1
0
mirror of https://github.com/ppy/osu.git synced 2025-02-27 06:02:55 +08:00

Merge pull request #18689 from peppy/quick-import-only-on-batch

Always perform full consistency checks for single imports
This commit is contained in:
Dean Herbert 2022-06-15 17:06:06 +09:00 committed by GitHub
commit 080f2859ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 115 additions and 146 deletions

View File

@ -1,6 +1,8 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
#nullable enable
using System;
using System.Collections.Generic;
using System.Diagnostics;
@ -12,7 +14,6 @@ using NUnit.Framework;
using osu.Framework.Extensions;
using osu.Framework.Extensions.ObjectExtensions;
using osu.Framework.Logging;
using osu.Framework.Platform;
using osu.Game.Beatmaps;
using osu.Game.Database;
using osu.Game.Extensions;
@ -28,8 +29,6 @@ using SharpCompress.Archives.Zip;
using SharpCompress.Common;
using SharpCompress.Writers.Zip;
#nullable enable
namespace osu.Game.Tests.Database
{
[TestFixture]
@ -394,7 +393,6 @@ namespace osu.Game.Tests.Database
}
[Test]
[Ignore("intentionally broken by import optimisations")]
public void TestImportThenImportWithChangedFile()
{
RunTestWithRealmAsync(async (realm, storage) =>
@ -491,7 +489,6 @@ namespace osu.Game.Tests.Database
}
[Test]
[Ignore("intentionally broken by import optimisations")]
public void TestImportCorruptThenImport()
{
RunTestWithRealmAsync(async (realm, storage) =>
@ -503,16 +500,18 @@ namespace osu.Game.Tests.Database
var firstFile = imported.Files.First();
var fileStorage = storage.GetStorageForDirectory("files");
long originalLength;
using (var stream = storage.GetStream(firstFile.File.GetStoragePath()))
using (var stream = fileStorage.GetStream(firstFile.File.GetStoragePath()))
originalLength = stream.Length;
using (var stream = storage.CreateFileSafely(firstFile.File.GetStoragePath()))
using (var stream = fileStorage.CreateFileSafely(firstFile.File.GetStoragePath()))
stream.WriteByte(0);
var importedSecondTime = await LoadOszIntoStore(importer, realm.Realm);
using (var stream = storage.GetStream(firstFile.File.GetStoragePath()))
using (var stream = fileStorage.GetStream(firstFile.File.GetStoragePath()))
Assert.AreEqual(stream.Length, originalLength, "Corruption was not fixed on second import");
// check the newly "imported" beatmap is actually just the restored previous import. since it matches hash.
@ -622,7 +621,7 @@ namespace osu.Game.Tests.Database
using var importer = new BeatmapModelManager(realm, storage);
using var store = new RealmRulesetStore(realm, storage);
var imported = await LoadOszIntoStore(importer, realm.Realm);
var imported = await LoadOszIntoStore(importer, realm.Realm, batchImport: true);
deleteBeatmapSet(imported, realm.Realm);
@ -678,7 +677,7 @@ namespace osu.Game.Tests.Database
{
RunTestWithRealmAsync(async (realm, storage) =>
{
using var importer = new NonOptimisedBeatmapImporter(realm, storage);
using var importer = new BeatmapModelManager(realm, storage);
using var store = new RealmRulesetStore(realm, storage);
var imported = await LoadOszIntoStore(importer, realm.Realm);
@ -960,11 +959,11 @@ namespace osu.Game.Tests.Database
return realm.All<BeatmapSetInfo>().FirstOrDefault(beatmapSet => beatmapSet.ID == importedSet!.ID);
}
public static async Task<BeatmapSetInfo> LoadOszIntoStore(BeatmapImporter importer, Realm realm, string? path = null, bool virtualTrack = false)
public static async Task<BeatmapSetInfo> LoadOszIntoStore(BeatmapImporter importer, Realm realm, string? path = null, bool virtualTrack = false, bool batchImport = false)
{
string? temp = path ?? TestResources.GetTestBeatmapForImport(virtualTrack);
var importedSet = await importer.Import(new ImportTask(temp));
var importedSet = await importer.Import(new ImportTask(temp), batchImport);
Assert.NotNull(importedSet);
Debug.Assert(importedSet != null);
@ -1081,15 +1080,5 @@ namespace osu.Game.Tests.Database
Assert.Fail(failureMessage);
}
public class NonOptimisedBeatmapImporter : BeatmapImporter
{
public NonOptimisedBeatmapImporter(RealmAccess realm, Storage storage)
: base(realm, storage)
{
}
protected override bool HasCustomHashFunction => true;
}
}
}

View File

@ -225,10 +225,10 @@ namespace osu.Game.Tests.Online
this.testBeatmapManager = testBeatmapManager;
}
public override Live<BeatmapSetInfo> Import(BeatmapSetInfo item, ArchiveReader archive = null, CancellationToken cancellationToken = default)
public override Live<BeatmapSetInfo> Import(BeatmapSetInfo item, ArchiveReader archive = null, bool batchImport = false, CancellationToken cancellationToken = default)
{
testBeatmapManager.AllowImport.Task.WaitSafely();
return (testBeatmapManager.CurrentImport = base.Import(item, archive, cancellationToken));
return (testBeatmapManager.CurrentImport = base.Import(item, archive, batchImport, cancellationToken));
}
}
}

View File

@ -353,11 +353,11 @@ namespace osu.Game.Beatmaps
public Task<IEnumerable<Live<BeatmapSetInfo>>> Import(ProgressNotification notification, params ImportTask[] tasks) => beatmapModelManager.Import(notification, tasks);
public Task<Live<BeatmapSetInfo>?> Import(ImportTask task, bool lowPriority = false, CancellationToken cancellationToken = default) => beatmapModelManager.Import(task, lowPriority, cancellationToken);
public Task<Live<BeatmapSetInfo>?> Import(ImportTask task, bool batchImport = false, CancellationToken cancellationToken = default) => beatmapModelManager.Import(task, batchImport, cancellationToken);
public Task<Live<BeatmapSetInfo>?> Import(ArchiveReader archive, bool lowPriority = false, CancellationToken cancellationToken = default) => beatmapModelManager.Import(archive, lowPriority, cancellationToken);
public Task<Live<BeatmapSetInfo>?> Import(ArchiveReader archive, bool batchImport = false, CancellationToken cancellationToken = default) => beatmapModelManager.Import(archive, batchImport, cancellationToken);
public Live<BeatmapSetInfo>? Import(BeatmapSetInfo item, ArchiveReader? archive = null, CancellationToken cancellationToken = default) => beatmapModelManager.Import(item, archive, cancellationToken);
public Live<BeatmapSetInfo>? Import(BeatmapSetInfo item, ArchiveReader? archive = null, CancellationToken cancellationToken = default) => beatmapModelManager.Import(item, archive, false, cancellationToken);
public IEnumerable<string> HandledExtensions => beatmapModelManager.HandledExtensions;

View File

@ -280,7 +280,7 @@ namespace osu.Game.Scoring
public Task<IEnumerable<Live<ScoreInfo>>> Import(ProgressNotification notification, params ImportTask[] tasks) => scoreModelManager.Import(notification, tasks);
public Live<ScoreInfo> Import(ScoreInfo item, ArchiveReader archive = null, bool lowPriority = false, CancellationToken cancellationToken = default) => scoreModelManager.Import(item, archive, cancellationToken);
public Live<ScoreInfo> Import(ScoreInfo item, ArchiveReader archive = null, bool batchImport = false, CancellationToken cancellationToken = default) => scoreModelManager.Import(item, archive, batchImport, cancellationToken);
public bool IsAvailableLocally(ScoreInfo model) => scoreModelManager.IsAvailableLocally(model);

View File

@ -276,13 +276,10 @@ namespace osu.Game.Skinning
public Task<IEnumerable<Live<SkinInfo>>> Import(ProgressNotification notification, params ImportTask[] tasks) => skinModelManager.Import(notification, tasks);
public Task<Live<SkinInfo>> Import(ImportTask task, bool lowPriority = false, CancellationToken cancellationToken = default) => skinModelManager.Import(task, lowPriority, cancellationToken);
public Task<Live<SkinInfo>> Import(ImportTask task, bool batchImport = false, CancellationToken cancellationToken = default) => skinModelManager.Import(task, batchImport, cancellationToken);
public Task<Live<SkinInfo>> Import(ArchiveReader archive, bool lowPriority = false, CancellationToken cancellationToken = default) =>
skinModelManager.Import(archive, lowPriority, cancellationToken);
public Live<SkinInfo> Import(SkinInfo item, ArchiveReader archive = null, bool lowPriority = false, CancellationToken cancellationToken = default) =>
skinModelManager.Import(item, archive, cancellationToken);
public Task<Live<SkinInfo>> Import(ArchiveReader archive, bool batchImport = false, CancellationToken cancellationToken = default) =>
skinModelManager.Import(archive, batchImport, cancellationToken);
#endregion

View File

@ -46,8 +46,6 @@ namespace osu.Game.Skinning
private const string unknown_creator_string = @"Unknown";
protected override bool HasCustomHashFunction => true;
protected override void Populate(SkinInfo model, ArchiveReader? archive, Realm realm, CancellationToken cancellationToken = default)
{
var skinInfoFile = model.Files.SingleOrDefault(f => f.Filename == skin_info_file);

View File

@ -32,12 +32,16 @@ namespace osu.Game.Stores
public abstract class RealmArchiveModelImporter<TModel> : IModelImporter<TModel>
where TModel : RealmObject, IHasRealmFiles, IHasGuidPrimaryKey, ISoftDelete
{
/// <summary>
/// The maximum number of concurrent imports to run per import scheduler.
/// </summary>
private const int import_queue_request_concurrency = 1;
/// <summary>
/// The size of a batch import operation before considering it a lower priority operation.
/// The minimum number of items in a single import call in order for the import to be processed as a batch.
/// Batch imports will apply optimisations preferring speed over consistency when detecting changes in already-imported items.
/// </summary>
private const int low_priority_import_batch_size = 1;
private const int minimum_items_considered_batch_import = 10;
/// <summary>
/// A singleton scheduler shared by all <see cref="RealmArchiveModelImporter{TModel}"/>.
@ -49,11 +53,11 @@ namespace osu.Game.Stores
private static readonly ThreadedTaskScheduler import_scheduler = new ThreadedTaskScheduler(import_queue_request_concurrency, nameof(RealmArchiveModelImporter<TModel>));
/// <summary>
/// A second scheduler for lower priority imports.
/// A second scheduler for batch imports.
/// For simplicity, these will just run in parallel with normal priority imports, but a future refactor would see this implemented via a custom scheduler/queue.
/// See https://gist.github.com/peppy/f0e118a14751fc832ca30dd48ba3876b for an incomplete version of this.
/// </summary>
private static readonly ThreadedTaskScheduler import_scheduler_low_priority = new ThreadedTaskScheduler(import_queue_request_concurrency, nameof(RealmArchiveModelImporter<TModel>));
private static readonly ThreadedTaskScheduler import_scheduler_batch = new ThreadedTaskScheduler(import_queue_request_concurrency, nameof(RealmArchiveModelImporter<TModel>));
public virtual IEnumerable<string> HandledExtensions => new[] { @".zip" };
@ -105,7 +109,7 @@ namespace osu.Game.Stores
var imported = new List<Live<TModel>>();
bool isLowPriorityImport = tasks.Length > low_priority_import_batch_size;
bool isBatchImport = tasks.Length >= minimum_items_considered_batch_import;
try
{
@ -115,7 +119,7 @@ namespace osu.Game.Stores
try
{
var model = await Import(task, isLowPriorityImport, notification.CancellationToken).ConfigureAwait(false);
var model = await Import(task, isBatchImport, notification.CancellationToken).ConfigureAwait(false);
lock (imported)
{
@ -178,16 +182,16 @@ namespace osu.Game.Stores
/// Note that this bypasses the UI flow and should only be used for special cases or testing.
/// </summary>
/// <param name="task">The <see cref="ImportTask"/> containing data about the <typeparamref name="TModel"/> to import.</param>
/// <param name="lowPriority">Whether this is a low priority import.</param>
/// <param name="batchImport">Whether this import is part of a larger batch.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The imported model, if successful.</returns>
public async Task<Live<TModel>?> Import(ImportTask task, bool lowPriority = false, CancellationToken cancellationToken = default)
public async Task<Live<TModel>?> Import(ImportTask task, bool batchImport = false, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
Live<TModel>? import;
using (ArchiveReader reader = task.GetReader())
import = await Import(reader, lowPriority, cancellationToken).ConfigureAwait(false);
import = await Import(reader, batchImport, cancellationToken).ConfigureAwait(false);
// We may or may not want to delete the file depending on where it is stored.
// e.g. reconstructing/repairing database with items from default storage.
@ -210,9 +214,9 @@ namespace osu.Game.Stores
/// Silently import an item from an <see cref="ArchiveReader"/>.
/// </summary>
/// <param name="archive">The archive to be imported.</param>
/// <param name="lowPriority">Whether this is a low priority import.</param>
/// <param name="batchImport">Whether this import is part of a larger batch.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
public async Task<Live<TModel>?> Import(ArchiveReader archive, bool lowPriority = false, CancellationToken cancellationToken = default)
public async Task<Live<TModel>?> Import(ArchiveReader archive, bool batchImport = false, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
@ -235,10 +239,10 @@ namespace osu.Game.Stores
return null;
}
var scheduledImport = Task.Factory.StartNew(() => Import(model, archive, cancellationToken),
var scheduledImport = Task.Factory.StartNew(() => Import(model, archive, batchImport, cancellationToken),
cancellationToken,
TaskCreationOptions.HideScheduler,
lowPriority ? import_scheduler_low_priority : import_scheduler);
batchImport ? import_scheduler_batch : import_scheduler);
return await scheduledImport.ConfigureAwait(false);
}
@ -248,106 +252,104 @@ namespace osu.Game.Stores
/// </summary>
/// <param name="item">The model to be imported.</param>
/// <param name="archive">An optional archive to use for model population.</param>
/// <param name="batchImport">If <c>true</c>, imports will be skipped before they begin, given an existing model matches on hash and filenames. Should generally only be used for large batch imports, as it may defy user expectations when updating an existing model.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
public virtual Live<TModel>? Import(TModel item, ArchiveReader? archive = null, CancellationToken cancellationToken = default)
public virtual Live<TModel>? Import(TModel item, ArchiveReader? archive = null, bool batchImport = false, CancellationToken cancellationToken = default) => Realm.Run(realm =>
{
return Realm.Run(realm =>
cancellationToken.ThrowIfCancellationRequested();
bool checkedExisting = false;
TModel? existing = null;
if (batchImport && archive != null)
{
cancellationToken.ThrowIfCancellationRequested();
// this is a fast bail condition to improve large import performance.
item.Hash = computeHashFast(archive);
bool checkedExisting = false;
TModel? existing = null;
checkedExisting = true;
existing = CheckForExisting(item, realm);
if (archive != null && !HasCustomHashFunction)
if (existing != null)
{
// this is a fast bail condition to improve large import performance.
item.Hash = computeHashFast(archive);
// bare minimum comparisons
//
// note that this should really be checking filesizes on disk (of existing files) for some degree of sanity.
// or alternatively doing a faster hash check. either of these require database changes and reprocessing of existing files.
if (CanSkipImport(existing, item) &&
getFilenames(existing.Files).SequenceEqual(getShortenedFilenames(archive).Select(p => p.shortened).OrderBy(f => f)) &&
checkAllFilesExist(existing))
{
LogForModel(item, @$"Found existing (optimised) {HumanisedModelName} for {item} (ID {existing.ID}) skipping import.");
checkedExisting = true;
existing = CheckForExisting(item, realm);
using (var transaction = realm.BeginWrite())
{
UndeleteForReuse(existing);
transaction.Commit();
}
return existing.ToLive(Realm);
}
LogForModel(item, @"Found existing (optimised) but failed pre-check.");
}
}
try
{
LogForModel(item, @"Beginning import...");
// TODO: do we want to make the transaction this local? not 100% sure, will need further investigation.
using (var transaction = realm.BeginWrite())
{
if (archive != null)
// TODO: look into rollback of file additions (or delayed commit).
item.Files.AddRange(createFileInfos(archive, Files, realm));
item.Hash = ComputeHash(item);
// TODO: we may want to run this outside of the transaction.
Populate(item, archive, realm, cancellationToken);
if (!checkedExisting)
existing = CheckForExisting(item, realm);
if (existing != null)
{
// bare minimum comparisons
//
// note that this should really be checking filesizes on disk (of existing files) for some degree of sanity.
// or alternatively doing a faster hash check. either of these require database changes and reprocessing of existing files.
if (CanSkipImport(existing, item) &&
getFilenames(existing.Files).SequenceEqual(getShortenedFilenames(archive).Select(p => p.shortened).OrderBy(f => f)) &&
checkAllFilesExist(existing))
if (CanReuseExisting(existing, item))
{
LogForModel(item, @$"Found existing (optimised) {HumanisedModelName} for {item} (ID {existing.ID}) skipping import.");
LogForModel(item, @$"Found existing {HumanisedModelName} for {item} (ID {existing.ID}) skipping import.");
using (var transaction = realm.BeginWrite())
{
UndeleteForReuse(existing);
transaction.Commit();
}
UndeleteForReuse(existing);
transaction.Commit();
return existing.ToLive(Realm);
}
LogForModel(item, @"Found existing (optimised) but failed pre-check.");
}
}
LogForModel(item, @"Found existing but failed re-use check.");
try
{
LogForModel(item, @"Beginning import...");
// TODO: do we want to make the transaction this local? not 100% sure, will need further investigation.
using (var transaction = realm.BeginWrite())
{
if (archive != null)
// TODO: look into rollback of file additions (or delayed commit).
item.Files.AddRange(createFileInfos(archive, Files, realm));
item.Hash = ComputeHash(item);
// TODO: we may want to run this outside of the transaction.
Populate(item, archive, realm, cancellationToken);
if (!checkedExisting)
existing = CheckForExisting(item, realm);
if (existing != null)
{
if (CanReuseExisting(existing, item))
{
LogForModel(item, @$"Found existing {HumanisedModelName} for {item} (ID {existing.ID}) skipping import.");
UndeleteForReuse(existing);
transaction.Commit();
return existing.ToLive(Realm);
}
LogForModel(item, @"Found existing but failed re-use check.");
existing.DeletePending = true;
}
PreImport(item, realm);
// import to store
realm.Add(item);
transaction.Commit();
existing.DeletePending = true;
}
LogForModel(item, @"Import successfully completed!");
}
catch (Exception e)
{
if (!(e is TaskCanceledException))
LogForModel(item, @"Database import or population failed and has been rolled back.", e);
PreImport(item, realm);
throw;
// import to store
realm.Add(item);
transaction.Commit();
}
return (Live<TModel>?)item.ToLive(Realm);
});
}
LogForModel(item, @"Import successfully completed!");
}
catch (Exception e)
{
if (!(e is TaskCanceledException))
LogForModel(item, @"Database import or population failed and has been rolled back.", e);
throw;
}
return (Live<TModel>?)item.ToLive(Realm);
});
/// <summary>
/// Any file extensions which should be included in hash creation.
@ -375,19 +377,13 @@ namespace osu.Game.Stores
Logger.Log($"{prefix} {message}", LoggingTarget.Database);
}
/// <summary>
/// Whether the implementation overrides <see cref="ComputeHash"/> with a custom implementation.
/// Custom hash implementations must bypass the early exit in the import flow (see <see cref="computeHashFast"/> usage).
/// </summary>
protected virtual bool HasCustomHashFunction => false;
/// <summary>
/// Create a SHA-2 hash from the provided archive based on file content of all files matching <see cref="HashableFileTypes"/>.
/// </summary>
/// <remarks>
/// In the case of no matching files, a hash will be generated from the passed archive's <see cref="ArchiveReader.Name"/>.
/// </remarks>
protected virtual string ComputeHash(TModel item)
protected string ComputeHash(TModel item)
{
// for now, concatenate all hashable files in the set to create a unique hash.
MemoryStream hashable = new MemoryStream();

View File

@ -147,7 +147,7 @@ namespace osu.Game.Tests.Visual
protected override BeatmapModelManager CreateBeatmapModelManager(Storage storage, RealmAccess realm, RulesetStore rulesets, BeatmapOnlineLookupQueue onlineLookupQueue)
{
return new TestBeatmapModelManager(storage, realm, onlineLookupQueue);
return new BeatmapModelManager(realm, storage, onlineLookupQueue);
}
protected override WorkingBeatmapCache CreateWorkingBeatmapCache(AudioManager audioManager, IResourceStore<byte[]> resources, IResourceStore<byte[]> storage, WorkingBeatmap defaultBeatmap, GameHost host)
@ -181,17 +181,6 @@ namespace osu.Game.Tests.Visual
=> testBeatmapManager.TestBeatmap;
}
internal class TestBeatmapModelManager : BeatmapModelManager
{
public TestBeatmapModelManager(Storage storage, RealmAccess databaseAccess, BeatmapOnlineLookupQueue beatmapOnlineLookupQueue)
: base(databaseAccess, storage, beatmapOnlineLookupQueue)
{
}
protected override string ComputeHash(BeatmapSetInfo item)
=> string.Empty;
}
public override void Save(BeatmapInfo info, IBeatmap beatmapContent, ISkin beatmapSkin = null)
{
// don't actually care about saving for this context.