// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. // See the LICENCE file in the repository root for full licence text. using System; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.SignalR.Client; using osu.Framework.Allocation; using osu.Framework.Bindables; using osu.Framework.Logging; using osu.Game.Configuration; using osu.Game.Online.API; using osu.Game.Online.API.Requests.Responses; using osu.Game.Users; namespace osu.Game.Online.Metadata { public partial class OnlineMetadataClient : MetadataClient { public override IBindable IsConnected { get; } = new Bindable(); public override IBindable IsWatchingUserPresence => isWatchingUserPresence; private readonly BindableBool isWatchingUserPresence = new BindableBool(); // ReSharper disable once InconsistentlySynchronizedField public override IBindableDictionary UserStates => userStates; private readonly BindableDictionary userStates = new BindableDictionary(); private readonly string endpoint; private IHubClientConnector? connector; private Bindable lastQueueId = null!; private IBindable localUser = null!; private IBindable userActivity = null!; private IBindable? userStatus; private HubConnection? connection => connector?.CurrentConnection; public OnlineMetadataClient(EndpointConfiguration endpoints) { endpoint = endpoints.MetadataEndpointUrl; } [BackgroundDependencyLoader] private void load(IAPIProvider api, OsuConfigManager config) { // Importantly, we are intentionally not using MessagePack here to correctly support derived class serialization. // More information on the limitations / reasoning can be found in osu-server-spectator's initialisation code. connector = api.GetHubConnector(nameof(OnlineMetadataClient), endpoint, false); if (connector != null) { connector.ConfigureConnection = connection => { // this is kind of SILLY // https://github.com/dotnet/aspnetcore/issues/15198 connection.On(nameof(IMetadataClient.BeatmapSetsUpdated), ((IMetadataClient)this).BeatmapSetsUpdated); connection.On(nameof(IMetadataClient.UserPresenceUpdated), ((IMetadataClient)this).UserPresenceUpdated); }; IsConnected.BindTo(connector.IsConnected); IsConnected.BindValueChanged(isConnectedChanged, true); } lastQueueId = config.GetBindable(OsuSetting.LastProcessedMetadataId); localUser = api.LocalUser.GetBoundCopy(); userActivity = api.Activity.GetBoundCopy()!; } protected override void LoadComplete() { base.LoadComplete(); localUser.BindValueChanged(_ => { if (localUser.Value is not GuestUser) { userStatus = localUser.Value.Status.GetBoundCopy(); userStatus.BindValueChanged(status => UpdateStatus(status.NewValue), true); } else userStatus = null; }, true); userActivity.BindValueChanged(activity => { if (localUser.Value is not GuestUser) UpdateActivity(activity.NewValue); }, true); } private bool catchingUp; private void isConnectedChanged(ValueChangedEvent connected) { if (!connected.NewValue) { isWatchingUserPresence.Value = false; userStates.Clear(); return; } if (localUser.Value is not GuestUser) { UpdateActivity(userActivity.Value); UpdateStatus(userStatus?.Value); } if (lastQueueId.Value >= 0) { catchingUp = true; Task.Run(async () => { try { while (true) { Logger.Log($"Requesting catch-up from {lastQueueId.Value}"); var catchUpChanges = await GetChangesSince(lastQueueId.Value).ConfigureAwait(true); lastQueueId.Value = catchUpChanges.LastProcessedQueueID; if (catchUpChanges.BeatmapSetIDs.Length == 0) { Logger.Log($"Catch-up complete at {lastQueueId.Value}"); break; } await ProcessChanges(catchUpChanges.BeatmapSetIDs).ConfigureAwait(true); } } catch (Exception e) { Logger.Log($"Error while processing catch-up of metadata ({e.Message})"); } finally { catchingUp = false; } }); } } public override async Task BeatmapSetsUpdated(BeatmapUpdates updates) { Logger.Log($"Received beatmap updates {updates.BeatmapSetIDs.Length} updates with last id {updates.LastProcessedQueueID}"); // If we're still catching up, avoid updating the last ID as it will interfere with catch-up efforts. if (!catchingUp) lastQueueId.Value = updates.LastProcessedQueueID; await ProcessChanges(updates.BeatmapSetIDs).ConfigureAwait(false); } public override Task GetChangesSince(int queueId) { if (connector?.IsConnected.Value != true) return Task.FromCanceled(default); Logger.Log($"Requesting any changes since last known queue id {queueId}"); Debug.Assert(connection != null); return connection.InvokeAsync(nameof(IMetadataServer.GetChangesSince), queueId); } public override Task UpdateActivity(UserActivity? activity) { if (connector?.IsConnected.Value != true) return Task.FromCanceled(new CancellationToken(true)); Debug.Assert(connection != null); return connection.InvokeAsync(nameof(IMetadataServer.UpdateActivity), activity); } public override Task UpdateStatus(UserStatus? status) { if (connector?.IsConnected.Value != true) return Task.FromCanceled(new CancellationToken(true)); Debug.Assert(connection != null); return connection.InvokeAsync(nameof(IMetadataServer.UpdateStatus), status); } public override Task UserPresenceUpdated(int userId, UserPresence? presence) { lock (userStates) { if (presence != null) userStates[userId] = presence.Value; else userStates.Remove(userId); } return Task.CompletedTask; } public override async Task BeginWatchingUserPresence() { if (connector?.IsConnected.Value != true) throw new OperationCanceledException(); Debug.Assert(connection != null); await connection.InvokeAsync(nameof(IMetadataServer.BeginWatchingUserPresence)).ConfigureAwait(false); isWatchingUserPresence.Value = true; } public override async Task EndWatchingUserPresence() { try { if (connector?.IsConnected.Value != true) throw new OperationCanceledException(); // must happen synchronously before any remote calls to avoid misordering. userStates.Clear(); Debug.Assert(connection != null); await connection.InvokeAsync(nameof(IMetadataServer.EndWatchingUserPresence)).ConfigureAwait(false); } finally { isWatchingUserPresence.Value = false; } } public override async Task DisconnectRequested() { await base.DisconnectRequested().ConfigureAwait(false); await EndWatchingUserPresence().ConfigureAwait(false); } protected override void Dispose(bool isDisposing) { base.Dispose(isDisposing); connector?.Dispose(); } } }