1
0
mirror of https://github.com/ppy/osu.git synced 2024-12-15 19:12:57 +08:00

Merge pull request #16975 from peppy/spectator-reliability

Improve resilience of spectator frame communication to ensure replays are sent in full
This commit is contained in:
Dan Balasescu 2022-02-25 22:57:19 +09:00 committed by GitHub
commit 6ce6eaf03d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 169 additions and 88 deletions

View File

@ -257,7 +257,7 @@ namespace osu.Game.Tests.Visual.Gameplay
sendFrames();
waitForPlayer();
AddStep("send passed", () => spectatorClient.EndPlay(streamingUser.Id, SpectatedUserState.Passed));
AddStep("send passed", () => spectatorClient.SendEndPlay(streamingUser.Id, SpectatedUserState.Passed));
AddUntilStep("state is passed", () => spectatorClient.WatchedUserStates[streamingUser.Id].State == SpectatedUserState.Passed);
start();
@ -275,7 +275,7 @@ namespace osu.Game.Tests.Visual.Gameplay
sendFrames();
waitForPlayer();
AddStep("send quit", () => spectatorClient.EndPlay(streamingUser.Id));
AddStep("send quit", () => spectatorClient.SendEndPlay(streamingUser.Id));
AddUntilStep("state is quit", () => spectatorClient.WatchedUserStates[streamingUser.Id].State == SpectatedUserState.Quit);
start();
@ -293,7 +293,7 @@ namespace osu.Game.Tests.Visual.Gameplay
sendFrames();
waitForPlayer();
AddStep("send failed", () => spectatorClient.EndPlay(streamingUser.Id, SpectatedUserState.Failed));
AddStep("send failed", () => spectatorClient.SendEndPlay(streamingUser.Id, SpectatedUserState.Failed));
AddUntilStep("state is failed", () => spectatorClient.WatchedUserStates[streamingUser.Id].State == SpectatedUserState.Failed);
start();
@ -312,16 +312,16 @@ namespace osu.Game.Tests.Visual.Gameplay
private void waitForPlayer() => AddUntilStep("wait for player", () => (Stack.CurrentScreen as Player)?.IsLoaded == true);
private void start(int? beatmapId = null) => AddStep("start play", () => spectatorClient.StartPlay(streamingUser.Id, beatmapId ?? importedBeatmapId));
private void start(int? beatmapId = null) => AddStep("start play", () => spectatorClient.SendStartPlay(streamingUser.Id, beatmapId ?? importedBeatmapId));
private void finish(SpectatedUserState state = SpectatedUserState.Quit) => AddStep("end play", () => spectatorClient.EndPlay(streamingUser.Id, state));
private void finish(SpectatedUserState state = SpectatedUserState.Quit) => AddStep("end play", () => spectatorClient.SendEndPlay(streamingUser.Id, state));
private void checkPaused(bool state) =>
AddUntilStep($"game is {(state ? "paused" : "playing")}", () => player.ChildrenOfType<DrawableRuleset>().First().IsPaused.Value == state);
private void sendFrames(int count = 10)
{
AddStep("send frames", () => spectatorClient.SendFrames(streamingUser.Id, count));
AddStep("send frames", () => spectatorClient.SendFramesFromUser(streamingUser.Id, count));
}
private void loadSpectatingScreen()

View File

@ -41,8 +41,12 @@ namespace osu.Game.Tests.Visual.Gameplay
private Replay replay;
private TestSpectatorClient spectatorClient;
private ManualClock manualClock;
private TestReplayRecorder recorder;
private OsuSpriteText latencyDisplay;
private TestFramedReplayInputHandler replayHandler;
@ -54,7 +58,6 @@ namespace osu.Game.Tests.Visual.Gameplay
{
replay = new Replay();
manualClock = new ManualClock();
SpectatorClient spectatorClient;
Child = new DependencyProvidingContainer
{
@ -76,7 +79,7 @@ namespace osu.Game.Tests.Visual.Gameplay
{
recordingManager = new TestRulesetInputManager(TestSceneModSettings.CreateTestRulesetInfo(), 0, SimultaneousBindingMode.Unique)
{
Recorder = new TestReplayRecorder
Recorder = recorder = new TestReplayRecorder
{
ScreenSpaceToGamefield = pos => recordingManager.ToLocalSpace(pos),
},
@ -143,22 +146,52 @@ namespace osu.Game.Tests.Visual.Gameplay
});
}
[Test]
public void TestBasic()
{
AddUntilStep("received frames", () => replay.Frames.Count > 50);
AddStep("stop sending frames", () => recorder.Expire());
AddUntilStep("wait for all frames received", () => replay.Frames.Count == recorder.SentFrames.Count);
}
[Test]
public void TestWithSendFailure()
{
AddUntilStep("received frames", () => replay.Frames.Count > 50);
int framesReceivedSoFar = 0;
int frameSendAttemptsSoFar = 0;
AddStep("start failing sends", () =>
{
spectatorClient.ShouldFailSendingFrames = true;
framesReceivedSoFar = replay.Frames.Count;
frameSendAttemptsSoFar = spectatorClient.FrameSendAttempts;
});
AddUntilStep("wait for send attempts", () => spectatorClient.FrameSendAttempts > frameSendAttemptsSoFar + 5);
AddAssert("frames did not increase", () => framesReceivedSoFar == replay.Frames.Count);
AddStep("stop failing sends", () => spectatorClient.ShouldFailSendingFrames = false);
AddUntilStep("wait for next frames", () => framesReceivedSoFar < replay.Frames.Count);
AddStep("stop sending frames", () => recorder.Expire());
AddUntilStep("wait for all frames received", () => replay.Frames.Count == recorder.SentFrames.Count);
AddAssert("ensure frames were received in the correct sequence", () => replay.Frames.Select(f => f.Time).SequenceEqual(recorder.SentFrames.Select(f => f.Time)));
}
private void onNewFrames(int userId, FrameDataBundle frames)
{
Logger.Log($"Received {frames.Frames.Count} new frames ({string.Join(',', frames.Frames.Select(f => ((int)f.Time).ToString()))})");
foreach (var legacyFrame in frames.Frames)
{
var frame = new TestReplayFrame();
frame.FromLegacy(legacyFrame, null);
replay.Frames.Add(frame);
}
}
[Test]
public void TestBasic()
{
AddStep("Wait for user input", () => { });
Logger.Log($"Received {frames.Frames.Count} new frames (total {replay.Frames.Count} of {recorder.SentFrames.Count})");
}
private double latency = SpectatorClient.TIME_BETWEEN_SENDS;
@ -318,6 +351,8 @@ namespace osu.Game.Tests.Visual.Gameplay
internal class TestReplayRecorder : ReplayRecorder<TestAction>
{
public List<ReplayFrame> SentFrames = new List<ReplayFrame>();
public TestReplayRecorder()
: base(new Score
{
@ -332,7 +367,9 @@ namespace osu.Game.Tests.Visual.Gameplay
protected override ReplayFrame HandleFrame(Vector2 mousePosition, List<TestAction> actions, ReplayFrame previousFrame)
{
return new TestReplayFrame(Time.Current, mousePosition, actions.ToArray());
var testReplayFrame = new TestReplayFrame(Time.Current, mousePosition, actions.ToArray());
SentFrames.Add(testReplayFrame);
return testReplayFrame;
}
}
}

View File

@ -34,7 +34,7 @@ namespace osu.Game.Tests.Visual.Multiplayer
foreach ((int userId, var _) in clocks)
{
SpectatorClient.StartPlay(userId, 0);
SpectatorClient.SendStartPlay(userId, 0);
OnlinePlayDependencies.MultiplayerClient.AddUser(new APIUser { Id = userId });
}
});
@ -68,10 +68,10 @@ namespace osu.Game.Tests.Visual.Multiplayer
// For player 2, send frames in sets of 10.
for (int i = 0; i < 100; i++)
{
SpectatorClient.SendFrames(PLAYER_1_ID, 1);
SpectatorClient.SendFramesFromUser(PLAYER_1_ID, 1);
if (i % 10 == 0)
SpectatorClient.SendFrames(PLAYER_2_ID, 10);
SpectatorClient.SendFramesFromUser(PLAYER_2_ID, 10);
}
});

View File

@ -71,11 +71,11 @@ namespace osu.Game.Tests.Visual.Multiplayer
loadSpectateScreen(false);
AddWaitStep("wait a bit", 10);
AddStep("load player first_player_id", () => SpectatorClient.StartPlay(PLAYER_1_ID, importedBeatmapId));
AddStep("load player first_player_id", () => SpectatorClient.SendStartPlay(PLAYER_1_ID, importedBeatmapId));
AddUntilStep("one player added", () => spectatorScreen.ChildrenOfType<Player>().Count() == 1);
AddWaitStep("wait a bit", 10);
AddStep("load player second_player_id", () => SpectatorClient.StartPlay(PLAYER_2_ID, importedBeatmapId));
AddStep("load player second_player_id", () => SpectatorClient.SendStartPlay(PLAYER_2_ID, importedBeatmapId));
AddUntilStep("two players added", () => spectatorScreen.ChildrenOfType<Player>().Count() == 2);
}
@ -134,8 +134,8 @@ namespace osu.Game.Tests.Visual.Multiplayer
TeamID = 1,
};
SpectatorClient.StartPlay(player1.UserID, importedBeatmapId);
SpectatorClient.StartPlay(player2.UserID, importedBeatmapId);
SpectatorClient.SendStartPlay(player1.UserID, importedBeatmapId);
SpectatorClient.SendStartPlay(player2.UserID, importedBeatmapId);
playingUsers.Add(player1);
playingUsers.Add(player2);
@ -361,7 +361,7 @@ namespace osu.Game.Tests.Visual.Multiplayer
// to ensure negative gameplay start time does not affect spectator, send frames exactly after StartGameplay().
// (similar to real spectating sessions in which the first frames get sent between StartGameplay() and player load complete)
AddStep("send frames at gameplay start", () => getInstance(PLAYER_1_ID).OnGameplayStarted += () => SpectatorClient.SendFrames(PLAYER_1_ID, 100));
AddStep("send frames at gameplay start", () => getInstance(PLAYER_1_ID).OnGameplayStarted += () => SpectatorClient.SendFramesFromUser(PLAYER_1_ID, 100));
AddUntilStep("wait for player load", () => spectatorScreen.AllPlayersLoaded);
@ -398,7 +398,7 @@ namespace osu.Game.Tests.Visual.Multiplayer
};
OnlinePlayDependencies.MultiplayerClient.AddUser(user.User, true);
SpectatorClient.StartPlay(id, beatmapId ?? importedBeatmapId);
SpectatorClient.SendStartPlay(id, beatmapId ?? importedBeatmapId);
playingUsers.Add(user);
}
@ -412,7 +412,7 @@ namespace osu.Game.Tests.Visual.Multiplayer
var user = playingUsers.Single(u => u.UserID == userId);
OnlinePlayDependencies.MultiplayerClient.RemoveUser(user.User.AsNonNull());
SpectatorClient.EndPlay(userId);
SpectatorClient.SendEndPlay(userId);
playingUsers.Remove(user);
});
@ -425,7 +425,7 @@ namespace osu.Game.Tests.Visual.Multiplayer
AddStep("send frames", () =>
{
foreach (int id in userIds)
SpectatorClient.SendFrames(id, count);
SpectatorClient.SendFramesFromUser(id, count);
});
}

View File

@ -58,7 +58,7 @@ namespace osu.Game.Tests.Visual.Multiplayer
foreach (int user in users)
{
SpectatorClient.StartPlay(user, Beatmap.Value.BeatmapInfo.OnlineID);
SpectatorClient.SendStartPlay(user, Beatmap.Value.BeatmapInfo.OnlineID);
multiplayerUsers.Add(OnlinePlayDependencies.MultiplayerClient.AddUser(new APIUser { Id = user }, true));
}

View File

@ -62,7 +62,7 @@ namespace osu.Game.Tests.Visual.Multiplayer
foreach (int user in users)
{
SpectatorClient.StartPlay(user, Beatmap.Value.BeatmapInfo.OnlineID);
SpectatorClient.SendStartPlay(user, Beatmap.Value.BeatmapInfo.OnlineID);
var roomUser = OnlinePlayDependencies.MultiplayerClient.AddUser(new APIUser { Id = user }, true);
roomUser.MatchState = new TeamVersusUserState

View File

@ -56,9 +56,9 @@ namespace osu.Game.Tests.Visual.Online
[Test]
public void TestBasicDisplay()
{
AddStep("Add playing user", () => spectatorClient.StartPlay(streamingUser.Id, 0));
AddStep("Add playing user", () => spectatorClient.SendStartPlay(streamingUser.Id, 0));
AddUntilStep("Panel loaded", () => currentlyPlaying.ChildrenOfType<UserGridPanel>()?.FirstOrDefault()?.User.Id == 2);
AddStep("Remove playing user", () => spectatorClient.EndPlay(streamingUser.Id));
AddStep("Remove playing user", () => spectatorClient.SendEndPlay(streamingUser.Id));
AddUntilStep("Panel no longer present", () => !currentlyPlaying.ChildrenOfType<UserGridPanel>().Any());
}

View File

@ -57,14 +57,14 @@ namespace osu.Game.Online.Spectator
return connection.SendAsync(nameof(ISpectatorServer.BeginPlaySession), state);
}
protected override Task SendFramesInternal(FrameDataBundle data)
protected override Task SendFramesInternal(FrameDataBundle bundle)
{
if (!IsConnected.Value)
return Task.CompletedTask;
Debug.Assert(connection != null);
return connection.SendAsync(nameof(ISpectatorServer.SendFrameData), data);
return connection.SendAsync(nameof(ISpectatorServer.SendFrameData), bundle);
}
protected override Task EndPlayingInternal(SpectatorState state)

View File

@ -12,6 +12,7 @@ using osu.Framework.Allocation;
using osu.Framework.Bindables;
using osu.Framework.Development;
using osu.Framework.Graphics;
using osu.Framework.Logging;
using osu.Game.Beatmaps;
using osu.Game.Online.API;
using osu.Game.Replays.Legacy;
@ -45,18 +46,6 @@ namespace osu.Game.Online.Spectator
/// </summary>
public IBindableList<int> PlayingUsers => playingUsers;
/// <summary>
/// All users currently being watched.
/// </summary>
private readonly List<int> watchedUsers = new List<int>();
private readonly BindableDictionary<int, SpectatorState> watchedUserStates = new BindableDictionary<int, SpectatorState>();
private readonly BindableList<int> playingUsers = new BindableList<int>();
private readonly SpectatorState currentState = new SpectatorState();
private IBeatmap? currentBeatmap;
private Score? currentScore;
/// <summary>
/// Whether the local user is playing.
/// </summary>
@ -77,6 +66,28 @@ namespace osu.Game.Online.Spectator
/// </summary>
public event Action<int, SpectatorState>? OnUserFinishedPlaying;
/// <summary>
/// All users currently being watched.
/// </summary>
private readonly List<int> watchedUsers = new List<int>();
private readonly BindableDictionary<int, SpectatorState> watchedUserStates = new BindableDictionary<int, SpectatorState>();
private readonly BindableList<int> playingUsers = new BindableList<int>();
private readonly SpectatorState currentState = new SpectatorState();
private IBeatmap? currentBeatmap;
private Score? currentScore;
private readonly Queue<FrameDataBundle> pendingFrameBundles = new Queue<FrameDataBundle>();
private readonly Queue<LegacyReplayFrame> pendingFrames = new Queue<LegacyReplayFrame>();
private double lastPurgeTime;
private Task? lastSend;
private const int max_pending_frames = 30;
[BackgroundDependencyLoader]
private void load()
{
@ -94,6 +105,7 @@ namespace osu.Game.Online.Spectator
// re-send state in case it wasn't received
if (IsPlaying)
// TODO: this is likely sent out of order after a reconnect scenario. needs further consideration.
BeginPlayingInternal(currentState);
}
else
@ -168,7 +180,20 @@ namespace osu.Game.Online.Spectator
});
}
public void SendFrames(FrameDataBundle data) => lastSend = SendFramesInternal(data);
public void HandleFrame(ReplayFrame frame) => Schedule(() =>
{
if (!IsPlaying)
{
Logger.Log($"Frames arrived at {nameof(SpectatorClient)} outside of gameplay scope and will be ignored.");
return;
}
if (frame is IConvertibleReplayFrame convertible)
pendingFrames.Enqueue(convertible.ToLegacy(currentBeatmap));
if (pendingFrames.Count > max_pending_frames)
purgePendingFrames();
});
public void EndPlaying(GameplayState state)
{
@ -180,7 +205,7 @@ namespace osu.Game.Online.Spectator
return;
if (pendingFrames.Count > 0)
purgePendingFrames(true);
purgePendingFrames();
IsPlaying = false;
currentBeatmap = null;
@ -222,7 +247,7 @@ namespace osu.Game.Online.Spectator
protected abstract Task BeginPlayingInternal(SpectatorState state);
protected abstract Task SendFramesInternal(FrameDataBundle data);
protected abstract Task SendFramesInternal(FrameDataBundle bundle);
protected abstract Task EndPlayingInternal(SpectatorState state);
@ -230,53 +255,57 @@ namespace osu.Game.Online.Spectator
protected abstract Task StopWatchingUserInternal(int userId);
private readonly Queue<LegacyReplayFrame> pendingFrames = new Queue<LegacyReplayFrame>();
private double lastSendTime;
private Task? lastSend;
private const int max_pending_frames = 30;
protected override void Update()
{
base.Update();
if (pendingFrames.Count > 0 && Time.Current - lastSendTime > TIME_BETWEEN_SENDS)
if (pendingFrames.Count > 0 && Time.Current - lastPurgeTime > TIME_BETWEEN_SENDS)
purgePendingFrames();
}
public void HandleFrame(ReplayFrame frame)
private void purgePendingFrames()
{
Debug.Assert(ThreadSafety.IsUpdateThread);
if (!IsPlaying)
return;
if (frame is IConvertibleReplayFrame convertible)
pendingFrames.Enqueue(convertible.ToLegacy(currentBeatmap));
if (pendingFrames.Count > max_pending_frames)
purgePendingFrames();
}
private void purgePendingFrames(bool force = false)
{
if (lastSend?.IsCompleted == false && !force)
return;
if (pendingFrames.Count == 0)
return;
var frames = pendingFrames.ToArray();
pendingFrames.Clear();
Debug.Assert(currentScore != null);
SendFrames(new FrameDataBundle(currentScore.ScoreInfo, frames));
var frames = pendingFrames.ToArray();
var bundle = new FrameDataBundle(currentScore.ScoreInfo, frames);
lastSendTime = Time.Current;
pendingFrames.Clear();
lastPurgeTime = Time.Current;
pendingFrameBundles.Enqueue(bundle);
sendNextBundleIfRequired();
}
private void sendNextBundleIfRequired()
{
Debug.Assert(ThreadSafety.IsUpdateThread);
if (lastSend?.IsCompleted == false)
return;
if (!pendingFrameBundles.TryPeek(out var bundle))
return;
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
lastSend = tcs.Task;
SendFramesInternal(bundle).ContinueWith(t => Schedule(() =>
{
bool wasSuccessful = t.Exception == null;
// If the last bundle send wasn't successful, try again without dequeuing.
if (wasSuccessful)
pendingFrameBundles.Dequeue();
tcs.SetResult(wasSuccessful);
sendNextBundleIfRequired();
}));
}
}
}

View File

@ -21,10 +21,17 @@ namespace osu.Game.Tests.Visual.Spectator
public class TestSpectatorClient : SpectatorClient
{
/// <summary>
/// Maximum number of frames sent per bundle via <see cref="SendFrames"/>.
/// Maximum number of frames sent per bundle via <see cref="SendFramesFromUser"/>.
/// </summary>
public const int FRAME_BUNDLE_SIZE = 10;
/// <summary>
/// Whether to force send operations to fail (simulating a network issue).
/// </summary>
public bool ShouldFailSendingFrames { get; set; }
public int FrameSendAttempts { get; private set; }
public override IBindable<bool> IsConnected { get; } = new Bindable<bool>(true);
public IReadOnlyDictionary<int, ReplayFrame> LastReceivedUserFrames => lastReceivedUserFrames;
@ -47,7 +54,7 @@ namespace osu.Game.Tests.Visual.Spectator
/// </summary>
/// <param name="userId">The user to start play for.</param>
/// <param name="beatmapId">The playing beatmap id.</param>
public void StartPlay(int userId, int beatmapId)
public void SendStartPlay(int userId, int beatmapId)
{
userBeatmapDictionary[userId] = beatmapId;
userNextFrameDictionary[userId] = 0;
@ -59,7 +66,7 @@ namespace osu.Game.Tests.Visual.Spectator
/// </summary>
/// <param name="userId">The user to end play for.</param>
/// <param name="state">The spectator state to end play with.</param>
public void EndPlay(int userId, SpectatedUserState state = SpectatedUserState.Quit)
public void SendEndPlay(int userId, SpectatedUserState state = SpectatedUserState.Quit)
{
if (!userBeatmapDictionary.ContainsKey(userId))
return;
@ -74,14 +81,14 @@ namespace osu.Game.Tests.Visual.Spectator
userBeatmapDictionary.Remove(userId);
}
public new void Schedule(Action action) => base.Schedule(action);
/// <summary>
/// Sends frames for an arbitrary user, in bundles containing 10 frames each.
/// This bypasses the standard queueing mechanism completely and should only be used to test cases where multiple users need to be sending data.
/// Importantly, <see cref="ShouldFailSendingFrames"/> will have no effect.
/// </summary>
/// <param name="userId">The user to send frames for.</param>
/// <param name="count">The total number of frames to send.</param>
public void SendFrames(int userId, int count)
public void SendFramesFromUser(int userId, int count)
{
var frames = new List<LegacyReplayFrame>();
@ -123,7 +130,15 @@ namespace osu.Game.Tests.Visual.Spectator
return ((ISpectatorClient)this).UserBeganPlaying(api.LocalUser.Value.Id, state);
}
protected override Task SendFramesInternal(FrameDataBundle data) => ((ISpectatorClient)this).UserSentFrames(api.LocalUser.Value.Id, data);
protected override Task SendFramesInternal(FrameDataBundle bundle)
{
FrameSendAttempts++;
if (ShouldFailSendingFrames)
return Task.FromException(new InvalidOperationException());
return ((ISpectatorClient)this).UserSentFrames(api.LocalUser.Value.Id, bundle);
}
protected override Task EndPlayingInternal(SpectatorState state) => ((ISpectatorClient)this).UserFinishedPlaying(api.LocalUser.Value.Id, state);