1
0
mirror of https://github.com/ppy/osu.git synced 2025-03-19 05:57:19 +08:00

Merge branch 'thread-safe-spectator-client'

This commit is contained in:
smoogipoo 2021-05-21 16:00:24 +09:00
commit 36aa186c6e
3 changed files with 122 additions and 151 deletions

View File

@ -10,6 +10,7 @@ using System.Linq;
using System.Threading.Tasks;
using osu.Framework.Allocation;
using osu.Framework.Bindables;
using osu.Framework.Development;
using osu.Framework.Graphics;
using osu.Game.Beatmaps;
using osu.Game.Online.API;
@ -38,8 +39,6 @@ namespace osu.Game.Online.Spectator
private readonly List<int> watchingUsers = new List<int>();
private readonly object userLock = new object();
public IBindableList<int> PlayingUsers => playingUsers;
private readonly BindableList<int> playingUsers = new BindableList<int>();
@ -81,18 +80,13 @@ namespace osu.Game.Online.Spectator
[BackgroundDependencyLoader]
private void load()
{
IsConnected.BindValueChanged(connected =>
IsConnected.BindValueChanged(connected => Schedule(() =>
{
if (connected.NewValue)
{
// get all the users that were previously being watched
int[] users;
lock (userLock)
{
users = watchingUsers.ToArray();
watchingUsers.Clear();
}
int[] users = watchingUsers.ToArray();
watchingUsers.Clear();
// resubscribe to watched users.
foreach (var userId in users)
@ -104,18 +98,15 @@ namespace osu.Game.Online.Spectator
}
else
{
lock (userLock)
{
playingUsers.Clear();
playingUserStates.Clear();
}
playingUsers.Clear();
playingUserStates.Clear();
}
}, true);
}), true);
}
Task ISpectatorClient.UserBeganPlaying(int userId, SpectatorState state)
{
lock (userLock)
Schedule(() =>
{
if (!playingUsers.Contains(userId))
playingUsers.Add(userId);
@ -125,35 +116,37 @@ namespace osu.Game.Online.Spectator
// We don't want the user states to update unless the player is being watched, otherwise calling BindUserBeganPlaying() can lead to double invocations.
if (watchingUsers.Contains(userId))
playingUserStates[userId] = state;
}
OnUserBeganPlaying?.Invoke(userId, state);
OnUserBeganPlaying?.Invoke(userId, state);
});
return Task.CompletedTask;
}
Task ISpectatorClient.UserFinishedPlaying(int userId, SpectatorState state)
{
lock (userLock)
Schedule(() =>
{
playingUsers.Remove(userId);
playingUserStates.Remove(userId);
}
OnUserFinishedPlaying?.Invoke(userId, state);
OnUserFinishedPlaying?.Invoke(userId, state);
});
return Task.CompletedTask;
}
Task ISpectatorClient.UserSentFrames(int userId, FrameDataBundle data)
{
OnNewFrames?.Invoke(userId, data);
Schedule(() => OnNewFrames?.Invoke(userId, data));
return Task.CompletedTask;
}
public void BeginPlaying(GameplayBeatmap beatmap, Score score)
{
Debug.Assert(ThreadSafety.IsUpdateThread);
if (IsPlaying)
throw new InvalidOperationException($"Cannot invoke {nameof(BeginPlaying)} when already playing");
@ -177,33 +170,38 @@ namespace osu.Game.Online.Spectator
if (!IsPlaying)
return;
IsPlaying = false;
currentBeatmap = null;
// This method is most commonly called via Dispose(), which is asynchronous.
// Todo: This should not be a thing, but requires framework changes.
Schedule(() =>
{
IsPlaying = false;
currentBeatmap = null;
EndPlayingInternal(currentState);
EndPlayingInternal(currentState);
});
}
public void WatchUser(int userId)
{
lock (userLock)
{
if (watchingUsers.Contains(userId))
return;
Debug.Assert(ThreadSafety.IsUpdateThread);
watchingUsers.Add(userId);
}
if (watchingUsers.Contains(userId))
return;
watchingUsers.Add(userId);
WatchUserInternal(userId);
}
public void StopWatchingUser(int userId)
{
lock (userLock)
// This method is most commonly called via Dispose(), which is asynchronous.
// Todo: This should not be a thing, but requires framework changes.
Schedule(() =>
{
watchingUsers.Remove(userId);
}
StopWatchingUserInternal(userId);
StopWatchingUserInternal(userId);
});
}
protected abstract Task BeginPlayingInternal(SpectatorState state);
@ -234,6 +232,8 @@ namespace osu.Game.Online.Spectator
public void HandleFrame(ReplayFrame frame)
{
Debug.Assert(ThreadSafety.IsUpdateThread);
if (frame is IConvertibleReplayFrame convertible)
pendingFrames.Enqueue(convertible.ToLegacy(currentBeatmap));
@ -265,8 +265,7 @@ namespace osu.Game.Online.Spectator
/// <returns><c>true</c> if successful (the user is playing), <c>false</c> otherwise.</returns>
public bool TryGetPlayingUserState(int userId, out SpectatorState state)
{
lock (userLock)
return playingUserStates.TryGetValue(userId, out state);
return playingUserStates.TryGetValue(userId, out state);
}
/// <summary>
@ -277,16 +276,13 @@ namespace osu.Game.Online.Spectator
public void BindUserBeganPlaying(Action<int, SpectatorState> callback, bool runOnceImmediately = false)
{
// The lock is taken before the event is subscribed to to prevent doubling of events.
lock (userLock)
{
OnUserBeganPlaying += callback;
OnUserBeganPlaying += callback;
if (!runOnceImmediately)
return;
if (!runOnceImmediately)
return;
foreach (var (userId, state) in playingUserStates)
callback(userId, state);
}
foreach (var (userId, state) in playingUserStates)
callback(userId, state);
}
}
}

View File

@ -55,8 +55,6 @@ namespace osu.Game.Screens.Play.HUD
foreach (var userId in playingUsers)
{
spectatorClient.WatchUser(userId);
// probably won't be required in the final implementation.
var resolvedUser = userLookupCache.GetUserAsync(userId).Result;
@ -80,6 +78,8 @@ namespace osu.Game.Screens.Play.HUD
// BindableList handles binding in a really bad way (Clear then AddRange) so we need to do this manually..
foreach (int userId in playingUsers)
{
spectatorClient.WatchUser(userId);
if (!multiplayerClient.CurrentMatchPlayingUserIds.Contains(userId))
usersChanged(this, new NotifyCollectionChangedEventArgs(NotifyCollectionChangedAction.Remove, new[] { userId }));
}

View File

@ -42,9 +42,6 @@ namespace osu.Game.Screens.Spectate
[Resolved]
private UserLookupCache userLookupCache { get; set; }
// A lock is used to synchronise access to spectator/gameplay states, since this class is a screen which may become non-current and stop receiving updates at any point.
private readonly object stateLock = new object();
private readonly Dictionary<int, User> userMap = new Dictionary<int, User>();
private readonly Dictionary<int, GameplayState> gameplayStates = new Dictionary<int, GameplayState>();
@ -63,8 +60,11 @@ namespace osu.Game.Screens.Spectate
{
base.LoadComplete();
populateAllUsers().ContinueWith(_ => Schedule(() =>
getAllUsers().ContinueWith(users => Schedule(() =>
{
foreach (var u in users.Result)
userMap[u.Id] = u;
spectatorClient.BindUserBeganPlaying(userBeganPlaying, true);
spectatorClient.OnUserFinishedPlaying += userFinishedPlaying;
spectatorClient.OnNewFrames += userSentFrames;
@ -72,27 +72,23 @@ namespace osu.Game.Screens.Spectate
managerUpdated = beatmaps.ItemUpdated.GetBoundCopy();
managerUpdated.BindValueChanged(beatmapUpdated);
lock (stateLock)
{
foreach (var (id, _) in userMap)
spectatorClient.WatchUser(id);
}
foreach (var (id, _) in userMap)
spectatorClient.WatchUser(id);
}));
}
private Task populateAllUsers()
private Task<User[]> getAllUsers()
{
var userLookupTasks = new List<Task>();
var userLookupTasks = new List<Task<User>>();
foreach (var u in userIds)
{
userLookupTasks.Add(userLookupCache.GetUserAsync(u).ContinueWith(task =>
{
if (!task.IsCompletedSuccessfully)
return;
return null;
lock (stateLock)
userMap[u] = task.Result;
return task.Result;
}));
}
@ -104,16 +100,13 @@ namespace osu.Game.Screens.Spectate
if (!e.NewValue.TryGetTarget(out var beatmapSet))
return;
lock (stateLock)
foreach (var (userId, _) in userMap)
{
foreach (var (userId, _) in userMap)
{
if (!spectatorClient.TryGetPlayingUserState(userId, out var userState))
continue;
if (!spectatorClient.TryGetPlayingUserState(userId, out var userState))
continue;
if (beatmapSet.Beatmaps.Any(b => b.OnlineBeatmapID == userState.BeatmapID))
updateGameplayState(userId);
}
if (beatmapSet.Beatmaps.Any(b => b.OnlineBeatmapID == userState.BeatmapID))
updateGameplayState(userId);
}
}
@ -122,101 +115,89 @@ namespace osu.Game.Screens.Spectate
if (state.RulesetID == null || state.BeatmapID == null)
return;
lock (stateLock)
{
if (!userMap.ContainsKey(userId))
return;
if (!userMap.ContainsKey(userId))
return;
// The user may have stopped playing.
if (!spectatorClient.TryGetPlayingUserState(userId, out _))
return;
// The user may have stopped playing.
if (!spectatorClient.TryGetPlayingUserState(userId, out _))
return;
Schedule(() => OnUserStateChanged(userId, state));
Schedule(() => OnUserStateChanged(userId, state));
updateGameplayState(userId);
}
updateGameplayState(userId);
}
private void updateGameplayState(int userId)
{
lock (stateLock)
Debug.Assert(userMap.ContainsKey(userId));
// The user may have stopped playing.
if (!spectatorClient.TryGetPlayingUserState(userId, out var spectatorState))
return;
var user = userMap[userId];
var resolvedRuleset = rulesets.AvailableRulesets.FirstOrDefault(r => r.ID == spectatorState.RulesetID)?.CreateInstance();
if (resolvedRuleset == null)
return;
var resolvedBeatmap = beatmaps.QueryBeatmap(b => b.OnlineBeatmapID == spectatorState.BeatmapID);
if (resolvedBeatmap == null)
return;
var score = new Score
{
Debug.Assert(userMap.ContainsKey(userId));
// The user may have stopped playing.
if (!spectatorClient.TryGetPlayingUserState(userId, out var spectatorState))
return;
var user = userMap[userId];
var resolvedRuleset = rulesets.AvailableRulesets.FirstOrDefault(r => r.ID == spectatorState.RulesetID)?.CreateInstance();
if (resolvedRuleset == null)
return;
var resolvedBeatmap = beatmaps.QueryBeatmap(b => b.OnlineBeatmapID == spectatorState.BeatmapID);
if (resolvedBeatmap == null)
return;
var score = new Score
ScoreInfo = new ScoreInfo
{
ScoreInfo = new ScoreInfo
{
Beatmap = resolvedBeatmap,
User = user,
Mods = spectatorState.Mods.Select(m => m.ToMod(resolvedRuleset)).ToArray(),
Ruleset = resolvedRuleset.RulesetInfo,
},
Replay = new Replay { HasReceivedAllFrames = false },
};
Beatmap = resolvedBeatmap,
User = user,
Mods = spectatorState.Mods.Select(m => m.ToMod(resolvedRuleset)).ToArray(),
Ruleset = resolvedRuleset.RulesetInfo,
},
Replay = new Replay { HasReceivedAllFrames = false },
};
var gameplayState = new GameplayState(score, resolvedRuleset, beatmaps.GetWorkingBeatmap(resolvedBeatmap));
var gameplayState = new GameplayState(score, resolvedRuleset, beatmaps.GetWorkingBeatmap(resolvedBeatmap));
gameplayStates[userId] = gameplayState;
Schedule(() => StartGameplay(userId, gameplayState));
}
gameplayStates[userId] = gameplayState;
Schedule(() => StartGameplay(userId, gameplayState));
}
private void userSentFrames(int userId, FrameDataBundle bundle)
{
lock (stateLock)
if (!userMap.ContainsKey(userId))
return;
if (!gameplayStates.TryGetValue(userId, out var gameplayState))
return;
// The ruleset instance should be guaranteed to be in sync with the score via ScoreLock.
Debug.Assert(gameplayState.Ruleset != null && gameplayState.Ruleset.RulesetInfo.Equals(gameplayState.Score.ScoreInfo.Ruleset));
foreach (var frame in bundle.Frames)
{
if (!userMap.ContainsKey(userId))
return;
IConvertibleReplayFrame convertibleFrame = gameplayState.Ruleset.CreateConvertibleReplayFrame();
convertibleFrame.FromLegacy(frame, gameplayState.Beatmap.Beatmap);
if (!gameplayStates.TryGetValue(userId, out var gameplayState))
return;
var convertedFrame = (ReplayFrame)convertibleFrame;
convertedFrame.Time = frame.Time;
// The ruleset instance should be guaranteed to be in sync with the score via ScoreLock.
Debug.Assert(gameplayState.Ruleset != null && gameplayState.Ruleset.RulesetInfo.Equals(gameplayState.Score.ScoreInfo.Ruleset));
foreach (var frame in bundle.Frames)
{
IConvertibleReplayFrame convertibleFrame = gameplayState.Ruleset.CreateConvertibleReplayFrame();
convertibleFrame.FromLegacy(frame, gameplayState.Beatmap.Beatmap);
var convertedFrame = (ReplayFrame)convertibleFrame;
convertedFrame.Time = frame.Time;
gameplayState.Score.Replay.Frames.Add(convertedFrame);
}
gameplayState.Score.Replay.Frames.Add(convertedFrame);
}
}
private void userFinishedPlaying(int userId, SpectatorState state)
{
lock (stateLock)
{
if (!userMap.ContainsKey(userId))
return;
if (!userMap.ContainsKey(userId))
return;
if (!gameplayStates.TryGetValue(userId, out var gameplayState))
return;
if (!gameplayStates.TryGetValue(userId, out var gameplayState))
return;
gameplayState.Score.Replay.HasReceivedAllFrames = true;
gameplayState.Score.Replay.HasReceivedAllFrames = true;
gameplayStates.Remove(userId);
Schedule(() => EndGameplay(userId));
}
gameplayStates.Remove(userId);
Schedule(() => EndGameplay(userId));
}
/// <summary>
@ -245,15 +226,12 @@ namespace osu.Game.Screens.Spectate
/// <param name="userId">The user to stop spectating.</param>
protected void RemoveUser(int userId)
{
lock (stateLock)
{
userFinishedPlaying(userId, null);
userFinishedPlaying(userId, null);
userIds.Remove(userId);
userMap.Remove(userId);
userIds.Remove(userId);
userMap.Remove(userId);
spectatorClient.StopWatchingUser(userId);
}
spectatorClient.StopWatchingUser(userId);
}
protected override void Dispose(bool isDisposing)
@ -266,11 +244,8 @@ namespace osu.Game.Screens.Spectate
spectatorClient.OnUserFinishedPlaying -= userFinishedPlaying;
spectatorClient.OnNewFrames -= userSentFrames;
lock (stateLock)
{
foreach (var (userId, _) in userMap)
spectatorClient.StopWatchingUser(userId);
}
foreach (var (userId, _) in userMap)
spectatorClient.StopWatchingUser(userId);
}
managerUpdated?.UnbindAll();