This commit is contained in:
Luck
2017-05-14 22:15:25 +01:00
Unverified
parent d4ac261e85
commit 5121fc6b1f
26 changed files with 215 additions and 174 deletions
@@ -69,12 +69,12 @@ public class StorageDelegate implements Storage {
@Override
public Executor getSyncExecutor() {
return plugin.getScheduler().getSyncExecutor();
return plugin.getScheduler().sync();
}
@Override
public Executor getAsyncExecutor() {
return plugin.getScheduler().getAsyncExecutor();
return plugin.getScheduler().async();
}
@Override
@@ -246,7 +246,7 @@ public class Exporter implements Runnable {
e.printStackTrace();
}
}
}, plugin.getScheduler().getAsyncExecutor()));
}, plugin.getScheduler().async()));
}
// all of the threads have been scheduled now and are running. we just need to wait for them all to complete
@@ -141,12 +141,12 @@ public class GenericUserManager extends AbstractManager<UserIdentifier, User> im
@Override
public void scheduleUnload(UUID uuid) {
plugin.getScheduler().doAsyncLater(() -> {
plugin.getScheduler().asyncLater(() -> {
// check once to see if the user can be unloaded.
if (getIfLoaded(plugin.getUuidCache().getUUID(uuid)) != null && !plugin.isPlayerOnline(uuid)) {
// check again in 40 ticks, we want to be sure the player won't have re-logged before we unload them.
plugin.getScheduler().doAsyncLater(() -> {
plugin.getScheduler().asyncLater(() -> {
User user = getIfLoaded(plugin.getUuidCache().getUUID(uuid));
if (user != null && !plugin.isPlayerOnline(uuid)) {
user.unregisterData();
@@ -187,12 +187,12 @@ public interface LuckPermsPlugin {
*/
LuckPermsScheduler getScheduler();
default void doAsync(Runnable r) {
getScheduler().doAsync(r);
default void doAsync(Runnable runnable) {
getScheduler().doAsync(runnable);
}
default void doSync(Runnable r) {
getScheduler().doSync(r);
default void doSync(Runnable runnable) {
getScheduler().doSync(runnable);
}
/**
@@ -27,20 +27,70 @@ package me.lucko.luckperms.common.plugin;
import java.util.concurrent.Executor;
/**
* A scheduler for running tasks using the systems provided by the platform
*/
public interface LuckPermsScheduler {
Executor getAsyncExecutor();
Executor getSyncExecutor();
/**
* Gets an async executor instance
*
* @return an async executor instance
*/
Executor async();
void doAsync(Runnable r);
void doSync(Runnable r);
/**
* Gets a sync executor instance
*
* @return a sync executor instance
*/
Executor sync();
void doAsyncRepeating(Runnable r, long interval);
void doSyncRepeating(Runnable r, long interval);
/**
* Executes a runnable async
*
* @param runnable the runnable
*/
void doAsync(Runnable runnable);
void doAsyncLater(Runnable r, long delay);
void doSyncLater(Runnable r, long delay);
/**
* Executes a runnable sync
*
* @param runnable the runnable
*/
void doSync(Runnable runnable);
/**
* Runs a runnable repeatedly until the plugin disables. Will wait for the interval before the first iteration of the task is ran.
* @param runnable the runnable
* @param intervalTicks the interval in ticks.
*/
void asyncRepeating(Runnable runnable, long intervalTicks);
/**
* Runs a runnable repeatedly until the plugin disables. Will wait for the interval before the first iteration of the task is ran.
* @param runnable the runnable
* @param intervalTicks the interval in ticks.
*/
void syncRepeating(Runnable runnable, long intervalTicks);
/**
* Runs a runnable with a delay
* @param runnable the runnable
* @param delayTicks the delay in ticks
*/
void asyncLater(Runnable runnable, long delayTicks);
/**
* Runs a runnable with a delay
* @param runnable the runnable
* @param delayTicks the delay in ticks
*/
void syncLater(Runnable runnable, long delayTicks);
/**
* Shuts down this executor instance
*/
void shutdown();
}
@@ -43,7 +43,7 @@ import me.lucko.luckperms.common.data.Log;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import me.lucko.luckperms.common.storage.backing.AbstractBacking;
import me.lucko.luckperms.common.storage.wrappings.BufferedOutputStorage;
import me.lucko.luckperms.common.storage.wrappings.TolerantStorage;
import me.lucko.luckperms.common.storage.wrappings.PhasedStorage;
import java.util.List;
import java.util.Set;
@@ -57,8 +57,8 @@ import java.util.function.Supplier;
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class AbstractStorage implements Storage {
public static Storage wrap(LuckPermsPlugin plugin, AbstractBacking backing) {
BufferedOutputStorage bufferedDs = BufferedOutputStorage.wrap(TolerantStorage.wrap(new AbstractStorage(plugin, backing)), 1000L);
plugin.getScheduler().doAsyncRepeating(bufferedDs, 10L);
BufferedOutputStorage bufferedDs = BufferedOutputStorage.wrap(PhasedStorage.wrap(new AbstractStorage(plugin, backing)), 1000L);
plugin.getScheduler().asyncRepeating(bufferedDs, 10L);
return bufferedDs;
}
@@ -77,7 +77,7 @@ public class AbstractStorage implements Storage {
}
private <T> CompletableFuture<T> makeFuture(Supplier<T> supplier) {
return CompletableFuture.supplyAsync(supplier, backing.getPlugin().getScheduler().getAsyncExecutor());
return CompletableFuture.supplyAsync(supplier, backing.getPlugin().getScheduler().async());
}
@Override
@@ -605,7 +605,7 @@ public class YAMLBacking extends FlatfileBacking {
int size = vals.size();
if (size == 1) {
context.put(e.getKey(), vals.get(0));;
context.put(e.getKey(), vals.get(0));
} else if (size > 1) {
context.put(e.getKey(), vals);
}
@@ -52,33 +52,10 @@ public class BufferedOutputStorage implements Storage, Runnable {
private final long flushTime;
private final Buffer<User, Boolean> userOutputBuffer = new Buffer<User, Boolean>() {
@Override
public Boolean dequeue(User user) {
return backing.saveUser(user).join();
}
};
private final Buffer<Group, Boolean> groupOutputBuffer = new Buffer<Group, Boolean>() {
@Override
public Boolean dequeue(Group group) {
return backing.saveGroup(group).join();
}
};
private final Buffer<Track, Boolean> trackOutputBuffer = new Buffer<Track, Boolean>() {
@Override
public Boolean dequeue(Track track) {
return backing.saveTrack(track).join();
}
};
private final Buffer<UserIdentifier, Boolean> uuidDataOutputBuffer = new Buffer<UserIdentifier, Boolean>() {
@Override
protected Boolean dequeue(UserIdentifier userIdentifier) {
return backing.saveUUIDData(userIdentifier.getUsername().get(), userIdentifier.getUuid()).join();
}
};
private final Buffer<User, Boolean> userOutputBuffer = Buffer.of(user -> backing.saveUser(user).join());
private final Buffer<Group, Boolean> groupOutputBuffer = Buffer.of(group -> backing.saveGroup(group).join());
private final Buffer<Track, Boolean> trackOutputBuffer = Buffer.of(track -> backing.saveTrack(track).join());
private final Buffer<UserIdentifier, Boolean> uuidDataOutputBuffer = Buffer.of(userIdentifier -> backing.saveUUIDData(userIdentifier.getUsername().get(), userIdentifier.getUuid()).join());
@Override
public void run() {
@@ -53,9 +53,9 @@ import java.util.concurrent.TimeoutException;
* A Datastore wrapping that ensures all tasks are completed before {@link Storage#shutdown()} is called.
*/
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class TolerantStorage implements Storage {
public static TolerantStorage wrap(Storage storage) {
return new TolerantStorage(storage);
public class PhasedStorage implements Storage {
public static PhasedStorage wrap(Storage storage) {
return new PhasedStorage(storage);
}
@Delegate(types = Delegated.class)
@@ -36,6 +36,7 @@ import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
/**
* Thread-safe buffer utility. Holds a buffer of objects to be processed after they've been waiting in the buffer
@@ -44,11 +45,20 @@ import java.util.concurrent.locks.ReentrantLock;
* @param <T> the type of objects in the buffer
* @param <R> the type of result produced by the final process
*/
public abstract class Buffer<T, R> implements Runnable {
public class Buffer<T, R> implements Runnable {
private static final long DEFAULT_FLUSH_TIME = 1000; // 1 second
public static <T, R> Buffer<T, R> of(Function<T, R> dequeueFunc) {
return new Buffer<>(dequeueFunc);
}
private final ReentrantLock lock = new ReentrantLock();
private final List<BufferedObject<T, R>> buffer = new LinkedList<>();
private final Function<T, R> dequeueFunc;
private Buffer(Function<T, R> dequeueFunc) {
this.dequeueFunc = dequeueFunc;
}
public CompletableFuture<R> enqueue(@NonNull T t) {
lock.lock();
@@ -80,7 +90,9 @@ public abstract class Buffer<T, R> implements Runnable {
}
}
protected abstract R dequeue(T t);
protected R dequeue(T t) {
return dequeueFunc.apply(t);
}
public void flush(long flushTime) {
long time = System.currentTimeMillis();
@@ -68,7 +68,7 @@ public class FileWatcher implements Runnable {
}
// Register with a delay to ignore changes made at startup
plugin.getScheduler().doAsyncLater(() -> {
plugin.getScheduler().asyncLater(() -> {
try {
// doesn't need to be atomic
if (keyMap.containsKey(id)) {
@@ -37,30 +37,37 @@ import java.util.stream.Collector;
@UtilityClass
public class ImmutableCollectors {
private static final Collector<Object, ImmutableList.Builder<Object>, ImmutableList<Object>> LIST = Collector.of(
ImmutableList.Builder::new,
ImmutableList.Builder::add,
(l, r) -> l.addAll(r.build()),
ImmutableList.Builder::build
);
private static final Collector<Object, ImmutableSet.Builder<Object>, ImmutableSet<Object>> SET = Collector.of(
ImmutableSet.Builder::new,
ImmutableSet.Builder::add,
(l, r) -> l.addAll(r.build()),
ImmutableSet.Builder::build
);
public static <T> Collector<T, ImmutableList.Builder<T>, ImmutableList<T>> toImmutableList() {
//noinspection unchecked
return (Collector) LIST;
}
public static <T> Collector<T, ImmutableSet.Builder<T>, ImmutableSet<T>> toImmutableSet() {
//noinspection unchecked
return (Collector) SET;
}
public static <T, K, V> Collector<T, ImmutableMap.Builder<K, V>, ImmutableMap<K, V>> toImmutableMap(Function<? super T, ? extends K> keyMapper, Function<? super T, ? extends V> valueMapper) {
return Collector.of(
ImmutableMap.Builder<K, V>::new,
(r, t) -> r.put(keyMapper.apply(t), valueMapper.apply(t)),
(l, r) -> l.putAll(r.build()),
ImmutableMap.Builder::build,
Collector.Characteristics.UNORDERED);
}
public static <T> Collector<T, ImmutableSet.Builder<T>, ImmutableSet<T>> toImmutableSet() {
return Collector.of(
ImmutableSet.Builder<T>::new,
ImmutableSet.Builder<T>::add,
(l, r) -> l.addAll(r.build()),
ImmutableSet.Builder<T>::build,
Collector.Characteristics.UNORDERED);
}
public static <T> Collector<T, ImmutableList.Builder<T>, ImmutableList<T>> toImmutableList() {
return Collector.of(
ImmutableList.Builder<T>::new,
ImmutableList.Builder<T>::add,
(l, r) -> l.addAll(r.build()),
ImmutableList.Builder<T>::build);
ImmutableMap.Builder::build
);
}
}
@@ -32,7 +32,7 @@ import me.lucko.luckperms.common.commands.sender.Sender;
import me.lucko.luckperms.common.constants.Message;
@AllArgsConstructor
public class LoggerImpl implements Logger {
public class SenderLogger implements Logger {
private final Sender console;
@Override