// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. // See the LICENCE file in the repository root for full licence text. using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; using Humanizer; using JetBrains.Annotations; using Microsoft.EntityFrameworkCore; using osu.Framework; using osu.Framework.Bindables; using osu.Framework.Extensions; using osu.Framework.Extensions.IEnumerableExtensions; using osu.Framework.Logging; using osu.Framework.Platform; using osu.Framework.Threading; using osu.Game.IO; using osu.Game.IO.Archives; using osu.Game.IPC; using osu.Game.Overlays.Notifications; using SharpCompress.Archives.Zip; namespace osu.Game.Database { /// /// Encapsulates a model store class to give it import functionality. /// Adds cross-functionality with to give access to the central file store for the provided model. /// /// The model type. /// The associated file join type. public abstract class ArchiveModelManager : ICanAcceptFiles, IModelManager where TModel : class, IHasFiles, IHasPrimaryKey, ISoftDelete where TFileModel : class, INamedFileInfo, new() { private const int import_queue_request_concurrency = 1; /// /// The size of a batch import operation before considering it a lower priority operation. /// private const int low_priority_import_batch_size = 1; /// /// A singleton scheduler shared by all . /// /// /// This scheduler generally performs IO and CPU intensive work so concurrency is limited harshly. /// It is mainly being used as a queue mechanism for large imports. /// private static readonly ThreadedTaskScheduler import_scheduler = new ThreadedTaskScheduler(import_queue_request_concurrency, nameof(ArchiveModelManager)); /// /// A second scheduler for lower priority imports. /// For simplicity, these will just run in parallel with normal priority imports, but a future refactor would see this implemented via a custom scheduler/queue. /// See https://gist.github.com/peppy/f0e118a14751fc832ca30dd48ba3876b for an incomplete version of this. /// private static readonly ThreadedTaskScheduler import_scheduler_low_priority = new ThreadedTaskScheduler(import_queue_request_concurrency, nameof(ArchiveModelManager)); /// /// Set an endpoint for notifications to be posted to. /// public Action PostNotification { protected get; set; } /// /// Fired when a new or updated becomes available in the database. /// This is not guaranteed to run on the update thread. /// public IBindable> ItemUpdated => itemUpdated; private readonly Bindable> itemUpdated = new Bindable>(); /// /// Fired when a is removed from the database. /// This is not guaranteed to run on the update thread. /// public IBindable> ItemRemoved => itemRemoved; private readonly Bindable> itemRemoved = new Bindable>(); public virtual IEnumerable HandledExtensions => new[] { ".zip" }; protected readonly FileStore Files; protected readonly IDatabaseContextFactory ContextFactory; protected readonly MutableDatabaseBackedStore ModelStore; // ReSharper disable once NotAccessedField.Local (we should keep a reference to this so it is not finalised) private ArchiveImportIPCChannel ipc; private readonly Storage exportStorage; protected ArchiveModelManager(Storage storage, IDatabaseContextFactory contextFactory, MutableDatabaseBackedStoreWithFileIncludes modelStore, IIpcHost importHost = null) { ContextFactory = contextFactory; ModelStore = modelStore; ModelStore.ItemUpdated += item => handleEvent(() => itemUpdated.Value = new WeakReference(item)); ModelStore.ItemRemoved += item => handleEvent(() => itemRemoved.Value = new WeakReference(item)); exportStorage = storage.GetStorageForDirectory("exports"); Files = new FileStore(contextFactory, storage); if (importHost != null) ipc = new ArchiveImportIPCChannel(importHost, this); ModelStore.Cleanup(); } /// /// Import one or more items from filesystem . /// /// /// This will be treated as a low priority import if more than one path is specified; use to always import at standard priority. /// This will post notifications tracking progress. /// /// One or more archive locations on disk. public Task Import(params string[] paths) { var notification = new ProgressNotification { State = ProgressNotificationState.Active }; PostNotification?.Invoke(notification); return Import(notification, paths.Select(p => new ImportTask(p)).ToArray()); } public Task Import(params ImportTask[] tasks) { var notification = new ProgressNotification { State = ProgressNotificationState.Active }; PostNotification?.Invoke(notification); return Import(notification, tasks); } protected async Task> Import(ProgressNotification notification, params ImportTask[] tasks) { if (tasks.Length == 0) { notification.CompletionText = $"No {HumanisedModelName}s were found to import!"; notification.State = ProgressNotificationState.Completed; return Enumerable.Empty(); } notification.Progress = 0; notification.Text = $"{HumanisedModelName.Humanize(LetterCasing.Title)} import is initialising..."; int current = 0; var imported = new List(); bool isLowPriorityImport = tasks.Length > low_priority_import_batch_size; try { await Task.WhenAll(tasks.Select(async task => { notification.CancellationToken.ThrowIfCancellationRequested(); try { var model = await Import(task, isLowPriorityImport, notification.CancellationToken).ConfigureAwait(false); lock (imported) { if (model != null) imported.Add(model); current++; notification.Text = $"Imported {current} of {tasks.Length} {HumanisedModelName}s"; notification.Progress = (float)current / tasks.Length; } } catch (TaskCanceledException) { throw; } catch (Exception e) { Logger.Error(e, $@"Could not import ({task})", LoggingTarget.Database); } })).ConfigureAwait(false); } catch (OperationCanceledException) { if (imported.Count == 0) { notification.State = ProgressNotificationState.Cancelled; return imported; } } if (imported.Count == 0) { notification.Text = $"{HumanisedModelName.Humanize(LetterCasing.Title)} import failed!"; notification.State = ProgressNotificationState.Cancelled; } else { notification.CompletionText = imported.Count == 1 ? $"Imported {imported.First()}!" : $"Imported {imported.Count} {HumanisedModelName}s!"; if (imported.Count > 0 && PresentImport != null) { notification.CompletionText += " Click to view."; notification.CompletionClickAction = () => { PresentImport?.Invoke(imported); return true; }; } notification.State = ProgressNotificationState.Completed; } return imported; } /// /// Import one from the filesystem and delete the file on success. /// Note that this bypasses the UI flow and should only be used for special cases or testing. /// /// The containing data about the to import. /// Whether this is a low priority import. /// An optional cancellation token. /// The imported model, if successful. internal async Task Import(ImportTask task, bool lowPriority = false, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); TModel import; using (ArchiveReader reader = task.GetReader()) import = await Import(reader, lowPriority, cancellationToken).ConfigureAwait(false); // We may or may not want to delete the file depending on where it is stored. // e.g. reconstructing/repairing database with items from default storage. // Also, not always a single file, i.e. for LegacyFilesystemReader // TODO: Add a check to prevent files from storage to be deleted. try { if (import != null && File.Exists(task.Path) && ShouldDeleteArchive(task.Path)) File.Delete(task.Path); } catch (Exception e) { LogForModel(import, $@"Could not delete original file after import ({task})", e); } return import; } /// /// Fired when the user requests to view the resulting import. /// public Action> PresentImport; /// /// Silently import an item from an . /// /// The archive to be imported. /// Whether this is a low priority import. /// An optional cancellation token. public Task Import(ArchiveReader archive, bool lowPriority = false, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); TModel model = null; try { model = CreateModel(archive); if (model == null) return Task.FromResult(null); } catch (TaskCanceledException) { throw; } catch (Exception e) { LogForModel(model, $"Model creation of {archive.Name} failed.", e); return null; } return Import(model, archive, lowPriority, cancellationToken); } /// /// Any file extensions which should be included in hash creation. /// Generally should include all file types which determine the file's uniqueness. /// Large files should be avoided if possible. /// /// /// This is only used by the default hash implementation. If is overridden, it will not be used. /// protected abstract string[] HashableFileTypes { get; } internal static void LogForModel(TModel model, string message, Exception e = null) { string prefix = $"[{(model?.Hash ?? "?????").Substring(0, 5)}]"; if (e != null) Logger.Error(e, $"{prefix} {message}", LoggingTarget.Database); else Logger.Log($"{prefix} {message}", LoggingTarget.Database); } /// /// Create a SHA-2 hash from the provided archive based on file content of all files matching . /// /// /// In the case of no matching files, a hash will be generated from the passed archive's . /// protected virtual string ComputeHash(TModel item, ArchiveReader reader = null) { // for now, concatenate all .osu files in the set to create a unique hash. MemoryStream hashable = new MemoryStream(); foreach (TFileModel file in item.Files.Where(f => HashableFileTypes.Any(ext => f.Filename.EndsWith(ext, StringComparison.OrdinalIgnoreCase))).OrderBy(f => f.Filename)) { using (Stream s = Files.Store.GetStream(file.FileInfo.StoragePath)) s.CopyTo(hashable); } if (hashable.Length > 0) return hashable.ComputeSHA2Hash(); if (reader != null) return reader.Name.ComputeSHA2Hash(); return item.Hash; } /// /// Silently import an item from a . /// /// The model to be imported. /// An optional archive to use for model population. /// Whether this is a low priority import. /// An optional cancellation token. public virtual async Task Import(TModel item, ArchiveReader archive = null, bool lowPriority = false, CancellationToken cancellationToken = default) => await Task.Factory.StartNew(async () => { cancellationToken.ThrowIfCancellationRequested(); delayEvents(); void rollback() { if (!Delete(item)) { // We may have not yet added the model to the underlying table, but should still clean up files. LogForModel(item, "Dereferencing files for incomplete import."); Files.Dereference(item.Files.Select(f => f.FileInfo).ToArray()); } } try { LogForModel(item, "Beginning import..."); item.Files = archive != null ? createFileInfos(archive, Files) : new List(); item.Hash = ComputeHash(item, archive); await Populate(item, archive, cancellationToken).ConfigureAwait(false); using (var write = ContextFactory.GetForWrite()) // used to share a context for full import. keep in mind this will block all writes. { try { if (!write.IsTransactionLeader) throw new InvalidOperationException($"Ensure there is no parent transaction so errors can correctly be handled by {this}"); var existing = CheckForExisting(item); if (existing != null) { if (CanReuseExisting(existing, item)) { Undelete(existing); LogForModel(item, $"Found existing {HumanisedModelName} for {item} (ID {existing.ID}) – skipping import."); // existing item will be used; rollback new import and exit early. rollback(); flushEvents(true); return existing; } Delete(existing); ModelStore.PurgeDeletable(s => s.ID == existing.ID); } PreImport(item); // import to store ModelStore.Add(item); } catch (Exception e) { write.Errors.Add(e); throw; } } LogForModel(item, "Import successfully completed!"); } catch (Exception e) { if (!(e is TaskCanceledException)) LogForModel(item, "Database import or population failed and has been rolled back.", e); rollback(); flushEvents(false); throw; } flushEvents(true); return item; }, cancellationToken, TaskCreationOptions.HideScheduler, lowPriority ? import_scheduler_low_priority : import_scheduler).Unwrap().ConfigureAwait(false); /// /// Exports an item to a legacy (.zip based) package. /// /// The item to export. public void Export(TModel item) { var retrievedItem = ModelStore.ConsumableItems.FirstOrDefault(s => s.ID == item.ID); if (retrievedItem == null) throw new ArgumentException("Specified model could not be found", nameof(item)); using (var outputStream = exportStorage.GetStream($"{getValidFilename(item.ToString())}{HandledExtensions.First()}", FileAccess.Write, FileMode.Create)) ExportModelTo(retrievedItem, outputStream); exportStorage.OpenInNativeExplorer(); } /// /// Exports an item to the given output stream. /// /// The item to export. /// The output stream to export to. protected virtual void ExportModelTo(TModel model, Stream outputStream) { using (var archive = ZipArchive.Create()) { foreach (var file in model.Files) archive.AddEntry(file.Filename, Files.Storage.GetStream(file.FileInfo.StoragePath)); archive.SaveTo(outputStream); } } /// /// Replace an existing file with a new version. /// /// The item to operate on. /// The existing file to be replaced. /// The new file contents. /// An optional filename for the new file. Will use the previous filename if not specified. public void ReplaceFile(TModel model, TFileModel file, Stream contents, string filename = null) { using (ContextFactory.GetForWrite()) { DeleteFile(model, file); AddFile(model, contents, filename ?? file.Filename); } } /// /// Delete an existing file. /// /// The item to operate on. /// The existing file to be deleted. public void DeleteFile(TModel model, TFileModel file) { using (var usage = ContextFactory.GetForWrite()) { // Dereference the existing file info, since the file model will be removed. if (file.FileInfo != null) { Files.Dereference(file.FileInfo); // This shouldn't be required, but here for safety in case the provided TModel is not being change tracked // Definitely can be removed once we rework the database backend. usage.Context.Set().Remove(file); } model.Files.Remove(file); } } /// /// Add a new file. /// /// The item to operate on. /// The new file contents. /// The filename for the new file. public void AddFile(TModel model, Stream contents, string filename) { using (ContextFactory.GetForWrite()) { model.Files.Add(new TFileModel { Filename = filename, FileInfo = Files.Add(contents) }); Update(model); } } /// /// Perform an update of the specified item. /// TODO: Support file additions/removals. /// /// The item to update. public void Update(TModel item) { using (ContextFactory.GetForWrite()) { item.Hash = ComputeHash(item); ModelStore.Update(item); } } /// /// Delete an item from the manager. /// Is a no-op for already deleted items. /// /// The item to delete. /// false if no operation was performed public bool Delete(TModel item) { using (ContextFactory.GetForWrite()) { // re-fetch the model on the import context. var foundModel = queryModel().Include(s => s.Files).ThenInclude(f => f.FileInfo).FirstOrDefault(s => s.ID == item.ID); if (foundModel == null || foundModel.DeletePending) return false; if (ModelStore.Delete(foundModel)) Files.Dereference(foundModel.Files.Select(f => f.FileInfo).ToArray()); return true; } } /// /// Delete multiple items. /// This will post notifications tracking progress. /// public void Delete(List items, bool silent = false) { if (items.Count == 0) return; var notification = new ProgressNotification { Progress = 0, Text = $"Preparing to delete all {HumanisedModelName}s...", CompletionText = $"Deleted all {HumanisedModelName}s!", State = ProgressNotificationState.Active, }; if (!silent) PostNotification?.Invoke(notification); int i = 0; foreach (var b in items) { if (notification.State == ProgressNotificationState.Cancelled) // user requested abort return; notification.Text = $"Deleting {HumanisedModelName}s ({++i} of {items.Count})"; Delete(b); notification.Progress = (float)i / items.Count; } notification.State = ProgressNotificationState.Completed; } /// /// Restore multiple items that were previously deleted. /// This will post notifications tracking progress. /// public void Undelete(List items, bool silent = false) { if (!items.Any()) return; var notification = new ProgressNotification { CompletionText = "Restored all deleted items!", Progress = 0, State = ProgressNotificationState.Active, }; if (!silent) PostNotification?.Invoke(notification); int i = 0; foreach (var item in items) { if (notification.State == ProgressNotificationState.Cancelled) // user requested abort return; notification.Text = $"Restoring ({++i} of {items.Count})"; Undelete(item); notification.Progress = (float)i / items.Count; } notification.State = ProgressNotificationState.Completed; } /// /// Restore an item that was previously deleted. Is a no-op if the item is not in a deleted state, or has its protected flag set. /// /// The item to restore public void Undelete(TModel item) { using (var usage = ContextFactory.GetForWrite()) { usage.Context.ChangeTracker.AutoDetectChangesEnabled = false; if (!ModelStore.Undelete(item)) return; Files.Reference(item.Files.Select(f => f.FileInfo).ToArray()); usage.Context.ChangeTracker.AutoDetectChangesEnabled = true; } } /// /// Create all required s for the provided archive, adding them to the global file store. /// private List createFileInfos(ArchiveReader reader, FileStore files) { var fileInfos = new List(); string prefix = reader.Filenames.GetCommonPrefix(); if (!(prefix.EndsWith('/') || prefix.EndsWith('\\'))) prefix = string.Empty; // import files to manager foreach (string file in reader.Filenames) { using (Stream s = reader.GetStream(file)) { fileInfos.Add(new TFileModel { Filename = file.Substring(prefix.Length).ToStandardisedPath(), FileInfo = files.Add(s) }); } } return fileInfos; } #region osu-stable import /// /// Set a storage with access to an osu-stable install for import purposes. /// public Func GetStableStorage { private get; set; } /// /// Denotes whether an osu-stable installation is present to perform automated imports from. /// public bool StableInstallationAvailable => GetStableStorage?.Invoke() != null; /// /// The relative path from osu-stable's data directory to import items from. /// protected virtual string ImportFromStablePath => null; /// /// Select paths to import from stable where all paths should be absolute. Default implementation iterates all directories in . /// protected virtual IEnumerable GetStableImportPaths(Storage storage) => storage.GetDirectories(ImportFromStablePath) .Select(path => storage.GetFullPath(path)); /// /// Whether this specified path should be removed after successful import. /// /// The path for consideration. May be a file or a directory. /// Whether to perform deletion. protected virtual bool ShouldDeleteArchive(string path) => false; /// /// This is a temporary method and will likely be replaced by a full-fledged (and more correctly placed) migration process in the future. /// public Task ImportFromStableAsync(StableStorage stableStorage) { if (stableStorage == null) { Logger.Log("No osu!stable installation available!", LoggingTarget.Information, LogLevel.Error); return Task.CompletedTask; } var storage = PrepareStableStorage(stableStorage); if (!storage.ExistsDirectory(ImportFromStablePath)) { // This handles situations like when the user does not have a Skins folder Logger.Log($"No {ImportFromStablePath} folder available in osu!stable installation", LoggingTarget.Information, LogLevel.Error); return Task.CompletedTask; } return Task.Run(async () => await Import(GetStableImportPaths(storage).ToArray()).ConfigureAwait(false)); } /// /// Run any required traversal operations on the stable storage location before performing operations. /// /// The stable storage. /// The usable storage. Return the unchanged if no traversal is required. protected virtual Storage PrepareStableStorage(StableStorage stableStorage) => stableStorage; #endregion /// /// Create a barebones model from the provided archive. /// Actual expensive population should be done in ; this should just prepare for duplicate checking. /// /// The archive to create the model for. /// A model populated with minimal information. Returning a null will abort importing silently. protected abstract TModel CreateModel(ArchiveReader archive); /// /// Populate the provided model completely from the given archive. /// After this method, the model should be in a state ready to commit to a store. /// /// The model to populate. /// The archive to use as a reference for population. May be null. /// An optional cancellation token. protected virtual Task Populate(TModel model, [CanBeNull] ArchiveReader archive, CancellationToken cancellationToken = default) => Task.CompletedTask; /// /// Perform any final actions before the import to database executes. /// /// The model prepared for import. protected virtual void PreImport(TModel model) { } /// /// Check whether an existing model already exists for a new import item. /// /// The new model proposed for import. /// An existing model which matches the criteria to skip importing, else null. protected TModel CheckForExisting(TModel model) => model.Hash == null ? null : ModelStore.ConsumableItems.FirstOrDefault(b => b.Hash == model.Hash); /// /// After an existing is found during an import process, the default behaviour is to use/restore the existing /// item and skip the import. This method allows changing that behaviour. /// /// The existing model. /// The newly imported model. /// Whether the existing model should be restored and used. Returning false will delete the existing and force a re-import. protected virtual bool CanReuseExisting(TModel existing, TModel import) => // for the best or worst, we copy and import files of a new import before checking whether // it is a duplicate. so to check if anything has changed, we can just compare all FileInfo IDs. getIDs(existing.Files).SequenceEqual(getIDs(import.Files)) && getFilenames(existing.Files).SequenceEqual(getFilenames(import.Files)); private IEnumerable getIDs(List files) { foreach (var f in files.OrderBy(f => f.Filename)) yield return f.FileInfo.ID; } private IEnumerable getFilenames(List files) { foreach (var f in files.OrderBy(f => f.Filename)) yield return f.Filename; } private DbSet queryModel() => ContextFactory.Get().Set(); protected virtual string HumanisedModelName => $"{typeof(TModel).Name.Replace("Info", "").ToLower()}"; #region Event handling / delaying private readonly List queuedEvents = new List(); /// /// Allows delaying of outwards events until an operation is confirmed (at a database level). /// private bool delayingEvents; /// /// Begin delaying outwards events. /// private void delayEvents() => delayingEvents = true; /// /// Flush delayed events and disable delaying. /// /// Whether the flushed events should be performed. private void flushEvents(bool perform) { Action[] events; lock (queuedEvents) { events = queuedEvents.ToArray(); queuedEvents.Clear(); } if (perform) { foreach (var a in events) a.Invoke(); } delayingEvents = false; } private void handleEvent(Action a) { if (delayingEvents) { lock (queuedEvents) queuedEvents.Add(a); } else a.Invoke(); } #endregion private string getValidFilename(string filename) { foreach (char c in Path.GetInvalidFileNameChars()) filename = filename.Replace(c, '_'); return filename; } } }