1
0
mirror of https://github.com/ppy/osu.git synced 2024-11-19 09:32:54 +08:00
osu-lazer/osu.Game/Online/Spectator/SpectatorStreamingClient.cs

336 lines
10 KiB
C#
Raw Normal View History

2020-10-22 18:41:10 +08:00
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.Extensions.DependencyInjection;
using Newtonsoft.Json;
using osu.Framework;
using osu.Framework.Allocation;
using osu.Framework.Bindables;
using osu.Framework.Graphics;
using osu.Framework.Logging;
using osu.Game.Beatmaps;
using osu.Game.Online.API;
2020-10-22 18:17:19 +08:00
using osu.Game.Replays.Legacy;
2020-10-23 16:24:19 +08:00
using osu.Game.Rulesets;
2020-10-22 16:29:38 +08:00
using osu.Game.Rulesets.Mods;
using osu.Game.Rulesets.Replays;
using osu.Game.Rulesets.Replays.Types;
using osu.Game.Scoring;
using osu.Game.Screens.Play;
namespace osu.Game.Online.Spectator
{
public class SpectatorStreamingClient : Component, ISpectatorClient
{
/// <summary>
/// The maximum milliseconds between frame bundle sends.
/// </summary>
public const double TIME_BETWEEN_SENDS = 200;
private HubConnection connection;
private readonly List<int> watchingUsers = new List<int>();
2020-12-02 18:02:49 +08:00
private readonly object userLock = new object();
public IBindableList<int> PlayingUsers => playingUsers;
private readonly BindableList<int> playingUsers = new BindableList<int>();
private readonly IBindable<APIState> apiState = new Bindable<APIState>();
2020-10-22 14:27:04 +08:00
private bool isConnected;
[Resolved]
private IAPIProvider api { get; set; }
[CanBeNull]
private IBeatmap currentBeatmap;
[CanBeNull]
private Score currentScore;
2020-10-23 16:24:19 +08:00
[Resolved]
2020-10-27 09:59:24 +08:00
private IBindable<RulesetInfo> currentRuleset { get; set; }
2020-10-23 16:24:19 +08:00
2020-10-22 16:29:38 +08:00
[Resolved]
2020-10-27 09:59:24 +08:00
private IBindable<IReadOnlyList<Mod>> currentMods { get; set; }
2020-10-22 16:29:38 +08:00
private readonly SpectatorState currentState = new SpectatorState();
private bool isPlaying;
/// <summary>
/// Called whenever new frames arrive from the server.
/// </summary>
public event Action<int, FrameDataBundle> OnNewFrames;
2020-10-22 17:10:27 +08:00
2020-10-26 19:05:11 +08:00
/// <summary>
/// Called whenever a user starts a play session.
/// </summary>
public event Action<int, SpectatorState> OnUserBeganPlaying;
/// <summary>
2020-11-01 21:39:10 +08:00
/// Called whenever a user finishes a play session.
2020-10-26 19:05:11 +08:00
/// </summary>
public event Action<int, SpectatorState> OnUserFinishedPlaying;
private readonly string endpoint;
public SpectatorStreamingClient(EndpointConfiguration endpoints)
{
endpoint = endpoints.SpectatorEndpointUrl;
}
[BackgroundDependencyLoader]
private void load()
{
apiState.BindTo(api.State);
apiState.BindValueChanged(apiStateChanged, true);
}
private void apiStateChanged(ValueChangedEvent<APIState> state)
{
switch (state.NewValue)
{
case APIState.Failing:
case APIState.Offline:
connection?.StopAsync();
connection = null;
break;
case APIState.Online:
Task.Run(Connect);
break;
}
}
protected virtual async Task Connect()
{
2020-10-22 14:27:04 +08:00
if (connection != null)
return;
var builder = new HubConnectionBuilder()
.WithUrl(endpoint, options => { options.Headers.Add("Authorization", $"Bearer {api.AccessToken}"); });
if (RuntimeInfo.SupportsJIT)
builder.AddMessagePackProtocol();
else
{
2021-01-29 05:36:07 +08:00
// eventually we will precompile resolvers for messagepack, but this isn't working currently
// see https://github.com/neuecc/MessagePack-CSharp/issues/780#issuecomment-768794308.
builder.AddNewtonsoftJsonProtocol(options => { options.PayloadSerializerSettings.ReferenceLoopHandling = ReferenceLoopHandling.Ignore; });
}
connection = builder.Build();
// until strong typed client support is added, each method must be manually bound (see https://github.com/dotnet/aspnetcore/issues/15198)
connection.On<int, SpectatorState>(nameof(ISpectatorClient.UserBeganPlaying), ((ISpectatorClient)this).UserBeganPlaying);
connection.On<int, FrameDataBundle>(nameof(ISpectatorClient.UserSentFrames), ((ISpectatorClient)this).UserSentFrames);
connection.On<int, SpectatorState>(nameof(ISpectatorClient.UserFinishedPlaying), ((ISpectatorClient)this).UserFinishedPlaying);
2020-10-22 14:27:04 +08:00
connection.Closed += async ex =>
{
isConnected = false;
playingUsers.Clear();
if (ex != null)
{
Logger.Log($"Spectator client lost connection: {ex}", LoggingTarget.Network);
await tryUntilConnected();
}
2020-10-22 14:27:04 +08:00
};
await tryUntilConnected();
async Task tryUntilConnected()
{
Logger.Log("Spectator client connecting...", LoggingTarget.Network);
2020-10-22 14:27:04 +08:00
while (api.State.Value == APIState.Online)
{
try
{
// reconnect on any failure
await connection.StartAsync();
Logger.Log("Spectator client connected!", LoggingTarget.Network);
2020-10-22 14:27:04 +08:00
2020-12-02 18:02:49 +08:00
// get all the users that were previously being watched
int[] users;
lock (userLock)
{
users = watchingUsers.ToArray();
watchingUsers.Clear();
}
2020-10-22 14:27:04 +08:00
// success
isConnected = true;
// resubscribe to watched users
foreach (var userId in users)
WatchUser(userId);
// re-send state in case it wasn't received
if (isPlaying)
beginPlaying();
2020-10-22 14:27:04 +08:00
break;
}
catch (Exception e)
2020-10-22 14:27:04 +08:00
{
Logger.Log($"Spectator client connection error: {e}", LoggingTarget.Network);
2020-10-22 14:27:04 +08:00
await Task.Delay(5000);
}
}
}
}
Task ISpectatorClient.UserBeganPlaying(int userId, SpectatorState state)
{
if (!playingUsers.Contains(userId))
playingUsers.Add(userId);
2020-10-26 19:05:11 +08:00
OnUserBeganPlaying?.Invoke(userId, state);
return Task.CompletedTask;
}
Task ISpectatorClient.UserFinishedPlaying(int userId, SpectatorState state)
{
playingUsers.Remove(userId);
2020-10-26 19:05:11 +08:00
OnUserFinishedPlaying?.Invoke(userId, state);
return Task.CompletedTask;
}
Task ISpectatorClient.UserSentFrames(int userId, FrameDataBundle data)
{
OnNewFrames?.Invoke(userId, data);
2020-10-26 19:05:11 +08:00
return Task.CompletedTask;
}
public void BeginPlaying(GameplayBeatmap beatmap, Score score)
2020-10-22 14:27:04 +08:00
{
if (isPlaying)
throw new InvalidOperationException($"Cannot invoke {nameof(BeginPlaying)} when already playing");
isPlaying = true;
2020-10-22 16:29:43 +08:00
// transfer state at point of beginning play
currentState.BeatmapID = beatmap.BeatmapInfo.OnlineBeatmapID;
2020-10-27 09:59:24 +08:00
currentState.RulesetID = currentRuleset.Value.ID;
currentState.Mods = currentMods.Value.Select(m => new APIMod(m));
2020-10-22 16:29:43 +08:00
currentBeatmap = beatmap.PlayableBeatmap;
currentScore = score;
beginPlaying();
}
private void beginPlaying()
{
Debug.Assert(isPlaying);
if (!isConnected) return;
2020-10-22 16:29:43 +08:00
connection.SendAsync(nameof(ISpectatorServer.BeginPlaySession), currentState);
2020-10-22 14:27:04 +08:00
}
2020-10-22 14:27:04 +08:00
public void SendFrames(FrameDataBundle data)
{
if (!isConnected) return;
2020-10-22 18:17:19 +08:00
lastSend = connection.SendAsync(nameof(ISpectatorServer.SendFrameData), data);
2020-10-22 14:27:04 +08:00
}
2020-10-22 16:29:38 +08:00
public void EndPlaying()
2020-10-22 14:27:04 +08:00
{
isPlaying = false;
currentBeatmap = null;
2020-10-22 21:56:23 +08:00
if (!isConnected) return;
2020-10-22 16:29:38 +08:00
connection.SendAsync(nameof(ISpectatorServer.EndPlaySession), currentState);
2020-10-22 14:27:04 +08:00
}
public virtual void WatchUser(int userId)
2020-10-22 14:27:04 +08:00
{
2020-12-02 18:02:49 +08:00
lock (userLock)
{
if (watchingUsers.Contains(userId))
return;
2020-12-02 18:02:49 +08:00
watchingUsers.Add(userId);
2020-10-22 18:17:19 +08:00
2020-12-02 18:02:49 +08:00
if (!isConnected)
return;
}
2020-10-22 18:17:19 +08:00
2020-10-22 14:27:04 +08:00
connection.SendAsync(nameof(ISpectatorServer.StartWatchingUser), userId);
}
2020-10-22 18:17:19 +08:00
public void StopWatchingUser(int userId)
{
2020-12-02 18:02:49 +08:00
lock (userLock)
{
watchingUsers.Remove(userId);
2020-10-22 18:17:19 +08:00
2020-12-02 18:02:49 +08:00
if (!isConnected)
return;
}
2020-10-22 18:17:19 +08:00
connection.SendAsync(nameof(ISpectatorServer.EndWatchingUser), userId);
}
private readonly Queue<LegacyReplayFrame> pendingFrames = new Queue<LegacyReplayFrame>();
private double lastSendTime;
private Task lastSend;
private const int max_pending_frames = 30;
protected override void Update()
{
base.Update();
if (pendingFrames.Count > 0 && Time.Current - lastSendTime > TIME_BETWEEN_SENDS)
2020-10-22 18:17:19 +08:00
purgePendingFrames();
}
public void HandleFrame(ReplayFrame frame)
{
if (frame is IConvertibleReplayFrame convertible)
pendingFrames.Enqueue(convertible.ToLegacy(currentBeatmap));
2020-10-22 18:17:19 +08:00
if (pendingFrames.Count > max_pending_frames)
purgePendingFrames();
}
private void purgePendingFrames()
{
if (lastSend?.IsCompleted == false)
return;
var frames = pendingFrames.ToArray();
pendingFrames.Clear();
Debug.Assert(currentScore != null);
SendFrames(new FrameDataBundle(currentScore.ScoreInfo, frames));
2020-10-22 18:17:19 +08:00
lastSendTime = Time.Current;
}
}
}