1
0
mirror of https://github.com/ppy/osu.git synced 2025-02-13 15:03:13 +08:00

Merge pull request #20991 from smoogipoo/websocket-chat-2

Receive chat messages via notification websocket
This commit is contained in:
Dean Herbert 2022-11-12 18:18:39 +09:00 committed by GitHub
commit 8154eaafd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 807 additions and 108 deletions

View File

@ -55,6 +55,14 @@ namespace osu.Game.Tests.Chat
case MarkChannelAsReadRequest markRead: case MarkChannelAsReadRequest markRead:
handleMarkChannelAsReadRequest(markRead); handleMarkChannelAsReadRequest(markRead);
return true; return true;
case GetUpdatesRequest updatesRequest:
updatesRequest.TriggerSuccess(new GetUpdatesResponse
{
Messages = sentMessages.ToList(),
Presence = new List<Channel>()
});
return true;
} }
return false; return false;
@ -95,6 +103,7 @@ namespace osu.Game.Tests.Chat
}); });
AddStep("post message", () => channelManager.PostMessage("Something interesting")); AddStep("post message", () => channelManager.PostMessage("Something interesting"));
AddUntilStep("message postesd", () => !channel.Messages.Any(m => m is LocalMessage));
AddStep("post /help command", () => channelManager.PostCommand("help", channel)); AddStep("post /help command", () => channelManager.PostCommand("help", channel));
AddStep("post /me command with no action", () => channelManager.PostCommand("me", channel)); AddStep("post /me command with no action", () => channelManager.PostCommand("me", channel));
@ -115,7 +124,8 @@ namespace osu.Game.Tests.Chat
Content = request.Message.Content, Content = request.Message.Content,
Links = request.Message.Links, Links = request.Message.Links,
Timestamp = request.Message.Timestamp, Timestamp = request.Message.Timestamp,
Sender = request.Message.Sender Sender = request.Message.Sender,
Uuid = request.Message.Uuid
}; };
sentMessages.Add(message); sentMessages.Add(message);

View File

@ -46,6 +46,8 @@ namespace osu.Game.Tests.Visual.Online
availableChannels.Add(new Channel { Name = "#english" }); availableChannels.Add(new Channel { Name = "#english" });
availableChannels.Add(new Channel { Name = "#japanese" }); availableChannels.Add(new Channel { Name = "#japanese" });
Dependencies.Cache(chatManager); Dependencies.Cache(chatManager);
Add(chatManager);
} }
[SetUp] [SetUp]

View File

@ -56,7 +56,9 @@ namespace osu.Game.Tests.Visual.Online
protected override IReadOnlyDependencyContainer CreateChildDependencies(IReadOnlyDependencyContainer parent) protected override IReadOnlyDependencyContainer CreateChildDependencies(IReadOnlyDependencyContainer parent)
{ {
Add(channelManager = new ChannelManager(parent.Get<IAPIProvider>())); var api = parent.Get<IAPIProvider>();
Add(channelManager = new ChannelManager(api));
var dependencies = new DependencyContainer(base.CreateChildDependencies(parent)); var dependencies = new DependencyContainer(base.CreateChildDependencies(parent));

View File

@ -48,7 +48,7 @@ namespace osu.Game.Tournament.Components
if (manager == null) if (manager == null)
{ {
AddInternal(manager = new ChannelManager(api) { HighPollRate = { Value = true } }); AddInternal(manager = new ChannelManager(api));
Channel.BindTo(manager.CurrentChannel); Channel.BindTo(manager.CurrentChannel);
} }

View File

@ -20,6 +20,8 @@ using osu.Framework.Logging;
using osu.Game.Configuration; using osu.Game.Configuration;
using osu.Game.Online.API.Requests; using osu.Game.Online.API.Requests;
using osu.Game.Online.API.Requests.Responses; using osu.Game.Online.API.Requests.Responses;
using osu.Game.Online.Notifications;
using osu.Game.Online.Notifications.WebSocket;
using osu.Game.Users; using osu.Game.Users;
namespace osu.Game.Online.API namespace osu.Game.Online.API
@ -299,6 +301,9 @@ namespace osu.Game.Online.API
public IHubClientConnector GetHubConnector(string clientName, string endpoint, bool preferMessagePack) => public IHubClientConnector GetHubConnector(string clientName, string endpoint, bool preferMessagePack) =>
new HubClientConnector(clientName, endpoint, this, versionHash, preferMessagePack); new HubClientConnector(clientName, endpoint, this, versionHash, preferMessagePack);
public NotificationsClientConnector GetNotificationsConnector() =>
new WebSocketNotificationsClientConnector(this);
public RegistrationRequest.RegistrationRequestErrors CreateAccount(string email, string username, string password) public RegistrationRequest.RegistrationRequestErrors CreateAccount(string email, string username, string password)
{ {
Debug.Assert(State.Value == APIState.Offline); Debug.Assert(State.Value == APIState.Offline);

View File

@ -9,6 +9,8 @@ using System.Threading.Tasks;
using osu.Framework.Bindables; using osu.Framework.Bindables;
using osu.Framework.Graphics; using osu.Framework.Graphics;
using osu.Game.Online.API.Requests.Responses; using osu.Game.Online.API.Requests.Responses;
using osu.Game.Online.Notifications;
using osu.Game.Tests;
using osu.Game.Users; using osu.Game.Users;
namespace osu.Game.Online.API namespace osu.Game.Online.API
@ -115,6 +117,8 @@ namespace osu.Game.Online.API
public IHubClientConnector GetHubConnector(string clientName, string endpoint, bool preferMessagePack) => null; public IHubClientConnector GetHubConnector(string clientName, string endpoint, bool preferMessagePack) => null;
public NotificationsClientConnector GetNotificationsConnector() => new PollingNotificationsClientConnector(this);
public RegistrationRequest.RegistrationRequestErrors CreateAccount(string email, string username, string password) public RegistrationRequest.RegistrationRequestErrors CreateAccount(string email, string username, string password)
{ {
Thread.Sleep(200); Thread.Sleep(200);

View File

@ -5,6 +5,7 @@ using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using osu.Framework.Bindables; using osu.Framework.Bindables;
using osu.Game.Online.API.Requests.Responses; using osu.Game.Online.API.Requests.Responses;
using osu.Game.Online.Notifications;
using osu.Game.Users; using osu.Game.Users;
namespace osu.Game.Online.API namespace osu.Game.Online.API
@ -112,6 +113,11 @@ namespace osu.Game.Online.API
/// <param name="preferMessagePack">Whether to use MessagePack for serialisation if available on this platform.</param> /// <param name="preferMessagePack">Whether to use MessagePack for serialisation if available on this platform.</param>
IHubClientConnector? GetHubConnector(string clientName, string endpoint, bool preferMessagePack = true); IHubClientConnector? GetHubConnector(string clientName, string endpoint, bool preferMessagePack = true);
/// <summary>
/// Constructs a new <see cref="NotificationsClientConnector"/>.
/// </summary>
NotificationsClientConnector GetNotificationsConnector();
/// <summary> /// <summary>
/// Create a new user account. This is a blocking operation. /// Create a new user account. This is a blocking operation.
/// </summary> /// </summary>

View File

@ -0,0 +1,21 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System.Net.Http;
using osu.Framework.IO.Network;
using osu.Game.Online.API.Requests.Responses;
namespace osu.Game.Online.API.Requests
{
public class ChatAckRequest : APIRequest<ChatAckResponse>
{
protected override WebRequest CreateWebRequest()
{
var req = base.CreateWebRequest();
req.Method = HttpMethod.Post;
return req;
}
protected override string Target => "chat/ack";
}
}

View File

@ -28,6 +28,7 @@ namespace osu.Game.Online.API.Requests
req.AddParameter(@"target_id", user.Id.ToString()); req.AddParameter(@"target_id", user.Id.ToString());
req.AddParameter(@"message", message.Content); req.AddParameter(@"message", message.Content);
req.AddParameter(@"is_action", message.IsAction.ToString().ToLowerInvariant()); req.AddParameter(@"is_action", message.IsAction.ToString().ToLowerInvariant());
req.AddParameter(@"uuid", message.Uuid);
return req; return req;
} }

View File

@ -0,0 +1,19 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using osu.Game.Online.API.Requests.Responses;
namespace osu.Game.Online.API.Requests
{
public class GetChannelRequest : APIRequest<GetChannelResponse>
{
private readonly long channelId;
public GetChannelRequest(long channelId)
{
this.channelId = channelId;
}
protected override string Target => $"chat/channels/{channelId}";
}
}

View File

@ -0,0 +1,12 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using osu.Game.Online.API.Requests.Responses;
namespace osu.Game.Online.API.Requests
{
public class GetNotificationsRequest : APIRequest<APINotificationsBundle>
{
protected override string Target => @"notifications";
}
}

View File

@ -25,6 +25,7 @@ namespace osu.Game.Online.API.Requests
req.Method = HttpMethod.Post; req.Method = HttpMethod.Post;
req.AddParameter(@"is_action", Message.IsAction.ToString().ToLowerInvariant()); req.AddParameter(@"is_action", Message.IsAction.ToString().ToLowerInvariant());
req.AddParameter(@"message", Message.Content); req.AddParameter(@"message", Message.Content);
req.AddParameter(@"uuid", Message.Uuid);
return req; return req;
} }

View File

@ -0,0 +1,37 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace osu.Game.Online.API.Requests.Responses
{
[JsonObject(MemberSerialization.OptIn)]
public class APINotification
{
[JsonProperty(@"id")]
public long Id { get; set; }
[JsonProperty(@"name")]
public string Name { get; set; } = null!;
[JsonProperty(@"created_at")]
public DateTimeOffset? CreatedAt { get; set; }
[JsonProperty(@"object_type")]
public string ObjectType { get; set; } = null!;
[JsonProperty(@"object_id")]
public string ObjectId { get; set; } = null!;
[JsonProperty(@"source_user_id")]
public long? SourceUserId { get; set; }
[JsonProperty(@"is_read")]
public bool IsRead { get; set; }
[JsonProperty(@"details")]
public JObject? Details { get; set; }
}
}

View File

@ -0,0 +1,20 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using Newtonsoft.Json;
namespace osu.Game.Online.API.Requests.Responses
{
[JsonObject(MemberSerialization.OptIn)]
public class APINotificationsBundle
{
[JsonProperty(@"has_more")]
public bool HasMore { get; set; }
[JsonProperty(@"notifications")]
public APINotification[] Notifications { get; set; } = null!;
[JsonProperty(@"notification_endpoint")]
public string Endpoint { get; set; } = null!;
}
}

View File

@ -0,0 +1,15 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System.Collections.Generic;
using Newtonsoft.Json;
namespace osu.Game.Online.API.Requests.Responses
{
[JsonObject(MemberSerialization.OptIn)]
public class ChatAckResponse
{
[JsonProperty("silences")]
public List<ChatSilence> Silences { get; set; } = null!;
}
}

View File

@ -0,0 +1,17 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using Newtonsoft.Json;
namespace osu.Game.Online.API.Requests.Responses
{
[JsonObject(MemberSerialization.OptIn)]
public class ChatSilence
{
[JsonProperty("id")]
public uint Id { get; set; }
[JsonProperty("user_id")]
public uint UserId { get; set; }
}
}

View File

@ -0,0 +1,19 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System.Collections.Generic;
using Newtonsoft.Json;
using osu.Game.Online.Chat;
namespace osu.Game.Online.API.Requests.Responses
{
[JsonObject(MemberSerialization.OptIn)]
public class GetChannelResponse
{
[JsonProperty(@"channel")]
public Channel Channel { get; set; } = null!;
[JsonProperty(@"users")]
public List<APIUser> Users { get; set; } = null!;
}
}

View File

@ -134,6 +134,14 @@ namespace osu.Game.Online.Chat
/// <param name="messages"></param> /// <param name="messages"></param>
public void AddNewMessages(params Message[] messages) public void AddNewMessages(params Message[] messages)
{ {
foreach (var m in messages)
{
LocalEchoMessage localEcho = pendingMessages.FirstOrDefault(local => local.Uuid == m.Uuid);
if (localEcho != null)
ReplaceMessage(localEcho, m);
}
messages = messages.Except(Messages).ToArray(); messages = messages.Except(Messages).ToArray();
if (messages.Length == 0) return; if (messages.Length == 0) return;
@ -171,6 +179,10 @@ namespace osu.Game.Online.Chat
throw new InvalidOperationException("Attempted to add the same message again"); throw new InvalidOperationException("Attempted to add the same message again");
Messages.Add(final); Messages.Add(final);
if (final.Id > LastMessageId)
LastMessageId = final.Id;
PendingMessageResolved?.Invoke(echo, final); PendingMessageResolved?.Invoke(echo, final);
} }

View File

@ -6,16 +6,17 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading.Tasks;
using osu.Framework.Allocation; using osu.Framework.Allocation;
using osu.Framework.Bindables; using osu.Framework.Bindables;
using osu.Framework.Extensions; using osu.Framework.Extensions;
using osu.Framework.Graphics.Containers;
using osu.Framework.Logging; using osu.Framework.Logging;
using osu.Framework.Threading;
using osu.Game.Database; using osu.Game.Database;
using osu.Game.Input;
using osu.Game.Online.API; using osu.Game.Online.API;
using osu.Game.Online.API.Requests; using osu.Game.Online.API.Requests;
using osu.Game.Online.API.Requests.Responses; using osu.Game.Online.API.Requests.Responses;
using osu.Game.Online.Notifications;
using osu.Game.Overlays.Chat.Listing; using osu.Game.Overlays.Chat.Listing;
namespace osu.Game.Online.Chat namespace osu.Game.Online.Chat
@ -23,7 +24,7 @@ namespace osu.Game.Online.Chat
/// <summary> /// <summary>
/// Manages everything channel related /// Manages everything channel related
/// </summary> /// </summary>
public class ChannelManager : PollingComponent, IChannelPostTarget public class ChannelManager : CompositeComponent, IChannelPostTarget
{ {
/// <summary> /// <summary>
/// The channels the player joins on startup /// The channels the player joins on startup
@ -64,43 +65,67 @@ namespace osu.Game.Online.Chat
public IBindableList<Channel> AvailableChannels => availableChannels; public IBindableList<Channel> AvailableChannels => availableChannels;
private readonly IAPIProvider api; private readonly IAPIProvider api;
private readonly NotificationsClientConnector connector;
[Resolved] [Resolved]
private UserLookupCache users { get; set; } private UserLookupCache users { get; set; }
public readonly BindableBool HighPollRate = new BindableBool(); private readonly IBindable<APIState> apiState = new Bindable<APIState>();
private bool channelsInitialised;
private readonly IBindable<bool> isIdle = new BindableBool(); private ScheduledDelegate scheduledAck;
public ChannelManager(IAPIProvider api) public ChannelManager(IAPIProvider api)
{ {
this.api = api; this.api = api;
connector = api.GetNotificationsConnector();
CurrentChannel.ValueChanged += currentChannelChanged; CurrentChannel.ValueChanged += currentChannelChanged;
} }
[BackgroundDependencyLoader(permitNulls: true)] [BackgroundDependencyLoader]
private void load(IdleTracker idleTracker) private void load()
{ {
HighPollRate.BindValueChanged(updatePollRate); connector.ChannelJoined += ch => Schedule(() => joinChannel(ch));
isIdle.BindValueChanged(updatePollRate, true);
if (idleTracker != null) connector.ChannelParted += ch => Schedule(() => LeaveChannel(getChannel(ch)));
isIdle.BindTo(idleTracker.IsIdle);
connector.NewMessages += msgs => Schedule(() => addMessages(msgs));
connector.PresenceReceived += () => Schedule(() =>
{
if (!channelsInitialised)
{
channelsInitialised = true;
// we want this to run after the first presence so we can see if the user is in any channels already.
initializeChannels();
}
});
connector.Start();
apiState.BindTo(api.State);
apiState.BindValueChanged(_ => performChatAckRequest(), true);
} }
private void updatePollRate(ValueChangedEvent<bool> valueChangedEvent) private void performChatAckRequest()
{ {
// Polling will eventually be replaced with websocket, but let's avoid doing these background operations as much as possible for now. if (apiState.Value != APIState.Online)
// The only loss will be delayed PM/message highlight notifications. return;
int millisecondsBetweenPolls = HighPollRate.Value ? 1000 : 60000;
if (isIdle.Value) scheduledAck?.Cancel();
millisecondsBetweenPolls *= 10;
if (TimeBetweenPolls.Value != millisecondsBetweenPolls) var req = new ChatAckRequest();
req.Success += _ => scheduleNextRequest();
req.Failure += _ => scheduleNextRequest();
api.Queue(req);
// Todo: Handle silences.
void scheduleNextRequest()
{ {
TimeBetweenPolls.Value = millisecondsBetweenPolls; scheduledAck?.Cancel();
Logger.Log($"Chat is now polling every {TimeBetweenPolls.Value} ms"); scheduledAck = Scheduler.AddDelayed(performChatAckRequest, 60000);
} }
} }
@ -181,7 +206,8 @@ namespace osu.Game.Online.Chat
Timestamp = DateTimeOffset.Now, Timestamp = DateTimeOffset.Now,
ChannelId = target.Id, ChannelId = target.Id,
IsAction = isAction, IsAction = isAction,
Content = text Content = text,
Uuid = Guid.NewGuid().ToString()
}; };
target.AddLocalEcho(message); target.AddLocalEcho(message);
@ -191,13 +217,7 @@ namespace osu.Game.Online.Chat
{ {
var createNewPrivateMessageRequest = new CreateNewPrivateMessageRequest(target.Users.First(), message); var createNewPrivateMessageRequest = new CreateNewPrivateMessageRequest(target.Users.First(), message);
createNewPrivateMessageRequest.Success += createRes => createNewPrivateMessageRequest.Success += _ => dequeueAndRun();
{
target.Id = createRes.ChannelID;
target.ReplaceMessage(message, createRes.Message);
dequeueAndRun();
};
createNewPrivateMessageRequest.Failure += exception => createNewPrivateMessageRequest.Failure += exception =>
{ {
handlePostException(exception); handlePostException(exception);
@ -211,12 +231,7 @@ namespace osu.Game.Online.Chat
var req = new PostMessageRequest(message); var req = new PostMessageRequest(message);
req.Success += m => req.Success += m => dequeueAndRun();
{
target.ReplaceMessage(message, m);
dequeueAndRun();
};
req.Failure += exception => req.Failure += exception =>
{ {
handlePostException(exception); handlePostException(exception);
@ -328,7 +343,7 @@ namespace osu.Game.Online.Chat
} }
} }
private void handleChannelMessages(IEnumerable<Message> messages) private void addMessages(List<Message> messages)
{ {
var channels = JoinedChannels.ToList(); var channels = JoinedChannels.ToList();
@ -376,7 +391,7 @@ namespace osu.Game.Online.Chat
var fetchInitialMsgReq = new GetMessagesRequest(channel); var fetchInitialMsgReq = new GetMessagesRequest(channel);
fetchInitialMsgReq.Success += messages => fetchInitialMsgReq.Success += messages =>
{ {
handleChannelMessages(messages); addMessages(messages);
channel.MessagesLoaded = true; // this will mark the channel as having received messages even if there were none. channel.MessagesLoaded = true; // this will mark the channel as having received messages even if there were none.
}; };
@ -395,7 +410,13 @@ namespace osu.Game.Online.Chat
{ {
Channel found = null; Channel found = null;
bool lookupCondition(Channel ch) => lookup.Id > 0 ? ch.Id == lookup.Id : lookup.Name == ch.Name; bool lookupCondition(Channel ch)
{
if (ch.Id > 0 && lookup.Id > 0)
return ch.Id == lookup.Id;
return ch.Name == lookup.Name;
}
var available = AvailableChannels.FirstOrDefault(lookupCondition); var available = AvailableChannels.FirstOrDefault(lookupCondition);
if (available != null) if (available != null)
@ -415,6 +436,12 @@ namespace osu.Game.Online.Chat
if (foundSelf != null) if (foundSelf != null)
found.Users.Remove(foundSelf); found.Users.Remove(foundSelf);
} }
else
{
found.Id = lookup.Id;
found.Name = lookup.Name;
found.LastMessageId = Math.Max(found.LastMessageId ?? 0, lookup.LastMessageId ?? 0);
}
if (joined == null && addToJoined) joinedChannels.Add(found); if (joined == null && addToJoined) joinedChannels.Add(found);
if (available == null && addToAvailable) availableChannels.Add(found); if (available == null && addToAvailable) availableChannels.Add(found);
@ -464,7 +491,7 @@ namespace osu.Game.Online.Chat
{ {
channel.Id = resChannel.ChannelID.Value; channel.Id = resChannel.ChannelID.Value;
handleChannelMessages(resChannel.RecentMessages); addMessages(resChannel.RecentMessages);
channel.MessagesLoaded = true; // this will mark the channel as having received messages even if there were none. channel.MessagesLoaded = true; // this will mark the channel as having received messages even if there were none.
} }
}; };
@ -574,57 +601,6 @@ namespace osu.Game.Online.Chat
} }
} }
private long lastMessageId;
private bool channelsInitialised;
protected override Task Poll()
{
if (!api.IsLoggedIn)
return base.Poll();
var fetchReq = new GetUpdatesRequest(lastMessageId);
var tcs = new TaskCompletionSource<bool>();
fetchReq.Success += updates =>
{
if (updates?.Presence != null)
{
foreach (var channel in updates.Presence)
{
// we received this from the server so should mark the channel already joined.
channel.Joined.Value = true;
joinChannel(channel);
}
//todo: handle left channels
handleChannelMessages(updates.Messages);
foreach (var group in updates.Messages.GroupBy(m => m.ChannelId))
JoinedChannels.FirstOrDefault(c => c.Id == group.Key)?.AddNewMessages(group.ToArray());
lastMessageId = updates.Messages.LastOrDefault()?.Id ?? lastMessageId;
}
if (!channelsInitialised)
{
channelsInitialised = true;
// we want this to run after the first presence so we can see if the user is in any channels already.
initializeChannels();
}
tcs.SetResult(true);
};
fetchReq.Failure += _ => tcs.SetResult(false);
api.Queue(fetchReq);
return tcs.Task;
}
/// <summary> /// <summary>
/// Marks the <paramref name="channel"/> as read /// Marks the <paramref name="channel"/> as read
/// </summary> /// </summary>
@ -646,6 +622,12 @@ namespace osu.Game.Online.Chat
api.Queue(req); api.Queue(req);
} }
protected override void Dispose(bool isDisposing)
{
base.Dispose(isDisposing);
connector?.Dispose();
}
} }
/// <summary> /// <summary>

View File

@ -30,6 +30,19 @@ namespace osu.Game.Online.Chat
[JsonProperty(@"sender")] [JsonProperty(@"sender")]
public APIUser Sender; public APIUser Sender;
[JsonProperty(@"sender_id")]
public int SenderId
{
get => Sender?.Id ?? 0;
set => Sender = new APIUser { Id = value };
}
/// <summary>
/// A unique identifier for this message. Sent to and from osu!web to use for deduplication.
/// </summary>
[JsonProperty(@"uuid")]
public string Uuid { get; set; } = string.Empty;
[JsonConstructor] [JsonConstructor]
public Message() public Message()
{ {

View File

@ -49,6 +49,9 @@ namespace osu.Game.Online
this.api = api; this.api = api;
this.versionHash = versionHash; this.versionHash = versionHash;
this.preferMessagePack = preferMessagePack; this.preferMessagePack = preferMessagePack;
// Automatically start these connections.
Start();
} }
protected override Task<PersistentEndpointClient> BuildConnectionAsync(CancellationToken cancellationToken) protected override Task<PersistentEndpointClient> BuildConnectionAsync(CancellationToken cancellationToken)

View File

@ -0,0 +1,76 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using osu.Game.Online.API;
using osu.Game.Online.API.Requests;
using osu.Game.Online.Chat;
namespace osu.Game.Online.Notifications
{
/// <summary>
/// An abstract client which receives notification-related events (chat/notifications).
/// </summary>
public abstract class NotificationsClient : PersistentEndpointClient
{
public Action<Channel>? ChannelJoined;
public Action<Channel>? ChannelParted;
public Action<List<Message>>? NewMessages;
public Action? PresenceReceived;
protected readonly IAPIProvider API;
private long lastMessageId;
protected NotificationsClient(IAPIProvider api)
{
API = api;
}
public override Task ConnectAsync(CancellationToken cancellationToken)
{
API.Queue(CreateFetchMessagesRequest(0));
return Task.CompletedTask;
}
protected APIRequest CreateFetchMessagesRequest(long? lastMessageId = null)
{
var fetchReq = new GetUpdatesRequest(lastMessageId ?? this.lastMessageId);
fetchReq.Success += updates =>
{
if (updates?.Presence != null)
{
foreach (var channel in updates.Presence)
HandleChannelJoined(channel);
//todo: handle left channels
HandleMessages(updates.Messages);
}
PresenceReceived?.Invoke();
};
return fetchReq;
}
protected void HandleChannelJoined(Channel channel)
{
channel.Joined.Value = true;
ChannelJoined?.Invoke(channel);
}
protected void HandleChannelParted(Channel channel) => ChannelParted?.Invoke(channel);
protected void HandleMessages(List<Message> messages)
{
NewMessages?.Invoke(messages);
lastMessageId = Math.Max(lastMessageId, messages.LastOrDefault()?.Id ?? 0);
}
}
}

View File

@ -0,0 +1,42 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using osu.Game.Online.API;
using osu.Game.Online.Chat;
namespace osu.Game.Online.Notifications
{
/// <summary>
/// An abstract connector or <see cref="NotificationsClient"/>s.
/// </summary>
public abstract class NotificationsClientConnector : PersistentEndpointClientConnector
{
public event Action<Channel>? ChannelJoined;
public event Action<Channel>? ChannelParted;
public event Action<List<Message>>? NewMessages;
public event Action? PresenceReceived;
protected NotificationsClientConnector(IAPIProvider api)
: base(api)
{
}
protected sealed override async Task<PersistentEndpointClient> BuildConnectionAsync(CancellationToken cancellationToken)
{
var client = await BuildNotificationClientAsync(cancellationToken);
client.ChannelJoined = c => ChannelJoined?.Invoke(c);
client.ChannelParted = c => ChannelParted?.Invoke(c);
client.NewMessages = m => NewMessages?.Invoke(m);
client.PresenceReceived = () => PresenceReceived?.Invoke();
return client;
}
protected abstract Task<NotificationsClient> BuildNotificationClientAsync(CancellationToken cancellationToken);
}
}

View File

@ -0,0 +1,19 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using Newtonsoft.Json;
namespace osu.Game.Online.Notifications.WebSocket
{
/// <summary>
/// A websocket message notifying the server that the client no longer wants to receive chat messages.
/// </summary>
[JsonObject(MemberSerialization.OptIn)]
public class EndChatRequest : SocketMessage
{
public EndChatRequest()
{
Event = @"chat.end";
}
}
}

View File

@ -0,0 +1,32 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using Newtonsoft.Json;
using osu.Game.Online.API.Requests.Responses;
using osu.Game.Online.Chat;
namespace osu.Game.Online.Notifications.WebSocket
{
/// <summary>
/// A websocket message sent from the server when new messages arrive.
/// </summary>
[JsonObject(MemberSerialization.OptIn)]
public class NewChatMessageData
{
[JsonProperty(@"messages")]
public List<Message> Messages { get; set; } = null!;
[JsonProperty(@"users")]
private List<APIUser> users { get; set; } = null!;
[OnDeserialized]
private void onDeserialised(StreamingContext context)
{
foreach (var m in Messages)
m.Sender = users.Single(u => u.OnlineID == m.SenderId);
}
}
}

View File

@ -0,0 +1,24 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace osu.Game.Online.Notifications.WebSocket
{
/// <summary>
/// A websocket message, sent either from the client or server.
/// </summary>
[JsonObject(MemberSerialization.OptIn)]
public class SocketMessage
{
[JsonProperty(@"event")]
public string Event { get; set; } = null!;
[JsonProperty(@"data")]
public JObject? Data { get; set; }
[JsonProperty(@"error")]
public string? Error { get; set; }
}
}

View File

@ -0,0 +1,19 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using Newtonsoft.Json;
namespace osu.Game.Online.Notifications.WebSocket
{
/// <summary>
/// A websocket message notifying the server that the client wants to receive chat messages.
/// </summary>
[JsonObject(MemberSerialization.OptIn)]
public class StartChatRequest : SocketMessage
{
public StartChatRequest()
{
Event = @"chat.start";
}
}
}

View File

@ -0,0 +1,180 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using osu.Framework.Extensions.TypeExtensions;
using osu.Framework.Logging;
using osu.Game.Online.API;
using osu.Game.Online.API.Requests;
using osu.Game.Online.Chat;
namespace osu.Game.Online.Notifications.WebSocket
{
/// <summary>
/// A notifications client which receives events via a websocket.
/// </summary>
public class WebSocketNotificationsClient : NotificationsClient
{
private readonly ClientWebSocket socket;
private readonly string endpoint;
private readonly ConcurrentDictionary<long, Channel> channelsMap = new ConcurrentDictionary<long, Channel>();
public WebSocketNotificationsClient(ClientWebSocket socket, string endpoint, IAPIProvider api)
: base(api)
{
this.socket = socket;
this.endpoint = endpoint;
}
public override async Task ConnectAsync(CancellationToken cancellationToken)
{
await socket.ConnectAsync(new Uri(endpoint), cancellationToken).ConfigureAwait(false);
await sendMessage(new StartChatRequest(), CancellationToken.None);
runReadLoop(cancellationToken);
await base.ConnectAsync(cancellationToken);
}
private void runReadLoop(CancellationToken cancellationToken) => Task.Run(async () =>
{
byte[] buffer = new byte[1024];
StringBuilder messageResult = new StringBuilder();
while (!cancellationToken.IsCancellationRequested)
{
try
{
WebSocketReceiveResult result = await socket.ReceiveAsync(buffer, cancellationToken);
switch (result.MessageType)
{
case WebSocketMessageType.Text:
messageResult.Append(Encoding.UTF8.GetString(buffer[..result.Count]));
if (result.EndOfMessage)
{
SocketMessage? message = JsonConvert.DeserializeObject<SocketMessage>(messageResult.ToString());
messageResult.Clear();
Debug.Assert(message != null);
if (message.Error != null)
{
Logger.Log($"{GetType().ReadableName()} error: {message.Error}", LoggingTarget.Network);
break;
}
Logger.Log($"{GetType().ReadableName()} handling event: {message.Event}");
await onMessageReceivedAsync(message);
}
break;
case WebSocketMessageType.Binary:
throw new NotImplementedException("Binary message type not supported.");
case WebSocketMessageType.Close:
throw new Exception("Connection closed by remote host.");
}
}
catch (Exception ex)
{
await InvokeClosed(ex);
return;
}
}
}, cancellationToken);
private async Task closeAsync()
{
try
{
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, @"Disconnecting", CancellationToken.None).ConfigureAwait(false);
}
catch
{
// Closure can fail if the connection is aborted. Don't really care since it's disposed anyway.
}
}
private async Task sendMessage(SocketMessage message, CancellationToken cancellationToken)
{
if (socket.State != WebSocketState.Open)
return;
await socket.SendAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)), WebSocketMessageType.Text, true, cancellationToken);
}
private async Task onMessageReceivedAsync(SocketMessage message)
{
switch (message.Event)
{
case @"chat.channel.join":
Debug.Assert(message.Data != null);
Channel? joinedChannel = JsonConvert.DeserializeObject<Channel>(message.Data.ToString());
Debug.Assert(joinedChannel != null);
HandleChannelJoined(joinedChannel);
break;
case @"chat.channel.part":
Debug.Assert(message.Data != null);
Channel? partedChannel = JsonConvert.DeserializeObject<Channel>(message.Data.ToString());
Debug.Assert(partedChannel != null);
HandleChannelParted(partedChannel);
break;
case @"chat.message.new":
Debug.Assert(message.Data != null);
NewChatMessageData? messageData = JsonConvert.DeserializeObject<NewChatMessageData>(message.Data.ToString());
Debug.Assert(messageData != null);
foreach (var msg in messageData.Messages)
HandleChannelJoined(await getChannel(msg.ChannelId));
HandleMessages(messageData.Messages);
break;
}
}
private async Task<Channel> getChannel(long channelId)
{
if (channelsMap.TryGetValue(channelId, out Channel channel))
return channel;
var tsc = new TaskCompletionSource<Channel>();
var req = new GetChannelRequest(channelId);
req.Success += response =>
{
channelsMap[channelId] = response.Channel;
tsc.SetResult(response.Channel);
};
req.Failure += ex => tsc.SetException(ex);
API.Queue(req);
return await tsc.Task;
}
public override async ValueTask DisposeAsync()
{
await base.DisposeAsync();
await closeAsync();
socket.Dispose();
}
}
}

View File

@ -0,0 +1,46 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System.Net;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using osu.Game.Online.API;
using osu.Game.Online.API.Requests;
namespace osu.Game.Online.Notifications.WebSocket
{
/// <summary>
/// A connector for <see cref="WebSocketNotificationsClient"/>s that receive events via a websocket.
/// </summary>
public class WebSocketNotificationsClientConnector : NotificationsClientConnector
{
private readonly IAPIProvider api;
public WebSocketNotificationsClientConnector(IAPIProvider api)
: base(api)
{
this.api = api;
}
protected override async Task<NotificationsClient> BuildNotificationClientAsync(CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<string>();
var req = new GetNotificationsRequest();
req.Success += bundle => tcs.SetResult(bundle.Endpoint);
req.Failure += ex => tcs.SetException(ex);
api.Queue(req);
string endpoint = await tcs.Task;
ClientWebSocket socket = new ClientWebSocket();
socket.Options.SetRequestHeader(@"Authorization", @$"Bearer {api.AccessToken}");
socket.Options.Proxy = WebRequest.DefaultWebProxy;
if (socket.Options.Proxy != null)
socket.Options.Proxy.Credentials = CredentialCache.DefaultCredentials;
return new WebSocketNotificationsClient(socket, endpoint, api);
}
}
}

View File

@ -23,11 +23,13 @@ namespace osu.Game.Online
/// </summary> /// </summary>
public PersistentEndpointClient? CurrentConnection { get; private set; } public PersistentEndpointClient? CurrentConnection { get; private set; }
protected readonly IAPIProvider API;
private readonly IBindable<APIState> apiState = new Bindable<APIState>();
private readonly Bindable<bool> isConnected = new Bindable<bool>(); private readonly Bindable<bool> isConnected = new Bindable<bool>();
private readonly SemaphoreSlim connectionLock = new SemaphoreSlim(1); private readonly SemaphoreSlim connectionLock = new SemaphoreSlim(1);
private CancellationTokenSource connectCancelSource = new CancellationTokenSource(); private CancellationTokenSource connectCancelSource = new CancellationTokenSource();
private bool started;
private readonly IBindable<APIState> apiState = new Bindable<APIState>();
/// <summary> /// <summary>
/// Constructs a new <see cref="PersistentEndpointClientConnector"/>. /// Constructs a new <see cref="PersistentEndpointClientConnector"/>.
@ -35,8 +37,20 @@ namespace osu.Game.Online
/// <param name="api"> An API provider used to react to connection state changes.</param> /// <param name="api"> An API provider used to react to connection state changes.</param>
protected PersistentEndpointClientConnector(IAPIProvider api) protected PersistentEndpointClientConnector(IAPIProvider api)
{ {
API = api;
apiState.BindTo(api.State); apiState.BindTo(api.State);
}
/// <summary>
/// Attempts to connect and begins processing messages from the remote endpoint.
/// </summary>
public void Start()
{
if (started)
return;
apiState.BindValueChanged(_ => Task.Run(connectIfPossible), true); apiState.BindValueChanged(_ => Task.Run(connectIfPossible), true);
started = true;
} }
public Task Reconnect() public Task Reconnect()

View File

@ -910,19 +910,6 @@ namespace osu.Game
loadComponentSingleFile(new BackgroundBeatmapProcessor(), Add); loadComponentSingleFile(new BackgroundBeatmapProcessor(), Add);
chatOverlay.State.BindValueChanged(_ => updateChatPollRate());
// Multiplayer modes need to increase poll rate temporarily.
API.Activity.BindValueChanged(_ => updateChatPollRate(), true);
void updateChatPollRate()
{
channelManager.HighPollRate.Value =
chatOverlay.State.Value == Visibility.Visible
|| API.Activity.Value is UserActivity.InLobby
|| API.Activity.Value is UserActivity.InMultiplayerGame
|| API.Activity.Value is UserActivity.SpectatingMultiplayerGame;
}
Add(difficultyRecommender); Add(difficultyRecommender);
Add(externalLinkOpener = new ExternalLinkOpener()); Add(externalLinkOpener = new ExternalLinkOpener());
Add(new MusicKeyBindingHandler()); Add(new MusicKeyBindingHandler());

View File

@ -0,0 +1,35 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System.Threading;
using System.Threading.Tasks;
using osu.Game.Online.API;
using osu.Game.Online.Notifications;
namespace osu.Game.Tests
{
/// <summary>
/// A notifications client which polls for new messages every second.
/// </summary>
public class PollingNotificationsClient : NotificationsClient
{
public PollingNotificationsClient(IAPIProvider api)
: base(api)
{
}
public override Task ConnectAsync(CancellationToken cancellationToken)
{
Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
await API.PerformAsync(CreateFetchMessagesRequest());
await Task.Delay(1000, cancellationToken);
}
}, cancellationToken);
return Task.CompletedTask;
}
}
}

View File

@ -0,0 +1,24 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System.Threading;
using System.Threading.Tasks;
using osu.Game.Online.API;
using osu.Game.Online.Notifications;
namespace osu.Game.Tests
{
/// <summary>
/// A connector for <see cref="PollingNotificationsClient"/>s that poll for new messages.
/// </summary>
public class PollingNotificationsClientConnector : NotificationsClientConnector
{
public PollingNotificationsClientConnector(IAPIProvider api)
: base(api)
{
}
protected override Task<NotificationsClient> BuildNotificationClientAsync(CancellationToken cancellationToken)
=> Task.FromResult((NotificationsClient)new PollingNotificationsClient(API));
}
}