mirror of
https://github.com/ppy/osu.git
synced 2024-12-14 09:32:55 +08:00
Merge pull request #16200 from bdach/lookup-cache-racing
Fix online lookup caches potentially getting stuck
This commit is contained in:
commit
c71942a350
@ -6,20 +6,13 @@ using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using JetBrains.Annotations;
|
||||
using osu.Framework.Allocation;
|
||||
using osu.Game.Online.API;
|
||||
using osu.Game.Online.API.Requests;
|
||||
using osu.Game.Online.API.Requests.Responses;
|
||||
|
||||
namespace osu.Game.Database
|
||||
{
|
||||
// This class is based on `UserLookupCache` which is well tested.
|
||||
// If modifications are to be made here, a base abstract implementation should likely be created and shared between the two.
|
||||
public class BeatmapLookupCache : MemoryCachingComponent<int, APIBeatmap>
|
||||
public class BeatmapLookupCache : OnlineLookupCache<int, APIBeatmap, GetBeatmapsRequest>
|
||||
{
|
||||
[Resolved]
|
||||
private IAPIProvider api { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Perform an API lookup on the specified beatmap, populating a <see cref="APIBeatmap"/> model.
|
||||
/// </summary>
|
||||
@ -27,7 +20,7 @@ namespace osu.Game.Database
|
||||
/// <param name="token">An optional cancellation token.</param>
|
||||
/// <returns>The populated beatmap, or null if the beatmap does not exist or the request could not be satisfied.</returns>
|
||||
[ItemCanBeNull]
|
||||
public Task<APIBeatmap> GetBeatmapAsync(int beatmapId, CancellationToken token = default) => GetAsync(beatmapId, token);
|
||||
public Task<APIBeatmap> GetBeatmapAsync(int beatmapId, CancellationToken token = default) => LookupAsync(beatmapId, token);
|
||||
|
||||
/// <summary>
|
||||
/// Perform an API lookup on the specified beatmaps, populating a <see cref="APIBeatmap"/> model.
|
||||
@ -35,115 +28,10 @@ namespace osu.Game.Database
|
||||
/// <param name="beatmapIds">The beatmaps to lookup.</param>
|
||||
/// <param name="token">An optional cancellation token.</param>
|
||||
/// <returns>The populated beatmaps. May include null results for failed retrievals.</returns>
|
||||
public Task<APIBeatmap[]> GetBeatmapsAsync(int[] beatmapIds, CancellationToken token = default)
|
||||
{
|
||||
var beatmapLookupTasks = new List<Task<APIBeatmap>>();
|
||||
public Task<APIBeatmap[]> GetBeatmapsAsync(int[] beatmapIds, CancellationToken token = default) => LookupAsync(beatmapIds, token);
|
||||
|
||||
foreach (int u in beatmapIds)
|
||||
{
|
||||
beatmapLookupTasks.Add(GetBeatmapAsync(u, token).ContinueWith(task =>
|
||||
{
|
||||
if (!task.IsCompletedSuccessfully)
|
||||
return null;
|
||||
protected override GetBeatmapsRequest CreateRequest(IEnumerable<int> ids) => new GetBeatmapsRequest(ids.ToArray());
|
||||
|
||||
return task.Result;
|
||||
}, token));
|
||||
}
|
||||
|
||||
return Task.WhenAll(beatmapLookupTasks);
|
||||
}
|
||||
|
||||
protected override async Task<APIBeatmap> ComputeValueAsync(int lookup, CancellationToken token = default)
|
||||
=> await queryBeatmap(lookup).ConfigureAwait(false);
|
||||
|
||||
private readonly Queue<(int id, TaskCompletionSource<APIBeatmap>)> pendingBeatmapTasks = new Queue<(int, TaskCompletionSource<APIBeatmap>)>();
|
||||
private Task pendingRequestTask;
|
||||
private readonly object taskAssignmentLock = new object();
|
||||
|
||||
private Task<APIBeatmap> queryBeatmap(int beatmapId)
|
||||
{
|
||||
lock (taskAssignmentLock)
|
||||
{
|
||||
var tcs = new TaskCompletionSource<APIBeatmap>();
|
||||
|
||||
// Add to the queue.
|
||||
pendingBeatmapTasks.Enqueue((beatmapId, tcs));
|
||||
|
||||
// Create a request task if there's not already one.
|
||||
if (pendingRequestTask == null)
|
||||
createNewTask();
|
||||
|
||||
return tcs.Task;
|
||||
}
|
||||
}
|
||||
|
||||
private void performLookup()
|
||||
{
|
||||
// contains at most 50 unique beatmap IDs from beatmapTasks, which is used to perform the lookup.
|
||||
var beatmapTasks = new Dictionary<int, List<TaskCompletionSource<APIBeatmap>>>();
|
||||
|
||||
// Grab at most 50 unique beatmap IDs from the queue.
|
||||
lock (taskAssignmentLock)
|
||||
{
|
||||
while (pendingBeatmapTasks.Count > 0 && beatmapTasks.Count < 50)
|
||||
{
|
||||
(int id, TaskCompletionSource<APIBeatmap> task) next = pendingBeatmapTasks.Dequeue();
|
||||
|
||||
// Perform a secondary check for existence, in case the beatmap was queried in a previous batch.
|
||||
if (CheckExists(next.id, out var existing))
|
||||
next.task.SetResult(existing);
|
||||
else
|
||||
{
|
||||
if (beatmapTasks.TryGetValue(next.id, out var tasks))
|
||||
tasks.Add(next.task);
|
||||
else
|
||||
beatmapTasks[next.id] = new List<TaskCompletionSource<APIBeatmap>> { next.task };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (beatmapTasks.Count == 0)
|
||||
return;
|
||||
|
||||
// Query the beatmaps.
|
||||
var request = new GetBeatmapsRequest(beatmapTasks.Keys.ToArray());
|
||||
|
||||
// rather than queueing, we maintain our own single-threaded request stream.
|
||||
// todo: we probably want retry logic here.
|
||||
api.Perform(request);
|
||||
|
||||
// Create a new request task if there's still more beatmaps to query.
|
||||
lock (taskAssignmentLock)
|
||||
{
|
||||
pendingRequestTask = null;
|
||||
if (pendingBeatmapTasks.Count > 0)
|
||||
createNewTask();
|
||||
}
|
||||
|
||||
List<APIBeatmap> foundBeatmaps = request.Response?.Beatmaps;
|
||||
|
||||
if (foundBeatmaps != null)
|
||||
{
|
||||
foreach (var beatmap in foundBeatmaps)
|
||||
{
|
||||
if (beatmapTasks.TryGetValue(beatmap.OnlineID, out var tasks))
|
||||
{
|
||||
foreach (var task in tasks)
|
||||
task.SetResult(beatmap);
|
||||
|
||||
beatmapTasks.Remove(beatmap.OnlineID);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if any tasks remain which were not satisfied, return null.
|
||||
foreach (var tasks in beatmapTasks.Values)
|
||||
{
|
||||
foreach (var task in tasks)
|
||||
task.SetResult(null);
|
||||
}
|
||||
}
|
||||
|
||||
private void createNewTask() => pendingRequestTask = Task.Run(performLookup);
|
||||
protected override IEnumerable<APIBeatmap> RetrieveResults(GetBeatmapsRequest request) => request.Response?.Beatmaps;
|
||||
}
|
||||
}
|
||||
|
170
osu.Game/Database/OnlineLookupCache.cs
Normal file
170
osu.Game/Database/OnlineLookupCache.cs
Normal file
@ -0,0 +1,170 @@
|
||||
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||
// See the LICENCE file in the repository root for full licence text.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using JetBrains.Annotations;
|
||||
using osu.Framework.Allocation;
|
||||
using osu.Game.Online.API;
|
||||
|
||||
namespace osu.Game.Database
|
||||
{
|
||||
public abstract class OnlineLookupCache<TLookup, TValue, TRequest> : MemoryCachingComponent<TLookup, TValue>
|
||||
where TLookup : IEquatable<TLookup>
|
||||
where TValue : class, IHasOnlineID<TLookup>
|
||||
where TRequest : APIRequest
|
||||
{
|
||||
[Resolved]
|
||||
private IAPIProvider api { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Creates an <see cref="APIRequest"/> to retrieve the values for a given collection of <typeparamref name="TLookup"/>s.
|
||||
/// </summary>
|
||||
/// <param name="ids">The IDs to perform the lookup with.</param>
|
||||
protected abstract TRequest CreateRequest(IEnumerable<TLookup> ids);
|
||||
|
||||
/// <summary>
|
||||
/// Retrieves a list of <typeparamref name="TValue"/>s from a successful <typeparamref name="TRequest"/> created by <see cref="CreateRequest"/>.
|
||||
/// </summary>
|
||||
[CanBeNull]
|
||||
protected abstract IEnumerable<TValue> RetrieveResults(TRequest request);
|
||||
|
||||
/// <summary>
|
||||
/// Perform a lookup using the specified <paramref name="id"/>, populating a <typeparamref name="TValue"/>.
|
||||
/// </summary>
|
||||
/// <param name="id">The ID to lookup.</param>
|
||||
/// <param name="token">An optional cancellation token.</param>
|
||||
/// <returns>The populated <typeparamref name="TValue"/>, or null if the value does not exist or the request could not be satisfied.</returns>
|
||||
[ItemCanBeNull]
|
||||
protected Task<TValue> LookupAsync(TLookup id, CancellationToken token = default) => GetAsync(id, token);
|
||||
|
||||
/// <summary>
|
||||
/// Perform an API lookup on the specified <paramref name="ids"/>, populating a <typeparamref name="TValue"/>.
|
||||
/// </summary>
|
||||
/// <param name="ids">The IDs to lookup.</param>
|
||||
/// <param name="token">An optional cancellation token.</param>
|
||||
/// <returns>The populated values. May include null results for failed retrievals.</returns>
|
||||
protected Task<TValue[]> LookupAsync(TLookup[] ids, CancellationToken token = default)
|
||||
{
|
||||
var lookupTasks = new List<Task<TValue>>();
|
||||
|
||||
foreach (var id in ids)
|
||||
{
|
||||
lookupTasks.Add(LookupAsync(id, token).ContinueWith(task =>
|
||||
{
|
||||
if (!task.IsCompletedSuccessfully)
|
||||
return null;
|
||||
|
||||
return task.Result;
|
||||
}, token));
|
||||
}
|
||||
|
||||
return Task.WhenAll(lookupTasks);
|
||||
}
|
||||
|
||||
// cannot be sealed due to test usages (see TestUserLookupCache).
|
||||
protected override async Task<TValue> ComputeValueAsync(TLookup lookup, CancellationToken token = default)
|
||||
=> await queryValue(lookup).ConfigureAwait(false);
|
||||
|
||||
private readonly Queue<(TLookup id, TaskCompletionSource<TValue>)> pendingTasks = new Queue<(TLookup, TaskCompletionSource<TValue>)>();
|
||||
private Task pendingRequestTask;
|
||||
private readonly object taskAssignmentLock = new object();
|
||||
|
||||
private Task<TValue> queryValue(TLookup id)
|
||||
{
|
||||
lock (taskAssignmentLock)
|
||||
{
|
||||
var tcs = new TaskCompletionSource<TValue>();
|
||||
|
||||
// Add to the queue.
|
||||
pendingTasks.Enqueue((id, tcs));
|
||||
|
||||
// Create a request task if there's not already one.
|
||||
if (pendingRequestTask == null)
|
||||
createNewTask();
|
||||
|
||||
return tcs.Task;
|
||||
}
|
||||
}
|
||||
|
||||
private void performLookup()
|
||||
{
|
||||
// contains at most 50 unique IDs from tasks, which is used to perform the lookup.
|
||||
var nextTaskBatch = new Dictionary<TLookup, List<TaskCompletionSource<TValue>>>();
|
||||
|
||||
// Grab at most 50 unique IDs from the queue.
|
||||
lock (taskAssignmentLock)
|
||||
{
|
||||
while (pendingTasks.Count > 0 && nextTaskBatch.Count < 50)
|
||||
{
|
||||
(TLookup id, TaskCompletionSource<TValue> task) next = pendingTasks.Dequeue();
|
||||
|
||||
// Perform a secondary check for existence, in case the value was queried in a previous batch.
|
||||
if (CheckExists(next.id, out var existing))
|
||||
next.task.SetResult(existing);
|
||||
else
|
||||
{
|
||||
if (nextTaskBatch.TryGetValue(next.id, out var tasks))
|
||||
tasks.Add(next.task);
|
||||
else
|
||||
nextTaskBatch[next.id] = new List<TaskCompletionSource<TValue>> { next.task };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (nextTaskBatch.Count == 0)
|
||||
{
|
||||
finishPendingTask();
|
||||
return;
|
||||
}
|
||||
|
||||
// Query the values.
|
||||
var request = CreateRequest(nextTaskBatch.Keys.ToArray());
|
||||
|
||||
// rather than queueing, we maintain our own single-threaded request stream.
|
||||
// todo: we probably want retry logic here.
|
||||
api.Perform(request);
|
||||
|
||||
finishPendingTask();
|
||||
|
||||
var foundValues = RetrieveResults(request);
|
||||
|
||||
if (foundValues != null)
|
||||
{
|
||||
foreach (var value in foundValues)
|
||||
{
|
||||
if (nextTaskBatch.TryGetValue(value.OnlineID, out var tasks))
|
||||
{
|
||||
foreach (var task in tasks)
|
||||
task.SetResult(value);
|
||||
|
||||
nextTaskBatch.Remove(value.OnlineID);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if any tasks remain which were not satisfied, return null.
|
||||
foreach (var tasks in nextTaskBatch.Values)
|
||||
{
|
||||
foreach (var task in tasks)
|
||||
task.SetResult(null);
|
||||
}
|
||||
}
|
||||
|
||||
private void finishPendingTask()
|
||||
{
|
||||
// Create a new request task if there's still more values to query.
|
||||
lock (taskAssignmentLock)
|
||||
{
|
||||
pendingRequestTask = null;
|
||||
if (pendingTasks.Count > 0)
|
||||
createNewTask();
|
||||
}
|
||||
}
|
||||
|
||||
private void createNewTask() => pendingRequestTask = Task.Run(performLookup);
|
||||
}
|
||||
}
|
@ -6,18 +6,13 @@ using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using JetBrains.Annotations;
|
||||
using osu.Framework.Allocation;
|
||||
using osu.Game.Online.API;
|
||||
using osu.Game.Online.API.Requests;
|
||||
using osu.Game.Online.API.Requests.Responses;
|
||||
|
||||
namespace osu.Game.Database
|
||||
{
|
||||
public class UserLookupCache : MemoryCachingComponent<int, APIUser>
|
||||
public class UserLookupCache : OnlineLookupCache<int, APIUser, GetUsersRequest>
|
||||
{
|
||||
[Resolved]
|
||||
private IAPIProvider api { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Perform an API lookup on the specified user, populating a <see cref="APIUser"/> model.
|
||||
/// </summary>
|
||||
@ -25,7 +20,7 @@ namespace osu.Game.Database
|
||||
/// <param name="token">An optional cancellation token.</param>
|
||||
/// <returns>The populated user, or null if the user does not exist or the request could not be satisfied.</returns>
|
||||
[ItemCanBeNull]
|
||||
public Task<APIUser> GetUserAsync(int userId, CancellationToken token = default) => GetAsync(userId, token);
|
||||
public Task<APIUser> GetUserAsync(int userId, CancellationToken token = default) => LookupAsync(userId, token);
|
||||
|
||||
/// <summary>
|
||||
/// Perform an API lookup on the specified users, populating a <see cref="APIUser"/> model.
|
||||
@ -33,115 +28,10 @@ namespace osu.Game.Database
|
||||
/// <param name="userIds">The users to lookup.</param>
|
||||
/// <param name="token">An optional cancellation token.</param>
|
||||
/// <returns>The populated users. May include null results for failed retrievals.</returns>
|
||||
public Task<APIUser[]> GetUsersAsync(int[] userIds, CancellationToken token = default)
|
||||
{
|
||||
var userLookupTasks = new List<Task<APIUser>>();
|
||||
public Task<APIUser[]> GetUsersAsync(int[] userIds, CancellationToken token = default) => LookupAsync(userIds, token);
|
||||
|
||||
foreach (int u in userIds)
|
||||
{
|
||||
userLookupTasks.Add(GetUserAsync(u, token).ContinueWith(task =>
|
||||
{
|
||||
if (!task.IsCompletedSuccessfully)
|
||||
return null;
|
||||
protected override GetUsersRequest CreateRequest(IEnumerable<int> ids) => new GetUsersRequest(ids.ToArray());
|
||||
|
||||
return task.Result;
|
||||
}, token));
|
||||
}
|
||||
|
||||
return Task.WhenAll(userLookupTasks);
|
||||
}
|
||||
|
||||
protected override async Task<APIUser> ComputeValueAsync(int lookup, CancellationToken token = default)
|
||||
=> await queryUser(lookup).ConfigureAwait(false);
|
||||
|
||||
private readonly Queue<(int id, TaskCompletionSource<APIUser>)> pendingUserTasks = new Queue<(int, TaskCompletionSource<APIUser>)>();
|
||||
private Task pendingRequestTask;
|
||||
private readonly object taskAssignmentLock = new object();
|
||||
|
||||
private Task<APIUser> queryUser(int userId)
|
||||
{
|
||||
lock (taskAssignmentLock)
|
||||
{
|
||||
var tcs = new TaskCompletionSource<APIUser>();
|
||||
|
||||
// Add to the queue.
|
||||
pendingUserTasks.Enqueue((userId, tcs));
|
||||
|
||||
// Create a request task if there's not already one.
|
||||
if (pendingRequestTask == null)
|
||||
createNewTask();
|
||||
|
||||
return tcs.Task;
|
||||
}
|
||||
}
|
||||
|
||||
private void performLookup()
|
||||
{
|
||||
// contains at most 50 unique user IDs from userTasks, which is used to perform the lookup.
|
||||
var userTasks = new Dictionary<int, List<TaskCompletionSource<APIUser>>>();
|
||||
|
||||
// Grab at most 50 unique user IDs from the queue.
|
||||
lock (taskAssignmentLock)
|
||||
{
|
||||
while (pendingUserTasks.Count > 0 && userTasks.Count < 50)
|
||||
{
|
||||
(int id, TaskCompletionSource<APIUser> task) next = pendingUserTasks.Dequeue();
|
||||
|
||||
// Perform a secondary check for existence, in case the user was queried in a previous batch.
|
||||
if (CheckExists(next.id, out var existing))
|
||||
next.task.SetResult(existing);
|
||||
else
|
||||
{
|
||||
if (userTasks.TryGetValue(next.id, out var tasks))
|
||||
tasks.Add(next.task);
|
||||
else
|
||||
userTasks[next.id] = new List<TaskCompletionSource<APIUser>> { next.task };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (userTasks.Count == 0)
|
||||
return;
|
||||
|
||||
// Query the users.
|
||||
var request = new GetUsersRequest(userTasks.Keys.ToArray());
|
||||
|
||||
// rather than queueing, we maintain our own single-threaded request stream.
|
||||
// todo: we probably want retry logic here.
|
||||
api.Perform(request);
|
||||
|
||||
// Create a new request task if there's still more users to query.
|
||||
lock (taskAssignmentLock)
|
||||
{
|
||||
pendingRequestTask = null;
|
||||
if (pendingUserTasks.Count > 0)
|
||||
createNewTask();
|
||||
}
|
||||
|
||||
List<APIUser> foundUsers = request.Response?.Users;
|
||||
|
||||
if (foundUsers != null)
|
||||
{
|
||||
foreach (var user in foundUsers)
|
||||
{
|
||||
if (userTasks.TryGetValue(user.Id, out var tasks))
|
||||
{
|
||||
foreach (var task in tasks)
|
||||
task.SetResult(user);
|
||||
|
||||
userTasks.Remove(user.Id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if any tasks remain which were not satisfied, return null.
|
||||
foreach (var tasks in userTasks.Values)
|
||||
{
|
||||
foreach (var task in tasks)
|
||||
task.SetResult(null);
|
||||
}
|
||||
}
|
||||
|
||||
private void createNewTask() => pendingRequestTask = Task.Run(performLookup);
|
||||
protected override IEnumerable<APIUser> RetrieveResults(GetUsersRequest request) => request.Response?.Users;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user