From 1fc4fa68204c5cb3e9b1a02509ab5afcebdc4638 Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Mon, 27 Jun 2022 19:20:15 +0900 Subject: [PATCH 1/8] Remove unnecessary `Task.Run` workaround in tests --- .../Database/RealmSubscriptionRegistrationTests.cs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/osu.Game.Tests/Database/RealmSubscriptionRegistrationTests.cs b/osu.Game.Tests/Database/RealmSubscriptionRegistrationTests.cs index b8ce036da1..c74341b5c9 100644 --- a/osu.Game.Tests/Database/RealmSubscriptionRegistrationTests.cs +++ b/osu.Game.Tests/Database/RealmSubscriptionRegistrationTests.cs @@ -5,7 +5,6 @@ using System; using System.Collections.Generic; using System.Linq; using System.Threading; -using System.Threading.Tasks; using NUnit.Framework; using osu.Framework.Allocation; using osu.Framework.Extensions; @@ -84,11 +83,7 @@ namespace osu.Game.Tests.Database realm.Run(r => r.Refresh()); - // Without forcing the write onto its own thread, realm will internally run the operation synchronously, which can cause a deadlock with `WaitSafely`. - Task.Run(async () => - { - await realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo())); - }).WaitSafely(); + realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo())).WaitSafely(); realm.Run(r => r.Refresh()); From c39c99bd43ce32f1f58ac274703e9dd3f542bcf6 Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Mon, 27 Jun 2022 19:20:28 +0900 Subject: [PATCH 2/8] Ensure all async writes are completed before realm is disposed --- osu.Game/Database/RealmAccess.cs | 42 +++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/osu.Game/Database/RealmAccess.cs b/osu.Game/Database/RealmAccess.cs index 089a783177..2618fe373e 100644 --- a/osu.Game/Database/RealmAccess.cs +++ b/osu.Game/Database/RealmAccess.cs @@ -388,26 +388,43 @@ namespace osu.Game.Database } } + private readonly CountdownEvent pendingAsyncWrites = new CountdownEvent(0); + /// /// Write changes to realm asynchronously, guaranteeing order of execution. /// /// The work to run. public Task WriteAsync(Action action) { - // Regardless of calling Realm.GetInstance or Realm.GetInstanceAsync, there is a blocking overhead on retrieval. - // Adding a forced Task.Run resolves this. + // Required to ensure the write is tracked and accounted for before disposal. + // Can potentially be avoided if we have a need to do so in the future. + if (!ThreadSafety.IsUpdateThread) + throw new InvalidOperationException(@$"{nameof(WriteAsync)} must be called from the update thread."); - return Task.Run(async () => + lock (realmLock) { - total_writes_async.Value++; + // CountdownEvent will fail if already at zero. + if (!pendingAsyncWrites.TryAddCount()) + pendingAsyncWrites.Reset(1); - // Not attempting to use Realm.GetInstanceAsync as there's seemingly no benefit to us (for now) and it adds complexity due to locking - // concerns in getRealmInstance(). On a quick check, it looks to be more suited to cases where realm is connecting to an online sync - // server, which we don't use. May want to report upstream or revisit in the future. - using (var realm = getRealmInstance()) - // ReSharper disable once AccessToDisposedClosure (WriteAsync should be marked as [InstantHandle]). - await realm.WriteAsync(() => action(realm)); - }); + // Regardless of calling Realm.GetInstance or Realm.GetInstanceAsync, there is a blocking overhead on retrieval. + // Adding a forced Task.Run resolves this. + var writeTask = Task.Run(async () => + { + total_writes_async.Value++; + + // Not attempting to use Realm.GetInstanceAsync as there's seemingly no benefit to us (for now) and it adds complexity due to locking + // concerns in getRealmInstance(). On a quick check, it looks to be more suited to cases where realm is connecting to an online sync + // server, which we don't use. May want to report upstream or revisit in the future. + using (var realm = getRealmInstance()) + // ReSharper disable once AccessToDisposedClosure (WriteAsync should be marked as [InstantHandle]). + await realm.WriteAsync(() => action(realm)); + + pendingAsyncWrites.Signal(); + }); + + return writeTask; + } } /// @@ -910,6 +927,9 @@ namespace osu.Game.Database public void Dispose() { + if (!pendingAsyncWrites.Wait(10000)) + Logger.Log("Realm took too long waiting on pending async writes", level: LogLevel.Error); + lock (realmLock) { updateRealm?.Dispose(); From 83982d258d96590722da57126219dfc1c920a7fb Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Mon, 27 Jun 2022 19:34:28 +0900 Subject: [PATCH 3/8] Throw immediately if attempting to `WriteAsync` after disposed --- osu.Game/Database/RealmAccess.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/osu.Game/Database/RealmAccess.cs b/osu.Game/Database/RealmAccess.cs index 2618fe373e..7a9561fc5e 100644 --- a/osu.Game/Database/RealmAccess.cs +++ b/osu.Game/Database/RealmAccess.cs @@ -396,6 +396,9 @@ namespace osu.Game.Database /// The work to run. public Task WriteAsync(Action action) { + if (isDisposed) + throw new ObjectDisposedException(nameof(RealmAccess)); + // Required to ensure the write is tracked and accounted for before disposal. // Can potentially be avoided if we have a need to do so in the future. if (!ThreadSafety.IsUpdateThread) From 62038850406fe5cccd1151cca141c1adb802ef1f Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Mon, 27 Jun 2022 19:34:42 +0900 Subject: [PATCH 4/8] Add test coverage of realm async writes --- osu.Game.Tests/Database/GeneralUsageTests.cs | 61 ++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/osu.Game.Tests/Database/GeneralUsageTests.cs b/osu.Game.Tests/Database/GeneralUsageTests.cs index 65f805bafb..db6e97dab2 100644 --- a/osu.Game.Tests/Database/GeneralUsageTests.cs +++ b/osu.Game.Tests/Database/GeneralUsageTests.cs @@ -2,11 +2,14 @@ // See the LICENCE file in the repository root for full licence text. using System; +using System.Linq; using System.Threading; using System.Threading.Tasks; using NUnit.Framework; +using osu.Framework.Extensions; using osu.Game.Beatmaps; using osu.Game.Database; +using osu.Game.Tests.Resources; namespace osu.Game.Tests.Database { @@ -33,6 +36,64 @@ namespace osu.Game.Tests.Database }); } + [Test] + public void TestAsyncWriteAsync() + { + RunTestWithRealmAsync(async (realm, _) => + { + await realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo())); + + realm.Run(r => r.Refresh()); + + Assert.That(realm.Run(r => r.All().Count()), Is.EqualTo(1)); + }); + } + + [Test] + public void TestAsyncWrite() + { + RunTestWithRealm((realm, _) => + { + realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo())).WaitSafely(); + + realm.Run(r => r.Refresh()); + + Assert.That(realm.Run(r => r.All().Count()), Is.EqualTo(1)); + }); + } + + [Test] + public void TestAsyncWriteAfterDisposal() + { + RunTestWithRealm((realm, _) => + { + realm.Dispose(); + Assert.ThrowsAsync(() => realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo()))); + }); + } + + [Test] + public void TestAsyncWriteBeforeDisposal() + { + ManualResetEventSlim resetEvent = new ManualResetEventSlim(); + + RunTestWithRealm((realm, _) => + { + var writeTask = realm.WriteAsync(r => + { + // ensure that disposal blocks for our execution + Assert.That(resetEvent.Wait(100), Is.False); + + r.Add(TestResources.CreateTestBeatmapSetInfo()); + }); + + realm.Dispose(); + resetEvent.Set(); + + writeTask.WaitSafely(); + }); + } + /// /// Test to ensure that a `CreateContext` call nested inside a subscription doesn't cause any deadlocks /// due to context fetching semaphores. From e10ac45fd73b7345898c700591edabfe7d84ee3a Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Tue, 28 Jun 2022 16:54:53 +0900 Subject: [PATCH 5/8] Remove probably redundant `realmLock` As far as I can tell all accesses are safe due to update thread guarantees. The only weird one may be async writes during a `BlockAllOperations`, but the `Compact` loop should handle this quite amicably. --- osu.Game/Database/RealmAccess.cs | 147 +++++++++++++------------------ 1 file changed, 62 insertions(+), 85 deletions(-) diff --git a/osu.Game/Database/RealmAccess.cs b/osu.Game/Database/RealmAccess.cs index 7a9561fc5e..715066f32d 100644 --- a/osu.Game/Database/RealmAccess.cs +++ b/osu.Game/Database/RealmAccess.cs @@ -98,8 +98,6 @@ namespace osu.Game.Database private static readonly GlobalStatistic total_writes_async = GlobalStatistics.Get(@"Realm", @"Writes (Async)"); - private readonly object realmLock = new object(); - private Realm? updateRealm; /// @@ -122,24 +120,21 @@ namespace osu.Game.Database if (!ThreadSafety.IsUpdateThread) throw new InvalidOperationException(@$"Use {nameof(getRealmInstance)} when performing realm operations from a non-update thread"); - lock (realmLock) + if (updateRealm == null) { - if (updateRealm == null) - { - updateRealm = getRealmInstance(); - hasInitialisedOnce = true; + updateRealm = getRealmInstance(); + hasInitialisedOnce = true; - Logger.Log(@$"Opened realm ""{updateRealm.Config.DatabasePath}"" at version {updateRealm.Config.SchemaVersion}"); + Logger.Log(@$"Opened realm ""{updateRealm.Config.DatabasePath}"" at version {updateRealm.Config.SchemaVersion}"); - // Resubscribe any subscriptions - foreach (var action in customSubscriptionsResetMap.Keys) - registerSubscription(action); - } - - Debug.Assert(updateRealm != null); - - return updateRealm; + // Resubscribe any subscriptions + foreach (var action in customSubscriptionsResetMap.Keys) + registerSubscription(action); } + + Debug.Assert(updateRealm != null); + + return updateRealm; } internal static bool CurrentThreadSubscriptionsAllowed => current_thread_subscriptions_allowed.Value; @@ -404,30 +399,27 @@ namespace osu.Game.Database if (!ThreadSafety.IsUpdateThread) throw new InvalidOperationException(@$"{nameof(WriteAsync)} must be called from the update thread."); - lock (realmLock) + // CountdownEvent will fail if already at zero. + if (!pendingAsyncWrites.TryAddCount()) + pendingAsyncWrites.Reset(1); + + // Regardless of calling Realm.GetInstance or Realm.GetInstanceAsync, there is a blocking overhead on retrieval. + // Adding a forced Task.Run resolves this. + var writeTask = Task.Run(async () => { - // CountdownEvent will fail if already at zero. - if (!pendingAsyncWrites.TryAddCount()) - pendingAsyncWrites.Reset(1); + total_writes_async.Value++; - // Regardless of calling Realm.GetInstance or Realm.GetInstanceAsync, there is a blocking overhead on retrieval. - // Adding a forced Task.Run resolves this. - var writeTask = Task.Run(async () => - { - total_writes_async.Value++; + // Not attempting to use Realm.GetInstanceAsync as there's seemingly no benefit to us (for now) and it adds complexity due to locking + // concerns in getRealmInstance(). On a quick check, it looks to be more suited to cases where realm is connecting to an online sync + // server, which we don't use. May want to report upstream or revisit in the future. + using (var realm = getRealmInstance()) + // ReSharper disable once AccessToDisposedClosure (WriteAsync should be marked as [InstantHandle]). + await realm.WriteAsync(() => action(realm)); - // Not attempting to use Realm.GetInstanceAsync as there's seemingly no benefit to us (for now) and it adds complexity due to locking - // concerns in getRealmInstance(). On a quick check, it looks to be more suited to cases where realm is connecting to an online sync - // server, which we don't use. May want to report upstream or revisit in the future. - using (var realm = getRealmInstance()) - // ReSharper disable once AccessToDisposedClosure (WriteAsync should be marked as [InstantHandle]). - await realm.WriteAsync(() => action(realm)); + pendingAsyncWrites.Signal(); + }); - pendingAsyncWrites.Signal(); - }); - - return writeTask; - } + return writeTask; } /// @@ -452,14 +444,11 @@ namespace osu.Game.Database public IDisposable RegisterForNotifications(Func> query, NotificationCallbackDelegate callback) where T : RealmObjectBase { - lock (realmLock) - { - Func action = realm => query(realm).QueryAsyncWithNotifications(callback); + Func action = realm => query(realm).QueryAsyncWithNotifications(callback); - // Store an action which is used when blocking to ensure consumers don't use results of a stale changeset firing. - notificationsResetMap.Add(action, () => callback(new EmptyRealmSet(), null, null)); - return RegisterCustomSubscription(action); - } + // Store an action which is used when blocking to ensure consumers don't use results of a stale changeset firing. + notificationsResetMap.Add(action, () => callback(new EmptyRealmSet(), null, null)); + return RegisterCustomSubscription(action); } /// @@ -550,15 +539,12 @@ namespace osu.Game.Database void unsubscribe() { - lock (realmLock) + if (customSubscriptionsResetMap.TryGetValue(action, out var unsubscriptionAction)) { - if (customSubscriptionsResetMap.TryGetValue(action, out var unsubscriptionAction)) - { - unsubscriptionAction?.Dispose(); - customSubscriptionsResetMap.Remove(action); - notificationsResetMap.Remove(action); - total_subscriptions.Value--; - } + unsubscriptionAction?.Dispose(); + customSubscriptionsResetMap.Remove(action); + notificationsResetMap.Remove(action); + total_subscriptions.Value--; } } }); @@ -568,19 +554,16 @@ namespace osu.Game.Database { Debug.Assert(ThreadSafety.IsUpdateThread); - lock (realmLock) - { - // Retrieve realm instance outside of flag update to ensure that the instance is retrieved, - // as attempting to access it inside the subscription if it's not constructed would lead to - // cyclic invocations of the subscription callback. - var realm = Realm; + // Retrieve realm instance outside of flag update to ensure that the instance is retrieved, + // as attempting to access it inside the subscription if it's not constructed would lead to + // cyclic invocations of the subscription callback. + var realm = Realm; - Debug.Assert(!customSubscriptionsResetMap.TryGetValue(action, out var found) || found == null); + Debug.Assert(!customSubscriptionsResetMap.TryGetValue(action, out var found) || found == null); - current_thread_subscriptions_allowed.Value = true; - customSubscriptionsResetMap[action] = action(realm); - current_thread_subscriptions_allowed.Value = false; - } + current_thread_subscriptions_allowed.Value = true; + customSubscriptionsResetMap[action] = action(realm); + current_thread_subscriptions_allowed.Value = false; } private Realm getRealmInstance() @@ -831,31 +814,28 @@ namespace osu.Game.Database { realmRetrievalLock.Wait(); - lock (realmLock) + if (hasInitialisedOnce) { - if (hasInitialisedOnce) + if (!ThreadSafety.IsUpdateThread) + throw new InvalidOperationException(@$"{nameof(BlockAllOperations)} must be called from the update thread."); + + syncContext = SynchronizationContext.Current; + + // Before disposing the update context, clean up all subscriptions. + // Note that in the case of realm notification subscriptions, this is not really required (they will be cleaned up by disposal). + // In the case of custom subscriptions, we want them to fire before the update realm is disposed in case they do any follow-up work. + foreach (var action in customSubscriptionsResetMap.ToArray()) { - if (!ThreadSafety.IsUpdateThread) - throw new InvalidOperationException(@$"{nameof(BlockAllOperations)} must be called from the update thread."); - - syncContext = SynchronizationContext.Current; - - // Before disposing the update context, clean up all subscriptions. - // Note that in the case of realm notification subscriptions, this is not really required (they will be cleaned up by disposal). - // In the case of custom subscriptions, we want them to fire before the update realm is disposed in case they do any follow-up work. - foreach (var action in customSubscriptionsResetMap.ToArray()) - { - action.Value?.Dispose(); - customSubscriptionsResetMap[action.Key] = null; - } - - updateRealm?.Dispose(); - updateRealm = null; + action.Value?.Dispose(); + customSubscriptionsResetMap[action.Key] = null; } - Logger.Log(@"Blocking realm operations.", LoggingTarget.Database); + updateRealm?.Dispose(); + updateRealm = null; } + Logger.Log(@"Blocking realm operations.", LoggingTarget.Database); + const int sleep_length = 200; int timeout = 5000; @@ -933,10 +913,7 @@ namespace osu.Game.Database if (!pendingAsyncWrites.Wait(10000)) Logger.Log("Realm took too long waiting on pending async writes", level: LogLevel.Error); - lock (realmLock) - { - updateRealm?.Dispose(); - } + updateRealm?.Dispose(); if (!isDisposed) { From d64959ad0c8a7dccf241741801f5854586229edb Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Tue, 28 Jun 2022 17:07:49 +0900 Subject: [PATCH 6/8] Add test coverage of async writes during a blocking operation --- osu.Game.Tests/Database/GeneralUsageTests.cs | 21 ++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/osu.Game.Tests/Database/GeneralUsageTests.cs b/osu.Game.Tests/Database/GeneralUsageTests.cs index db6e97dab2..5b6f7a0a53 100644 --- a/osu.Game.Tests/Database/GeneralUsageTests.cs +++ b/osu.Game.Tests/Database/GeneralUsageTests.cs @@ -49,6 +49,27 @@ namespace osu.Game.Tests.Database }); } + [Test] + public void TestAsyncWriteWhileBlocking() + { + RunTestWithRealm((realm, _) => + { + Task writeTask; + + using (realm.BlockAllOperations()) + { + writeTask = realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo())); + Thread.Sleep(100); + Assert.That(writeTask.IsCompleted, Is.False); + } + + writeTask.WaitSafely(); + + realm.Run(r => r.Refresh()); + Assert.That(realm.Run(r => r.All().Count()), Is.EqualTo(1)); + }); + } + [Test] public void TestAsyncWrite() { From 32af4e41eaa3805ddc2ffb94909f289877aa1996 Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Wed, 29 Jun 2022 20:56:01 +0900 Subject: [PATCH 7/8] Add back thread safety and locking as required --- osu.Game/Database/RealmAccess.cs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/osu.Game/Database/RealmAccess.cs b/osu.Game/Database/RealmAccess.cs index 715066f32d..728cd5c7c3 100644 --- a/osu.Game/Database/RealmAccess.cs +++ b/osu.Game/Database/RealmAccess.cs @@ -446,8 +446,12 @@ namespace osu.Game.Database { Func action = realm => query(realm).QueryAsyncWithNotifications(callback); - // Store an action which is used when blocking to ensure consumers don't use results of a stale changeset firing. - notificationsResetMap.Add(action, () => callback(new EmptyRealmSet(), null, null)); + lock (notificationsResetMap) + { + // Store an action which is used when blocking to ensure consumers don't use results of a stale changeset firing. + notificationsResetMap.Add(action, () => callback(new EmptyRealmSet(), null, null)); + } + return RegisterCustomSubscription(action); } @@ -543,7 +547,12 @@ namespace osu.Game.Database { unsubscriptionAction?.Dispose(); customSubscriptionsResetMap.Remove(action); - notificationsResetMap.Remove(action); + + lock (notificationsResetMap) + { + notificationsResetMap.Remove(action); + } + total_subscriptions.Value--; } } @@ -805,6 +814,9 @@ namespace osu.Game.Database /// An which should be disposed to end the blocking section. public IDisposable BlockAllOperations() { + if (!ThreadSafety.IsUpdateThread) + throw new InvalidOperationException(@$"{nameof(BlockAllOperations)} must be called from the update thread."); + if (isDisposed) throw new ObjectDisposedException(nameof(RealmAccess)); @@ -816,9 +828,6 @@ namespace osu.Game.Database if (hasInitialisedOnce) { - if (!ThreadSafety.IsUpdateThread) - throw new InvalidOperationException(@$"{nameof(BlockAllOperations)} must be called from the update thread."); - syncContext = SynchronizationContext.Current; // Before disposing the update context, clean up all subscriptions. From 7cb4e32c17ffb61c91c7b5d9e84c1f0729839e33 Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Wed, 29 Jun 2022 22:45:19 +0900 Subject: [PATCH 8/8] Add one more lock to appease CI --- osu.Game/Database/RealmAccess.cs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/osu.Game/Database/RealmAccess.cs b/osu.Game/Database/RealmAccess.cs index 59a235dbc2..8cf9bb4a47 100644 --- a/osu.Game/Database/RealmAccess.cs +++ b/osu.Game/Database/RealmAccess.cs @@ -880,8 +880,11 @@ namespace osu.Game.Database try { - foreach (var action in notificationsResetMap.Values) - action(); + lock (notificationsResetMap) + { + foreach (var action in notificationsResetMap.Values) + action(); + } } finally {