mirror of
https://github.com/ppy/osu.git
synced 2025-01-12 15:22:55 +08:00
Merge pull request #26724 from bdach/decouple-notification-websocket-from-chat
Decouple notifications websocket handling from chat operations
This commit is contained in:
commit
48fc0545c2
@ -75,8 +75,6 @@ namespace osu.Game.Tests.Chat
|
|||||||
return false;
|
return false;
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
|
||||||
AddUntilStep("wait for notifications client", () => channelManager.NotificationsConnected);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
[Test]
|
[Test]
|
||||||
|
@ -21,7 +21,7 @@ using osu.Game.Configuration;
|
|||||||
using osu.Game.Localisation;
|
using osu.Game.Localisation;
|
||||||
using osu.Game.Online.API.Requests;
|
using osu.Game.Online.API.Requests;
|
||||||
using osu.Game.Online.API.Requests.Responses;
|
using osu.Game.Online.API.Requests.Responses;
|
||||||
using osu.Game.Online.Notifications;
|
using osu.Game.Online.Chat;
|
||||||
using osu.Game.Online.Notifications.WebSocket;
|
using osu.Game.Online.Notifications.WebSocket;
|
||||||
using osu.Game.Users;
|
using osu.Game.Users;
|
||||||
|
|
||||||
@ -55,6 +55,8 @@ namespace osu.Game.Online.API
|
|||||||
public IBindable<UserActivity> Activity => activity;
|
public IBindable<UserActivity> Activity => activity;
|
||||||
public IBindable<UserStatistics> Statistics => statistics;
|
public IBindable<UserStatistics> Statistics => statistics;
|
||||||
|
|
||||||
|
public INotificationsClient NotificationsClient { get; }
|
||||||
|
|
||||||
public Language Language => game.CurrentLanguage.Value;
|
public Language Language => game.CurrentLanguage.Value;
|
||||||
|
|
||||||
private Bindable<APIUser> localUser { get; } = new Bindable<APIUser>(createGuestUser());
|
private Bindable<APIUser> localUser { get; } = new Bindable<APIUser>(createGuestUser());
|
||||||
@ -82,6 +84,7 @@ namespace osu.Game.Online.API
|
|||||||
|
|
||||||
APIEndpointUrl = endpointConfiguration.APIEndpointUrl;
|
APIEndpointUrl = endpointConfiguration.APIEndpointUrl;
|
||||||
WebsiteRootUrl = endpointConfiguration.WebsiteRootUrl;
|
WebsiteRootUrl = endpointConfiguration.WebsiteRootUrl;
|
||||||
|
NotificationsClient = new WebSocketNotificationsClientConnector(this);
|
||||||
|
|
||||||
authentication = new OAuth(endpointConfiguration.APIClientID, endpointConfiguration.APIClientSecret, APIEndpointUrl);
|
authentication = new OAuth(endpointConfiguration.APIClientID, endpointConfiguration.APIClientSecret, APIEndpointUrl);
|
||||||
log = Logger.GetLogger(LoggingTarget.Network);
|
log = Logger.GetLogger(LoggingTarget.Network);
|
||||||
@ -324,8 +327,7 @@ namespace osu.Game.Online.API
|
|||||||
public IHubClientConnector GetHubConnector(string clientName, string endpoint, bool preferMessagePack) =>
|
public IHubClientConnector GetHubConnector(string clientName, string endpoint, bool preferMessagePack) =>
|
||||||
new HubClientConnector(clientName, endpoint, this, versionHash, preferMessagePack);
|
new HubClientConnector(clientName, endpoint, this, versionHash, preferMessagePack);
|
||||||
|
|
||||||
public NotificationsClientConnector GetNotificationsConnector() =>
|
public IChatClient GetChatClient() => new WebSocketChatClient(this);
|
||||||
new WebSocketNotificationsClientConnector(this);
|
|
||||||
|
|
||||||
public RegistrationRequest.RegistrationRequestErrors CreateAccount(string email, string username, string password)
|
public RegistrationRequest.RegistrationRequestErrors CreateAccount(string email, string username, string password)
|
||||||
{
|
{
|
||||||
|
@ -8,7 +8,8 @@ using osu.Framework.Bindables;
|
|||||||
using osu.Framework.Graphics;
|
using osu.Framework.Graphics;
|
||||||
using osu.Game.Localisation;
|
using osu.Game.Localisation;
|
||||||
using osu.Game.Online.API.Requests.Responses;
|
using osu.Game.Online.API.Requests.Responses;
|
||||||
using osu.Game.Online.Notifications;
|
using osu.Game.Online.Chat;
|
||||||
|
using osu.Game.Online.Notifications.WebSocket;
|
||||||
using osu.Game.Tests;
|
using osu.Game.Tests;
|
||||||
using osu.Game.Users;
|
using osu.Game.Users;
|
||||||
|
|
||||||
@ -30,6 +31,9 @@ namespace osu.Game.Online.API
|
|||||||
|
|
||||||
public Bindable<UserStatistics?> Statistics { get; } = new Bindable<UserStatistics?>();
|
public Bindable<UserStatistics?> Statistics { get; } = new Bindable<UserStatistics?>();
|
||||||
|
|
||||||
|
public DummyNotificationsClient NotificationsClient { get; } = new DummyNotificationsClient();
|
||||||
|
INotificationsClient IAPIProvider.NotificationsClient => NotificationsClient;
|
||||||
|
|
||||||
public Language Language => Language.en;
|
public Language Language => Language.en;
|
||||||
|
|
||||||
public string AccessToken => "token";
|
public string AccessToken => "token";
|
||||||
@ -144,7 +148,7 @@ namespace osu.Game.Online.API
|
|||||||
|
|
||||||
public IHubClientConnector? GetHubConnector(string clientName, string endpoint, bool preferMessagePack) => null;
|
public IHubClientConnector? GetHubConnector(string clientName, string endpoint, bool preferMessagePack) => null;
|
||||||
|
|
||||||
public NotificationsClientConnector GetNotificationsConnector() => new PollingNotificationsClientConnector(this);
|
public IChatClient GetChatClient() => new TestChatClientConnector(this);
|
||||||
|
|
||||||
public RegistrationRequest.RegistrationRequestErrors? CreateAccount(string email, string username, string password)
|
public RegistrationRequest.RegistrationRequestErrors? CreateAccount(string email, string username, string password)
|
||||||
{
|
{
|
||||||
|
@ -6,7 +6,8 @@ using System.Threading.Tasks;
|
|||||||
using osu.Framework.Bindables;
|
using osu.Framework.Bindables;
|
||||||
using osu.Game.Localisation;
|
using osu.Game.Localisation;
|
||||||
using osu.Game.Online.API.Requests.Responses;
|
using osu.Game.Online.API.Requests.Responses;
|
||||||
using osu.Game.Online.Notifications;
|
using osu.Game.Online.Chat;
|
||||||
|
using osu.Game.Online.Notifications.WebSocket;
|
||||||
using osu.Game.Users;
|
using osu.Game.Users;
|
||||||
|
|
||||||
namespace osu.Game.Online.API
|
namespace osu.Game.Online.API
|
||||||
@ -130,9 +131,14 @@ namespace osu.Game.Online.API
|
|||||||
IHubClientConnector? GetHubConnector(string clientName, string endpoint, bool preferMessagePack = true);
|
IHubClientConnector? GetHubConnector(string clientName, string endpoint, bool preferMessagePack = true);
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Constructs a new <see cref="NotificationsClientConnector"/>.
|
/// Accesses the <see cref="INotificationsClient"/> used to receive asynchronous notifications from web.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
NotificationsClientConnector GetNotificationsConnector();
|
INotificationsClient NotificationsClient { get; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Creates a <see cref="IChatClient"/> instance to use in order to chat.
|
||||||
|
/// </summary>
|
||||||
|
IChatClient GetChatClient();
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Create a new user account. This is a blocking operation.
|
/// Create a new user account. This is a blocking operation.
|
||||||
|
@ -16,7 +16,6 @@ using osu.Game.Database;
|
|||||||
using osu.Game.Online.API;
|
using osu.Game.Online.API;
|
||||||
using osu.Game.Online.API.Requests;
|
using osu.Game.Online.API.Requests;
|
||||||
using osu.Game.Online.API.Requests.Responses;
|
using osu.Game.Online.API.Requests.Responses;
|
||||||
using osu.Game.Online.Notifications;
|
|
||||||
using osu.Game.Overlays.Chat.Listing;
|
using osu.Game.Overlays.Chat.Listing;
|
||||||
|
|
||||||
namespace osu.Game.Online.Chat
|
namespace osu.Game.Online.Chat
|
||||||
@ -64,13 +63,8 @@ namespace osu.Game.Online.Chat
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public IBindableList<Channel> AvailableChannels => availableChannels;
|
public IBindableList<Channel> AvailableChannels => availableChannels;
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Whether the client responsible for channel notifications is connected.
|
|
||||||
/// </summary>
|
|
||||||
public bool NotificationsConnected => connector.IsConnected.Value;
|
|
||||||
|
|
||||||
private readonly IAPIProvider api;
|
private readonly IAPIProvider api;
|
||||||
private readonly NotificationsClientConnector connector;
|
private readonly IChatClient chatClient;
|
||||||
|
|
||||||
[Resolved]
|
[Resolved]
|
||||||
private UserLookupCache users { get; set; }
|
private UserLookupCache users { get; set; }
|
||||||
@ -85,7 +79,7 @@ namespace osu.Game.Online.Chat
|
|||||||
{
|
{
|
||||||
this.api = api;
|
this.api = api;
|
||||||
|
|
||||||
connector = api.GetNotificationsConnector();
|
chatClient = api.GetChatClient();
|
||||||
|
|
||||||
CurrentChannel.ValueChanged += currentChannelChanged;
|
CurrentChannel.ValueChanged += currentChannelChanged;
|
||||||
}
|
}
|
||||||
@ -93,15 +87,11 @@ namespace osu.Game.Online.Chat
|
|||||||
[BackgroundDependencyLoader]
|
[BackgroundDependencyLoader]
|
||||||
private void load()
|
private void load()
|
||||||
{
|
{
|
||||||
connector.ChannelJoined += ch => Schedule(() => joinChannel(ch));
|
chatClient.ChannelJoined += ch => Schedule(() => joinChannel(ch));
|
||||||
|
chatClient.ChannelParted += ch => Schedule(() => leaveChannel(getChannel(ch), false));
|
||||||
connector.ChannelParted += ch => Schedule(() => leaveChannel(getChannel(ch), false));
|
chatClient.NewMessages += msgs => Schedule(() => addMessages(msgs));
|
||||||
|
chatClient.PresenceReceived += () => Schedule(initializeChannels);
|
||||||
connector.NewMessages += msgs => Schedule(() => addMessages(msgs));
|
chatClient.RequestPresence();
|
||||||
|
|
||||||
connector.PresenceReceived += () => Schedule(initializeChannels);
|
|
||||||
|
|
||||||
connector.Start();
|
|
||||||
|
|
||||||
apiState.BindTo(api.State);
|
apiState.BindTo(api.State);
|
||||||
apiState.BindValueChanged(_ => SendAck(), true);
|
apiState.BindValueChanged(_ => SendAck(), true);
|
||||||
@ -655,7 +645,7 @@ namespace osu.Game.Online.Chat
|
|||||||
protected override void Dispose(bool isDisposing)
|
protected override void Dispose(bool isDisposing)
|
||||||
{
|
{
|
||||||
base.Dispose(isDisposing);
|
base.Dispose(isDisposing);
|
||||||
connector?.Dispose();
|
chatClient?.Dispose();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
39
osu.Game/Online/Chat/IChatClient.cs
Normal file
39
osu.Game/Online/Chat/IChatClient.cs
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||||
|
// See the LICENCE file in the repository root for full licence text.
|
||||||
|
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
|
namespace osu.Game.Online.Chat
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Interface for consuming online chat.
|
||||||
|
/// </summary>
|
||||||
|
public interface IChatClient : IDisposable
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Fired when a <see cref="Channel"/> has been joined.
|
||||||
|
/// </summary>
|
||||||
|
event Action<Channel>? ChannelJoined;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Fired when a <see cref="Channel"/> has been parted.
|
||||||
|
/// </summary>
|
||||||
|
event Action<Channel>? ChannelParted;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Fired when new <see cref="Message"/>s have arrived from the server.
|
||||||
|
/// </summary>
|
||||||
|
event Action<List<Message>>? NewMessages;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Requests presence information from the server.
|
||||||
|
/// </summary>
|
||||||
|
void RequestPresence();
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Fired when the initial user presence information has been received.
|
||||||
|
/// </summary>
|
||||||
|
event Action? PresenceReceived;
|
||||||
|
}
|
||||||
|
}
|
144
osu.Game/Online/Chat/WebSocketChatClient.cs
Normal file
144
osu.Game/Online/Chat/WebSocketChatClient.cs
Normal file
@ -0,0 +1,144 @@
|
|||||||
|
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||||
|
// See the LICENCE file in the repository root for full licence text.
|
||||||
|
|
||||||
|
using System;
|
||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Diagnostics;
|
||||||
|
using Newtonsoft.Json;
|
||||||
|
using osu.Framework.Bindables;
|
||||||
|
using osu.Framework.Extensions;
|
||||||
|
using osu.Framework.Logging;
|
||||||
|
using osu.Game.Online.API;
|
||||||
|
using osu.Game.Online.API.Requests;
|
||||||
|
using osu.Game.Online.Notifications.WebSocket;
|
||||||
|
|
||||||
|
namespace osu.Game.Online.Chat
|
||||||
|
{
|
||||||
|
public class WebSocketChatClient : IChatClient
|
||||||
|
{
|
||||||
|
public event Action<Channel>? ChannelJoined;
|
||||||
|
public event Action<Channel>? ChannelParted;
|
||||||
|
public event Action<List<Message>>? NewMessages;
|
||||||
|
public event Action? PresenceReceived;
|
||||||
|
|
||||||
|
private readonly IAPIProvider api;
|
||||||
|
private readonly INotificationsClient client;
|
||||||
|
private readonly ConcurrentDictionary<long, Channel> channelsMap = new ConcurrentDictionary<long, Channel>();
|
||||||
|
|
||||||
|
public WebSocketChatClient(IAPIProvider api)
|
||||||
|
{
|
||||||
|
this.api = api;
|
||||||
|
client = api.NotificationsClient;
|
||||||
|
client.IsConnected.BindValueChanged(start, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void start(ValueChangedEvent<bool> connected)
|
||||||
|
{
|
||||||
|
if (!connected.NewValue)
|
||||||
|
return;
|
||||||
|
|
||||||
|
client.MessageReceived += onMessageReceived;
|
||||||
|
client.SendAsync(new StartChatRequest()).WaitSafely();
|
||||||
|
RequestPresence();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void RequestPresence()
|
||||||
|
{
|
||||||
|
var fetchReq = new GetUpdatesRequest(0);
|
||||||
|
|
||||||
|
fetchReq.Success += updates =>
|
||||||
|
{
|
||||||
|
if (updates?.Presence != null)
|
||||||
|
{
|
||||||
|
foreach (var channel in updates.Presence)
|
||||||
|
joinChannel(channel);
|
||||||
|
|
||||||
|
handleMessages(updates.Messages);
|
||||||
|
}
|
||||||
|
|
||||||
|
PresenceReceived?.Invoke();
|
||||||
|
};
|
||||||
|
|
||||||
|
api.Queue(fetchReq);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onMessageReceived(SocketMessage message)
|
||||||
|
{
|
||||||
|
switch (message.Event)
|
||||||
|
{
|
||||||
|
case @"chat.channel.join":
|
||||||
|
Debug.Assert(message.Data != null);
|
||||||
|
|
||||||
|
Channel? joinedChannel = JsonConvert.DeserializeObject<Channel>(message.Data.ToString());
|
||||||
|
Debug.Assert(joinedChannel != null);
|
||||||
|
|
||||||
|
joinChannel(joinedChannel);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case @"chat.channel.part":
|
||||||
|
Debug.Assert(message.Data != null);
|
||||||
|
|
||||||
|
Channel? partedChannel = JsonConvert.DeserializeObject<Channel>(message.Data.ToString());
|
||||||
|
Debug.Assert(partedChannel != null);
|
||||||
|
|
||||||
|
partChannel(partedChannel);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case @"chat.message.new":
|
||||||
|
Debug.Assert(message.Data != null);
|
||||||
|
|
||||||
|
NewChatMessageData? messageData = JsonConvert.DeserializeObject<NewChatMessageData>(message.Data.ToString());
|
||||||
|
Debug.Assert(messageData != null);
|
||||||
|
|
||||||
|
foreach (var msg in messageData.Messages)
|
||||||
|
postToChannel(msg);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void postToChannel(Message message)
|
||||||
|
{
|
||||||
|
if (channelsMap.TryGetValue(message.ChannelId, out Channel? channel))
|
||||||
|
{
|
||||||
|
joinChannel(channel);
|
||||||
|
NewMessages?.Invoke(new List<Message> { message });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var req = new GetChannelRequest(message.ChannelId);
|
||||||
|
|
||||||
|
req.Success += response =>
|
||||||
|
{
|
||||||
|
joinChannel(channelsMap[message.ChannelId] = response.Channel);
|
||||||
|
NewMessages?.Invoke(new List<Message> { message });
|
||||||
|
};
|
||||||
|
req.Failure += ex => Logger.Error(ex, "Failed to join channel");
|
||||||
|
|
||||||
|
api.Queue(req);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void joinChannel(Channel ch)
|
||||||
|
{
|
||||||
|
ch.Joined.Value = true;
|
||||||
|
ChannelJoined?.Invoke(ch);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void partChannel(Channel channel) => ChannelParted?.Invoke(channel);
|
||||||
|
|
||||||
|
private void handleMessages(List<Message>? messages)
|
||||||
|
{
|
||||||
|
if (messages == null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
NewMessages?.Invoke(messages);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
client.IsConnected.ValueChanged -= start;
|
||||||
|
client.MessageReceived -= onMessageReceived;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,42 +0,0 @@
|
|||||||
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
|
||||||
// See the LICENCE file in the repository root for full licence text.
|
|
||||||
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Threading;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using osu.Game.Online.API;
|
|
||||||
using osu.Game.Online.Chat;
|
|
||||||
|
|
||||||
namespace osu.Game.Online.Notifications
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// An abstract connector or <see cref="NotificationsClient"/>s.
|
|
||||||
/// </summary>
|
|
||||||
public abstract class NotificationsClientConnector : PersistentEndpointClientConnector
|
|
||||||
{
|
|
||||||
public event Action<Channel>? ChannelJoined;
|
|
||||||
public event Action<Channel>? ChannelParted;
|
|
||||||
public event Action<List<Message>>? NewMessages;
|
|
||||||
public event Action? PresenceReceived;
|
|
||||||
|
|
||||||
protected NotificationsClientConnector(IAPIProvider api)
|
|
||||||
: base(api)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
protected sealed override async Task<PersistentEndpointClient> BuildConnectionAsync(CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
var client = await BuildNotificationClientAsync(cancellationToken).ConfigureAwait(false);
|
|
||||||
|
|
||||||
client.ChannelJoined = c => ChannelJoined?.Invoke(c);
|
|
||||||
client.ChannelParted = c => ChannelParted?.Invoke(c);
|
|
||||||
client.NewMessages = m => NewMessages?.Invoke(m);
|
|
||||||
client.PresenceReceived = () => PresenceReceived?.Invoke();
|
|
||||||
|
|
||||||
return client;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected abstract Task<NotificationsClient> BuildNotificationClientAsync(CancellationToken cancellationToken);
|
|
||||||
}
|
|
||||||
}
|
|
@ -0,0 +1,29 @@
|
|||||||
|
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||||
|
// See the LICENCE file in the repository root for full licence text.
|
||||||
|
|
||||||
|
using System;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using osu.Framework.Bindables;
|
||||||
|
|
||||||
|
namespace osu.Game.Online.Notifications.WebSocket
|
||||||
|
{
|
||||||
|
public class DummyNotificationsClient : INotificationsClient
|
||||||
|
{
|
||||||
|
public IBindable<bool> IsConnected => new BindableBool(true);
|
||||||
|
|
||||||
|
public event Action<SocketMessage>? MessageReceived;
|
||||||
|
|
||||||
|
public Func<SocketMessage, bool>? HandleMessage;
|
||||||
|
|
||||||
|
public Task SendAsync(SocketMessage message, CancellationToken? cancellationToken = default)
|
||||||
|
{
|
||||||
|
if (HandleMessage?.Invoke(message) != true)
|
||||||
|
throw new InvalidOperationException($@"{nameof(DummyNotificationsClient)} cannot process this message.");
|
||||||
|
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Receive(SocketMessage message) => MessageReceived?.Invoke(message);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,31 @@
|
|||||||
|
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||||
|
// See the LICENCE file in the repository root for full licence text.
|
||||||
|
|
||||||
|
using System;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using osu.Framework.Bindables;
|
||||||
|
|
||||||
|
namespace osu.Game.Online.Notifications.WebSocket
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// A client for asynchronous notifications sent by osu-web.
|
||||||
|
/// </summary>
|
||||||
|
public interface INotificationsClient
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Whether this <see cref="INotificationsClient"/> is currently connected to a server.
|
||||||
|
/// </summary>
|
||||||
|
IBindable<bool> IsConnected { get; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Invoked when a new <see cref="SocketMessage"/> arrives for this client.
|
||||||
|
/// </summary>
|
||||||
|
event Action<SocketMessage>? MessageReceived;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Sends a <see cref="SocketMessage"/> to the notification server.
|
||||||
|
/// </summary>
|
||||||
|
Task SendAsync(SocketMessage message, CancellationToken? cancellationToken = default);
|
||||||
|
}
|
||||||
|
}
|
@ -2,7 +2,6 @@
|
|||||||
// See the LICENCE file in the repository root for full licence text.
|
// See the LICENCE file in the repository root for full licence text.
|
||||||
|
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Concurrent;
|
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
using System.Net;
|
using System.Net;
|
||||||
using System.Net.WebSockets;
|
using System.Net.WebSockets;
|
||||||
@ -12,23 +11,20 @@ using System.Threading.Tasks;
|
|||||||
using Newtonsoft.Json;
|
using Newtonsoft.Json;
|
||||||
using osu.Framework.Extensions.TypeExtensions;
|
using osu.Framework.Extensions.TypeExtensions;
|
||||||
using osu.Framework.Logging;
|
using osu.Framework.Logging;
|
||||||
using osu.Game.Online.API;
|
|
||||||
using osu.Game.Online.API.Requests;
|
|
||||||
using osu.Game.Online.Chat;
|
|
||||||
|
|
||||||
namespace osu.Game.Online.Notifications.WebSocket
|
namespace osu.Game.Online.Notifications.WebSocket
|
||||||
{
|
{
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// A notifications client which receives events via a websocket.
|
/// A notifications client which receives events via a websocket.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public class WebSocketNotificationsClient : NotificationsClient
|
public class WebSocketNotificationsClient : PersistentEndpointClient
|
||||||
{
|
{
|
||||||
|
public event Action<SocketMessage>? MessageReceived;
|
||||||
|
|
||||||
private readonly ClientWebSocket socket;
|
private readonly ClientWebSocket socket;
|
||||||
private readonly string endpoint;
|
private readonly string endpoint;
|
||||||
private readonly ConcurrentDictionary<long, Channel> channelsMap = new ConcurrentDictionary<long, Channel>();
|
|
||||||
|
|
||||||
public WebSocketNotificationsClient(ClientWebSocket socket, string endpoint, IAPIProvider api)
|
public WebSocketNotificationsClient(ClientWebSocket socket, string endpoint)
|
||||||
: base(api)
|
|
||||||
{
|
{
|
||||||
this.socket = socket;
|
this.socket = socket;
|
||||||
this.endpoint = endpoint;
|
this.endpoint = endpoint;
|
||||||
@ -37,11 +33,7 @@ namespace osu.Game.Online.Notifications.WebSocket
|
|||||||
public override async Task ConnectAsync(CancellationToken cancellationToken)
|
public override async Task ConnectAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
await socket.ConnectAsync(new Uri(endpoint), cancellationToken).ConfigureAwait(false);
|
await socket.ConnectAsync(new Uri(endpoint), cancellationToken).ConfigureAwait(false);
|
||||||
await sendMessage(new StartChatRequest(), CancellationToken.None).ConfigureAwait(false);
|
|
||||||
|
|
||||||
runReadLoop(cancellationToken);
|
runReadLoop(cancellationToken);
|
||||||
|
|
||||||
await base.ConnectAsync(cancellationToken).ConfigureAwait(false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runReadLoop(CancellationToken cancellationToken) => Task.Run(async () =>
|
private void runReadLoop(CancellationToken cancellationToken) => Task.Run(async () =>
|
||||||
@ -73,7 +65,7 @@ namespace osu.Game.Online.Notifications.WebSocket
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
await onMessageReceivedAsync(message).ConfigureAwait(false);
|
MessageReceived?.Invoke(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
@ -105,69 +97,12 @@ namespace osu.Game.Online.Notifications.WebSocket
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task sendMessage(SocketMessage message, CancellationToken cancellationToken)
|
public async Task SendAsync(SocketMessage message, CancellationToken? cancellationToken = default)
|
||||||
{
|
{
|
||||||
if (socket.State != WebSocketState.Open)
|
if (socket.State != WebSocketState.Open)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
await socket.SendAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)), WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
|
await socket.SendAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)), WebSocketMessageType.Text, true, cancellationToken ?? CancellationToken.None).ConfigureAwait(false);
|
||||||
}
|
|
||||||
|
|
||||||
private async Task onMessageReceivedAsync(SocketMessage message)
|
|
||||||
{
|
|
||||||
switch (message.Event)
|
|
||||||
{
|
|
||||||
case @"chat.channel.join":
|
|
||||||
Debug.Assert(message.Data != null);
|
|
||||||
|
|
||||||
Channel? joinedChannel = JsonConvert.DeserializeObject<Channel>(message.Data.ToString());
|
|
||||||
Debug.Assert(joinedChannel != null);
|
|
||||||
|
|
||||||
HandleChannelJoined(joinedChannel);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case @"chat.channel.part":
|
|
||||||
Debug.Assert(message.Data != null);
|
|
||||||
|
|
||||||
Channel? partedChannel = JsonConvert.DeserializeObject<Channel>(message.Data.ToString());
|
|
||||||
Debug.Assert(partedChannel != null);
|
|
||||||
|
|
||||||
HandleChannelParted(partedChannel);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case @"chat.message.new":
|
|
||||||
Debug.Assert(message.Data != null);
|
|
||||||
|
|
||||||
NewChatMessageData? messageData = JsonConvert.DeserializeObject<NewChatMessageData>(message.Data.ToString());
|
|
||||||
Debug.Assert(messageData != null);
|
|
||||||
|
|
||||||
foreach (var msg in messageData.Messages)
|
|
||||||
HandleChannelJoined(await getChannel(msg.ChannelId).ConfigureAwait(false));
|
|
||||||
|
|
||||||
HandleMessages(messageData.Messages);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task<Channel> getChannel(long channelId)
|
|
||||||
{
|
|
||||||
if (channelsMap.TryGetValue(channelId, out Channel? channel))
|
|
||||||
return channel;
|
|
||||||
|
|
||||||
var tsc = new TaskCompletionSource<Channel>();
|
|
||||||
var req = new GetChannelRequest(channelId);
|
|
||||||
|
|
||||||
req.Success += response =>
|
|
||||||
{
|
|
||||||
channelsMap[channelId] = response.Channel;
|
|
||||||
tsc.SetResult(response.Channel);
|
|
||||||
};
|
|
||||||
|
|
||||||
req.Failure += ex => tsc.SetException(ex);
|
|
||||||
|
|
||||||
API.Queue(req);
|
|
||||||
|
|
||||||
return await tsc.Task.ConfigureAwait(false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public override async ValueTask DisposeAsync()
|
public override async ValueTask DisposeAsync()
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||||
// See the LICENCE file in the repository root for full licence text.
|
// See the LICENCE file in the repository root for full licence text.
|
||||||
|
|
||||||
|
using System;
|
||||||
using System.Net;
|
using System.Net;
|
||||||
using System.Net.WebSockets;
|
using System.Net.WebSockets;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
@ -13,17 +14,20 @@ namespace osu.Game.Online.Notifications.WebSocket
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// A connector for <see cref="WebSocketNotificationsClient"/>s that receive events via a websocket.
|
/// A connector for <see cref="WebSocketNotificationsClient"/>s that receive events via a websocket.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public class WebSocketNotificationsClientConnector : NotificationsClientConnector
|
public class WebSocketNotificationsClientConnector : PersistentEndpointClientConnector, INotificationsClient
|
||||||
{
|
{
|
||||||
|
public event Action<SocketMessage>? MessageReceived;
|
||||||
|
|
||||||
private readonly IAPIProvider api;
|
private readonly IAPIProvider api;
|
||||||
|
|
||||||
public WebSocketNotificationsClientConnector(IAPIProvider api)
|
public WebSocketNotificationsClientConnector(IAPIProvider api)
|
||||||
: base(api)
|
: base(api)
|
||||||
{
|
{
|
||||||
this.api = api;
|
this.api = api;
|
||||||
|
Start();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected override async Task<NotificationsClient> BuildNotificationClientAsync(CancellationToken cancellationToken)
|
protected override async Task<PersistentEndpointClient> BuildConnectionAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
var tcs = new TaskCompletionSource<string>();
|
var tcs = new TaskCompletionSource<string>();
|
||||||
|
|
||||||
@ -40,7 +44,17 @@ namespace osu.Game.Online.Notifications.WebSocket
|
|||||||
if (socket.Options.Proxy != null)
|
if (socket.Options.Proxy != null)
|
||||||
socket.Options.Proxy.Credentials = CredentialCache.DefaultCredentials;
|
socket.Options.Proxy.Credentials = CredentialCache.DefaultCredentials;
|
||||||
|
|
||||||
return new WebSocketNotificationsClient(socket, endpoint, api);
|
var client = new WebSocketNotificationsClient(socket, endpoint);
|
||||||
|
client.MessageReceived += msg => MessageReceived?.Invoke(msg);
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task SendAsync(SocketMessage message, CancellationToken? cancellationToken = default)
|
||||||
|
{
|
||||||
|
if (CurrentConnection is not WebSocketNotificationsClient webSocketClient)
|
||||||
|
return Task.CompletedTask;
|
||||||
|
|
||||||
|
return webSocketClient.SendAsync(message, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -6,34 +6,39 @@ using System.Collections.Generic;
|
|||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
|
using osu.Game.Online;
|
||||||
using osu.Game.Online.API;
|
using osu.Game.Online.API;
|
||||||
using osu.Game.Online.API.Requests;
|
using osu.Game.Online.API.Requests;
|
||||||
using osu.Game.Online.Chat;
|
using osu.Game.Online.Chat;
|
||||||
|
|
||||||
namespace osu.Game.Online.Notifications
|
namespace osu.Game.Tests
|
||||||
{
|
{
|
||||||
/// <summary>
|
public class PollingChatClient : PersistentEndpointClient
|
||||||
/// An abstract client which receives notification-related events (chat/notifications).
|
|
||||||
/// </summary>
|
|
||||||
public abstract class NotificationsClient : PersistentEndpointClient
|
|
||||||
{
|
{
|
||||||
public Action<Channel>? ChannelJoined;
|
public event Action<Channel>? ChannelJoined;
|
||||||
public Action<Channel>? ChannelParted;
|
public event Action<List<Message>>? NewMessages;
|
||||||
public Action<List<Message>>? NewMessages;
|
public event Action? PresenceReceived;
|
||||||
public Action? PresenceReceived;
|
|
||||||
|
|
||||||
protected readonly IAPIProvider API;
|
private readonly IAPIProvider api;
|
||||||
|
|
||||||
private long lastMessageId;
|
private long lastMessageId;
|
||||||
|
|
||||||
protected NotificationsClient(IAPIProvider api)
|
public PollingChatClient(IAPIProvider api)
|
||||||
{
|
{
|
||||||
API = api;
|
this.api = api;
|
||||||
}
|
}
|
||||||
|
|
||||||
public override Task ConnectAsync(CancellationToken cancellationToken)
|
public override Task ConnectAsync(CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
API.Queue(CreateInitialFetchRequest(0));
|
Task.Run(async () =>
|
||||||
|
{
|
||||||
|
while (!cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
await api.PerformAsync(CreateInitialFetchRequest()).ConfigureAwait(true);
|
||||||
|
await Task.Delay(1000, cancellationToken).ConfigureAwait(true);
|
||||||
|
}
|
||||||
|
}, cancellationToken);
|
||||||
|
|
||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -46,11 +51,11 @@ namespace osu.Game.Online.Notifications
|
|||||||
if (updates?.Presence != null)
|
if (updates?.Presence != null)
|
||||||
{
|
{
|
||||||
foreach (var channel in updates.Presence)
|
foreach (var channel in updates.Presence)
|
||||||
HandleChannelJoined(channel);
|
handleChannelJoined(channel);
|
||||||
|
|
||||||
//todo: handle left channels
|
//todo: handle left channels
|
||||||
|
|
||||||
HandleMessages(updates.Messages);
|
handleMessages(updates.Messages);
|
||||||
}
|
}
|
||||||
|
|
||||||
PresenceReceived?.Invoke();
|
PresenceReceived?.Invoke();
|
||||||
@ -59,15 +64,13 @@ namespace osu.Game.Online.Notifications
|
|||||||
return fetchReq;
|
return fetchReq;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void HandleChannelJoined(Channel channel)
|
private void handleChannelJoined(Channel channel)
|
||||||
{
|
{
|
||||||
channel.Joined.Value = true;
|
channel.Joined.Value = true;
|
||||||
ChannelJoined?.Invoke(channel);
|
ChannelJoined?.Invoke(channel);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void HandleChannelParted(Channel channel) => ChannelParted?.Invoke(channel);
|
private void handleMessages(List<Message>? messages)
|
||||||
|
|
||||||
protected void HandleMessages(List<Message>? messages)
|
|
||||||
{
|
{
|
||||||
if (messages == null)
|
if (messages == null)
|
||||||
return;
|
return;
|
@ -1,35 +0,0 @@
|
|||||||
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
|
||||||
// See the LICENCE file in the repository root for full licence text.
|
|
||||||
|
|
||||||
using System.Threading;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using osu.Game.Online.API;
|
|
||||||
using osu.Game.Online.Notifications;
|
|
||||||
|
|
||||||
namespace osu.Game.Tests
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// A notifications client which polls for new messages every second.
|
|
||||||
/// </summary>
|
|
||||||
public class PollingNotificationsClient : NotificationsClient
|
|
||||||
{
|
|
||||||
public PollingNotificationsClient(IAPIProvider api)
|
|
||||||
: base(api)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
public override Task ConnectAsync(CancellationToken cancellationToken)
|
|
||||||
{
|
|
||||||
Task.Run(async () =>
|
|
||||||
{
|
|
||||||
while (!cancellationToken.IsCancellationRequested)
|
|
||||||
{
|
|
||||||
await API.PerformAsync(CreateInitialFetchRequest()).ConfigureAwait(true);
|
|
||||||
await Task.Delay(1000, cancellationToken).ConfigureAwait(true);
|
|
||||||
}
|
|
||||||
}, cancellationToken);
|
|
||||||
|
|
||||||
return Task.CompletedTask;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,24 +0,0 @@
|
|||||||
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
|
||||||
// See the LICENCE file in the repository root for full licence text.
|
|
||||||
|
|
||||||
using System.Threading;
|
|
||||||
using System.Threading.Tasks;
|
|
||||||
using osu.Game.Online.API;
|
|
||||||
using osu.Game.Online.Notifications;
|
|
||||||
|
|
||||||
namespace osu.Game.Tests
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// A connector for <see cref="PollingNotificationsClient"/>s that poll for new messages.
|
|
||||||
/// </summary>
|
|
||||||
public class PollingNotificationsClientConnector : NotificationsClientConnector
|
|
||||||
{
|
|
||||||
public PollingNotificationsClientConnector(IAPIProvider api)
|
|
||||||
: base(api)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
protected override Task<NotificationsClient> BuildNotificationClientAsync(CancellationToken cancellationToken)
|
|
||||||
=> Task.FromResult((NotificationsClient)new PollingNotificationsClient(API));
|
|
||||||
}
|
|
||||||
}
|
|
49
osu.Game/Tests/TestChatClientConnector.cs
Normal file
49
osu.Game/Tests/TestChatClientConnector.cs
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||||
|
// See the LICENCE file in the repository root for full licence text.
|
||||||
|
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using osu.Game.Online;
|
||||||
|
using osu.Game.Online.API;
|
||||||
|
using osu.Game.Online.Chat;
|
||||||
|
|
||||||
|
namespace osu.Game.Tests
|
||||||
|
{
|
||||||
|
public class TestChatClientConnector : PersistentEndpointClientConnector, IChatClient
|
||||||
|
{
|
||||||
|
public event Action<Channel>? ChannelJoined;
|
||||||
|
|
||||||
|
public event Action<Channel>? ChannelParted
|
||||||
|
{
|
||||||
|
add { }
|
||||||
|
remove { }
|
||||||
|
}
|
||||||
|
|
||||||
|
public event Action<List<Message>>? NewMessages;
|
||||||
|
public event Action? PresenceReceived;
|
||||||
|
|
||||||
|
public void RequestPresence()
|
||||||
|
{
|
||||||
|
// don't really need to do anything special if we poll every second anyway.
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestChatClientConnector(IAPIProvider api)
|
||||||
|
: base(api)
|
||||||
|
{
|
||||||
|
Start();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected sealed override Task<PersistentEndpointClient> BuildConnectionAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
var client = new PollingChatClient(API);
|
||||||
|
|
||||||
|
client.ChannelJoined += c => ChannelJoined?.Invoke(c);
|
||||||
|
client.NewMessages += m => NewMessages?.Invoke(m);
|
||||||
|
client.PresenceReceived += () => PresenceReceived?.Invoke();
|
||||||
|
|
||||||
|
return Task.FromResult<PersistentEndpointClient>(client);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user