mirror of
https://github.com/ppy/osu.git
synced 2025-02-13 14:13:18 +08:00
Clean up spectator streaming client with new hub connector
This commit is contained in:
parent
28b815ffe1
commit
f76f92515e
@ -3,7 +3,6 @@
|
||||
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using NUnit.Framework;
|
||||
using osu.Framework.Allocation;
|
||||
using osu.Framework.Bindables;
|
||||
@ -233,6 +232,8 @@ namespace osu.Game.Tests.Visual.Gameplay
|
||||
|
||||
public class TestSpectatorStreamingClient : SpectatorStreamingClient
|
||||
{
|
||||
protected override IBindable<bool> IsConnected { get; } = new BindableBool(false);
|
||||
|
||||
public readonly User StreamingUser = new User { Id = 55, Username = "Test user" };
|
||||
|
||||
public new BindableList<int> PlayingUsers => (BindableList<int>)base.PlayingUsers;
|
||||
@ -244,11 +245,6 @@ namespace osu.Game.Tests.Visual.Gameplay
|
||||
{
|
||||
}
|
||||
|
||||
protected override Task Connect()
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public void StartPlay(int beatmapId)
|
||||
{
|
||||
this.beatmapId = beatmapId;
|
||||
|
@ -4,7 +4,6 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using NUnit.Framework;
|
||||
using osu.Framework.Allocation;
|
||||
using osu.Framework.Bindables;
|
||||
@ -96,6 +95,8 @@ namespace osu.Game.Tests.Visual.Multiplayer
|
||||
|
||||
public class TestMultiplayerStreaming : SpectatorStreamingClient
|
||||
{
|
||||
protected override IBindable<bool> IsConnected { get; } = new BindableBool(false);
|
||||
|
||||
public new BindableList<int> PlayingUsers => (BindableList<int>)base.PlayingUsers;
|
||||
|
||||
private readonly int totalUsers;
|
||||
@ -163,8 +164,6 @@ namespace osu.Game.Tests.Visual.Multiplayer
|
||||
((ISpectatorClient)this).UserSentFrames(userId, new FrameDataBundle(header, Array.Empty<LegacyReplayFrame>()));
|
||||
}
|
||||
}
|
||||
|
||||
protected override Task Connect() => Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -8,13 +8,9 @@ using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
using JetBrains.Annotations;
|
||||
using Microsoft.AspNetCore.SignalR.Client;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Newtonsoft.Json;
|
||||
using osu.Framework;
|
||||
using osu.Framework.Allocation;
|
||||
using osu.Framework.Bindables;
|
||||
using osu.Framework.Graphics;
|
||||
using osu.Framework.Logging;
|
||||
using osu.Framework.Graphics.Containers;
|
||||
using osu.Game.Beatmaps;
|
||||
using osu.Game.Online.API;
|
||||
using osu.Game.Replays.Legacy;
|
||||
@ -27,14 +23,18 @@ using osu.Game.Screens.Play;
|
||||
|
||||
namespace osu.Game.Online.Spectator
|
||||
{
|
||||
public class SpectatorStreamingClient : Component, ISpectatorClient
|
||||
public class SpectatorStreamingClient : CompositeDrawable, ISpectatorClient
|
||||
{
|
||||
/// <summary>
|
||||
/// The maximum milliseconds between frame bundle sends.
|
||||
/// </summary>
|
||||
public const double TIME_BETWEEN_SENDS = 200;
|
||||
|
||||
private HubConnection connection;
|
||||
private readonly HubClientConnector connector;
|
||||
|
||||
protected virtual IBindable<bool> IsConnected => connector.IsConnected;
|
||||
|
||||
private HubConnection connection => connector.CurrentConnection;
|
||||
|
||||
private readonly List<int> watchingUsers = new List<int>();
|
||||
|
||||
@ -44,13 +44,6 @@ namespace osu.Game.Online.Spectator
|
||||
|
||||
private readonly BindableList<int> playingUsers = new BindableList<int>();
|
||||
|
||||
private readonly IBindable<APIState> apiState = new Bindable<APIState>();
|
||||
|
||||
private bool isConnected;
|
||||
|
||||
[Resolved]
|
||||
private IAPIProvider api { get; set; }
|
||||
|
||||
[CanBeNull]
|
||||
private IBeatmap currentBeatmap;
|
||||
|
||||
@ -82,114 +75,50 @@ namespace osu.Game.Online.Spectator
|
||||
/// </summary>
|
||||
public event Action<int, SpectatorState> OnUserFinishedPlaying;
|
||||
|
||||
private readonly string endpoint;
|
||||
|
||||
public SpectatorStreamingClient(EndpointConfiguration endpoints)
|
||||
{
|
||||
endpoint = endpoints.SpectatorEndpointUrl;
|
||||
InternalChild = connector = new HubClientConnector("Spectator client", endpoints.SpectatorEndpointUrl)
|
||||
{
|
||||
OnNewConnection = newConnection =>
|
||||
{
|
||||
// until strong typed client support is added, each method must be manually bound
|
||||
// (see https://github.com/dotnet/aspnetcore/issues/15198)
|
||||
newConnection.On<int, SpectatorState>(nameof(ISpectatorClient.UserBeganPlaying), ((ISpectatorClient)this).UserBeganPlaying);
|
||||
newConnection.On<int, FrameDataBundle>(nameof(ISpectatorClient.UserSentFrames), ((ISpectatorClient)this).UserSentFrames);
|
||||
newConnection.On<int, SpectatorState>(nameof(ISpectatorClient.UserFinishedPlaying), ((ISpectatorClient)this).UserFinishedPlaying);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
[BackgroundDependencyLoader]
|
||||
private void load()
|
||||
{
|
||||
apiState.BindTo(api.State);
|
||||
apiState.BindValueChanged(apiStateChanged, true);
|
||||
}
|
||||
|
||||
private void apiStateChanged(ValueChangedEvent<APIState> state)
|
||||
{
|
||||
switch (state.NewValue)
|
||||
IsConnected.BindValueChanged(connected =>
|
||||
{
|
||||
case APIState.Failing:
|
||||
case APIState.Offline:
|
||||
connection?.StopAsync();
|
||||
connection = null;
|
||||
break;
|
||||
|
||||
case APIState.Online:
|
||||
Task.Run(Connect);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
protected virtual async Task Connect()
|
||||
{
|
||||
if (connection != null)
|
||||
return;
|
||||
|
||||
var builder = new HubConnectionBuilder()
|
||||
.WithUrl(endpoint, options => { options.Headers.Add("Authorization", $"Bearer {api.AccessToken}"); });
|
||||
|
||||
if (RuntimeInfo.SupportsJIT)
|
||||
builder.AddMessagePackProtocol();
|
||||
else
|
||||
{
|
||||
// eventually we will precompile resolvers for messagepack, but this isn't working currently
|
||||
// see https://github.com/neuecc/MessagePack-CSharp/issues/780#issuecomment-768794308.
|
||||
builder.AddNewtonsoftJsonProtocol(options => { options.PayloadSerializerSettings.ReferenceLoopHandling = ReferenceLoopHandling.Ignore; });
|
||||
}
|
||||
|
||||
connection = builder.Build();
|
||||
// until strong typed client support is added, each method must be manually bound (see https://github.com/dotnet/aspnetcore/issues/15198)
|
||||
connection.On<int, SpectatorState>(nameof(ISpectatorClient.UserBeganPlaying), ((ISpectatorClient)this).UserBeganPlaying);
|
||||
connection.On<int, FrameDataBundle>(nameof(ISpectatorClient.UserSentFrames), ((ISpectatorClient)this).UserSentFrames);
|
||||
connection.On<int, SpectatorState>(nameof(ISpectatorClient.UserFinishedPlaying), ((ISpectatorClient)this).UserFinishedPlaying);
|
||||
|
||||
connection.Closed += async ex =>
|
||||
{
|
||||
isConnected = false;
|
||||
playingUsers.Clear();
|
||||
|
||||
if (ex != null)
|
||||
if (connected.NewValue)
|
||||
{
|
||||
Logger.Log($"Spectator client lost connection: {ex}", LoggingTarget.Network);
|
||||
await tryUntilConnected();
|
||||
// get all the users that were previously being watched
|
||||
int[] users;
|
||||
|
||||
lock (userLock)
|
||||
{
|
||||
users = watchingUsers.ToArray();
|
||||
watchingUsers.Clear();
|
||||
}
|
||||
|
||||
// resubscribe to watched users.
|
||||
foreach (var userId in users)
|
||||
WatchUser(userId);
|
||||
|
||||
// re-send state in case it wasn't received
|
||||
if (isPlaying)
|
||||
beginPlaying();
|
||||
}
|
||||
};
|
||||
|
||||
await tryUntilConnected();
|
||||
|
||||
async Task tryUntilConnected()
|
||||
{
|
||||
Logger.Log("Spectator client connecting...", LoggingTarget.Network);
|
||||
|
||||
while (api.State.Value == APIState.Online)
|
||||
else
|
||||
{
|
||||
try
|
||||
{
|
||||
// reconnect on any failure
|
||||
await connection.StartAsync();
|
||||
Logger.Log("Spectator client connected!", LoggingTarget.Network);
|
||||
|
||||
// get all the users that were previously being watched
|
||||
int[] users;
|
||||
|
||||
lock (userLock)
|
||||
{
|
||||
users = watchingUsers.ToArray();
|
||||
watchingUsers.Clear();
|
||||
}
|
||||
|
||||
// success
|
||||
isConnected = true;
|
||||
|
||||
// resubscribe to watched users
|
||||
foreach (var userId in users)
|
||||
WatchUser(userId);
|
||||
|
||||
// re-send state in case it wasn't received
|
||||
if (isPlaying)
|
||||
beginPlaying();
|
||||
|
||||
break;
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Logger.Log($"Spectator client connection error: {e}", LoggingTarget.Network);
|
||||
await Task.Delay(5000);
|
||||
}
|
||||
playingUsers.Clear();
|
||||
}
|
||||
}
|
||||
}, true);
|
||||
}
|
||||
|
||||
Task ISpectatorClient.UserBeganPlaying(int userId, SpectatorState state)
|
||||
@ -240,14 +169,14 @@ namespace osu.Game.Online.Spectator
|
||||
{
|
||||
Debug.Assert(isPlaying);
|
||||
|
||||
if (!isConnected) return;
|
||||
if (!IsConnected.Value) return;
|
||||
|
||||
connection.SendAsync(nameof(ISpectatorServer.BeginPlaySession), currentState);
|
||||
}
|
||||
|
||||
public void SendFrames(FrameDataBundle data)
|
||||
{
|
||||
if (!isConnected) return;
|
||||
if (!IsConnected.Value) return;
|
||||
|
||||
lastSend = connection.SendAsync(nameof(ISpectatorServer.SendFrameData), data);
|
||||
}
|
||||
@ -257,7 +186,7 @@ namespace osu.Game.Online.Spectator
|
||||
isPlaying = false;
|
||||
currentBeatmap = null;
|
||||
|
||||
if (!isConnected) return;
|
||||
if (!IsConnected.Value) return;
|
||||
|
||||
connection.SendAsync(nameof(ISpectatorServer.EndPlaySession), currentState);
|
||||
}
|
||||
@ -271,7 +200,7 @@ namespace osu.Game.Online.Spectator
|
||||
|
||||
watchingUsers.Add(userId);
|
||||
|
||||
if (!isConnected)
|
||||
if (!IsConnected.Value)
|
||||
return;
|
||||
}
|
||||
|
||||
@ -284,7 +213,7 @@ namespace osu.Game.Online.Spectator
|
||||
{
|
||||
watchingUsers.Remove(userId);
|
||||
|
||||
if (!isConnected)
|
||||
if (!IsConnected.Value)
|
||||
return;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user