1
0
mirror of https://github.com/ppy/osu.git synced 2024-12-15 08:13:31 +08:00

Merge pull request #10867 from smoogipoo/refactor-user-request

Refactor user caching to improve readability and fix threadsafety issues
This commit is contained in:
Dean Herbert 2020-11-17 14:20:45 +09:00 committed by GitHub
commit d7b6e4a32d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,7 +1,6 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence. // Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text. // See the LICENCE file in the repository root for full licence text.
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
@ -15,103 +14,93 @@ namespace osu.Game.Database
{ {
public class UserLookupCache : MemoryCachingComponent<int, User> public class UserLookupCache : MemoryCachingComponent<int, User>
{ {
private readonly HashSet<int> nextTaskIDs = new HashSet<int>();
[Resolved] [Resolved]
private IAPIProvider api { get; set; } private IAPIProvider api { get; set; }
private readonly object taskAssignmentLock = new object();
private Task<List<User>> pendingRequest;
/// <summary>
/// Whether <see cref="pendingRequest"/> has already grabbed its IDs.
/// </summary>
private bool pendingRequestConsumedIDs;
public Task<User> GetUserAsync(int userId, CancellationToken token = default) => GetAsync(userId, token); public Task<User> GetUserAsync(int userId, CancellationToken token = default) => GetAsync(userId, token);
protected override async Task<User> ComputeValueAsync(int lookup, CancellationToken token = default) protected override async Task<User> ComputeValueAsync(int lookup, CancellationToken token = default)
{ => await queryUser(lookup);
var users = await getQueryTaskForUser(lookup);
return users.FirstOrDefault(u => u.Id == lookup);
}
/// <summary> private readonly Queue<(int id, TaskCompletionSource<User>)> pendingUserTasks = new Queue<(int, TaskCompletionSource<User>)>();
/// Return the task responsible for fetching the provided user. private Task pendingRequestTask;
/// This may be part of a larger batch lookup to reduce web requests. private readonly object taskAssignmentLock = new object();
/// </summary>
/// <param name="userId">The user to lookup.</param> private Task<User> queryUser(int userId)
/// <returns>The task responsible for the lookup.</returns>
private Task<List<User>> getQueryTaskForUser(int userId)
{ {
lock (taskAssignmentLock) lock (taskAssignmentLock)
{ {
nextTaskIDs.Add(userId); var tcs = new TaskCompletionSource<User>();
// if there's a pending request which hasn't been started yet (and is not yet full), we can wait on it. // Add to the queue.
if (pendingRequest != null && !pendingRequestConsumedIDs && nextTaskIDs.Count < 50) pendingUserTasks.Enqueue((userId, tcs));
return pendingRequest;
return queueNextTask(nextLookup); // Create a request task if there's not already one.
if (pendingRequestTask == null)
createNewTask();
return tcs.Task;
} }
}
List<User> nextLookup() private void performLookup()
{
// contains at most 50 unique user IDs from userTasks, which is used to perform the lookup.
var userTasks = new Dictionary<int, List<TaskCompletionSource<User>>>();
// Grab at most 50 unique user IDs from the queue.
lock (taskAssignmentLock)
{ {
int[] lookupItems; while (pendingUserTasks.Count > 0 && userTasks.Count < 50)
lock (taskAssignmentLock)
{ {
pendingRequestConsumedIDs = true; (int id, TaskCompletionSource<User> task) next = pendingUserTasks.Dequeue();
lookupItems = nextTaskIDs.ToArray();
nextTaskIDs.Clear();
if (lookupItems.Length == 0) // Perform a secondary check for existence, in case the user was queried in a previous batch.
if (CheckExists(next.id, out var existing))
next.task.SetResult(existing);
else
{ {
queueNextTask(null); if (userTasks.TryGetValue(next.id, out var tasks))
return new List<User>(); tasks.Add(next.task);
else
userTasks[next.id] = new List<TaskCompletionSource<User>> { next.task };
} }
} }
var request = new GetUsersRequest(lookupItems);
// rather than queueing, we maintain our own single-threaded request stream.
api.Perform(request);
return request.Result?.Users;
} }
}
/// <summary> // Query the users.
/// Queues new work at the end of the current work tasks. var request = new GetUsersRequest(userTasks.Keys.ToArray());
/// Ensures the provided work is eventually run.
/// </summary> // rather than queueing, we maintain our own single-threaded request stream.
/// <param name="work">The work to run. Can be null to signify the end of available work.</param> api.Perform(request);
/// <returns>The task tracking this work.</returns>
private Task<List<User>> queueNextTask(Func<List<User>> work) // Create a new request task if there's still more users to query.
{
lock (taskAssignmentLock) lock (taskAssignmentLock)
{ {
if (work == null) pendingRequestTask = null;
{ if (pendingUserTasks.Count > 0)
pendingRequest = null; createNewTask();
pendingRequestConsumedIDs = false; }
}
else if (pendingRequest == null)
{
// special case for the first request ever.
pendingRequest = Task.Run(work);
pendingRequestConsumedIDs = false;
}
else
{
// append the new request on to the last to be executed.
pendingRequest = pendingRequest.ContinueWith(_ => work());
pendingRequestConsumedIDs = false;
}
return pendingRequest; foreach (var user in request.Result.Users)
{
if (userTasks.TryGetValue(user.Id, out var tasks))
{
foreach (var task in tasks)
task.SetResult(user);
userTasks.Remove(user.Id);
}
}
// if any tasks remain which were not satisfied, return null.
foreach (var tasks in userTasks.Values)
{
foreach (var task in tasks)
task.SetResult(null);
} }
} }
private void createNewTask() => pendingRequestTask = Task.Run(performLookup);
} }
} }