Reduce buffer times, refactor MessagingService init

This commit is contained in:
Luck
2017-09-04 16:49:43 +01:00
Unverified
parent e1b51dd6af
commit d60d0ac9c5
28 changed files with 585 additions and 307 deletions
@@ -79,57 +79,57 @@ public class StorageDelegate implements Storage {
@Override
public CompletableFuture<Boolean> logAction(@NonNull LogEntry entry) {
return handle.force().logAction(entry);
return handle.noBuffer().logAction(entry);
}
@Override
public CompletableFuture<Log> getLog() {
return handle.force().getLog().thenApply(log -> log == null ? null : new LogDelegate(log));
return handle.noBuffer().getLog().thenApply(log -> log == null ? null : new LogDelegate(log));
}
@Override
public CompletableFuture<Boolean> loadUser(@NonNull UUID uuid, String username) {
return handle.force().loadUser(uuid, username == null ? null : checkUsername(username));
return handle.noBuffer().loadUser(uuid, username == null ? null : checkUsername(username));
}
@Override
public CompletableFuture<Boolean> saveUser(@NonNull User user) {
return handle.force().saveUser(UserDelegate.cast(user));
return handle.noBuffer().saveUser(UserDelegate.cast(user));
}
@Override
public CompletableFuture<Boolean> cleanupUsers() {
return handle.force().cleanupUsers();
return handle.noBuffer().cleanupUsers();
}
@Override
public CompletableFuture<Set<UUID>> getUniqueUsers() {
return handle.force().getUniqueUsers();
return handle.noBuffer().getUniqueUsers();
}
@Override
public CompletableFuture<List<HeldPermission<UUID>>> getUsersWithPermission(@NonNull String permission) {
return handle.force().getUsersWithPermission(permission);
return handle.noBuffer().getUsersWithPermission(permission);
}
@Override
public CompletableFuture<Boolean> createAndLoadGroup(@NonNull String name) {
return handle.force().createAndLoadGroup(checkName(name), CreationCause.API);
return handle.noBuffer().createAndLoadGroup(checkName(name), CreationCause.API);
}
@Override
public CompletableFuture<Boolean> loadGroup(@NonNull String name) {
return handle.force().loadGroup(checkName(name));
return handle.noBuffer().loadGroup(checkName(name));
}
@Override
public CompletableFuture<Boolean> loadAllGroups() {
return handle.force().loadAllGroups();
return handle.noBuffer().loadAllGroups();
}
@Override
public CompletableFuture<Boolean> saveGroup(@NonNull Group group) {
return handle.force().saveGroup(GroupDelegate.cast(group));
return handle.noBuffer().saveGroup(GroupDelegate.cast(group));
}
@Override
@@ -137,51 +137,51 @@ public class StorageDelegate implements Storage {
if (group.getName().equalsIgnoreCase(plugin.getConfiguration().get(ConfigKeys.DEFAULT_GROUP_NAME))) {
throw new IllegalArgumentException("Cannot delete the default group.");
}
return handle.force().deleteGroup(GroupDelegate.cast(group), DeletionCause.API);
return handle.noBuffer().deleteGroup(GroupDelegate.cast(group), DeletionCause.API);
}
@Override
public CompletableFuture<List<HeldPermission<String>>> getGroupsWithPermission(@NonNull String permission) {
return handle.force().getGroupsWithPermission(permission);
return handle.noBuffer().getGroupsWithPermission(permission);
}
@Override
public CompletableFuture<Boolean> createAndLoadTrack(@NonNull String name) {
return handle.force().createAndLoadTrack(checkName(name), CreationCause.API);
return handle.noBuffer().createAndLoadTrack(checkName(name), CreationCause.API);
}
@Override
public CompletableFuture<Boolean> loadTrack(@NonNull String name) {
return handle.force().loadTrack(checkName(name));
return handle.noBuffer().loadTrack(checkName(name));
}
@Override
public CompletableFuture<Boolean> loadAllTracks() {
return handle.force().loadAllTracks();
return handle.noBuffer().loadAllTracks();
}
@Override
public CompletableFuture<Boolean> saveTrack(@NonNull Track track) {
return handle.force().saveTrack(TrackDelegate.cast(track));
return handle.noBuffer().saveTrack(TrackDelegate.cast(track));
}
@Override
public CompletableFuture<Boolean> deleteTrack(@NonNull Track track) {
return handle.force().deleteTrack(TrackDelegate.cast(track), DeletionCause.API);
return handle.noBuffer().deleteTrack(TrackDelegate.cast(track), DeletionCause.API);
}
@Override
public CompletableFuture<Boolean> saveUUIDData(@NonNull String username, @NonNull UUID uuid) {
return handle.force().saveUUIDData(checkUsername(username), uuid);
return handle.noBuffer().saveUUIDData(checkUsername(username), uuid);
}
@Override
public CompletableFuture<UUID> getUUID(@NonNull String username) {
return handle.force().getUUID(checkUsername(username));
return handle.noBuffer().getUUID(checkUsername(username));
}
@Override
public CompletableFuture<String> getName(@NonNull UUID uuid) {
return handle.force().getName(uuid);
return handle.noBuffer().getName(uuid);
}
}
@@ -45,6 +45,7 @@ import java.util.function.Supplier;
@RequiredArgsConstructor
public abstract class BufferedRequest<T> {
private final long bufferTimeMillis;
private final long sleepInterval;
private final Executor executor;
private WeakReference<Processor<T>> processor = null;
@@ -60,7 +61,7 @@ public abstract class BufferedRequest<T> {
}
}
Processor<T> p = new Processor<>(bufferTimeMillis, this::perform);
Processor<T> p = new Processor<>(bufferTimeMillis, sleepInterval, this::perform);
executor.execute(p);
processor = new WeakReference<>(p);
return p.get();
@@ -79,6 +80,7 @@ public abstract class BufferedRequest<T> {
@RequiredArgsConstructor
private static class Processor<R> implements Runnable {
private final long delayMillis;
private final long sleepMillis;
private final Supplier<R> supplier;
private final ReentrantLock lock = new ReentrantLock();
private final CompletableFuture<R> future = new CompletableFuture<>();
@@ -108,7 +110,7 @@ public abstract class BufferedRequest<T> {
}
try {
Thread.sleep(500);
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -0,0 +1,44 @@
/*
* This file is part of LuckPerms, licensed under the MIT License.
*
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package me.lucko.luckperms.common.buffers;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import me.lucko.luckperms.common.tasks.UpdateTask;
public class UpdateTaskBuffer extends BufferedRequest<Void> {
private final LuckPermsPlugin plugin;
public UpdateTaskBuffer(LuckPermsPlugin plugin) {
super(250L, 50L, plugin::doAsync);
this.plugin = plugin;
}
@Override
protected Void perform() {
new UpdateTask(plugin).run();
return null;
}
}
@@ -173,7 +173,7 @@ public abstract class SubCommand<T> extends Command<T, Void> {
}
public static void save(User user, Sender sender, LuckPermsPlugin plugin) {
boolean success = plugin.getStorage().force().saveUser(user).join();
boolean success = plugin.getStorage().noBuffer().saveUser(user).join();
if (sender.isImport()) {
user.getRefreshBuffer().request();
@@ -192,7 +192,7 @@ public abstract class SubCommand<T> extends Command<T, Void> {
}
public static void save(Group group, Sender sender, LuckPermsPlugin plugin) {
boolean success = plugin.getStorage().force().saveGroup(group).join();
boolean success = plugin.getStorage().noBuffer().saveGroup(group).join();
if (sender.isImport()) {
plugin.getUpdateTaskBuffer().request();
@@ -211,7 +211,7 @@ public abstract class SubCommand<T> extends Command<T, Void> {
}
public static void save(Track track, Sender sender, LuckPermsPlugin plugin) {
boolean success = plugin.getStorage().force().saveTrack(track).join();
boolean success = plugin.getStorage().noBuffer().saveTrack(track).join();
if (sender.isImport()) {
plugin.getUpdateTaskBuffer().request();
@@ -78,7 +78,7 @@ public class LogNotify extends SubCommand<Log> {
user.removeIf(n -> n.getPermission().equalsIgnoreCase("luckperms.log.notify.ignoring"));
}
plugin.getStorage().force().saveUser(user).join();
plugin.getStorage().noBuffer().saveUser(user).join();
}
@Override
@@ -25,6 +25,8 @@
package me.lucko.luckperms.common.locale;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import java.io.File;
/**
@@ -32,8 +34,27 @@ import java.io.File;
*/
public interface LocaleManager {
/**
* Tries to load from a locale file, and logs via the plugin if successful.
*
* @param plugin the plugin to log to
* @param file the file to load from
*/
void tryLoad(LuckPermsPlugin plugin, File file);
/**
* Loads a locale file
*
* @param file the file to load from
* @throws Exception if the process fails
*/
void loadFromFile(File file) throws Exception;
/**
* Gets the size of loaded translations
*
* @return the size of the loaded translations
*/
int getSize();
/**
@@ -25,10 +25,17 @@
package me.lucko.luckperms.common.locale;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import java.io.File;
public class NoopLocaleManager implements LocaleManager {
@Override
public void tryLoad(LuckPermsPlugin plugin, File file) {
}
@Override
public void loadFromFile(File file) throws Exception {
@@ -27,6 +27,8 @@ package me.lucko.luckperms.common.locale;
import com.google.common.collect.ImmutableMap;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import org.yaml.snakeyaml.Yaml;
import java.io.BufferedReader;
@@ -40,6 +42,17 @@ public class SimpleLocaleManager implements LocaleManager {
private Map<Message, String> messages = ImmutableMap.of();
private Map<CommandSpec, CommandSpec.CommandSpecData> commands = ImmutableMap.of();
public void tryLoad(LuckPermsPlugin plugin, File file) {
if (file.exists()) {
plugin.getLog().info("Found lang.yml - loading messages...");
try {
loadFromFile(file);
} catch (Exception e) {
e.printStackTrace();
}
}
}
@SuppressWarnings("unchecked")
public void loadFromFile(File file) throws Exception {
try (BufferedReader reader = Files.newBufferedReader(file.toPath(), StandardCharsets.UTF_8)) {
@@ -26,7 +26,6 @@
package me.lucko.luckperms.common.messaging;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
@@ -47,9 +46,8 @@ import java.util.function.Consumer;
/**
* An abstract implementation of {@link me.lucko.luckperms.api.MessagingService}.
*/
@RequiredArgsConstructor
public abstract class AbstractMessagingService implements InternalMessagingService {
public static final String CHANNEL = "lpuc";
protected static final String CHANNEL = "lpuc";
@Getter
private final LuckPermsPlugin plugin;
@@ -57,17 +55,19 @@ public abstract class AbstractMessagingService implements InternalMessagingServi
@Getter
private final String name;
private final Set<UUID> receivedMessages = Collections.synchronizedSet(new HashSet<>());
private final Gson gson = new Gson();
private final Set<UUID> receivedMessages;
private final Gson gson;
@Getter
private final BufferedRequest<Void> updateBuffer = new BufferedRequest<Void>(3000L, r -> getPlugin().doAsync(r)) {
@Override
protected Void perform() {
pushUpdate();
return null;
}
};
private final BufferedRequest<Void> updateBuffer;
public AbstractMessagingService(LuckPermsPlugin plugin, String name) {
this.plugin = plugin;
this.name = name;
this.receivedMessages = Collections.synchronizedSet(new HashSet<>());
this.gson = new Gson();
this.updateBuffer = new PushUpdateBuffer(plugin);
}
protected abstract void sendMessage(String channel, String message);
@@ -164,4 +164,16 @@ public abstract class AbstractMessagingService implements InternalMessagingServi
}
}
private final class PushUpdateBuffer extends BufferedRequest<Void> {
public PushUpdateBuffer(LuckPermsPlugin plugin) {
super(3000L, 200L, plugin::doAsync);
}
@Override
protected Void perform() {
pushUpdate();
return null;
}
}
}
@@ -0,0 +1,81 @@
/*
* This file is part of LuckPerms, licensed under the MIT License.
*
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package me.lucko.luckperms.common.messaging;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import me.lucko.luckperms.common.config.ConfigKeys;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
@RequiredArgsConstructor
public class MessagingFactory<P extends LuckPermsPlugin> {
@Getter(AccessLevel.PROTECTED)
private final P plugin;
public final InternalMessagingService getInstance() {
String messagingType = plugin.getConfiguration().get(ConfigKeys.MESSAGING_SERVICE).toLowerCase();
if (messagingType.equals("none") && plugin.getConfiguration().get(ConfigKeys.REDIS_ENABLED)) {
messagingType = "redis";
}
if (messagingType.equals("none")) {
return new NoopMessagingService();
}
plugin.getLog().info("Loading messaging service... [" + messagingType.toUpperCase() + "]");
InternalMessagingService service = getServiceFor(messagingType);
if (service != null) {
return service;
}
plugin.getLog().warn("Messaging service '" + messagingType + "' not recognised.");
return new NoopMessagingService();
}
protected InternalMessagingService getServiceFor(String messagingType) {
if (messagingType.equals("redis")) {
if (plugin.getConfiguration().get(ConfigKeys.REDIS_ENABLED)) {
RedisMessagingService redis = new RedisMessagingService(plugin);
try {
redis.init(plugin.getConfiguration().get(ConfigKeys.REDIS_ADDRESS), plugin.getConfiguration().get(ConfigKeys.REDIS_PASSWORD));
return redis;
} catch (Exception e) {
plugin.getLog().warn("Couldn't load redis...");
e.printStackTrace();
}
} else {
plugin.getLog().warn("Messaging Service was set to redis, but redis is not enabled!");
}
}
return null;
}
}
@@ -70,7 +70,7 @@ public class User extends PermissionHolder implements Identifiable<UserIdentifie
private final UserCache userData = new UserCache(this);
@Getter
private BufferedRequest<Void> refreshBuffer = new BufferedRequest<Void>(1000L, r -> getPlugin().doAsync(r)) {
private BufferedRequest<Void> refreshBuffer = new BufferedRequest<Void>(250L, 50L, r -> getPlugin().doAsync(r)) {
@Override
protected Void perform() {
refreshPermissions();
@@ -58,8 +58,8 @@ import java.util.function.Supplier;
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class AbstractStorage implements Storage {
public static Storage wrap(LuckPermsPlugin plugin, AbstractBacking backing) {
BufferedOutputStorage bufferedDs = BufferedOutputStorage.wrap(PhasedStorage.wrap(new AbstractStorage(plugin, backing)), 1000L);
plugin.getScheduler().asyncRepeating(bufferedDs, 5L);
BufferedOutputStorage bufferedDs = BufferedOutputStorage.wrap(PhasedStorage.wrap(new AbstractStorage(plugin, backing)), 250L);
plugin.getScheduler().asyncRepeating(bufferedDs, 2L);
return bufferedDs;
}
@@ -82,7 +82,7 @@ public class AbstractStorage implements Storage {
}
@Override
public Storage force() {
public Storage noBuffer() {
return this;
}
@@ -55,7 +55,7 @@ public interface Storage {
void setAcceptingLogins(boolean acceptingLogins);
Storage force();
Storage noBuffer();
void init();
@@ -413,7 +413,7 @@ public class MongoDBBacking extends AbstractBacking {
@Override
public boolean cleanupUsers() {
return true; // TODO
return true;
}
@Override
@@ -496,7 +496,7 @@ public class SQLBacking extends AbstractBacking {
@Override
public boolean cleanupUsers() {
return true; // TODO
return true;
}
@Override
@@ -34,10 +34,8 @@ import me.lucko.luckperms.common.buffers.Buffer;
import me.lucko.luckperms.common.model.Group;
import me.lucko.luckperms.common.model.Track;
import me.lucko.luckperms.common.model.User;
import me.lucko.luckperms.common.references.UserIdentifier;
import me.lucko.luckperms.common.storage.Storage;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
@@ -55,7 +53,6 @@ public class BufferedOutputStorage implements Storage, Runnable {
private final Buffer<User, Boolean> userOutputBuffer = Buffer.of(user -> BufferedOutputStorage.this.backing.saveUser(user).join());
private final Buffer<Group, Boolean> groupOutputBuffer = Buffer.of(group -> BufferedOutputStorage.this.backing.saveGroup(group).join());
private final Buffer<Track, Boolean> trackOutputBuffer = Buffer.of(track -> BufferedOutputStorage.this.backing.saveTrack(track).join());
private final Buffer<UserIdentifier, Boolean> uuidDataOutputBuffer = Buffer.of(userIdentifier -> BufferedOutputStorage.this.backing.saveUUIDData(userIdentifier.getUsername().get(), userIdentifier.getUuid()).join());
@Override
public void run() {
@@ -70,11 +67,10 @@ public class BufferedOutputStorage implements Storage, Runnable {
userOutputBuffer.flush(flushTime);
groupOutputBuffer.flush(flushTime);
trackOutputBuffer.flush(flushTime);
userOutputBuffer.flush(flushTime);
}
@Override
public Storage force() {
public Storage noBuffer() {
return backing;
}
@@ -99,17 +95,11 @@ public class BufferedOutputStorage implements Storage, Runnable {
return trackOutputBuffer.enqueue(track);
}
@Override
public CompletableFuture<Boolean> saveUUIDData(String username, UUID uuid) {
return uuidDataOutputBuffer.enqueue(UserIdentifier.of(uuid, username));
}
private interface Exclude {
Storage force();
Storage noBuffer();
CompletableFuture<Void> shutdown();
CompletableFuture<Boolean> saveUser(User user);
CompletableFuture<Boolean> saveGroup(Group group);
CompletableFuture<Boolean> saveTrack(Track track);
CompletableFuture<Boolean> saveUUIDData(String username, UUID uuid);
}
}
@@ -65,7 +65,7 @@ public class PhasedStorage implements Storage {
private final Phaser phaser = new Phaser();
@Override
public Storage force() {
public Storage noBuffer() {
return this;
}
@@ -73,7 +73,7 @@ public class PhasedStorage implements Storage {
public void shutdown() {
// Wait for other threads to finish.
try {
phaser.awaitAdvanceInterruptibly(phaser.getPhase(), 5, TimeUnit.SECONDS);
phaser.awaitAdvanceInterruptibly(phaser.getPhase(), 10, TimeUnit.SECONDS);
} catch (InterruptedException | TimeoutException e) {
e.printStackTrace();
}
@@ -46,32 +46,32 @@ public class LoginHelper {
final UuidCache cache = plugin.getUuidCache();
if (!plugin.getConfiguration().get(ConfigKeys.USE_SERVER_UUIDS)) {
UUID uuid = plugin.getStorage().force().getUUID(username).join();
UUID uuid = plugin.getStorage().noBuffer().getUUID(username).join();
if (uuid != null) {
cache.addToCache(u, uuid);
} else {
// No previous data for this player
plugin.getApiProvider().getEventFactory().handleUserFirstLogin(u, username);
cache.addToCache(u, u);
CompletableFuture<Boolean> future = plugin.getStorage().force().saveUUIDData(username, u);
CompletableFuture<Boolean> future = plugin.getStorage().noBuffer().saveUUIDData(username, u);
if (joinUuidSave) {
future.join();
}
}
} else {
String name = plugin.getStorage().force().getName(u).join();
String name = plugin.getStorage().noBuffer().getName(u).join();
if (name == null) {
plugin.getApiProvider().getEventFactory().handleUserFirstLogin(u, username);
}
// Online mode, no cache needed. This is just for name -> uuid lookup.
CompletableFuture<Boolean> future = plugin.getStorage().force().saveUUIDData(username, u);
CompletableFuture<Boolean> future = plugin.getStorage().noBuffer().saveUUIDData(username, u);
if (joinUuidSave) {
future.join();
}
}
plugin.getStorage().force().loadUser(cache.getUUID(u), username).join();
plugin.getStorage().noBuffer().loadUser(cache.getUUID(u), username).join();
User user = plugin.getUserManager().getIfLoaded(cache.getUUID(u));
if (user == null) {
plugin.getLog().warn("Failed to load user: " + username);
@@ -87,7 +87,7 @@ public class LoginHelper {
// If they were given a default, persist the new assignments back to the storage.
if (save) {
plugin.getStorage().force().saveUser(user).join();
plugin.getStorage().noBuffer().saveUser(user).join();
}
user.preCalculateData(false); // Pretty nasty calculation call. Sets up the caching system so data is ready when the user joins.