1
0
mirror of https://github.com/ppy/osu.git synced 2024-11-11 09:27:29 +08:00

Generalise + add polling-style for usage in tests

This commit is contained in:
Dan Balasescu 2022-10-28 17:53:28 +09:00
parent efa8256911
commit 527b1d9db1
15 changed files with 357 additions and 223 deletions

View File

@ -17,7 +17,7 @@ namespace osu.Game.Online
Connection.Closed += InvokeClosed;
}
public override Task StartAsync(CancellationToken cancellationToken) => Connection.StartAsync(cancellationToken);
public override Task ConnectAsync(CancellationToken cancellationToken) => Connection.StartAsync(cancellationToken);
public override async ValueTask DisposeAsync()
{

View File

@ -2,112 +2,95 @@
// See the LICENCE file in the repository root for full licence text.
using System;
using System.Diagnostics;
using System.Net.WebSockets;
using System.Text;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using osu.Framework.Extensions.TypeExtensions;
using osu.Framework.Logging;
using osu.Game.Online.API;
using osu.Game.Online.API.Requests;
using osu.Game.Online.Chat;
namespace osu.Game.Online.Notifications
{
public partial class NotificationsClient : SocketClient
/// <summary>
/// An abstract client which receives notification-related events (chat/notifications).
/// </summary>
public abstract class NotificationsClient : SocketClient
{
private readonly ClientWebSocket socket;
private readonly string endpoint;
public Action<Channel>? ChannelJoined;
public Action<List<Message>>? NewMessages;
public Action? PresenceReceived;
private readonly IAPIProvider api;
public NotificationsClient(ClientWebSocket socket, string endpoint, IAPIProvider api)
private bool enableChat;
private long lastMessageId;
protected NotificationsClient(IAPIProvider api)
{
this.socket = socket;
this.endpoint = endpoint;
this.api = api;
}
public override async Task StartAsync(CancellationToken cancellationToken)
public bool EnableChat
{
await socket.ConnectAsync(new Uri(endpoint), cancellationToken).ConfigureAwait(false);
await onConnectedAsync();
runReadLoop(cancellationToken);
}
private void runReadLoop(CancellationToken cancellationToken) => Task.Run((Func<Task>)(async () =>
{
byte[] buffer = new byte[1024];
StringBuilder messageResult = new StringBuilder();
while (!cancellationToken.IsCancellationRequested)
get => enableChat;
set
{
try
{
WebSocketReceiveResult result = await socket.ReceiveAsync(buffer, cancellationToken);
switch (result.MessageType)
{
case WebSocketMessageType.Text:
messageResult.Append(Encoding.UTF8.GetString(buffer[..result.Count]));
if (result.EndOfMessage)
{
SocketMessage? message = JsonConvert.DeserializeObject<SocketMessage>(messageResult.ToString());
messageResult.Clear();
Debug.Assert(message != null);
if (message.Error != null)
{
Logger.Log($"{GetType().ReadableName()} error: {message.Error}", LoggingTarget.Network);
break;
}
await onMessageReceivedAsync(message);
}
break;
case WebSocketMessageType.Binary:
throw new NotImplementedException();
case WebSocketMessageType.Close:
throw new Exception("Connection closed by remote host.");
}
}
catch (Exception ex)
{
await InvokeClosed(ex);
if (enableChat == value)
return;
enableChat = value;
if (EnableChat)
Task.Run(StartChatAsync);
}
}
public override async Task ConnectAsync(CancellationToken cancellationToken)
{
if (EnableChat)
await StartChatAsync();
}
protected virtual Task StartChatAsync()
{
api.Queue(CreateFetchMessagesRequest(0));
return Task.CompletedTask;
}
protected APIRequest CreateFetchMessagesRequest(long? lastMessageId = null)
{
var fetchReq = new GetUpdatesRequest(lastMessageId ?? this.lastMessageId);
fetchReq.Success += updates =>
{
if (updates?.Presence != null)
{
foreach (var channel in updates.Presence)
HandleJoinedChannel(channel);
//todo: handle left channels
HandleMessages(updates.Messages);
}
}
}), cancellationToken);
private async Task closeAsync()
{
try
{
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, @"Disconnecting", CancellationToken.None).ConfigureAwait(false);
}
catch
{
// Closure can fail if the connection is aborted. Don't really care since it's disposed anyway.
}
PresenceReceived?.Invoke();
};
return fetchReq;
}
private async Task sendMessage(SocketMessage message, CancellationToken cancellationToken)
protected void HandleJoinedChannel(Channel channel)
{
if (socket.State != WebSocketState.Open)
return;
await socket.SendAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)), WebSocketMessageType.Text, true, cancellationToken);
// we received this from the server so should mark the channel already joined.
channel.Joined.Value = true;
ChannelJoined?.Invoke(channel);
}
public override async ValueTask DisposeAsync()
protected void HandleMessages(List<Message> messages)
{
await base.DisposeAsync();
await closeAsync();
socket.Dispose();
NewMessages?.Invoke(messages);
lastMessageId = Math.Max(lastMessageId, messages.LastOrDefault()?.Id ?? 0);
}
}
}

View File

@ -3,29 +3,27 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using osu.Game.Online.API;
using osu.Game.Online.API.Requests;
using osu.Game.Online.Chat;
namespace osu.Game.Online.Notifications
{
public class NotificationsClientConnector : SocketClientConnector
/// <summary>
/// An abstract connector or <see cref="NotificationsClient"/>s.
/// </summary>
public abstract class NotificationsClientConnector : SocketClientConnector
{
public event Action<Channel>? ChannelJoined;
public event Action<List<Message>>? NewMessages;
public event Action? PresenceReceived;
private readonly IAPIProvider api;
private bool chatStarted;
public NotificationsClientConnector(IAPIProvider api)
protected NotificationsClientConnector(IAPIProvider api)
: base(api)
{
this.api = api;
}
public void StartChat()
@ -36,30 +34,18 @@ namespace osu.Game.Online.Notifications
client.EnableChat = true;
}
protected override async Task<SocketClient> BuildConnectionAsync(CancellationToken cancellationToken)
protected sealed override async Task<SocketClient> BuildConnectionAsync(CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<string>();
var client = await BuildNotificationClientAsync(cancellationToken);
var req = new GetNotificationsRequest();
req.Success += bundle => tcs.SetResult(bundle.Endpoint);
req.Failure += ex => tcs.SetException(ex);
api.Queue(req);
client.ChannelJoined = c => ChannelJoined?.Invoke(c);
client.NewMessages = m => NewMessages?.Invoke(m);
client.PresenceReceived = () => PresenceReceived?.Invoke();
client.EnableChat = chatStarted;
string endpoint = await tcs.Task;
ClientWebSocket socket = new ClientWebSocket();
socket.Options.SetRequestHeader(@"Authorization", @$"Bearer {api.AccessToken}");
socket.Options.Proxy = WebRequest.DefaultWebProxy;
if (socket.Options.Proxy != null)
socket.Options.Proxy.Credentials = CredentialCache.DefaultCredentials;
return new NotificationsClient(socket, endpoint, api)
{
ChannelJoined = c => ChannelJoined?.Invoke(c),
NewMessages = m => NewMessages?.Invoke(m),
PresenceReceived = () => PresenceReceived?.Invoke(),
EnableChat = chatStarted
};
return client;
}
protected abstract Task<NotificationsClient> BuildNotificationClientAsync(CancellationToken cancellationToken);
}
}

View File

@ -1,105 +0,0 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using osu.Game.Online.API.Requests;
using osu.Game.Online.Chat;
namespace osu.Game.Online.Notifications
{
public partial class NotificationsClient
{
public Action<Channel>? ChannelJoined;
public Action<List<Message>>? NewMessages;
public Action? PresenceReceived;
private bool enableChat;
private long lastMessageId;
public bool EnableChat
{
get => enableChat;
set
{
if (enableChat == value)
return;
enableChat = value;
Task.Run(startChatIfEnabledAsync);
}
}
private async Task onConnectedAsync()
{
await startChatIfEnabledAsync();
}
private async Task startChatIfEnabledAsync()
{
if (!EnableChat)
return;
await sendMessage(new StartChatRequest(), CancellationToken.None);
var fetchReq = new GetUpdatesRequest(lastMessageId);
fetchReq.Success += updates =>
{
if (updates?.Presence != null)
{
foreach (var channel in updates.Presence)
handleJoinedChannel(channel);
//todo: handle left channels
handleMessages(updates.Messages);
}
PresenceReceived?.Invoke();
};
api.Queue(fetchReq);
}
private Task onMessageReceivedAsync(SocketMessage message)
{
switch (message.Event)
{
case @"chat.message.new":
Debug.Assert(message.Data != null);
NewChatMessageData? messageData = JsonConvert.DeserializeObject<NewChatMessageData>(message.Data.ToString());
Debug.Assert(messageData != null);
List<Message> messages = messageData.Messages.Where(m => m.Sender.OnlineID != api.LocalUser.Value.OnlineID).ToList();
foreach (var msg in messages)
handleJoinedChannel(new Channel(msg.Sender) { Id = msg.ChannelId });
handleMessages(messages);
break;
}
return Task.CompletedTask;
}
private void handleJoinedChannel(Channel channel)
{
// we received this from the server so should mark the channel already joined.
channel.Joined.Value = true;
ChannelJoined?.Invoke(channel);
}
private void handleMessages(List<Message> messages)
{
NewMessages?.Invoke(messages);
lastMessageId = messages.LastOrDefault()?.Id ?? lastMessageId;
}
}
}

View File

@ -0,0 +1,37 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System.Threading;
using System.Threading.Tasks;
using osu.Game.Online.API;
namespace osu.Game.Online.Notifications.Polling
{
/// <summary>
/// A notifications client which polls for new messages every second.
/// </summary>
public class PollingNotificationsClient : NotificationsClient
{
private readonly IAPIProvider api;
public PollingNotificationsClient(IAPIProvider api)
: base(api)
{
this.api = api;
}
public override Task ConnectAsync(CancellationToken cancellationToken)
{
Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
await api.PerformAsync(CreateFetchMessagesRequest());
await Task.Delay(1000, cancellationToken);
}
}, cancellationToken);
return Task.CompletedTask;
}
}
}

View File

@ -0,0 +1,26 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System.Threading;
using System.Threading.Tasks;
using osu.Game.Online.API;
namespace osu.Game.Online.Notifications.Polling
{
/// <summary>
/// A connector for <see cref="PollingNotificationsClient"/>s that poll for new messages.
/// </summary>
public class PollingNotificationsClientConnector : NotificationsClientConnector
{
private readonly IAPIProvider api;
public PollingNotificationsClientConnector(IAPIProvider api)
: base(api)
{
this.api = api;
}
protected override Task<NotificationsClient> BuildNotificationClientAsync(CancellationToken cancellationToken)
=> Task.FromResult((NotificationsClient)new PollingNotificationsClient(api));
}
}

View File

@ -3,8 +3,11 @@
using Newtonsoft.Json;
namespace osu.Game.Online.Notifications
namespace osu.Game.Online.Notifications.WebSocket
{
/// <summary>
/// A websocket message notifying the server that the client no longer wants to receive chat messages.
/// </summary>
[JsonObject(MemberSerialization.OptIn)]
public class EndChatRequest : SocketMessage
{

View File

@ -8,8 +8,11 @@ using Newtonsoft.Json;
using osu.Game.Online.API.Requests.Responses;
using osu.Game.Online.Chat;
namespace osu.Game.Online.Notifications
namespace osu.Game.Online.Notifications.WebSocket
{
/// <summary>
/// A websocket message sent from the server when new messages arrive.
/// </summary>
[JsonObject(MemberSerialization.OptIn)]
public class NewChatMessageData
{

View File

@ -4,8 +4,11 @@
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace osu.Game.Online.Notifications
namespace osu.Game.Online.Notifications.WebSocket
{
/// <summary>
/// A websocket message, sent either from the client or server.
/// </summary>
[JsonObject(MemberSerialization.OptIn)]
public class SocketMessage
{

View File

@ -3,8 +3,11 @@
using Newtonsoft.Json;
namespace osu.Game.Online.Notifications
namespace osu.Game.Online.Notifications.WebSocket
{
/// <summary>
/// A websocket message notifying the server that the client wants to receive chat messages.
/// </summary>
[JsonObject(MemberSerialization.OptIn)]
public class StartChatRequest : SocketMessage
{

View File

@ -0,0 +1,148 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using osu.Framework.Extensions.TypeExtensions;
using osu.Framework.Logging;
using osu.Game.Online.API;
using osu.Game.Online.Chat;
namespace osu.Game.Online.Notifications.WebSocket
{
/// <summary>
/// A notifications client which receives events via a websocket.
/// </summary>
public class WebSocketNotificationsClient : NotificationsClient
{
private readonly ClientWebSocket socket;
private readonly string endpoint;
private readonly IAPIProvider api;
public WebSocketNotificationsClient(ClientWebSocket socket, string endpoint, IAPIProvider api)
: base(api)
{
this.socket = socket;
this.endpoint = endpoint;
this.api = api;
}
public override async Task ConnectAsync(CancellationToken cancellationToken)
{
await socket.ConnectAsync(new Uri(endpoint), cancellationToken).ConfigureAwait(false);
runReadLoop(cancellationToken);
await base.ConnectAsync(cancellationToken);
}
protected override async Task StartChatAsync()
{
await sendMessage(new StartChatRequest(), CancellationToken.None);
await base.StartChatAsync();
}
private void runReadLoop(CancellationToken cancellationToken) => Task.Run((Func<Task>)(async () =>
{
byte[] buffer = new byte[1024];
StringBuilder messageResult = new StringBuilder();
while (!cancellationToken.IsCancellationRequested)
{
try
{
WebSocketReceiveResult result = await socket.ReceiveAsync(buffer, cancellationToken);
switch (result.MessageType)
{
case WebSocketMessageType.Text:
messageResult.Append(Encoding.UTF8.GetString(buffer[..result.Count]));
if (result.EndOfMessage)
{
SocketMessage? message = JsonConvert.DeserializeObject<SocketMessage>(messageResult.ToString());
messageResult.Clear();
Debug.Assert(message != null);
if (message.Error != null)
{
Logger.Log($"{GetType().ReadableName()} error: {message.Error}", LoggingTarget.Network);
break;
}
await onMessageReceivedAsync(message);
}
break;
case WebSocketMessageType.Binary:
throw new NotImplementedException();
case WebSocketMessageType.Close:
throw new Exception("Connection closed by remote host.");
}
}
catch (Exception ex)
{
await InvokeClosed(ex);
return;
}
}
}), cancellationToken);
private async Task closeAsync()
{
try
{
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, @"Disconnecting", CancellationToken.None).ConfigureAwait(false);
}
catch
{
// Closure can fail if the connection is aborted. Don't really care since it's disposed anyway.
}
}
private async Task sendMessage(SocketMessage message, CancellationToken cancellationToken)
{
if (socket.State != WebSocketState.Open)
return;
await socket.SendAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)), WebSocketMessageType.Text, true, cancellationToken);
}
private Task onMessageReceivedAsync(SocketMessage message)
{
switch (message.Event)
{
case @"chat.message.new":
Debug.Assert(message.Data != null);
NewChatMessageData? messageData = JsonConvert.DeserializeObject<NewChatMessageData>(message.Data.ToString());
Debug.Assert(messageData != null);
List<Message> messages = messageData.Messages.Where(m => m.Sender.OnlineID != api.LocalUser.Value.OnlineID).ToList();
foreach (var msg in messages)
HandleJoinedChannel(new Channel(msg.Sender) { Id = msg.ChannelId });
HandleMessages(messages);
break;
}
return Task.CompletedTask;
}
public override async ValueTask DisposeAsync()
{
await base.DisposeAsync();
await closeAsync();
socket.Dispose();
}
}
}

View File

@ -0,0 +1,46 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System.Net;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using osu.Game.Online.API;
using osu.Game.Online.API.Requests;
namespace osu.Game.Online.Notifications.WebSocket
{
/// <summary>
/// A connector for <see cref="WebSocketNotificationsClient"/>s that receive events via a websocket.
/// </summary>
public class WebSocketNotificationsClientConnector : NotificationsClientConnector
{
private readonly IAPIProvider api;
public WebSocketNotificationsClientConnector(IAPIProvider api)
: base(api)
{
this.api = api;
}
protected override async Task<NotificationsClient> BuildNotificationClientAsync(CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<string>();
var req = new GetNotificationsRequest();
req.Success += bundle => tcs.SetResult(bundle.Endpoint);
req.Failure += ex => tcs.SetException(ex);
api.Queue(req);
string endpoint = await tcs.Task;
ClientWebSocket socket = new ClientWebSocket();
socket.Options.SetRequestHeader(@"Authorization", @$"Bearer {api.AccessToken}");
socket.Options.Proxy = WebRequest.DefaultWebProxy;
if (socket.Options.Proxy != null)
socket.Options.Proxy.Credentials = CredentialCache.DefaultCredentials;
return new WebSocketNotificationsClient(socket, endpoint, api);
}
}
}

View File

@ -13,7 +13,7 @@ namespace osu.Game.Online
protected Task InvokeClosed(Exception? exception) => Closed?.Invoke(exception) ?? Task.CompletedTask;
public abstract Task StartAsync(CancellationToken cancellationToken);
public abstract Task ConnectAsync(CancellationToken cancellationToken);
public virtual ValueTask DisposeAsync()
{

View File

@ -92,7 +92,7 @@ namespace osu.Game.Online
cancellationToken.ThrowIfCancellationRequested();
await CurrentConnection.StartAsync(cancellationToken).ConfigureAwait(false);
await CurrentConnection.ConnectAsync(cancellationToken).ConfigureAwait(false);
Logger.Log($"{ClientName} connected!", LoggingTarget.Network);
isConnected.Value = true;

View File

@ -45,6 +45,7 @@ using osu.Game.Online;
using osu.Game.Online.API.Requests.Responses;
using osu.Game.Online.Chat;
using osu.Game.Online.Notifications;
using osu.Game.Online.Notifications.Polling;
using osu.Game.Overlays;
using osu.Game.Overlays.Music;
using osu.Game.Overlays.Notifications;
@ -882,7 +883,7 @@ namespace osu.Game
loadComponentSingleFile(dashboard = new DashboardOverlay(), overlayContent.Add, true);
loadComponentSingleFile(news = new NewsOverlay(), overlayContent.Add, true);
var rankingsOverlay = loadComponentSingleFile(new RankingsOverlay(), overlayContent.Add, true);
loadComponentSingleFile(notificationsClient = new NotificationsClientConnector(API), AddInternal, true);
loadComponentSingleFile(notificationsClient = new PollingNotificationsClientConnector(API), AddInternal, true);
loadComponentSingleFile(channelManager = new ChannelManager(API), AddInternal, true);
loadComponentSingleFile(chatOverlay = new ChatOverlay(), overlayContent.Add, true);
loadComponentSingleFile(new MessageNotifier(), AddInternal, true);