1
0
mirror of https://github.com/ppy/osu.git synced 2025-01-15 12:42:54 +08:00

Simplify and combine concurrency of ArchiveModelManager

This commit is contained in:
Dean Herbert 2019-06-10 13:19:58 +09:00
parent 600503ec8e
commit b4d2d0bd0b
2 changed files with 57 additions and 116 deletions

View File

@ -2,7 +2,6 @@
// See the LICENCE file in the repository root for full licence text. // See the LICENCE file in the repository root for full licence text.
using System; using System;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
@ -111,7 +110,7 @@ namespace osu.Game.Beatmaps
validateOnlineIds(beatmapSet); validateOnlineIds(beatmapSet);
await Task.WhenAll(beatmapSet.Beatmaps.Select(b => updateQueue.Enqueue(new UpdateItem(b, cancellationToken)).Task).ToArray()); await Task.WhenAll(beatmapSet.Beatmaps.Select(b => updateQueue.Perform(b, cancellationToken)).ToArray());
} }
protected override void PreImport(BeatmapSetInfo beatmapSet) protected override void PreImport(BeatmapSetInfo beatmapSet)
@ -424,81 +423,24 @@ namespace osu.Game.Beatmaps
private class BeatmapUpdateQueue private class BeatmapUpdateQueue
{ {
private readonly IAPIProvider api; private readonly IAPIProvider api;
private readonly Queue<UpdateItem> queue = new Queue<UpdateItem>();
private int activeThreads; private readonly ThreadedTaskScheduler updateScheduler = new ThreadedTaskScheduler(4);
public BeatmapUpdateQueue(IAPIProvider api) public BeatmapUpdateQueue(IAPIProvider api)
{ {
this.api = api; this.api = api;
} }
public UpdateItem Enqueue(UpdateItem item) public Task Perform(BeatmapInfo beatmap, CancellationToken cancellationToken)
=> Task.Factory.StartNew(() => perform(beatmap, cancellationToken), cancellationToken, TaskCreationOptions.HideScheduler, updateScheduler);
private void perform(BeatmapInfo beatmap, CancellationToken cancellation)
{ {
lock (queue) if (cancellation.IsCancellationRequested)
{
queue.Enqueue(item);
if (activeThreads >= 16)
return item;
new Thread(runWork) { IsBackground = true }.Start();
activeThreads++;
}
return item;
}
private void runWork()
{
while (true)
{
UpdateItem toProcess;
lock (queue)
{
if (queue.Count == 0)
break;
toProcess = queue.Dequeue();
}
toProcess.PerformUpdate(api);
}
lock (queue)
activeThreads--;
}
}
private class UpdateItem
{
public Task Task => tcs.Task;
private readonly BeatmapInfo beatmap;
private readonly CancellationToken cancellationToken;
private readonly TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
public UpdateItem(BeatmapInfo beatmap, CancellationToken cancellationToken)
{
this.beatmap = beatmap;
this.cancellationToken = cancellationToken;
}
public void PerformUpdate(IAPIProvider api)
{
if (cancellationToken.IsCancellationRequested)
{
tcs.SetCanceled();
return; return;
}
if (api?.State != APIState.Online) if (api?.State != APIState.Online)
{
tcs.SetResult(false);
return; return;
}
Logger.Log("Attempting online lookup for the missing values...", LoggingTarget.Database); Logger.Log("Attempting online lookup for the missing values...", LoggingTarget.Database);
@ -512,17 +454,14 @@ namespace osu.Game.Beatmaps
beatmap.BeatmapSet.Status = res.BeatmapSet.Status; beatmap.BeatmapSet.Status = res.BeatmapSet.Status;
beatmap.BeatmapSet.OnlineBeatmapSetID = res.OnlineBeatmapSetID; beatmap.BeatmapSet.OnlineBeatmapSetID = res.OnlineBeatmapSetID;
beatmap.OnlineBeatmapID = res.OnlineBeatmapID; beatmap.OnlineBeatmapID = res.OnlineBeatmapID;
tcs.SetResult(true);
}; };
req.Failure += e => req.Failure += e =>
{ {
Logger.Log($"Failed ({e})", LoggingTarget.Database); Logger.Log($"Failed ({e})", LoggingTarget.Database);
tcs.SetResult(false);
}; };
// intentionally blocking to limit web request concurrency
req.Perform(api); req.Perform(api);
} }
} }

View File

@ -1,4 +1,4 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence. // Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text. // See the LICENCE file in the repository root for full licence text.
using System; using System;
@ -11,7 +11,6 @@ using JetBrains.Annotations;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using osu.Framework; using osu.Framework;
using osu.Framework.Extensions; using osu.Framework.Extensions;
using osu.Framework.Extensions.TypeExtensions;
using osu.Framework.IO.File; using osu.Framework.IO.File;
using osu.Framework.Logging; using osu.Framework.Logging;
using osu.Framework.Platform; using osu.Framework.Platform;
@ -32,7 +31,7 @@ namespace osu.Game.Database
/// </summary> /// </summary>
/// <typeparam name="TModel">The model type.</typeparam> /// <typeparam name="TModel">The model type.</typeparam>
/// <typeparam name="TFileModel">The associated file join type.</typeparam> /// <typeparam name="TFileModel">The associated file join type.</typeparam>
public abstract class ArchiveModelManager<TModel, TFileModel> : ICanAcceptFiles public abstract class ArchiveModelManager<TModel, TFileModel> : ArchiveModelManager, ICanAcceptFiles
where TModel : class, IHasFiles<TFileModel>, IHasPrimaryKey, ISoftDelete where TModel : class, IHasFiles<TFileModel>, IHasPrimaryKey, ISoftDelete
where TFileModel : INamedFileInfo, new() where TFileModel : INamedFileInfo, new()
{ {
@ -112,11 +111,8 @@ namespace osu.Game.Database
a.Invoke(); a.Invoke();
} }
private readonly ThreadedTaskScheduler importScheduler;
protected ArchiveModelManager(Storage storage, IDatabaseContextFactory contextFactory, MutableDatabaseBackedStoreWithFileIncludes<TModel, TFileModel> modelStore, IIpcHost importHost = null) protected ArchiveModelManager(Storage storage, IDatabaseContextFactory contextFactory, MutableDatabaseBackedStoreWithFileIncludes<TModel, TFileModel> modelStore, IIpcHost importHost = null)
{ {
importScheduler = new ThreadedTaskScheduler(16, $"{GetType().ReadableName()}.Import");
ContextFactory = contextFactory; ContextFactory = contextFactory;
ModelStore = modelStore; ModelStore = modelStore;
@ -152,55 +148,55 @@ namespace osu.Game.Database
var term = $"{typeof(TModel).Name.Replace("Info", "").ToLower()}"; var term = $"{typeof(TModel).Name.Replace("Info", "").ToLower()}";
var tasks = new List<Task>();
int current = 0; int current = 0;
foreach (string path in paths) var imported = new List<TModel>();
await Task.WhenAll(paths.Select(path => Import(path, notification.CancellationToken).ContinueWith(t =>
{ {
tasks.Add(Import(path, notification.CancellationToken).ContinueWith(t => lock (notification)
{ {
lock (notification) current++;
{
current++;
notification.Text = $"Imported {current} of {paths.Length} {term}s"; notification.Text = $"Imported {current} of {paths.Length} {term}s";
notification.Progress = (float)current / paths.Length; notification.Progress = (float)current / paths.Length;
} }
if (t.Exception != null) if (t.Exception == null)
{ {
var e = t.Exception.InnerException ?? t.Exception; lock (imported)
Logger.Error(e, $@"Could not import ({Path.GetFileName(path)})"); imported.Add(t.Result);
} }
})); else
{
var e = t.Exception.InnerException ?? t.Exception;
Logger.Error(e, $@"Could not import ({Path.GetFileName(path)})");
}
})));
if (imported.Count == 0)
{
notification.Text = "Import failed!";
notification.State = ProgressNotificationState.Cancelled;
} }
else
{
notification.CompletionText = imported.Count == 1
? $"Imported {imported.First()}!"
: $"Imported {current} {term}s!";
await Task.WhenAll(tasks); if (imported.Count > 0 && PresentImport != null)
{
notification.CompletionText += " Click to view.";
notification.CompletionClickAction = () =>
{
PresentImport?.Invoke(imported);
return true;
};
}
// if (imported.Count == 0) notification.State = ProgressNotificationState.Completed;
// { }
// notification.Text = "Import failed!";
// notification.State = ProgressNotificationState.Cancelled;
// }
// else
// {
// notification.CompletionText = imported.Count == 1
// ? $"Imported {imported.First()}!"
// : $"Imported {current} {term}s!";
//
// if (imported.Count > 0 && PresentImport != null)
// {
// notification.CompletionText += " Click to view.";
// notification.CompletionClickAction = () =>
// {
// PresentImport?.Invoke(imported);
// return true;
// };
// }
//
// notification.State = ProgressNotificationState.Completed;
// }
} }
/// <summary> /// <summary>
@ -368,7 +364,7 @@ namespace osu.Game.Database
} }
return item; return item;
}, CancellationToken.None, TaskCreationOptions.None, importScheduler).Unwrap(); }, CancellationToken.None, TaskCreationOptions.HideScheduler, IMPORT_SCHEDULER).Unwrap();
/// <summary> /// <summary>
/// Perform an update of the specified item. /// Perform an update of the specified item.
@ -615,4 +611,10 @@ namespace osu.Game.Database
throw new InvalidFormatException($"{path} is not a valid archive"); throw new InvalidFormatException($"{path} is not a valid archive");
} }
} }
public abstract class ArchiveModelManager
{
// allow sharing static across all generic types
protected static readonly ThreadedTaskScheduler IMPORT_SCHEDULER = new ThreadedTaskScheduler(1);
}
} }