From 8cf0f7da5fc9382669d66ce65544eb41457aca52 Mon Sep 17 00:00:00 2001 From: Luck Date: Sun, 3 Jun 2018 19:36:55 +0100 Subject: [PATCH] Improve buffering code --- .../common/buffers/BufferedRequest.java | 169 ++++++++++-------- .../common/buffers/UpdateTaskBuffer.java | 4 +- .../messaging/LuckPermsMessagingService.java | 3 +- .../storage/dao/file/FileActionLogger.java | 3 +- .../service/persisted/PersistedSubject.java | 3 +- 5 files changed, 105 insertions(+), 77 deletions(-) diff --git a/common/src/main/java/me/lucko/luckperms/common/buffers/BufferedRequest.java b/common/src/main/java/me/lucko/luckperms/common/buffers/BufferedRequest.java index 1933b0ee..1dc4a5f9 100644 --- a/common/src/main/java/me/lucko/luckperms/common/buffers/BufferedRequest.java +++ b/common/src/main/java/me/lucko/luckperms/common/buffers/BufferedRequest.java @@ -25,125 +25,148 @@ package me.lucko.luckperms.common.buffers; -import java.lang.ref.WeakReference; +import me.lucko.luckperms.common.plugin.SchedulerTask; +import me.lucko.luckperms.common.plugin.scheduler.SchedulerAdapter; + import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; /** * Thread-safe request buffer. * - * Waits for the buffer time to pass before performing the operation. If the task is called again in that time, the - * buffer time is reset. + * Waits for the buffer time to pass before performing the operation. + * If the request is called again in that time, the buffer time is reset. * * @param the return type */ public abstract class BufferedRequest { - private final long bufferTimeMillis; - private final long sleepInterval; - private final Executor executor; - private WeakReference> processor = null; - private final ReentrantLock lock = new ReentrantLock(); + /** The buffer time */ + private final long bufferTime; + private final TimeUnit unit; + private final SchedulerAdapter schedulerAdapter; - public BufferedRequest(long bufferTimeMillis, long sleepInterval, Executor executor) { - this.bufferTimeMillis = bufferTimeMillis; - this.sleepInterval = sleepInterval; - this.executor = executor; + /** The active processor task, if present */ + private Processor processor = null; + + /** Mutex to guard processor */ + private final Object[] mutex = new Object[0]; + + /** + * Creates a new buffer with the given timeout millis + * + * @param bufferTime the timeout + * @param unit the unit of the timeout + */ + public BufferedRequest(long bufferTime, TimeUnit unit, SchedulerAdapter schedulerAdapter) { + this.bufferTime = bufferTime; + this.unit = unit; + this.schedulerAdapter = schedulerAdapter; } + /** + * Makes a request to the buffer + * + * @return the future + */ public CompletableFuture request() { - this.lock.lock(); - try { + synchronized (this.mutex) { if (this.processor != null) { - Processor p = this.processor.get(); - if (p != null && p.isUsable()) { - return p.getAndExtend(); + try { + return this.processor.extendAndGetFuture(); + } catch (IllegalStateException e) { + // ignore } } - Processor p = new Processor<>(this.bufferTimeMillis, this.sleepInterval, this::perform); - this.executor.execute(p); - this.processor = new WeakReference<>(p); - return p.get(); - - } finally { - this.lock.unlock(); + Processor p = this.processor = new Processor<>(this::perform, this.bufferTime, this.unit, this.schedulerAdapter); + return p.getFuture(); } } + /** + * Requests the value, bypassing the buffer + * + * @return the value + */ public T requestDirectly() { return perform(); } + /** + * Performs the buffered task + * + * @return the result + */ protected abstract T perform(); private static class Processor implements Runnable { - private final long delayMillis; - private final long sleepMillis; - private final Supplier supplier; - private final ReentrantLock lock = new ReentrantLock(); - private final CompletableFuture future = new CompletableFuture<>(); - private boolean usable = true; - private long executionTime; + private Supplier supplier; - public Processor(long delayMillis, long sleepMillis, Supplier supplier) { - this.delayMillis = delayMillis; - this.sleepMillis = sleepMillis; + private final long delay; + private final TimeUnit unit; + + private SchedulerAdapter schedulerAdapter; + private SchedulerTask task; + + private final Object[] mutex = new Object[0]; + private CompletableFuture future = new CompletableFuture<>(); + private boolean usable = true; + + Processor(Supplier supplier, long delay, TimeUnit unit, SchedulerAdapter schedulerAdapter) { this.supplier = supplier; + this.delay = delay; + this.unit = unit; + this.schedulerAdapter = schedulerAdapter; + + rescheduleTask(); + } + + private void rescheduleTask() { + synchronized (this.mutex) { + if (!this.usable) { + throw new IllegalStateException("Processor not usable"); + } + if (this.task != null) { + this.task.cancel(); + } + this.task = this.schedulerAdapter.asyncLater(this, this.delay, this.unit); + } } @Override public void run() { - this.lock.lock(); + synchronized (this.mutex) { + if (!this.usable) { + throw new IllegalStateException("Task has already ran"); + } + this.usable = false; + } + + // compute result try { - this.executionTime = System.currentTimeMillis() + this.delayMillis; - } finally { - this.lock.unlock(); + R result = this.supplier.get(); + this.future.complete(result); + } catch (Exception e) { + this.future.completeExceptionally(e); } - while (true) { - this.lock.lock(); - try { - if (System.currentTimeMillis() > this.executionTime) { - this.usable = false; - break; - } - - } finally { - this.lock.unlock(); - } - - try { - Thread.sleep(this.sleepMillis); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - R result = this.supplier.get(); - this.future.complete(result); + // allow supplier and future to be GCed + this.task = null; + this.supplier = null; + this.future = null; } - public CompletableFuture get() { + CompletableFuture getFuture() { return this.future; } - public CompletableFuture getAndExtend() { - this.lock.lock(); - try { - this.executionTime = System.currentTimeMillis() + this.delayMillis; - } finally { - this.lock.unlock(); - } - + CompletableFuture extendAndGetFuture() { + rescheduleTask(); return this.future; } - public boolean isUsable() { - return this.usable; - } } } diff --git a/common/src/main/java/me/lucko/luckperms/common/buffers/UpdateTaskBuffer.java b/common/src/main/java/me/lucko/luckperms/common/buffers/UpdateTaskBuffer.java index b9cf6723..2e43d486 100644 --- a/common/src/main/java/me/lucko/luckperms/common/buffers/UpdateTaskBuffer.java +++ b/common/src/main/java/me/lucko/luckperms/common/buffers/UpdateTaskBuffer.java @@ -28,11 +28,13 @@ package me.lucko.luckperms.common.buffers; import me.lucko.luckperms.common.plugin.LuckPermsPlugin; import me.lucko.luckperms.common.tasks.UpdateTask; +import java.util.concurrent.TimeUnit; + public class UpdateTaskBuffer extends BufferedRequest { private final LuckPermsPlugin plugin; public UpdateTaskBuffer(LuckPermsPlugin plugin) { - super(250L, 50L, plugin.getBootstrap().getScheduler().async()); + super(250L, TimeUnit.MILLISECONDS, plugin.getBootstrap().getScheduler()); this.plugin = plugin; } diff --git a/common/src/main/java/me/lucko/luckperms/common/messaging/LuckPermsMessagingService.java b/common/src/main/java/me/lucko/luckperms/common/messaging/LuckPermsMessagingService.java index 4e6a5142..e469ae07 100644 --- a/common/src/main/java/me/lucko/luckperms/common/messaging/LuckPermsMessagingService.java +++ b/common/src/main/java/me/lucko/luckperms/common/messaging/LuckPermsMessagingService.java @@ -47,6 +47,7 @@ import java.util.HashSet; import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; @@ -207,7 +208,7 @@ public class LuckPermsMessagingService implements InternalMessagingService, Inco private final class PushUpdateBuffer extends BufferedRequest { public PushUpdateBuffer(LuckPermsPlugin plugin) { - super(2000L, 200L, plugin.getBootstrap().getScheduler().async()); + super(2, TimeUnit.SECONDS, plugin.getBootstrap().getScheduler()); } @Override diff --git a/common/src/main/java/me/lucko/luckperms/common/storage/dao/file/FileActionLogger.java b/common/src/main/java/me/lucko/luckperms/common/storage/dao/file/FileActionLogger.java index e4f70f4d..8071f675 100644 --- a/common/src/main/java/me/lucko/luckperms/common/storage/dao/file/FileActionLogger.java +++ b/common/src/main/java/me/lucko/luckperms/common/storage/dao/file/FileActionLogger.java @@ -47,6 +47,7 @@ import java.nio.file.Path; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; public class FileActionLogger { @@ -166,7 +167,7 @@ public class FileActionLogger { private final class SaveBuffer extends BufferedRequest { public SaveBuffer(LuckPermsPlugin plugin) { - super(2000L, 500L, plugin.getBootstrap().getScheduler().async()); + super(2, TimeUnit.SECONDS, plugin.getBootstrap().getScheduler()); } @Override diff --git a/sponge/src/main/java/me/lucko/luckperms/sponge/service/persisted/PersistedSubject.java b/sponge/src/main/java/me/lucko/luckperms/sponge/service/persisted/PersistedSubject.java index 4a1d4450..c50aab67 100644 --- a/sponge/src/main/java/me/lucko/luckperms/sponge/service/persisted/PersistedSubject.java +++ b/sponge/src/main/java/me/lucko/luckperms/sponge/service/persisted/PersistedSubject.java @@ -41,6 +41,7 @@ import org.spongepowered.api.service.permission.Subject; import java.io.IOException; import java.util.Optional; +import java.util.concurrent.TimeUnit; /** * A simple persistable Subject implementation @@ -164,7 +165,7 @@ public class PersistedSubject extends CalculatedSubject implements LPSubject { private final class SaveBuffer extends BufferedRequest { public SaveBuffer(LuckPermsPlugin plugin) { - super(1000L, 500L, plugin.getBootstrap().getScheduler().async()); + super(1, TimeUnit.SECONDS, plugin.getBootstrap().getScheduler()); } @Override