// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. // See the LICENCE file in the repository root for full licence text. #nullable enable using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; using Humanizer; using osu.Framework.Extensions; using osu.Framework.Extensions.IEnumerableExtensions; using osu.Framework.Logging; using osu.Framework.Platform; using osu.Framework.Threading; using osu.Game.Extensions; using osu.Game.IO.Archives; using osu.Game.Models; using osu.Game.Overlays.Notifications; using Realms; namespace osu.Game.Database { /// /// Encapsulates a model store class to give it import functionality. /// Adds cross-functionality with to give access to the central file store for the provided model. /// /// The model type. public abstract class RealmArchiveModelImporter : IModelImporter where TModel : RealmObject, IHasRealmFiles, IHasGuidPrimaryKey, ISoftDelete { /// /// The maximum number of concurrent imports to run per import scheduler. /// private const int import_queue_request_concurrency = 1; /// /// The minimum number of items in a single import call in order for the import to be processed as a batch. /// Batch imports will apply optimisations preferring speed over consistency when detecting changes in already-imported items. /// private const int minimum_items_considered_batch_import = 10; /// /// A singleton scheduler shared by all . /// /// /// This scheduler generally performs IO and CPU intensive work so concurrency is limited harshly. /// It is mainly being used as a queue mechanism for large imports. /// private static readonly ThreadedTaskScheduler import_scheduler = new ThreadedTaskScheduler(import_queue_request_concurrency, nameof(RealmArchiveModelImporter)); /// /// A second scheduler for batch imports. /// For simplicity, these will just run in parallel with normal priority imports, but a future refactor would see this implemented via a custom scheduler/queue. /// See https://gist.github.com/peppy/f0e118a14751fc832ca30dd48ba3876b for an incomplete version of this. /// private static readonly ThreadedTaskScheduler import_scheduler_batch = new ThreadedTaskScheduler(import_queue_request_concurrency, nameof(RealmArchiveModelImporter)); public virtual IEnumerable HandledExtensions => new[] { @".zip" }; protected readonly RealmFileStore Files; protected readonly RealmAccess Realm; /// /// Fired when the user requests to view the resulting import. /// public Action>>? PostImport { get; set; } /// /// Set an endpoint for notifications to be posted to. /// public Action? PostNotification { protected get; set; } protected RealmArchiveModelImporter(Storage storage, RealmAccess realm) { Realm = realm; Files = new RealmFileStore(realm, storage); } public Task Import(params string[] paths) => Import(paths.Select(p => new ImportTask(p)).ToArray()); public Task Import(params ImportTask[] tasks) { var notification = new ProgressNotification { State = ProgressNotificationState.Active }; PostNotification?.Invoke(notification); return Import(notification, tasks); } public async Task>> Import(ProgressNotification notification, params ImportTask[] tasks) { if (tasks.Length == 0) { notification.CompletionText = $"No {HumanisedModelName}s were found to import!"; notification.State = ProgressNotificationState.Completed; return Enumerable.Empty>(); } notification.Progress = 0; notification.Text = $"{HumanisedModelName.Humanize(LetterCasing.Title)} import is initialising..."; int current = 0; var imported = new List>(); bool isBatchImport = tasks.Length >= minimum_items_considered_batch_import; try { await Task.WhenAll(tasks.Select(async task => { notification.CancellationToken.ThrowIfCancellationRequested(); try { var model = await Import(task, isBatchImport, notification.CancellationToken).ConfigureAwait(false); lock (imported) { if (model != null) imported.Add(model); current++; notification.Text = $"Imported {current} of {tasks.Length} {HumanisedModelName}s"; notification.Progress = (float)current / tasks.Length; } } catch (TaskCanceledException) { throw; } catch (Exception e) { Logger.Error(e, $@"Could not import ({task})", LoggingTarget.Database); } })).ConfigureAwait(false); } catch (OperationCanceledException) { if (imported.Count == 0) { notification.State = ProgressNotificationState.Cancelled; return imported; } } if (imported.Count == 0) { notification.Text = $"{HumanisedModelName.Humanize(LetterCasing.Title)} import failed!"; notification.State = ProgressNotificationState.Cancelled; } else { notification.CompletionText = imported.Count == 1 ? $"Imported {imported.First().GetDisplayString()}!" : $"Imported {imported.Count} {HumanisedModelName}s!"; if (imported.Count > 0 && PostImport != null) { notification.CompletionText += " Click to view."; notification.CompletionClickAction = () => { PostImport?.Invoke(imported); return true; }; } notification.State = ProgressNotificationState.Completed; } return imported; } /// /// Import one from the filesystem and delete the file on success. /// Note that this bypasses the UI flow and should only be used for special cases or testing. /// /// The containing data about the to import. /// Whether this import is part of a larger batch. /// An optional cancellation token. /// The imported model, if successful. public async Task?> Import(ImportTask task, bool batchImport = false, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); Live? import; using (ArchiveReader reader = task.GetReader()) import = await Import(reader, batchImport, cancellationToken).ConfigureAwait(false); // We may or may not want to delete the file depending on where it is stored. // e.g. reconstructing/repairing database with items from default storage. // Also, not always a single file, i.e. for LegacyFilesystemReader // TODO: Add a check to prevent files from storage to be deleted. try { if (import != null && File.Exists(task.Path) && ShouldDeleteArchive(task.Path)) File.Delete(task.Path); } catch (Exception e) { Logger.Error(e, $@"Could not delete original file after import ({task})"); } return import; } /// /// Silently import an item from an . /// /// The archive to be imported. /// Whether this import is part of a larger batch. /// An optional cancellation token. public async Task?> Import(ArchiveReader archive, bool batchImport = false, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); TModel? model = null; try { model = CreateModel(archive); if (model == null) return null; } catch (TaskCanceledException) { throw; } catch (Exception e) { LogForModel(model, @$"Model creation of {archive.Name} failed.", e); return null; } var scheduledImport = Task.Factory.StartNew(() => Import(model, archive, batchImport, cancellationToken), cancellationToken, TaskCreationOptions.HideScheduler, batchImport ? import_scheduler_batch : import_scheduler); return await scheduledImport.ConfigureAwait(false); } /// /// Silently import an item from a . /// /// The model to be imported. /// An optional archive to use for model population. /// If true, imports will be skipped before they begin, given an existing model matches on hash and filenames. Should generally only be used for large batch imports, as it may defy user expectations when updating an existing model. /// An optional cancellation token. public virtual Live? Import(TModel item, ArchiveReader? archive = null, bool batchImport = false, CancellationToken cancellationToken = default) => Realm.Run(realm => { cancellationToken.ThrowIfCancellationRequested(); bool checkedExisting = false; TModel? existing = null; if (batchImport && archive != null) { // this is a fast bail condition to improve large import performance. item.Hash = computeHashFast(archive); checkedExisting = true; existing = CheckForExisting(item, realm); if (existing != null) { // bare minimum comparisons // // note that this should really be checking filesizes on disk (of existing files) for some degree of sanity. // or alternatively doing a faster hash check. either of these require database changes and reprocessing of existing files. if (CanSkipImport(existing, item) && getFilenames(existing.Files).SequenceEqual(getShortenedFilenames(archive).Select(p => p.shortened).OrderBy(f => f)) && checkAllFilesExist(existing)) { LogForModel(item, @$"Found existing (optimised) {HumanisedModelName} for {item} (ID {existing.ID}) – skipping import."); using (var transaction = realm.BeginWrite()) { UndeleteForReuse(existing); transaction.Commit(); } return existing.ToLive(Realm); } LogForModel(item, @"Found existing (optimised) but failed pre-check."); } } try { LogForModel(item, @"Beginning import..."); // TODO: do we want to make the transaction this local? not 100% sure, will need further investigation. using (var transaction = realm.BeginWrite()) { if (archive != null) // TODO: look into rollback of file additions (or delayed commit). item.Files.AddRange(createFileInfos(archive, Files, realm)); item.Hash = ComputeHash(item); // TODO: we may want to run this outside of the transaction. Populate(item, archive, realm, cancellationToken); if (!checkedExisting) existing = CheckForExisting(item, realm); if (existing != null) { if (CanReuseExisting(existing, item)) { LogForModel(item, @$"Found existing {HumanisedModelName} for {item} (ID {existing.ID}) – skipping import."); UndeleteForReuse(existing); transaction.Commit(); return existing.ToLive(Realm); } LogForModel(item, @"Found existing but failed re-use check."); existing.DeletePending = true; } PreImport(item, realm); // import to store realm.Add(item); transaction.Commit(); } LogForModel(item, @"Import successfully completed!"); } catch (Exception e) { if (!(e is TaskCanceledException)) LogForModel(item, @"Database import or population failed and has been rolled back.", e); throw; } return (Live?)item.ToLive(Realm); }); /// /// Any file extensions which should be included in hash creation. /// Generally should include all file types which determine the file's uniqueness. /// Large files should be avoided if possible. /// /// /// This is only used by the default hash implementation. If is overridden, it will not be used. /// protected abstract string[] HashableFileTypes { get; } internal static void LogForModel(TModel? model, string message, Exception? e = null) { string trimmedHash; if (model == null || !model.IsValid || string.IsNullOrEmpty(model.Hash)) trimmedHash = "?????"; else trimmedHash = model.Hash.Substring(0, 5); string prefix = $"[{trimmedHash}]"; if (e != null) Logger.Error(e, $"{prefix} {message}", LoggingTarget.Database); else Logger.Log($"{prefix} {message}", LoggingTarget.Database); } /// /// Create a SHA-2 hash from the provided archive based on file content of all files matching . /// /// /// In the case of no matching files, a hash will be generated from the passed archive's . /// protected string ComputeHash(TModel item) { // for now, concatenate all hashable files in the set to create a unique hash. MemoryStream hashable = new MemoryStream(); foreach (RealmNamedFileUsage file in item.Files.Where(f => HashableFileTypes.Any(ext => f.Filename.EndsWith(ext, StringComparison.OrdinalIgnoreCase))).OrderBy(f => f.Filename)) { using (Stream s = Files.Store.GetStream(file.File.GetStoragePath())) s.CopyTo(hashable); } if (hashable.Length > 0) return hashable.ComputeSHA2Hash(); return item.Hash; } private string computeHashFast(ArchiveReader reader) { MemoryStream hashable = new MemoryStream(); foreach (string? file in reader.Filenames.Where(f => HashableFileTypes.Any(ext => f.EndsWith(ext, StringComparison.OrdinalIgnoreCase))).OrderBy(f => f)) { using (Stream s = reader.GetStream(file)) s.CopyTo(hashable); } if (hashable.Length > 0) return hashable.ComputeSHA2Hash(); return reader.Name.ComputeSHA2Hash(); } /// /// Create all required s for the provided archive, adding them to the global file store. /// private List createFileInfos(ArchiveReader reader, RealmFileStore files, Realm realm) { var fileInfos = new List(); // import files to manager foreach (var filenames in getShortenedFilenames(reader)) { using (Stream s = reader.GetStream(filenames.original)) { var item = new RealmNamedFileUsage(files.Add(s, realm), filenames.shortened); fileInfos.Add(item); } } return fileInfos; } private IEnumerable<(string original, string shortened)> getShortenedFilenames(ArchiveReader reader) { string prefix = reader.Filenames.GetCommonPrefix(); if (!(prefix.EndsWith('/') || prefix.EndsWith('\\'))) prefix = string.Empty; foreach (string file in reader.Filenames) yield return (file, file.Substring(prefix.Length).ToStandardisedPath()); } /// /// Create a barebones model from the provided archive. /// Actual expensive population should be done in ; this should just prepare for duplicate checking. /// /// The archive to create the model for. /// A model populated with minimal information. Returning a null will abort importing silently. protected abstract TModel? CreateModel(ArchiveReader archive); /// /// Populate the provided model completely from the given archive. /// After this method, the model should be in a state ready to commit to a store. /// /// The model to populate. /// The archive to use as a reference for population. May be null. /// The current realm context. /// An optional cancellation token. protected abstract void Populate(TModel model, ArchiveReader? archive, Realm realm, CancellationToken cancellationToken = default); /// /// Perform any final actions before the import to database executes. /// /// The model prepared for import. /// The current realm context. protected virtual void PreImport(TModel model, Realm realm) { } /// /// Check whether an existing model already exists for a new import item. /// /// The new model proposed for import. /// The current realm context. /// An existing model which matches the criteria to skip importing, else null. protected TModel? CheckForExisting(TModel model, Realm realm) => string.IsNullOrEmpty(model.Hash) ? null : realm.All().FirstOrDefault(b => b.Hash == model.Hash); /// /// Whether import can be skipped after finding an existing import early in the process. /// Only valid when is not overridden. /// /// The existing model. /// The newly imported model. /// Whether to skip this import completely. protected virtual bool CanSkipImport(TModel existing, TModel import) => true; /// /// After an existing is found during an import process, the default behaviour is to use/restore the existing /// item and skip the import. This method allows changing that behaviour. /// /// The existing model. /// The newly imported model. /// Whether the existing model should be restored and used. Returning false will delete the existing and force a re-import. protected virtual bool CanReuseExisting(TModel existing, TModel import) => // for the best or worst, we copy and import files of a new import before checking whether // it is a duplicate. so to check if anything has changed, we can just compare all File IDs. getIDs(existing.Files).SequenceEqual(getIDs(import.Files)) && getFilenames(existing.Files).SequenceEqual(getFilenames(import.Files)); private bool checkAllFilesExist(TModel model) => model.Files.All(f => Files.Storage.Exists(f.File.GetStoragePath())); /// /// Called when an existing model is in a soft deleted state but being recovered. /// /// The existing model. protected virtual void UndeleteForReuse(TModel existing) { if (!existing.DeletePending) return; LogForModel(existing, $@"Existing {HumanisedModelName}'s deletion flag has been removed to allow for reuse."); existing.DeletePending = false; } /// /// Whether this specified path should be removed after successful import. /// /// The path for consideration. May be a file or a directory. /// Whether to perform deletion. protected virtual bool ShouldDeleteArchive(string path) => false; private IEnumerable getIDs(IEnumerable files) { foreach (var f in files.OrderBy(f => f.Filename)) yield return f.File.Hash; } private IEnumerable getFilenames(IEnumerable files) { foreach (var f in files.OrderBy(f => f.Filename)) yield return f.Filename; } public virtual string HumanisedModelName => $"{typeof(TModel).Name.Replace(@"Info", "").ToLower()}"; } }