From 1b1f4c9c09cf8669b957b07a4888cf47dcd67199 Mon Sep 17 00:00:00 2001 From: smoogipoo Date: Mon, 16 Nov 2020 20:35:22 +0900 Subject: [PATCH 1/4] Refactor user request to fix threadsafety issues --- osu.Game/Database/UserLookupCache.cs | 123 ++++++++++----------------- 1 file changed, 47 insertions(+), 76 deletions(-) diff --git a/osu.Game/Database/UserLookupCache.cs b/osu.Game/Database/UserLookupCache.cs index c85ad6d651..05ba9c882b 100644 --- a/osu.Game/Database/UserLookupCache.cs +++ b/osu.Game/Database/UserLookupCache.cs @@ -1,7 +1,6 @@ // Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. // See the LICENCE file in the repository root for full licence text. -using System; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -15,103 +14,75 @@ namespace osu.Game.Database { public class UserLookupCache : MemoryCachingComponent { - private readonly HashSet nextTaskIDs = new HashSet(); - [Resolved] private IAPIProvider api { get; set; } - private readonly object taskAssignmentLock = new object(); - - private Task> pendingRequest; - - /// - /// Whether has already grabbed its IDs. - /// - private bool pendingRequestConsumedIDs; - public Task GetUserAsync(int userId, CancellationToken token = default) => GetAsync(userId, token); protected override async Task ComputeValueAsync(int lookup, CancellationToken token = default) - { - var users = await getQueryTaskForUser(lookup); - return users.FirstOrDefault(u => u.Id == lookup); - } + => await queryUser(lookup); - /// - /// Return the task responsible for fetching the provided user. - /// This may be part of a larger batch lookup to reduce web requests. - /// - /// The user to lookup. - /// The task responsible for the lookup. - private Task> getQueryTaskForUser(int userId) + private readonly List<(int id, TaskCompletionSource)> pendingUserTasks = new List<(int, TaskCompletionSource)>(); + private Task pendingRequestTask; + private readonly object taskAssignmentLock = new object(); + + private Task queryUser(int userId) { lock (taskAssignmentLock) { - nextTaskIDs.Add(userId); + var tcs = new TaskCompletionSource(); - // if there's a pending request which hasn't been started yet (and is not yet full), we can wait on it. - if (pendingRequest != null && !pendingRequestConsumedIDs && nextTaskIDs.Count < 50) - return pendingRequest; + // Add to the queue. + pendingUserTasks.Add((userId, tcs)); - return queueNextTask(nextLookup); - } + // Create a request task if there's not already one. + if (pendingRequestTask == null) + createNewTask(); - List nextLookup() - { - int[] lookupItems; - - lock (taskAssignmentLock) - { - pendingRequestConsumedIDs = true; - lookupItems = nextTaskIDs.ToArray(); - nextTaskIDs.Clear(); - - if (lookupItems.Length == 0) - { - queueNextTask(null); - return new List(); - } - } - - var request = new GetUsersRequest(lookupItems); - - // rather than queueing, we maintain our own single-threaded request stream. - api.Perform(request); - - return request.Result?.Users; + return tcs.Task; } } - /// - /// Queues new work at the end of the current work tasks. - /// Ensures the provided work is eventually run. - /// - /// The work to run. Can be null to signify the end of available work. - /// The task tracking this work. - private Task> queueNextTask(Func> work) + private void performLookup() { + var userTasks = new List<(int id, TaskCompletionSource task)>(); + + // Grab at most 50 users from the queue. lock (taskAssignmentLock) { - if (work == null) + while (pendingUserTasks.Count > 0 && userTasks.Count < 50) { - pendingRequest = null; - pendingRequestConsumedIDs = false; - } - else if (pendingRequest == null) - { - // special case for the first request ever. - pendingRequest = Task.Run(work); - pendingRequestConsumedIDs = false; - } - else - { - // append the new request on to the last to be executed. - pendingRequest = pendingRequest.ContinueWith(_ => work()); - pendingRequestConsumedIDs = false; - } + (int id, TaskCompletionSource task) next = pendingUserTasks[^1]; - return pendingRequest; + pendingUserTasks.RemoveAt(pendingUserTasks.Count - 1); + + // Perform a secondary check for existence, in case the user was queried in a previous batch. + if (CheckExists(next.id, out var existing)) + next.task.SetResult(existing); + else + userTasks.Add(next); + } } + + // Query the users. + var request = new GetUsersRequest(userTasks.Select(t => t.id).ToArray()); + + // rather than queueing, we maintain our own single-threaded request stream. + api.Perform(request); + + // Create a new request task if there's still more users to query. + lock (taskAssignmentLock) + { + pendingRequestTask = null; + if (pendingUserTasks.Count > 0) + createNewTask(); + } + + // Notify of completion. + foreach (var (id, task) in userTasks) + task.SetResult(request.Result?.Users?.FirstOrDefault(u => u.Id == id)); } + + private void createNewTask() => pendingRequestTask = Task.Run(performLookup); } } From 87bf168718bd918fb90cb04306216ceae8d0a688 Mon Sep 17 00:00:00 2001 From: smoogipoo Date: Mon, 16 Nov 2020 20:52:51 +0900 Subject: [PATCH 2/4] Use queue instead of list --- osu.Game/Database/UserLookupCache.cs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/osu.Game/Database/UserLookupCache.cs b/osu.Game/Database/UserLookupCache.cs index 05ba9c882b..71413d076e 100644 --- a/osu.Game/Database/UserLookupCache.cs +++ b/osu.Game/Database/UserLookupCache.cs @@ -22,7 +22,7 @@ namespace osu.Game.Database protected override async Task ComputeValueAsync(int lookup, CancellationToken token = default) => await queryUser(lookup); - private readonly List<(int id, TaskCompletionSource)> pendingUserTasks = new List<(int, TaskCompletionSource)>(); + private readonly Queue<(int id, TaskCompletionSource)> pendingUserTasks = new Queue<(int, TaskCompletionSource)>(); private Task pendingRequestTask; private readonly object taskAssignmentLock = new object(); @@ -33,7 +33,7 @@ namespace osu.Game.Database var tcs = new TaskCompletionSource(); // Add to the queue. - pendingUserTasks.Add((userId, tcs)); + pendingUserTasks.Enqueue((userId, tcs)); // Create a request task if there's not already one. if (pendingRequestTask == null) @@ -52,9 +52,7 @@ namespace osu.Game.Database { while (pendingUserTasks.Count > 0 && userTasks.Count < 50) { - (int id, TaskCompletionSource task) next = pendingUserTasks[^1]; - - pendingUserTasks.RemoveAt(pendingUserTasks.Count - 1); + (int id, TaskCompletionSource task) next = pendingUserTasks.Dequeue(); // Perform a secondary check for existence, in case the user was queried in a previous batch. if (CheckExists(next.id, out var existing)) From 85b0f714670215fc85eec8215f393674ccfd062a Mon Sep 17 00:00:00 2001 From: smoogipoo Date: Mon, 16 Nov 2020 21:17:43 +0900 Subject: [PATCH 3/4] Handle duplicate user IDs within the same batch --- osu.Game/Database/UserLookupCache.cs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/osu.Game/Database/UserLookupCache.cs b/osu.Game/Database/UserLookupCache.cs index 71413d076e..ba7f2ad98e 100644 --- a/osu.Game/Database/UserLookupCache.cs +++ b/osu.Game/Database/UserLookupCache.cs @@ -45,12 +45,15 @@ namespace osu.Game.Database private void performLookup() { + // userTasks may exceed 50 elements, indicating the existence of duplicate user IDs. All duplicated user IDs must be fulfilled. + // userIds contains at most 50 unique user IDs from userTasks, which is used to perform the lookup. var userTasks = new List<(int id, TaskCompletionSource task)>(); + var userIds = new HashSet(); - // Grab at most 50 users from the queue. + // Grab at most 50 unique user IDs from the queue. lock (taskAssignmentLock) { - while (pendingUserTasks.Count > 0 && userTasks.Count < 50) + while (pendingUserTasks.Count > 0 && userIds.Count < 50) { (int id, TaskCompletionSource task) next = pendingUserTasks.Dequeue(); @@ -58,12 +61,15 @@ namespace osu.Game.Database if (CheckExists(next.id, out var existing)) next.task.SetResult(existing); else + { userTasks.Add(next); + userIds.Add(next.id); + } } } // Query the users. - var request = new GetUsersRequest(userTasks.Select(t => t.id).ToArray()); + var request = new GetUsersRequest(userIds.ToArray()); // rather than queueing, we maintain our own single-threaded request stream. api.Perform(request); From 009d666241fb7adfd1c1f11d7603478dbd9166b0 Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Tue, 17 Nov 2020 10:57:11 +0900 Subject: [PATCH 4/4] Use dictionary to avoid linq overhead --- osu.Game/Database/UserLookupCache.cs | 36 +++++++++++++++++++--------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/osu.Game/Database/UserLookupCache.cs b/osu.Game/Database/UserLookupCache.cs index ba7f2ad98e..05d6930992 100644 --- a/osu.Game/Database/UserLookupCache.cs +++ b/osu.Game/Database/UserLookupCache.cs @@ -45,15 +45,13 @@ namespace osu.Game.Database private void performLookup() { - // userTasks may exceed 50 elements, indicating the existence of duplicate user IDs. All duplicated user IDs must be fulfilled. - // userIds contains at most 50 unique user IDs from userTasks, which is used to perform the lookup. - var userTasks = new List<(int id, TaskCompletionSource task)>(); - var userIds = new HashSet(); + // contains at most 50 unique user IDs from userTasks, which is used to perform the lookup. + var userTasks = new Dictionary>>(); // Grab at most 50 unique user IDs from the queue. lock (taskAssignmentLock) { - while (pendingUserTasks.Count > 0 && userIds.Count < 50) + while (pendingUserTasks.Count > 0 && userTasks.Count < 50) { (int id, TaskCompletionSource task) next = pendingUserTasks.Dequeue(); @@ -62,14 +60,16 @@ namespace osu.Game.Database next.task.SetResult(existing); else { - userTasks.Add(next); - userIds.Add(next.id); + if (userTasks.TryGetValue(next.id, out var tasks)) + tasks.Add(next.task); + else + userTasks[next.id] = new List> { next.task }; } } } // Query the users. - var request = new GetUsersRequest(userIds.ToArray()); + var request = new GetUsersRequest(userTasks.Keys.ToArray()); // rather than queueing, we maintain our own single-threaded request stream. api.Perform(request); @@ -82,9 +82,23 @@ namespace osu.Game.Database createNewTask(); } - // Notify of completion. - foreach (var (id, task) in userTasks) - task.SetResult(request.Result?.Users?.FirstOrDefault(u => u.Id == id)); + foreach (var user in request.Result.Users) + { + if (userTasks.TryGetValue(user.Id, out var tasks)) + { + foreach (var task in tasks) + task.SetResult(user); + + userTasks.Remove(user.Id); + } + } + + // if any tasks remain which were not satisfied, return null. + foreach (var tasks in userTasks.Values) + { + foreach (var task in tasks) + task.SetResult(null); + } } private void createNewTask() => pendingRequestTask = Task.Run(performLookup);