1
0
mirror of https://github.com/ppy/osu.git synced 2025-01-08 06:52:59 +08:00

Decouple notifications websocket handling from chat operations

This is a prerequisite for https://github.com/ppy/osu/pull/25480.

The `WebSocketNotificationsClient` was tightly coupled to chat specifics
making it difficult to use in the second factor verification flow.
This commit's goal is to separate the websocket connection and message
handling concerns from specific chat logic concerns.
This commit is contained in:
Bartłomiej Dach 2024-01-25 12:25:27 +01:00
parent baaf33d995
commit de52f0a80c
No known key found for this signature in database
16 changed files with 330 additions and 225 deletions

View File

@ -75,8 +75,6 @@ namespace osu.Game.Tests.Chat
return false; return false;
}; };
}); });
AddUntilStep("wait for notifications client", () => channelManager.NotificationsConnected);
} }
[Test] [Test]

View File

@ -21,7 +21,7 @@ using osu.Game.Configuration;
using osu.Game.Localisation; using osu.Game.Localisation;
using osu.Game.Online.API.Requests; using osu.Game.Online.API.Requests;
using osu.Game.Online.API.Requests.Responses; using osu.Game.Online.API.Requests.Responses;
using osu.Game.Online.Notifications; using osu.Game.Online.Chat;
using osu.Game.Online.Notifications.WebSocket; using osu.Game.Online.Notifications.WebSocket;
using osu.Game.Users; using osu.Game.Users;
@ -55,6 +55,8 @@ namespace osu.Game.Online.API
public IBindable<UserActivity> Activity => activity; public IBindable<UserActivity> Activity => activity;
public IBindable<UserStatistics> Statistics => statistics; public IBindable<UserStatistics> Statistics => statistics;
public INotificationsClient NotificationsClient { get; }
public Language Language => game.CurrentLanguage.Value; public Language Language => game.CurrentLanguage.Value;
private Bindable<APIUser> localUser { get; } = new Bindable<APIUser>(createGuestUser()); private Bindable<APIUser> localUser { get; } = new Bindable<APIUser>(createGuestUser());
@ -82,6 +84,7 @@ namespace osu.Game.Online.API
APIEndpointUrl = endpointConfiguration.APIEndpointUrl; APIEndpointUrl = endpointConfiguration.APIEndpointUrl;
WebsiteRootUrl = endpointConfiguration.WebsiteRootUrl; WebsiteRootUrl = endpointConfiguration.WebsiteRootUrl;
NotificationsClient = new WebSocketNotificationsClientConnector(this);
authentication = new OAuth(endpointConfiguration.APIClientID, endpointConfiguration.APIClientSecret, APIEndpointUrl); authentication = new OAuth(endpointConfiguration.APIClientID, endpointConfiguration.APIClientSecret, APIEndpointUrl);
log = Logger.GetLogger(LoggingTarget.Network); log = Logger.GetLogger(LoggingTarget.Network);
@ -324,8 +327,7 @@ namespace osu.Game.Online.API
public IHubClientConnector GetHubConnector(string clientName, string endpoint, bool preferMessagePack) => public IHubClientConnector GetHubConnector(string clientName, string endpoint, bool preferMessagePack) =>
new HubClientConnector(clientName, endpoint, this, versionHash, preferMessagePack); new HubClientConnector(clientName, endpoint, this, versionHash, preferMessagePack);
public NotificationsClientConnector GetNotificationsConnector() => public IChatClient GetChatClient() => new WebSocketChatClient(this);
new WebSocketNotificationsClientConnector(this);
public RegistrationRequest.RegistrationRequestErrors CreateAccount(string email, string username, string password) public RegistrationRequest.RegistrationRequestErrors CreateAccount(string email, string username, string password)
{ {

View File

@ -8,7 +8,8 @@ using osu.Framework.Bindables;
using osu.Framework.Graphics; using osu.Framework.Graphics;
using osu.Game.Localisation; using osu.Game.Localisation;
using osu.Game.Online.API.Requests.Responses; using osu.Game.Online.API.Requests.Responses;
using osu.Game.Online.Notifications; using osu.Game.Online.Chat;
using osu.Game.Online.Notifications.WebSocket;
using osu.Game.Tests; using osu.Game.Tests;
using osu.Game.Users; using osu.Game.Users;
@ -30,6 +31,9 @@ namespace osu.Game.Online.API
public Bindable<UserStatistics?> Statistics { get; } = new Bindable<UserStatistics?>(); public Bindable<UserStatistics?> Statistics { get; } = new Bindable<UserStatistics?>();
public DummyNotificationsClient NotificationsClient { get; } = new DummyNotificationsClient();
INotificationsClient IAPIProvider.NotificationsClient => NotificationsClient;
public Language Language => Language.en; public Language Language => Language.en;
public string AccessToken => "token"; public string AccessToken => "token";
@ -144,7 +148,7 @@ namespace osu.Game.Online.API
public IHubClientConnector? GetHubConnector(string clientName, string endpoint, bool preferMessagePack) => null; public IHubClientConnector? GetHubConnector(string clientName, string endpoint, bool preferMessagePack) => null;
public NotificationsClientConnector GetNotificationsConnector() => new PollingNotificationsClientConnector(this); public IChatClient GetChatClient() => new PollingChatClientConnector(this);
public RegistrationRequest.RegistrationRequestErrors? CreateAccount(string email, string username, string password) public RegistrationRequest.RegistrationRequestErrors? CreateAccount(string email, string username, string password)
{ {

View File

@ -6,7 +6,8 @@ using System.Threading.Tasks;
using osu.Framework.Bindables; using osu.Framework.Bindables;
using osu.Game.Localisation; using osu.Game.Localisation;
using osu.Game.Online.API.Requests.Responses; using osu.Game.Online.API.Requests.Responses;
using osu.Game.Online.Notifications; using osu.Game.Online.Chat;
using osu.Game.Online.Notifications.WebSocket;
using osu.Game.Users; using osu.Game.Users;
namespace osu.Game.Online.API namespace osu.Game.Online.API
@ -129,10 +130,9 @@ namespace osu.Game.Online.API
/// <param name="preferMessagePack">Whether to use MessagePack for serialisation if available on this platform.</param> /// <param name="preferMessagePack">Whether to use MessagePack for serialisation if available on this platform.</param>
IHubClientConnector? GetHubConnector(string clientName, string endpoint, bool preferMessagePack = true); IHubClientConnector? GetHubConnector(string clientName, string endpoint, bool preferMessagePack = true);
/// <summary> INotificationsClient NotificationsClient { get; }
/// Constructs a new <see cref="NotificationsClientConnector"/>.
/// </summary> IChatClient GetChatClient();
NotificationsClientConnector GetNotificationsConnector();
/// <summary> /// <summary>
/// Create a new user account. This is a blocking operation. /// Create a new user account. This is a blocking operation.

View File

@ -16,7 +16,6 @@ using osu.Game.Database;
using osu.Game.Online.API; using osu.Game.Online.API;
using osu.Game.Online.API.Requests; using osu.Game.Online.API.Requests;
using osu.Game.Online.API.Requests.Responses; using osu.Game.Online.API.Requests.Responses;
using osu.Game.Online.Notifications;
using osu.Game.Overlays.Chat.Listing; using osu.Game.Overlays.Chat.Listing;
namespace osu.Game.Online.Chat namespace osu.Game.Online.Chat
@ -64,13 +63,8 @@ namespace osu.Game.Online.Chat
/// </summary> /// </summary>
public IBindableList<Channel> AvailableChannels => availableChannels; public IBindableList<Channel> AvailableChannels => availableChannels;
/// <summary>
/// Whether the client responsible for channel notifications is connected.
/// </summary>
public bool NotificationsConnected => connector.IsConnected.Value;
private readonly IAPIProvider api; private readonly IAPIProvider api;
private readonly NotificationsClientConnector connector; private readonly IChatClient chatClient;
[Resolved] [Resolved]
private UserLookupCache users { get; set; } private UserLookupCache users { get; set; }
@ -85,7 +79,7 @@ namespace osu.Game.Online.Chat
{ {
this.api = api; this.api = api;
connector = api.GetNotificationsConnector(); chatClient = api.GetChatClient();
CurrentChannel.ValueChanged += currentChannelChanged; CurrentChannel.ValueChanged += currentChannelChanged;
} }
@ -93,15 +87,11 @@ namespace osu.Game.Online.Chat
[BackgroundDependencyLoader] [BackgroundDependencyLoader]
private void load() private void load()
{ {
connector.ChannelJoined += ch => Schedule(() => joinChannel(ch)); chatClient.ChannelJoined += ch => Schedule(() => joinChannel(ch));
chatClient.ChannelParted += ch => Schedule(() => leaveChannel(getChannel(ch), false));
connector.ChannelParted += ch => Schedule(() => leaveChannel(getChannel(ch), false)); chatClient.NewMessages += msgs => Schedule(() => addMessages(msgs));
chatClient.PresenceReceived += () => Schedule(initializeChannels);
connector.NewMessages += msgs => Schedule(() => addMessages(msgs)); chatClient.FetchInitialMessages();
connector.PresenceReceived += () => Schedule(initializeChannels);
connector.Start();
apiState.BindTo(api.State); apiState.BindTo(api.State);
apiState.BindValueChanged(_ => SendAck(), true); apiState.BindValueChanged(_ => SendAck(), true);
@ -655,7 +645,7 @@ namespace osu.Game.Online.Chat
protected override void Dispose(bool isDisposing) protected override void Dispose(bool isDisposing)
{ {
base.Dispose(isDisposing); base.Dispose(isDisposing);
connector?.Dispose(); chatClient?.Dispose();
} }
} }

View File

@ -0,0 +1,18 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System;
using System.Collections.Generic;
namespace osu.Game.Online.Chat
{
public interface IChatClient : IDisposable
{
event Action<Channel>? ChannelJoined;
event Action<Channel>? ChannelParted;
event Action<List<Message>>? NewMessages;
event Action? PresenceReceived;
void FetchInitialMessages();
}
}

View File

@ -0,0 +1,148 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using Newtonsoft.Json;
using osu.Framework.Bindables;
using osu.Framework.Extensions;
using osu.Framework.Logging;
using osu.Game.Online.API;
using osu.Game.Online.API.Requests;
using osu.Game.Online.Notifications.WebSocket;
namespace osu.Game.Online.Chat
{
public class WebSocketChatClient : IChatClient
{
public event Action<Channel>? ChannelJoined;
public event Action<Channel>? ChannelParted;
public event Action<List<Message>>? NewMessages;
public event Action? PresenceReceived;
private readonly IAPIProvider api;
private readonly INotificationsClient client;
private readonly ConcurrentDictionary<long, Channel> channelsMap = new ConcurrentDictionary<long, Channel>();
public WebSocketChatClient(IAPIProvider api)
{
this.api = api;
client = api.NotificationsClient;
client.IsConnected.BindValueChanged(start, true);
}
private void start(ValueChangedEvent<bool> connected)
{
if (!connected.NewValue)
return;
client.MessageReceived += onMessageReceived;
client.SendAsync(new StartChatRequest()).WaitSafely();
}
public void FetchInitialMessages()
{
api.Queue(createInitialFetchRequest());
}
private APIRequest createInitialFetchRequest()
{
var fetchReq = new GetUpdatesRequest(0);
fetchReq.Success += updates =>
{
if (updates?.Presence != null)
{
foreach (var channel in updates.Presence)
joinChannel(channel);
handleMessages(updates.Messages);
}
PresenceReceived?.Invoke();
};
return fetchReq;
}
private void onMessageReceived(SocketMessage message)
{
switch (message.Event)
{
case @"chat.channel.join":
Debug.Assert(message.Data != null);
Channel? joinedChannel = JsonConvert.DeserializeObject<Channel>(message.Data.ToString());
Debug.Assert(joinedChannel != null);
joinChannel(joinedChannel);
break;
case @"chat.channel.part":
Debug.Assert(message.Data != null);
Channel? partedChannel = JsonConvert.DeserializeObject<Channel>(message.Data.ToString());
Debug.Assert(partedChannel != null);
partChannel(partedChannel);
break;
case @"chat.message.new":
Debug.Assert(message.Data != null);
NewChatMessageData? messageData = JsonConvert.DeserializeObject<NewChatMessageData>(message.Data.ToString());
Debug.Assert(messageData != null);
foreach (var msg in messageData.Messages)
postToChannel(msg);
break;
}
}
private void postToChannel(Message message)
{
if (channelsMap.TryGetValue(message.ChannelId, out Channel? channel))
{
joinChannel(channel);
NewMessages?.Invoke(new List<Message> { message });
return;
}
var req = new GetChannelRequest(message.ChannelId);
req.Success += response =>
{
joinChannel(channelsMap[message.ChannelId] = response.Channel);
NewMessages?.Invoke(new List<Message> { message });
};
req.Failure += ex => Logger.Error(ex, "Failed to join channel");
api.Queue(req);
}
private void joinChannel(Channel ch)
{
ch.Joined.Value = true;
ChannelJoined?.Invoke(ch);
}
private void partChannel(Channel channel) => ChannelParted?.Invoke(channel);
private void handleMessages(List<Message>? messages)
{
if (messages == null)
return;
NewMessages?.Invoke(messages);
}
public void Dispose()
{
client.IsConnected.ValueChanged -= start;
client.MessageReceived -= onMessageReceived;
}
}
}

View File

@ -1,42 +0,0 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using osu.Game.Online.API;
using osu.Game.Online.Chat;
namespace osu.Game.Online.Notifications
{
/// <summary>
/// An abstract connector or <see cref="NotificationsClient"/>s.
/// </summary>
public abstract class NotificationsClientConnector : PersistentEndpointClientConnector
{
public event Action<Channel>? ChannelJoined;
public event Action<Channel>? ChannelParted;
public event Action<List<Message>>? NewMessages;
public event Action? PresenceReceived;
protected NotificationsClientConnector(IAPIProvider api)
: base(api)
{
}
protected sealed override async Task<PersistentEndpointClient> BuildConnectionAsync(CancellationToken cancellationToken)
{
var client = await BuildNotificationClientAsync(cancellationToken).ConfigureAwait(false);
client.ChannelJoined = c => ChannelJoined?.Invoke(c);
client.ChannelParted = c => ChannelParted?.Invoke(c);
client.NewMessages = m => NewMessages?.Invoke(m);
client.PresenceReceived = () => PresenceReceived?.Invoke();
return client;
}
protected abstract Task<NotificationsClient> BuildNotificationClientAsync(CancellationToken cancellationToken);
}
}

View File

@ -0,0 +1,29 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System;
using System.Threading;
using System.Threading.Tasks;
using osu.Framework.Bindables;
namespace osu.Game.Online.Notifications.WebSocket
{
public class DummyNotificationsClient : INotificationsClient
{
public IBindable<bool> IsConnected => new BindableBool(true);
public event Action<SocketMessage>? MessageReceived;
public Func<SocketMessage, bool>? HandleMessage;
public Task SendAsync(SocketMessage message, CancellationToken? cancellationToken = default)
{
if (HandleMessage?.Invoke(message) != true)
throw new InvalidOperationException($@"{nameof(DummyNotificationsClient)} cannot process this message.");
return Task.CompletedTask;
}
public void Receive(SocketMessage message) => MessageReceived?.Invoke(message);
}
}

View File

@ -0,0 +1,17 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System;
using System.Threading;
using System.Threading.Tasks;
using osu.Framework.Bindables;
namespace osu.Game.Online.Notifications.WebSocket
{
public interface INotificationsClient
{
IBindable<bool> IsConnected { get; }
event Action<SocketMessage>? MessageReceived;
Task SendAsync(SocketMessage message, CancellationToken? cancellationToken = default);
}
}

View File

@ -2,7 +2,6 @@
// See the LICENCE file in the repository root for full licence text. // See the LICENCE file in the repository root for full licence text.
using System; using System;
using System.Collections.Concurrent;
using System.Diagnostics; using System.Diagnostics;
using System.Net; using System.Net;
using System.Net.WebSockets; using System.Net.WebSockets;
@ -12,23 +11,20 @@ using System.Threading.Tasks;
using Newtonsoft.Json; using Newtonsoft.Json;
using osu.Framework.Extensions.TypeExtensions; using osu.Framework.Extensions.TypeExtensions;
using osu.Framework.Logging; using osu.Framework.Logging;
using osu.Game.Online.API;
using osu.Game.Online.API.Requests;
using osu.Game.Online.Chat;
namespace osu.Game.Online.Notifications.WebSocket namespace osu.Game.Online.Notifications.WebSocket
{ {
/// <summary> /// <summary>
/// A notifications client which receives events via a websocket. /// A notifications client which receives events via a websocket.
/// </summary> /// </summary>
public class WebSocketNotificationsClient : NotificationsClient public class WebSocketNotificationsClient : PersistentEndpointClient
{ {
public event Action<SocketMessage>? MessageReceived;
private readonly ClientWebSocket socket; private readonly ClientWebSocket socket;
private readonly string endpoint; private readonly string endpoint;
private readonly ConcurrentDictionary<long, Channel> channelsMap = new ConcurrentDictionary<long, Channel>();
public WebSocketNotificationsClient(ClientWebSocket socket, string endpoint, IAPIProvider api) public WebSocketNotificationsClient(ClientWebSocket socket, string endpoint)
: base(api)
{ {
this.socket = socket; this.socket = socket;
this.endpoint = endpoint; this.endpoint = endpoint;
@ -37,11 +33,7 @@ namespace osu.Game.Online.Notifications.WebSocket
public override async Task ConnectAsync(CancellationToken cancellationToken) public override async Task ConnectAsync(CancellationToken cancellationToken)
{ {
await socket.ConnectAsync(new Uri(endpoint), cancellationToken).ConfigureAwait(false); await socket.ConnectAsync(new Uri(endpoint), cancellationToken).ConfigureAwait(false);
await sendMessage(new StartChatRequest(), CancellationToken.None).ConfigureAwait(false);
runReadLoop(cancellationToken); runReadLoop(cancellationToken);
await base.ConnectAsync(cancellationToken).ConfigureAwait(false);
} }
private void runReadLoop(CancellationToken cancellationToken) => Task.Run(async () => private void runReadLoop(CancellationToken cancellationToken) => Task.Run(async () =>
@ -73,7 +65,7 @@ namespace osu.Game.Online.Notifications.WebSocket
break; break;
} }
await onMessageReceivedAsync(message).ConfigureAwait(false); MessageReceived?.Invoke(message);
} }
break; break;
@ -105,69 +97,12 @@ namespace osu.Game.Online.Notifications.WebSocket
} }
} }
private async Task sendMessage(SocketMessage message, CancellationToken cancellationToken) public async Task SendAsync(SocketMessage message, CancellationToken? cancellationToken = default)
{ {
if (socket.State != WebSocketState.Open) if (socket.State != WebSocketState.Open)
return; return;
await socket.SendAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)), WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false); await socket.SendAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)), WebSocketMessageType.Text, true, cancellationToken ?? CancellationToken.None).ConfigureAwait(false);
}
private async Task onMessageReceivedAsync(SocketMessage message)
{
switch (message.Event)
{
case @"chat.channel.join":
Debug.Assert(message.Data != null);
Channel? joinedChannel = JsonConvert.DeserializeObject<Channel>(message.Data.ToString());
Debug.Assert(joinedChannel != null);
HandleChannelJoined(joinedChannel);
break;
case @"chat.channel.part":
Debug.Assert(message.Data != null);
Channel? partedChannel = JsonConvert.DeserializeObject<Channel>(message.Data.ToString());
Debug.Assert(partedChannel != null);
HandleChannelParted(partedChannel);
break;
case @"chat.message.new":
Debug.Assert(message.Data != null);
NewChatMessageData? messageData = JsonConvert.DeserializeObject<NewChatMessageData>(message.Data.ToString());
Debug.Assert(messageData != null);
foreach (var msg in messageData.Messages)
HandleChannelJoined(await getChannel(msg.ChannelId).ConfigureAwait(false));
HandleMessages(messageData.Messages);
break;
}
}
private async Task<Channel> getChannel(long channelId)
{
if (channelsMap.TryGetValue(channelId, out Channel? channel))
return channel;
var tsc = new TaskCompletionSource<Channel>();
var req = new GetChannelRequest(channelId);
req.Success += response =>
{
channelsMap[channelId] = response.Channel;
tsc.SetResult(response.Channel);
};
req.Failure += ex => tsc.SetException(ex);
API.Queue(req);
return await tsc.Task.ConfigureAwait(false);
} }
public override async ValueTask DisposeAsync() public override async ValueTask DisposeAsync()

View File

@ -1,6 +1,7 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence. // Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text. // See the LICENCE file in the repository root for full licence text.
using System;
using System.Net; using System.Net;
using System.Net.WebSockets; using System.Net.WebSockets;
using System.Threading; using System.Threading;
@ -13,17 +14,20 @@ namespace osu.Game.Online.Notifications.WebSocket
/// <summary> /// <summary>
/// A connector for <see cref="WebSocketNotificationsClient"/>s that receive events via a websocket. /// A connector for <see cref="WebSocketNotificationsClient"/>s that receive events via a websocket.
/// </summary> /// </summary>
public class WebSocketNotificationsClientConnector : NotificationsClientConnector public class WebSocketNotificationsClientConnector : PersistentEndpointClientConnector, INotificationsClient
{ {
public event Action<SocketMessage>? MessageReceived;
private readonly IAPIProvider api; private readonly IAPIProvider api;
public WebSocketNotificationsClientConnector(IAPIProvider api) public WebSocketNotificationsClientConnector(IAPIProvider api)
: base(api) : base(api)
{ {
this.api = api; this.api = api;
Start();
} }
protected override async Task<NotificationsClient> BuildNotificationClientAsync(CancellationToken cancellationToken) protected override async Task<PersistentEndpointClient> BuildConnectionAsync(CancellationToken cancellationToken)
{ {
var tcs = new TaskCompletionSource<string>(); var tcs = new TaskCompletionSource<string>();
@ -40,7 +44,17 @@ namespace osu.Game.Online.Notifications.WebSocket
if (socket.Options.Proxy != null) if (socket.Options.Proxy != null)
socket.Options.Proxy.Credentials = CredentialCache.DefaultCredentials; socket.Options.Proxy.Credentials = CredentialCache.DefaultCredentials;
return new WebSocketNotificationsClient(socket, endpoint, api); var client = new WebSocketNotificationsClient(socket, endpoint);
client.MessageReceived += msg => MessageReceived?.Invoke(msg);
return client;
}
public Task SendAsync(SocketMessage message, CancellationToken? cancellationToken = default)
{
if (CurrentConnection is not WebSocketNotificationsClient webSocketClient)
return Task.CompletedTask;
return webSocketClient.SendAsync(message, cancellationToken);
} }
} }
} }

View File

@ -6,34 +6,39 @@ using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using osu.Game.Online;
using osu.Game.Online.API; using osu.Game.Online.API;
using osu.Game.Online.API.Requests; using osu.Game.Online.API.Requests;
using osu.Game.Online.Chat; using osu.Game.Online.Chat;
namespace osu.Game.Online.Notifications namespace osu.Game.Tests
{ {
/// <summary> public class PollingChatClient : PersistentEndpointClient
/// An abstract client which receives notification-related events (chat/notifications).
/// </summary>
public abstract class NotificationsClient : PersistentEndpointClient
{ {
public Action<Channel>? ChannelJoined; public event Action<Channel>? ChannelJoined;
public Action<Channel>? ChannelParted; public event Action<List<Message>>? NewMessages;
public Action<List<Message>>? NewMessages; public event Action? PresenceReceived;
public Action? PresenceReceived;
protected readonly IAPIProvider API; private readonly IAPIProvider api;
private long lastMessageId; private long lastMessageId;
protected NotificationsClient(IAPIProvider api) public PollingChatClient(IAPIProvider api)
{ {
API = api; this.api = api;
} }
public override Task ConnectAsync(CancellationToken cancellationToken) public override Task ConnectAsync(CancellationToken cancellationToken)
{ {
API.Queue(CreateInitialFetchRequest(0)); Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
await api.PerformAsync(CreateInitialFetchRequest()).ConfigureAwait(true);
await Task.Delay(1000, cancellationToken).ConfigureAwait(true);
}
}, cancellationToken);
return Task.CompletedTask; return Task.CompletedTask;
} }
@ -46,11 +51,11 @@ namespace osu.Game.Online.Notifications
if (updates?.Presence != null) if (updates?.Presence != null)
{ {
foreach (var channel in updates.Presence) foreach (var channel in updates.Presence)
HandleChannelJoined(channel); handleChannelJoined(channel);
//todo: handle left channels //todo: handle left channels
HandleMessages(updates.Messages); handleMessages(updates.Messages);
} }
PresenceReceived?.Invoke(); PresenceReceived?.Invoke();
@ -59,15 +64,13 @@ namespace osu.Game.Online.Notifications
return fetchReq; return fetchReq;
} }
protected void HandleChannelJoined(Channel channel) private void handleChannelJoined(Channel channel)
{ {
channel.Joined.Value = true; channel.Joined.Value = true;
ChannelJoined?.Invoke(channel); ChannelJoined?.Invoke(channel);
} }
protected void HandleChannelParted(Channel channel) => ChannelParted?.Invoke(channel); private void handleMessages(List<Message>? messages)
protected void HandleMessages(List<Message>? messages)
{ {
if (messages == null) if (messages == null)
return; return;

View File

@ -0,0 +1,48 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using osu.Game.Online;
using osu.Game.Online.API;
using osu.Game.Online.Chat;
namespace osu.Game.Tests
{
public class PollingChatClientConnector : PersistentEndpointClientConnector, IChatClient
{
public event Action<Channel>? ChannelJoined;
public event Action<Channel>? ChannelParted
{
add { }
remove { }
}
public event Action<List<Message>>? NewMessages;
public event Action? PresenceReceived;
public void FetchInitialMessages()
{
// don't really need to do anything special if we poll every second anyway.
}
public PollingChatClientConnector(IAPIProvider api)
: base(api)
{
}
protected sealed override Task<PersistentEndpointClient> BuildConnectionAsync(CancellationToken cancellationToken)
{
var client = new PollingChatClient(API);
client.ChannelJoined += c => ChannelJoined?.Invoke(c);
client.NewMessages += m => NewMessages?.Invoke(m);
client.PresenceReceived += () => PresenceReceived?.Invoke();
return Task.FromResult<PersistentEndpointClient>(client);
}
}
}

View File

@ -1,35 +0,0 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System.Threading;
using System.Threading.Tasks;
using osu.Game.Online.API;
using osu.Game.Online.Notifications;
namespace osu.Game.Tests
{
/// <summary>
/// A notifications client which polls for new messages every second.
/// </summary>
public class PollingNotificationsClient : NotificationsClient
{
public PollingNotificationsClient(IAPIProvider api)
: base(api)
{
}
public override Task ConnectAsync(CancellationToken cancellationToken)
{
Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
await API.PerformAsync(CreateInitialFetchRequest()).ConfigureAwait(true);
await Task.Delay(1000, cancellationToken).ConfigureAwait(true);
}
}, cancellationToken);
return Task.CompletedTask;
}
}
}

View File

@ -1,24 +0,0 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System.Threading;
using System.Threading.Tasks;
using osu.Game.Online.API;
using osu.Game.Online.Notifications;
namespace osu.Game.Tests
{
/// <summary>
/// A connector for <see cref="PollingNotificationsClient"/>s that poll for new messages.
/// </summary>
public class PollingNotificationsClientConnector : NotificationsClientConnector
{
public PollingNotificationsClientConnector(IAPIProvider api)
: base(api)
{
}
protected override Task<NotificationsClient> BuildNotificationClientAsync(CancellationToken cancellationToken)
=> Task.FromResult((NotificationsClient)new PollingNotificationsClient(API));
}
}