diff --git a/osu.Game.Tests/Database/GeneralUsageTests.cs b/osu.Game.Tests/Database/GeneralUsageTests.cs index 65f805bafb..5b6f7a0a53 100644 --- a/osu.Game.Tests/Database/GeneralUsageTests.cs +++ b/osu.Game.Tests/Database/GeneralUsageTests.cs @@ -2,11 +2,14 @@ // See the LICENCE file in the repository root for full licence text. using System; +using System.Linq; using System.Threading; using System.Threading.Tasks; using NUnit.Framework; +using osu.Framework.Extensions; using osu.Game.Beatmaps; using osu.Game.Database; +using osu.Game.Tests.Resources; namespace osu.Game.Tests.Database { @@ -33,6 +36,85 @@ namespace osu.Game.Tests.Database }); } + [Test] + public void TestAsyncWriteAsync() + { + RunTestWithRealmAsync(async (realm, _) => + { + await realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo())); + + realm.Run(r => r.Refresh()); + + Assert.That(realm.Run(r => r.All().Count()), Is.EqualTo(1)); + }); + } + + [Test] + public void TestAsyncWriteWhileBlocking() + { + RunTestWithRealm((realm, _) => + { + Task writeTask; + + using (realm.BlockAllOperations()) + { + writeTask = realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo())); + Thread.Sleep(100); + Assert.That(writeTask.IsCompleted, Is.False); + } + + writeTask.WaitSafely(); + + realm.Run(r => r.Refresh()); + Assert.That(realm.Run(r => r.All().Count()), Is.EqualTo(1)); + }); + } + + [Test] + public void TestAsyncWrite() + { + RunTestWithRealm((realm, _) => + { + realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo())).WaitSafely(); + + realm.Run(r => r.Refresh()); + + Assert.That(realm.Run(r => r.All().Count()), Is.EqualTo(1)); + }); + } + + [Test] + public void TestAsyncWriteAfterDisposal() + { + RunTestWithRealm((realm, _) => + { + realm.Dispose(); + Assert.ThrowsAsync(() => realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo()))); + }); + } + + [Test] + public void TestAsyncWriteBeforeDisposal() + { + ManualResetEventSlim resetEvent = new ManualResetEventSlim(); + + RunTestWithRealm((realm, _) => + { + var writeTask = realm.WriteAsync(r => + { + // ensure that disposal blocks for our execution + Assert.That(resetEvent.Wait(100), Is.False); + + r.Add(TestResources.CreateTestBeatmapSetInfo()); + }); + + realm.Dispose(); + resetEvent.Set(); + + writeTask.WaitSafely(); + }); + } + /// /// Test to ensure that a `CreateContext` call nested inside a subscription doesn't cause any deadlocks /// due to context fetching semaphores. diff --git a/osu.Game.Tests/Database/RealmSubscriptionRegistrationTests.cs b/osu.Game.Tests/Database/RealmSubscriptionRegistrationTests.cs index b8ce036da1..c74341b5c9 100644 --- a/osu.Game.Tests/Database/RealmSubscriptionRegistrationTests.cs +++ b/osu.Game.Tests/Database/RealmSubscriptionRegistrationTests.cs @@ -5,7 +5,6 @@ using System; using System.Collections.Generic; using System.Linq; using System.Threading; -using System.Threading.Tasks; using NUnit.Framework; using osu.Framework.Allocation; using osu.Framework.Extensions; @@ -84,11 +83,7 @@ namespace osu.Game.Tests.Database realm.Run(r => r.Refresh()); - // Without forcing the write onto its own thread, realm will internally run the operation synchronously, which can cause a deadlock with `WaitSafely`. - Task.Run(async () => - { - await realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo())); - }).WaitSafely(); + realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo())).WaitSafely(); realm.Run(r => r.Refresh()); diff --git a/osu.Game/Database/RealmAccess.cs b/osu.Game/Database/RealmAccess.cs index cb34a92702..8cf9bb4a47 100644 --- a/osu.Game/Database/RealmAccess.cs +++ b/osu.Game/Database/RealmAccess.cs @@ -98,8 +98,6 @@ namespace osu.Game.Database private static readonly GlobalStatistic total_writes_async = GlobalStatistics.Get(@"Realm", @"Writes (Async)"); - private readonly object realmLock = new object(); - private Realm? updateRealm; /// @@ -122,24 +120,21 @@ namespace osu.Game.Database if (!ThreadSafety.IsUpdateThread) throw new InvalidOperationException(@$"Use {nameof(getRealmInstance)} when performing realm operations from a non-update thread"); - lock (realmLock) + if (updateRealm == null) { - if (updateRealm == null) - { - updateRealm = getRealmInstance(); - hasInitialisedOnce = true; + updateRealm = getRealmInstance(); + hasInitialisedOnce = true; - Logger.Log(@$"Opened realm ""{updateRealm.Config.DatabasePath}"" at version {updateRealm.Config.SchemaVersion}"); + Logger.Log(@$"Opened realm ""{updateRealm.Config.DatabasePath}"" at version {updateRealm.Config.SchemaVersion}"); - // Resubscribe any subscriptions - foreach (var action in customSubscriptionsResetMap.Keys.ToArray()) - registerSubscription(action); - } - - Debug.Assert(updateRealm != null); - - return updateRealm; + // Resubscribe any subscriptions + foreach (var action in customSubscriptionsResetMap.Keys.ToArray()) + registerSubscription(action); } + + Debug.Assert(updateRealm != null); + + return updateRealm; } internal static bool CurrentThreadSubscriptionsAllowed => current_thread_subscriptions_allowed.Value; @@ -388,16 +383,29 @@ namespace osu.Game.Database } } + private readonly CountdownEvent pendingAsyncWrites = new CountdownEvent(0); + /// /// Write changes to realm asynchronously, guaranteeing order of execution. /// /// The work to run. public Task WriteAsync(Action action) { + if (isDisposed) + throw new ObjectDisposedException(nameof(RealmAccess)); + + // Required to ensure the write is tracked and accounted for before disposal. + // Can potentially be avoided if we have a need to do so in the future. + if (!ThreadSafety.IsUpdateThread) + throw new InvalidOperationException(@$"{nameof(WriteAsync)} must be called from the update thread."); + + // CountdownEvent will fail if already at zero. + if (!pendingAsyncWrites.TryAddCount()) + pendingAsyncWrites.Reset(1); + // Regardless of calling Realm.GetInstance or Realm.GetInstanceAsync, there is a blocking overhead on retrieval. // Adding a forced Task.Run resolves this. - - return Task.Run(async () => + var writeTask = Task.Run(async () => { total_writes_async.Value++; @@ -407,7 +415,11 @@ namespace osu.Game.Database using (var realm = getRealmInstance()) // ReSharper disable once AccessToDisposedClosure (WriteAsync should be marked as [InstantHandle]). await realm.WriteAsync(() => action(realm)); + + pendingAsyncWrites.Signal(); }); + + return writeTask; } /// @@ -432,14 +444,15 @@ namespace osu.Game.Database public IDisposable RegisterForNotifications(Func> query, NotificationCallbackDelegate callback) where T : RealmObjectBase { - lock (realmLock) - { - Func action = realm => query(realm).QueryAsyncWithNotifications(callback); + Func action = realm => query(realm).QueryAsyncWithNotifications(callback); + lock (notificationsResetMap) + { // Store an action which is used when blocking to ensure consumers don't use results of a stale changeset firing. notificationsResetMap.Add(action, () => callback(new EmptyRealmSet(), null, null)); - return RegisterCustomSubscription(action); } + + return RegisterCustomSubscription(action); } /// @@ -530,15 +543,17 @@ namespace osu.Game.Database void unsubscribe() { - lock (realmLock) + if (customSubscriptionsResetMap.TryGetValue(action, out var unsubscriptionAction)) { - if (customSubscriptionsResetMap.TryGetValue(action, out var unsubscriptionAction)) + unsubscriptionAction?.Dispose(); + customSubscriptionsResetMap.Remove(action); + + lock (notificationsResetMap) { - unsubscriptionAction?.Dispose(); - customSubscriptionsResetMap.Remove(action); notificationsResetMap.Remove(action); - total_subscriptions.Value--; } + + total_subscriptions.Value--; } } }); @@ -548,19 +563,16 @@ namespace osu.Game.Database { Debug.Assert(ThreadSafety.IsUpdateThread); - lock (realmLock) - { - // Retrieve realm instance outside of flag update to ensure that the instance is retrieved, - // as attempting to access it inside the subscription if it's not constructed would lead to - // cyclic invocations of the subscription callback. - var realm = Realm; + // Retrieve realm instance outside of flag update to ensure that the instance is retrieved, + // as attempting to access it inside the subscription if it's not constructed would lead to + // cyclic invocations of the subscription callback. + var realm = Realm; - Debug.Assert(!customSubscriptionsResetMap.TryGetValue(action, out var found) || found == null); + Debug.Assert(!customSubscriptionsResetMap.TryGetValue(action, out var found) || found == null); - current_thread_subscriptions_allowed.Value = true; - customSubscriptionsResetMap[action] = action(realm); - current_thread_subscriptions_allowed.Value = false; - } + current_thread_subscriptions_allowed.Value = true; + customSubscriptionsResetMap[action] = action(realm); + current_thread_subscriptions_allowed.Value = false; } private Realm getRealmInstance() @@ -802,6 +814,9 @@ namespace osu.Game.Database /// An which should be disposed to end the blocking section. public IDisposable BlockAllOperations() { + if (!ThreadSafety.IsUpdateThread) + throw new InvalidOperationException(@$"{nameof(BlockAllOperations)} must be called from the update thread."); + if (isDisposed) throw new ObjectDisposedException(nameof(RealmAccess)); @@ -811,31 +826,25 @@ namespace osu.Game.Database { realmRetrievalLock.Wait(); - lock (realmLock) + if (hasInitialisedOnce) { - if (hasInitialisedOnce) + syncContext = SynchronizationContext.Current; + + // Before disposing the update context, clean up all subscriptions. + // Note that in the case of realm notification subscriptions, this is not really required (they will be cleaned up by disposal). + // In the case of custom subscriptions, we want them to fire before the update realm is disposed in case they do any follow-up work. + foreach (var action in customSubscriptionsResetMap.ToArray()) { - if (!ThreadSafety.IsUpdateThread) - throw new InvalidOperationException(@$"{nameof(BlockAllOperations)} must be called from the update thread."); - - syncContext = SynchronizationContext.Current; - - // Before disposing the update context, clean up all subscriptions. - // Note that in the case of realm notification subscriptions, this is not really required (they will be cleaned up by disposal). - // In the case of custom subscriptions, we want them to fire before the update realm is disposed in case they do any follow-up work. - foreach (var action in customSubscriptionsResetMap.ToArray()) - { - action.Value?.Dispose(); - customSubscriptionsResetMap[action.Key] = null; - } - - updateRealm?.Dispose(); - updateRealm = null; + action.Value?.Dispose(); + customSubscriptionsResetMap[action.Key] = null; } - Logger.Log(@"Blocking realm operations.", LoggingTarget.Database); + updateRealm?.Dispose(); + updateRealm = null; } + Logger.Log(@"Blocking realm operations.", LoggingTarget.Database); + const int sleep_length = 200; int timeout = 5000; @@ -871,8 +880,11 @@ namespace osu.Game.Database try { - foreach (var action in notificationsResetMap.Values) - action(); + lock (notificationsResetMap) + { + foreach (var action in notificationsResetMap.Values) + action(); + } } finally { @@ -910,10 +922,10 @@ namespace osu.Game.Database public void Dispose() { - lock (realmLock) - { - updateRealm?.Dispose(); - } + if (!pendingAsyncWrites.Wait(10000)) + Logger.Log("Realm took too long waiting on pending async writes", level: LogLevel.Error); + + updateRealm?.Dispose(); if (!isDisposed) {