1
0
mirror of https://github.com/ppy/osu.git synced 2025-01-27 12:42:54 +08:00
This commit is contained in:
smoogipoo 2021-01-27 01:20:50 +09:00
parent 085115cba5
commit 248989b3eb
3 changed files with 51 additions and 28 deletions

View File

@ -4,6 +4,7 @@
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using NUnit.Framework; using NUnit.Framework;
using osu.Game.Extensions;
using osu.Game.Utils; using osu.Game.Utils;
namespace osu.Game.Tests.NonVisual namespace osu.Game.Tests.NonVisual
@ -13,14 +14,22 @@ namespace osu.Game.Tests.NonVisual
{ {
private TaskChain taskChain; private TaskChain taskChain;
private int currentTask; private int currentTask;
private CancellationTokenSource globalCancellationToken;
[SetUp] [SetUp]
public void Setup() public void Setup()
{ {
globalCancellationToken = new CancellationTokenSource();
taskChain = new TaskChain(); taskChain = new TaskChain();
currentTask = 0; currentTask = 0;
} }
[TearDown]
public void TearDown()
{
globalCancellationToken?.Cancel();
}
[Test] [Test]
public async Task TestChainedTasksRunSequentially() public async Task TestChainedTasksRunSequentially()
{ {
@ -65,17 +74,40 @@ namespace osu.Game.Tests.NonVisual
Assert.That(task3.task.Result, Is.EqualTo(2)); Assert.That(task3.task.Result, Is.EqualTo(2));
} }
[Test]
public async Task TestChainedTaskDoesNotCompleteBeforeChildTasks()
{
var mutex = new ManualResetEventSlim(false);
var task = taskChain.Add(async () =>
{
await Task.Run(() => mutex.Wait(globalCancellationToken.Token)).CatchUnobservedExceptions();
});
// Allow task to potentially complete
Thread.Sleep(1000);
Assert.That(task.IsCompleted, Is.False);
// Allow the task to complete.
mutex.Set();
await task;
}
private (Task<int> task, ManualResetEventSlim mutex, CancellationTokenSource cancellation) addTask() private (Task<int> task, ManualResetEventSlim mutex, CancellationTokenSource cancellation) addTask()
{ {
var mutex = new ManualResetEventSlim(false); var mutex = new ManualResetEventSlim(false);
var cancellationSource = new CancellationTokenSource();
var completionSource = new TaskCompletionSource<int>(); var completionSource = new TaskCompletionSource<int>();
var cancellationSource = new CancellationTokenSource();
var token = CancellationTokenSource.CreateLinkedTokenSource(cancellationSource.Token, globalCancellationToken.Token);
taskChain.Add(() => taskChain.Add(() =>
{ {
mutex.Wait(CancellationToken.None); mutex.Wait(globalCancellationToken.Token);
completionSource.SetResult(Interlocked.Increment(ref currentTask)); completionSource.SetResult(Interlocked.Increment(ref currentTask));
}, cancellationSource.Token); }, token.Token);
return (completionSource.Task, mutex, cancellationSource); return (completionSource.Task, mutex, cancellationSource);
} }

View File

@ -182,7 +182,7 @@ namespace osu.Game.Online.Multiplayer
return joinOrLeaveTaskChain.Add(async () => return joinOrLeaveTaskChain.Add(async () =>
{ {
await scheduledReset; await scheduledReset;
await LeaveRoomInternal().CatchUnobservedExceptions(); await LeaveRoomInternal();
}); });
} }

View File

@ -14,8 +14,8 @@ namespace osu.Game.Utils
/// </summary> /// </summary>
public class TaskChain public class TaskChain
{ {
private readonly object currentTaskLock = new object(); private readonly object finalTaskLock = new object();
private Task? currentTask; private Task? finalTask;
/// <summary> /// <summary>
/// Adds a new task to the end of this <see cref="TaskChain"/>. /// Adds a new task to the end of this <see cref="TaskChain"/>.
@ -23,31 +23,22 @@ namespace osu.Game.Utils
/// <param name="action">The action to be executed.</param> /// <param name="action">The action to be executed.</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> for this task. Does not affect further tasks in the chain.</param> /// <param name="cancellationToken">The <see cref="CancellationToken"/> for this task. Does not affect further tasks in the chain.</param>
/// <returns>The awaitable <see cref="Task"/>.</returns> /// <returns>The awaitable <see cref="Task"/>.</returns>
public Task Add(Action action, CancellationToken cancellationToken = default) public async Task Add(Action action, CancellationToken cancellationToken = default)
{ {
lock (currentTaskLock) Task? previousTask;
{ Task currentTask;
// Note: Attaching the cancellation token to the continuation could lead to re-ordering of tasks in the chain.
// Therefore, the cancellation token is not used to cancel the continuation but only the run of each task.
if (currentTask == null)
{
currentTask = Task.Run(() =>
{
cancellationToken.ThrowIfCancellationRequested();
action();
}, CancellationToken.None);
}
else
{
currentTask = currentTask.ContinueWith(_ =>
{
cancellationToken.ThrowIfCancellationRequested();
action();
}, CancellationToken.None);
}
return currentTask; lock (finalTaskLock)
{
previousTask = finalTask;
finalTask = currentTask = new Task(action, cancellationToken);
} }
if (previousTask != null)
await previousTask;
currentTask.Start();
await currentTask;
} }
} }
} }