mirror of
https://github.com/ppy/osu.git
synced 2026-05-13 19:54:15 +08:00
Implement WebSocket server skeleton for external integrations (#37335)
- Supersedes / closes https://github.com/ppy/osu/pull/18129. Reasons I didn't use that PR are hopefully obvious upon comparing diffs but I can elaborate if they are not. - Single metric included for demonstration purposes. - Do not want to talk about further schema design at this time. - Specify `OSU_WEBSOCKET_SERVER=1` envvar to enable. - Can test consumption with [this five minute html job](https://github.com/user-attachments/files/26839923/index.html) (works even as a standalone file opened in browser, no CORS bs!) - There's a lot of inline comments, go read them. There are many WTFs because the .NET frozen websocket API is weird and stanky and reeks of the year 2007. The inline comments attempt to explain.
This commit is contained in:
committed by
GitHub
Unverified
parent
a7ac628e94
commit
9a2846539f
@@ -0,0 +1,13 @@
|
||||
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||
// See the LICENCE file in the repository root for full licence text.
|
||||
|
||||
using Newtonsoft.Json;
|
||||
|
||||
namespace osu.Desktop.IPC.Messages
|
||||
{
|
||||
public class HitCountMessage : OsuWebSocketMessage
|
||||
{
|
||||
[JsonProperty("new_hits")]
|
||||
public long NewHits { get; init; }
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||
// See the LICENCE file in the repository root for full licence text.
|
||||
|
||||
using Newtonsoft.Json;
|
||||
using osu.Framework.Extensions.TypeExtensions;
|
||||
|
||||
namespace osu.Desktop.IPC.Messages
|
||||
{
|
||||
public abstract class OsuWebSocketMessage
|
||||
{
|
||||
[JsonProperty("type")]
|
||||
public string Type { get; }
|
||||
|
||||
protected OsuWebSocketMessage()
|
||||
{
|
||||
Type = GetType().ReadableName();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||
// See the LICENCE file in the repository root for full licence text.
|
||||
|
||||
using System;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using osu.Desktop.IPC.Messages;
|
||||
using osu.Framework.Allocation;
|
||||
using osu.Framework.Bindables;
|
||||
using osu.Framework.Extensions;
|
||||
using osu.Framework.Graphics;
|
||||
using osu.Framework.Logging;
|
||||
using osu.Game.Configuration;
|
||||
using osu.Game.IPC;
|
||||
using osu.Game.Online.Multiplayer;
|
||||
using osu.Game.Rulesets.Scoring;
|
||||
using osu.Game.Scoring;
|
||||
using JsonConvert = Newtonsoft.Json.JsonConvert;
|
||||
|
||||
namespace osu.Desktop.IPC
|
||||
{
|
||||
public partial class OsuWebSocketProvider : Component
|
||||
{
|
||||
private WebSocketServer? server;
|
||||
private readonly Bindable<ScoreInfo> lastLocalScore = new Bindable<ScoreInfo>();
|
||||
|
||||
[BackgroundDependencyLoader]
|
||||
private void load(SessionStatics sessionStatics)
|
||||
{
|
||||
server = new WebSocketServer(49727);
|
||||
server.StartAsync().FireAndForget(onError: ex => Logger.Error(ex, "Failed to start websocket"));
|
||||
|
||||
sessionStatics.BindWith(Static.LastLocalUserScore, lastLocalScore);
|
||||
}
|
||||
|
||||
protected override void LoadComplete()
|
||||
{
|
||||
base.LoadComplete();
|
||||
|
||||
lastLocalScore.BindValueChanged(val =>
|
||||
{
|
||||
if (val.NewValue == null)
|
||||
return;
|
||||
|
||||
if (server?.IsRunning != true)
|
||||
return;
|
||||
|
||||
var msg = new HitCountMessage { NewHits = val.NewValue.Statistics.Where(kv => kv.Key.IsBasic() && kv.Key.IsHit()).Sum(kv => kv.Value) };
|
||||
broadcast(msg);
|
||||
});
|
||||
}
|
||||
|
||||
private void broadcast(OsuWebSocketMessage message)
|
||||
{
|
||||
if (server?.IsRunning != true)
|
||||
return;
|
||||
|
||||
string messageString = JsonConvert.SerializeObject(message);
|
||||
server.BroadcastAsync(messageString).FireAndForget();
|
||||
}
|
||||
|
||||
protected override void Dispose(bool isDisposing)
|
||||
{
|
||||
base.Dispose(isDisposing);
|
||||
|
||||
if (server?.IsRunning == true)
|
||||
{
|
||||
var cts = new CancellationTokenSource();
|
||||
cts.CancelAfter(TimeSpan.FromSeconds(10));
|
||||
server.StopAsync(cts.Token).WaitSafely();
|
||||
server = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ using System.IO;
|
||||
using System.Reflection;
|
||||
using System.Runtime.Versioning;
|
||||
using Microsoft.Win32;
|
||||
using osu.Desktop.IPC;
|
||||
using osu.Desktop.Performance;
|
||||
using osu.Desktop.Security;
|
||||
using osu.Framework.Platform;
|
||||
@@ -35,6 +36,8 @@ namespace osu.Desktop
|
||||
|
||||
public bool IsFirstRun { get; init; }
|
||||
|
||||
public bool EnableWebSocketServer { get; init; }
|
||||
|
||||
public OsuGameDesktop(string[]? args = null)
|
||||
: base(args)
|
||||
{
|
||||
@@ -148,6 +151,9 @@ namespace osu.Desktop
|
||||
|
||||
osuSchemeLinkIPCChannel = new OsuSchemeLinkIPCChannel(Host, this);
|
||||
archiveImportIPCChannel = new ArchiveImportIPCChannel(Host, this);
|
||||
|
||||
if (EnableWebSocketServer)
|
||||
Add(new OsuWebSocketProvider());
|
||||
}
|
||||
|
||||
public override void SetHost(GameHost host)
|
||||
|
||||
@@ -140,7 +140,8 @@ namespace osu.Desktop
|
||||
{
|
||||
host.Run(new OsuGameDesktop(args)
|
||||
{
|
||||
IsFirstRun = isFirstRun
|
||||
IsFirstRun = isFirstRun,
|
||||
EnableWebSocketServer = Environment.GetEnvironmentVariable("OSU_WEBSOCKET_SERVER") == "1",
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||
// See the LICENCE file in the repository root for full licence text.
|
||||
|
||||
using System;
|
||||
using System.Net.WebSockets;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using osu.Game.IPC;
|
||||
|
||||
namespace osu.Game.Tests.IPC
|
||||
{
|
||||
public sealed class WebSocketClient : IDisposable
|
||||
{
|
||||
public event Action<string>? MessageReceived;
|
||||
public event Action? Closed;
|
||||
|
||||
private readonly int port;
|
||||
private WebSocketChannel? channel;
|
||||
|
||||
public WebSocketClient(int port)
|
||||
{
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
public async Task Start(CancellationToken cancellationToken = default)
|
||||
{
|
||||
var webSocket = new ClientWebSocket();
|
||||
await webSocket.ConnectAsync(new Uri($@"ws://localhost:{port}/"), cancellationToken);
|
||||
channel = new WebSocketChannel(webSocket);
|
||||
channel.MessageReceived += msg => MessageReceived?.Invoke(msg);
|
||||
channel.ClosedPrematurely += () => Closed?.Invoke();
|
||||
channel.Start(cancellationToken);
|
||||
}
|
||||
|
||||
public async Task SendAsync(string message)
|
||||
{
|
||||
if (channel == null)
|
||||
throw new InvalidOperationException($@"Must {nameof(Start)} first.");
|
||||
|
||||
await channel.SendAsync(message);
|
||||
}
|
||||
|
||||
public async Task StopAsync(CancellationToken stoppingToken = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (channel != null)
|
||||
await channel.StopAsync(stoppingToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// has to be caught manually because outer task isn't accepting `stoppingToken`.
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
channel?.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,274 @@
|
||||
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||
// See the LICENCE file in the repository root for full licence text.
|
||||
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using NUnit.Framework;
|
||||
using osu.Game.IPC;
|
||||
using osu.Game.Online.Multiplayer;
|
||||
|
||||
namespace osu.Game.Tests.IPC
|
||||
{
|
||||
[TestFixture]
|
||||
public class WebSocketTest
|
||||
{
|
||||
[Test]
|
||||
public async Task TestClientInitiatedDuplexCommunication()
|
||||
{
|
||||
const int port = 54321;
|
||||
|
||||
var server = new WebSocketServer(port);
|
||||
var client = new WebSocketClient(port);
|
||||
|
||||
var duplexComplete = new ManualResetEventSlim(false);
|
||||
|
||||
server.MessageReceived += (clientId, msg) =>
|
||||
{
|
||||
if (msg != "PING")
|
||||
return;
|
||||
|
||||
// ReSharper disable once AccessToDisposedClosure
|
||||
server.SendAsync(clientId, "PONG").FireAndForget();
|
||||
};
|
||||
client.MessageReceived += msg =>
|
||||
{
|
||||
if (msg != "PONG")
|
||||
return;
|
||||
|
||||
duplexComplete.Set();
|
||||
};
|
||||
|
||||
await server.StartAsync();
|
||||
await client.Start();
|
||||
|
||||
await client.SendAsync("PING");
|
||||
Assert.That(duplexComplete.Wait(10_000));
|
||||
|
||||
await client.StopAsync();
|
||||
await server.StopAsync();
|
||||
|
||||
client.Dispose();
|
||||
server.Dispose();
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task TestServerInitiatedDuplexCommunication()
|
||||
{
|
||||
const int port = 54321;
|
||||
|
||||
var server = new WebSocketServer(port);
|
||||
var client = new WebSocketClient(port);
|
||||
|
||||
var clientConnected = new ManualResetEventSlim();
|
||||
var duplexComplete = new ManualResetEventSlim();
|
||||
|
||||
client.MessageReceived += msg =>
|
||||
{
|
||||
if (msg != "PING")
|
||||
return;
|
||||
|
||||
// ReSharper disable once AccessToDisposedClosure
|
||||
client.SendAsync("PONG").FireAndForget();
|
||||
};
|
||||
server.ClientConnected += _ => clientConnected.Set();
|
||||
server.MessageReceived += (_, msg) =>
|
||||
{
|
||||
if (msg != "PONG")
|
||||
return;
|
||||
|
||||
duplexComplete.Set();
|
||||
};
|
||||
|
||||
await server.StartAsync();
|
||||
await client.Start();
|
||||
Assert.That(clientConnected.Wait(10_000));
|
||||
|
||||
await server.SendAsync(1, "PING");
|
||||
Assert.That(duplexComplete.Wait(10_000));
|
||||
|
||||
await client.StopAsync();
|
||||
await server.StopAsync();
|
||||
|
||||
client.Dispose();
|
||||
server.Dispose();
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task TestServerBroadcast()
|
||||
{
|
||||
const int port = 54321;
|
||||
const int client_count = 5;
|
||||
|
||||
var server = new WebSocketServer(port);
|
||||
var clients = new List<WebSocketClient>(client_count);
|
||||
var connectionCountdown = new CountdownEvent(client_count);
|
||||
var receiptCountdown = new CountdownEvent(client_count);
|
||||
|
||||
for (int i = 0; i < client_count; ++i)
|
||||
{
|
||||
var client = new WebSocketClient(port);
|
||||
client.MessageReceived += msg =>
|
||||
{
|
||||
if (msg != "HI ALL")
|
||||
return;
|
||||
|
||||
receiptCountdown.Signal();
|
||||
};
|
||||
clients.Add(client);
|
||||
}
|
||||
|
||||
server.ClientConnected += _ => connectionCountdown.Signal();
|
||||
|
||||
await server.StartAsync();
|
||||
|
||||
foreach (var client in clients)
|
||||
await client.Start();
|
||||
Assert.That(connectionCountdown.Wait(10_000));
|
||||
|
||||
await server.BroadcastAsync("HI ALL");
|
||||
Assert.That(receiptCountdown.Wait(10_000));
|
||||
|
||||
foreach (var client in clients)
|
||||
{
|
||||
await client.StopAsync();
|
||||
client.Dispose();
|
||||
}
|
||||
|
||||
await server.StopAsync();
|
||||
server.Dispose();
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task TestClientSoftAborts()
|
||||
{
|
||||
const int port = 54321;
|
||||
|
||||
var server = new WebSocketServer(port);
|
||||
var client = new WebSocketClient(port);
|
||||
|
||||
await server.StartAsync();
|
||||
await client.Start();
|
||||
|
||||
await client.StopAsync();
|
||||
client.Dispose();
|
||||
|
||||
await server.StopAsync();
|
||||
server.Dispose();
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task TestClientHardAborts()
|
||||
{
|
||||
const int port = 54321;
|
||||
|
||||
var server = new WebSocketServer(port);
|
||||
var client = new WebSocketClient(port);
|
||||
|
||||
await server.StartAsync();
|
||||
await client.Start();
|
||||
|
||||
await client.StopAsync(new CancellationToken(true));
|
||||
client.Dispose();
|
||||
|
||||
await server.StopAsync();
|
||||
server.Dispose();
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task TestServerSoftAborts()
|
||||
{
|
||||
const int port = 54321;
|
||||
|
||||
var server = new WebSocketServer(port);
|
||||
var client = new WebSocketClient(port);
|
||||
|
||||
await server.StartAsync();
|
||||
await client.Start();
|
||||
|
||||
await server.StopAsync();
|
||||
server.Dispose();
|
||||
|
||||
await client.StopAsync();
|
||||
client.Dispose();
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task TestServerHardAborts()
|
||||
{
|
||||
const int port = 54321;
|
||||
|
||||
var server = new WebSocketServer(port);
|
||||
var client = new WebSocketClient(port);
|
||||
|
||||
await server.StartAsync();
|
||||
await client.Start();
|
||||
|
||||
await server.StopAsync(new CancellationToken(true));
|
||||
server.Dispose();
|
||||
|
||||
await client.StopAsync();
|
||||
client.Dispose();
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task TestClientMessageTooLong()
|
||||
{
|
||||
const int port = 54321;
|
||||
|
||||
var server = new WebSocketServer(port);
|
||||
var client = new WebSocketClient(port);
|
||||
|
||||
var clientClosed = new ManualResetEventSlim();
|
||||
client.Closed += clientClosed.Set;
|
||||
|
||||
await server.StartAsync();
|
||||
await client.Start();
|
||||
|
||||
await client.SendAsync(new string('0', 9999));
|
||||
Assert.That(clientClosed.Wait(10_000));
|
||||
await client.StopAsync();
|
||||
client.Dispose();
|
||||
|
||||
var client2 = new WebSocketClient(port);
|
||||
|
||||
var duplexComplete = new ManualResetEventSlim();
|
||||
server.MessageReceived += (clientId, msg) =>
|
||||
{
|
||||
if (msg != "PING")
|
||||
return;
|
||||
|
||||
// ReSharper disable once AccessToDisposedClosure
|
||||
server.SendAsync(clientId, "PONG").FireAndForget();
|
||||
};
|
||||
client2.MessageReceived += msg =>
|
||||
{
|
||||
if (msg != "PONG")
|
||||
return;
|
||||
|
||||
duplexComplete.Set();
|
||||
};
|
||||
|
||||
await client2.Start();
|
||||
await client2.SendAsync("PING");
|
||||
Assert.That(duplexComplete.Wait(10000));
|
||||
|
||||
await client2.StopAsync();
|
||||
await server.StopAsync();
|
||||
|
||||
client2.Dispose();
|
||||
server.Dispose();
|
||||
}
|
||||
|
||||
[Test]
|
||||
public async Task TestStartStopServerWithoutReceivingClients()
|
||||
{
|
||||
const int port = 54321;
|
||||
|
||||
var server = new WebSocketServer(port);
|
||||
await server.StartAsync();
|
||||
await server.StopAsync();
|
||||
server.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,169 @@
|
||||
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||
// See the LICENCE file in the repository root for full licence text.
|
||||
|
||||
using System;
|
||||
using System.Net.WebSockets;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace osu.Game.IPC
|
||||
{
|
||||
/// <summary>
|
||||
/// Represents a WebSocket-based communication channel.
|
||||
/// Only supports UTF-8 string-based messages, of maximum size of <see cref="max_message_size"/> bytes.
|
||||
/// </summary>
|
||||
public sealed class WebSocketChannel : IDisposable
|
||||
{
|
||||
public event Action<string>? MessageReceived;
|
||||
public event Action? ClosedPrematurely;
|
||||
|
||||
private const int max_message_size = 4096; // bytes
|
||||
|
||||
private readonly byte[] receiveBuffer = new byte[max_message_size];
|
||||
private int currentBufferPosition;
|
||||
|
||||
private readonly WebSocket webSocket;
|
||||
private Task? readWriteTask;
|
||||
private readonly CancellationTokenSource runningTokenSource = new CancellationTokenSource();
|
||||
private bool isDisposed;
|
||||
|
||||
public WebSocketChannel(WebSocket webSocket)
|
||||
{
|
||||
this.webSocket = webSocket;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Starts the channel.
|
||||
/// </summary>
|
||||
/// <param name="cancellationToken">Use this to abort the start.</param>
|
||||
public void Start(CancellationToken cancellationToken)
|
||||
{
|
||||
if (readWriteTask?.Status >= TaskStatus.Running)
|
||||
throw new InvalidOperationException($@"Cannot {nameof(Start)} more than once.");
|
||||
|
||||
readWriteTask = Task.Run(readWriteLoop, cancellationToken);
|
||||
}
|
||||
|
||||
private async Task readWriteLoop()
|
||||
{
|
||||
var token = runningTokenSource.Token;
|
||||
|
||||
while (!token.IsCancellationRequested)
|
||||
{
|
||||
ValueWebSocketReceiveResult result;
|
||||
|
||||
try
|
||||
{
|
||||
result = await webSocket.ReceiveAsync(receiveBuffer.AsMemory(currentBufferPosition), token).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// normal when `token` is cancelled.
|
||||
// at this point the websocket will have entered `Aborted` state on its own, so no further clean-up can be done.
|
||||
return;
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
// could throw something like `WebSocketException`s from the other side hard-aborting.
|
||||
ClosedPrematurely?.Invoke();
|
||||
return;
|
||||
}
|
||||
|
||||
currentBufferPosition += result.Count;
|
||||
|
||||
if (webSocket.State > WebSocketState.Open)
|
||||
{
|
||||
if (webSocket.State == WebSocketState.CloseReceived)
|
||||
{
|
||||
try
|
||||
{
|
||||
// attempt to complete the close handshake nicely.
|
||||
await webSocket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, @"Received close request", token).ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// an attempt was made, and failed. bail.
|
||||
}
|
||||
}
|
||||
|
||||
ClosedPrematurely?.Invoke();
|
||||
return;
|
||||
}
|
||||
|
||||
if (result.MessageType == WebSocketMessageType.Binary)
|
||||
{
|
||||
// see https://github.com/dotnet/runtime/issues/81762#issuecomment-1421029475 for difference between `CloseAsync()` and `CloseOutputAsync()`.
|
||||
// there is basically no incentive to use `CloseAsync()` in these error scenarios. the point is to drop the errant peer on the floor immediately.
|
||||
await webSocket.CloseOutputAsync(WebSocketCloseStatus.InvalidMessageType, @"Binary messages are not supported.", token).ConfigureAwait(false);
|
||||
ClosedPrematurely?.Invoke();
|
||||
return;
|
||||
}
|
||||
|
||||
if (currentBufferPosition >= max_message_size)
|
||||
{
|
||||
await webSocket.CloseOutputAsync(WebSocketCloseStatus.MessageTooBig, $@"Exceeded maximum message size of {max_message_size} bytes.", token).ConfigureAwait(false);
|
||||
ClosedPrematurely?.Invoke();
|
||||
return;
|
||||
}
|
||||
|
||||
if (result.EndOfMessage)
|
||||
{
|
||||
string message;
|
||||
|
||||
try
|
||||
{
|
||||
message = Encoding.UTF8.GetString(receiveBuffer, 0, currentBufferPosition);
|
||||
}
|
||||
catch (ArgumentException)
|
||||
{
|
||||
await webSocket.CloseOutputAsync(WebSocketCloseStatus.InvalidPayloadData, @"UTF-8 encoded strings expected.", token).ConfigureAwait(false);
|
||||
ClosedPrematurely?.Invoke();
|
||||
return;
|
||||
}
|
||||
|
||||
MessageReceived?.Invoke(message);
|
||||
Array.Fill(receiveBuffer, (byte)0, 0, currentBufferPosition);
|
||||
currentBufferPosition = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async Task SendAsync(string message)
|
||||
{
|
||||
if (readWriteTask == null)
|
||||
throw new InvalidOperationException($@"Must {nameof(Start)} first.");
|
||||
|
||||
byte[] bytes = Encoding.UTF8.GetBytes(message);
|
||||
await webSocket.SendAsync(bytes, WebSocketMessageType.Text, true, CancellationToken.None).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stops the channel.
|
||||
/// </summary>
|
||||
/// <param name="stoppingToken">Cancel this to transition from a graceful shutdown to a forced shutdown.</param>
|
||||
public async Task StopAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
if (isDisposed)
|
||||
return;
|
||||
|
||||
await runningTokenSource.CancelAsync().ConfigureAwait(false);
|
||||
|
||||
if (readWriteTask != null)
|
||||
await readWriteTask.WaitAsync(stoppingToken).ConfigureAwait(false);
|
||||
|
||||
if (stoppingToken.IsCancellationRequested)
|
||||
webSocket.Abort();
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (isDisposed)
|
||||
return;
|
||||
|
||||
isDisposed = true;
|
||||
webSocket.Dispose();
|
||||
runningTokenSource.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,280 @@
|
||||
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||
// See the LICENCE file in the repository root for full licence text.
|
||||
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Diagnostics;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Net.WebSockets;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using osu.Framework.Logging;
|
||||
|
||||
namespace osu.Game.IPC
|
||||
{
|
||||
/// <summary>
|
||||
/// Implements a WebSocket server to be used for external integrations such as streaming overlays.
|
||||
/// The server can only listen on <c>localhost</c>, on the port given in the constructor.
|
||||
/// Only UTF-8 string-based messages are supported. Binary messages are not supported.
|
||||
/// String-based messages must not exceed <see cref="WebSocketChannel.max_message_size"/> bytes.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// This implementation uses <see cref="HttpListener"/> internally.
|
||||
/// This is a frozen .NET API as per https://github.com/dotnet/runtime/issues/63941#issuecomment-1205259894.
|
||||
/// The reason of using this API instead of ASP.NET directly via frameworks like SignalR are as follows:
|
||||
/// <list type="bullet">
|
||||
/// <item>
|
||||
/// This is intended to be a <b>simple</b> server.
|
||||
/// There are no reliability guarantees, no delivery guarantees, no authorisation.
|
||||
/// The operation of this server is <b>best-effort</b>.
|
||||
/// Due to this, ASP.NET is surplus to requirements.
|
||||
/// </item>
|
||||
/// <item>Including ASP.NET wholesale would have a negative impact on binary size.</item>
|
||||
/// <item>
|
||||
/// Using ASP.NET could expose end users' PCs to having things enabled that shouldn't be enabled via little-known configuration toggles.
|
||||
/// One pertinent example is the <c>ASPNETCORE_URLS</c> environment variable which silently changes which endpoints an ASP.NET service listens on.
|
||||
/// </item>
|
||||
/// <item>
|
||||
/// ASP.NET does not generally fit into the paradigm of being <i>part</i> of an application.
|
||||
/// The way ASP.NET apps are structured, is that they generally <i>take over</i> the functioning of an application.
|
||||
/// Therefore, there is not necessarily a given that ASP.NET bundled inside the client will fully stop functioning even when explicitly asked.
|
||||
/// </item>
|
||||
/// </list>
|
||||
/// </remarks>
|
||||
public sealed class WebSocketServer : IDisposable
|
||||
{
|
||||
/// <summary>
|
||||
/// Whether the server is currently running and listening for connection requests.
|
||||
/// </summary>
|
||||
public bool IsRunning => handleRequestTask != null && !runningTokenSource.IsCancellationRequested;
|
||||
|
||||
/// <summary>
|
||||
/// Invoked when a client is connected.
|
||||
/// The argument is the assigned ID of the client.
|
||||
/// </summary>
|
||||
public event Action<int>? ClientConnected;
|
||||
|
||||
/// <summary>
|
||||
/// Invoked when a message is received.
|
||||
/// The first argument is the ID of the sender; the second is the content of the received message.
|
||||
/// </summary>
|
||||
public event Action<int, string>? MessageReceived;
|
||||
|
||||
private readonly object syncRoot = new object();
|
||||
|
||||
private readonly string prefix;
|
||||
private readonly Logger logger;
|
||||
|
||||
private HttpListener? listener;
|
||||
private readonly ManualResetEventSlim contextResetEvent = new ManualResetEventSlim();
|
||||
private Task? handleRequestTask;
|
||||
|
||||
private int channelCounter;
|
||||
private readonly ConcurrentDictionary<int, WebSocketChannel> channels = new ConcurrentDictionary<int, WebSocketChannel>();
|
||||
|
||||
private readonly CancellationTokenSource runningTokenSource = new CancellationTokenSource();
|
||||
private bool isDisposed;
|
||||
|
||||
public WebSocketServer(int port)
|
||||
{
|
||||
// Restricting to only providing a port is intentional for several reasons:
|
||||
// - Use of HTTP (no efforts are taken to make HTTPS work).
|
||||
// - Attack surface reduction (doesn't accidentally listen on all interfaces, potentially getting hit by something external).
|
||||
// Some users with setups that use a second "streaming PC" or similar will complain. They can set up proxies at their own peril.
|
||||
prefix = $@"http://localhost:{port}/";
|
||||
|
||||
logger = Logger.GetLogger(@"websocket");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Starts the server.
|
||||
/// </summary>
|
||||
/// <param name="cancellationToken">Use this to cancel start-up.</param>
|
||||
public Task StartAsync(CancellationToken cancellationToken = default) => Task.Run(() =>
|
||||
{
|
||||
lock (syncRoot)
|
||||
{
|
||||
if (listener != null)
|
||||
throw new InvalidOperationException($@"Cannot call {nameof(StartAsync)} multiple times.");
|
||||
|
||||
listener = new HttpListener();
|
||||
listener.Prefixes.Add(prefix);
|
||||
listener.Start();
|
||||
handleRequestTask = Task.Run(handleRequests, cancellationToken);
|
||||
logger.Add($@"Listening on {prefix}.");
|
||||
}
|
||||
}, cancellationToken);
|
||||
|
||||
private async Task handleRequests()
|
||||
{
|
||||
Debug.Assert(listener != null);
|
||||
|
||||
while (!runningTokenSource.IsCancellationRequested)
|
||||
{
|
||||
HttpListenerContext? context = null;
|
||||
|
||||
// `listener.GetContextAsync()` exists but is unusable here without ugly hacks.
|
||||
// as per source inspection, it is a thin wrapper over `{Begin,End}GetContext()`.
|
||||
// the problem with that is that the method is *hard-blocking* and *does not accept cancellation*.
|
||||
// therefore, if it's called in a processing loop like this
|
||||
// that we are expecting to be able to cut short at any moment's notice to shut things down,
|
||||
// it's not going to yield and will keep waiting forever.
|
||||
// a `listener.Stop()` from another thread does cut the call short, but also ends up in an unclean termination.
|
||||
// what "unclean termination" means here depends on the OS we're running on
|
||||
// (different exceptions are observed on macOS and Windows, at least).
|
||||
// therefore use the old asynchronous paradigm with manual signalling when the context is available.
|
||||
contextResetEvent.Reset();
|
||||
listener.BeginGetContext(iar =>
|
||||
{
|
||||
try
|
||||
{
|
||||
context = ((HttpListener)iar.AsyncState!).EndGetContext(iar);
|
||||
contextResetEvent.Set();
|
||||
}
|
||||
catch (HttpListenerException ex) when (ex.ErrorCode == 995)
|
||||
{
|
||||
// occurs on Windows when the listener is stopped.
|
||||
}
|
||||
}, listener);
|
||||
WaitHandle.WaitAny([contextResetEvent.WaitHandle, runningTokenSource.Token.WaitHandle]);
|
||||
|
||||
// either we have a context to use, or the cancellation fired.
|
||||
// if it's the latter, terminate processing loop.
|
||||
if (runningTokenSource.IsCancellationRequested)
|
||||
return;
|
||||
|
||||
Debug.Assert(context != null);
|
||||
|
||||
var request = context.Request;
|
||||
var response = context.Response;
|
||||
|
||||
if (!request.IsWebSocketRequest)
|
||||
{
|
||||
logger.Add($@"Received non-websocket request from {request.RemoteEndPoint}. Requesting upgrade.");
|
||||
response.StatusCode = (int)HttpStatusCode.UpgradeRequired;
|
||||
response.Headers.Add(HttpRequestHeader.Upgrade, @"websocket");
|
||||
response.Close();
|
||||
continue;
|
||||
}
|
||||
|
||||
HttpListenerWebSocketContext wsContext;
|
||||
|
||||
try
|
||||
{
|
||||
wsContext = await context.AcceptWebSocketAsync(null).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.Add($@"Failed to accept websocket connection from {request.RemoteEndPoint}.", LogLevel.Error, ex);
|
||||
continue;
|
||||
}
|
||||
|
||||
int channelId = Interlocked.Increment(ref channelCounter);
|
||||
var wsChannel = new WebSocketChannel(wsContext.WebSocket);
|
||||
channels[channelId] = wsChannel;
|
||||
wsChannel.MessageReceived += msg => MessageReceived?.Invoke(channelId, msg);
|
||||
wsChannel.ClosedPrematurely += () => onChannelClosed(channelId);
|
||||
wsChannel.Start(runningTokenSource.Token);
|
||||
logger.Add($@"Accepted websocket connection from {request.RemoteEndPoint} as client #{channelId}.");
|
||||
ClientConnected?.Invoke(channelId);
|
||||
}
|
||||
}
|
||||
|
||||
private void onChannelClosed(int channelId)
|
||||
{
|
||||
if (channels.TryRemove(channelId, out var channel))
|
||||
channel.Dispose();
|
||||
logger.Add($@"Connection with client #{channelId} closed.");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sends <paramref name="message"/> to the specific client with the given <paramref name="clientId"/>.
|
||||
/// </summary>
|
||||
/// <exception cref="ArgumentException"><paramref name="clientId"/> is not known.</exception>
|
||||
public async Task SendAsync(int clientId, string message)
|
||||
{
|
||||
if (!channels.TryGetValue(clientId, out var channel))
|
||||
throw new ArgumentException($@"Client {clientId} is not known.");
|
||||
|
||||
logger.Add($@"Sending to client {clientId}: {message}");
|
||||
await channel.SendAsync(message).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sends <paramref name="message"/> to all connected clients.
|
||||
/// </summary>
|
||||
public Task BroadcastAsync(string message)
|
||||
{
|
||||
logger.Add($@"Broadcasting to all clients: {message}");
|
||||
return Task.WhenAll(channels.Values.Select(ch => ch.SendAsync(message)).ToArray());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stops the server.
|
||||
/// </summary>
|
||||
/// <param name="stoppingToken">Cancel this to transition from a graceful shutdown to a forced shutdown.</param>
|
||||
public Task StopAsync(CancellationToken stoppingToken = default) => Task.Run(async () =>
|
||||
{
|
||||
if (isDisposed)
|
||||
return;
|
||||
|
||||
logger.Add(@"Stopping websocket server...");
|
||||
|
||||
// of note, ordering here is important - the token is supposed to be cancelled *before* the listener is stopped.
|
||||
// see `readWriteTask()` and the treatment of early cancellation for answer why.
|
||||
await runningTokenSource.CancelAsync().ConfigureAwait(false);
|
||||
|
||||
if (handleRequestTask != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
await handleRequestTask.WaitAsync(stoppingToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// has to be caught manually because outer task isn't accepting `stoppingToken`.
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
listener?.Stop();
|
||||
}
|
||||
catch (ObjectDisposedException)
|
||||
{
|
||||
// observed to intermittently fire on unices in unclear circumstances. tragic, but also irrelevant at this point. the point is to stop.
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await Task.WhenAll(channels.Values.Select(ch => ch.StopAsync(stoppingToken)).ToArray()).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// has to be caught manually because outer task isn't accepting `stoppingToken`.
|
||||
}
|
||||
|
||||
logger.Add(@"Websocket server stopped.");
|
||||
}, CancellationToken.None); // we always want this task to start running. passing `stoppingToken` here would mean potentially never even scheduling it for execution.
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (isDisposed)
|
||||
return;
|
||||
|
||||
isDisposed = true;
|
||||
|
||||
// no clue why this isn't accessible without casting.
|
||||
// sidebar: `Stop()` unregisters addresses on Windows, but `Abort()` doesn't!
|
||||
// this `Dispose()` implementation calls the former.
|
||||
(listener as IDisposable)?.Dispose();
|
||||
|
||||
foreach (var channel in channels.Values)
|
||||
channel.Dispose();
|
||||
|
||||
runningTokenSource.Dispose();
|
||||
contextResetEvent.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user