Expose a means to implement the plugin's MessagingService via the API

This commit is contained in:
Luck
2018-01-19 23:35:41 +00:00
Unverified
parent 612712f015
commit 821dc4ef56
38 changed files with 1469 additions and 442 deletions
@@ -26,9 +26,6 @@
package me.lucko.luckperms.common.actionlog;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import me.lucko.luckperms.api.Contexts;
import me.lucko.luckperms.api.LogEntry;
@@ -380,37 +377,4 @@ public class ExtendedLogEntry implements LogEntry {
"action=" + this.action + ")";
}
}
public static JsonObject serializeWithId(String id, LogEntry entry) {
JsonObject data = new JsonObject();
data.add("id", new JsonPrimitive(id));
data.add("actor", new JsonPrimitive(entry.getActor().toString()));
data.add("actorName", new JsonPrimitive(entry.getActorName()));
data.add("type", new JsonPrimitive(entry.getType().name()));
if (entry.getActed().isPresent()) {
data.add("acted", new JsonPrimitive(entry.getActed().get().toString()));
}
data.add("actedName", new JsonPrimitive(entry.getActedName()));
data.add("action", new JsonPrimitive(entry.getAction()));
return data;
}
public static Map.Entry<String, ExtendedLogEntry> deserialize(JsonObject object) {
ExtendedLogEntryBuilder builder = build();
String id = object.get("id").getAsString();
builder.actor(UUID.fromString(object.get("actor").getAsString()));
builder.actorName(object.get("actorName").getAsString());
builder.type(Type.valueOf(object.get("type").getAsString()));
if (object.has("acted")) {
builder.actor(UUID.fromString(object.get("acted").getAsString()));
}
builder.actedName(object.get("actedName").getAsString());
builder.action(object.get("action").getAsString());
return Maps.immutableEntry(id, builder.build());
}
}
@@ -31,7 +31,7 @@ import me.lucko.luckperms.common.commands.impl.log.LogNotify;
import me.lucko.luckperms.common.commands.sender.Sender;
import me.lucko.luckperms.common.config.ConfigKeys;
import me.lucko.luckperms.common.locale.Message;
import me.lucko.luckperms.common.messaging.ExtendedMessagingService;
import me.lucko.luckperms.common.messaging.InternalMessagingService;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import java.util.Optional;
@@ -69,7 +69,7 @@ public class LogDispatcher {
return;
}
Optional<ExtendedMessagingService> messagingService = this.plugin.getMessagingService();
Optional<InternalMessagingService> messagingService = this.plugin.getMessagingService();
if (!sender.isImport() && messagingService.isPresent()) {
messagingService.get().pushLog(entry);
}
@@ -37,6 +37,7 @@ import me.lucko.luckperms.api.event.EventBus;
import me.lucko.luckperms.api.manager.GroupManager;
import me.lucko.luckperms.api.manager.TrackManager;
import me.lucko.luckperms.api.manager.UserManager;
import me.lucko.luckperms.api.messenger.MessengerProvider;
import me.lucko.luckperms.api.metastacking.MetaStackFactory;
import me.lucko.luckperms.api.platform.PlatformInfo;
import me.lucko.luckperms.common.api.delegates.manager.ApiContextManager;
@@ -44,14 +45,16 @@ import me.lucko.luckperms.common.api.delegates.manager.ApiGroupManager;
import me.lucko.luckperms.common.api.delegates.manager.ApiTrackManager;
import me.lucko.luckperms.common.api.delegates.manager.ApiUserManager;
import me.lucko.luckperms.common.api.delegates.misc.ApiActionLogger;
import me.lucko.luckperms.common.api.delegates.misc.ApiMessagingService;
import me.lucko.luckperms.common.api.delegates.misc.ApiMetaStackFactory;
import me.lucko.luckperms.common.api.delegates.misc.ApiNodeFactory;
import me.lucko.luckperms.common.api.delegates.misc.ApiPlatformInfo;
import me.lucko.luckperms.common.config.ConfigKeys;
import me.lucko.luckperms.common.messaging.LuckPermsMessagingService;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import javax.annotation.Nonnull;
@@ -133,7 +136,14 @@ public class LuckPermsApiProvider implements LuckPermsApi {
@Nonnull
@Override
public Optional<MessagingService> getMessagingService() {
return this.plugin.getMessagingService().map(Function.identity());
return this.plugin.getMessagingService().map(ApiMessagingService::new);
}
@Override
public void registerMessengerProvider(@Nonnull MessengerProvider messengerProvider) {
if (this.plugin.getConfiguration().get(ConfigKeys.MESSAGING_SERVICE).equals("custom")) {
this.plugin.setMessagingService(new LuckPermsMessagingService(this.plugin, messengerProvider));
}
}
@Override
@@ -0,0 +1,59 @@
/*
* This file is part of LuckPerms, licensed under the MIT License.
*
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package me.lucko.luckperms.common.api.delegates.misc;
import me.lucko.luckperms.api.MessagingService;
import me.lucko.luckperms.api.User;
import me.lucko.luckperms.common.api.delegates.model.ApiUser;
import me.lucko.luckperms.common.messaging.InternalMessagingService;
import java.util.Objects;
import javax.annotation.Nonnull;
public class ApiMessagingService implements MessagingService {
private final InternalMessagingService handle;
public ApiMessagingService(InternalMessagingService handle) {
this.handle = handle;
}
@Override
public String getName() {
return this.handle.getName();
}
@Override
public void pushUpdate() {
this.handle.pushUpdate();
}
@Override
public void pushUserUpdate(@Nonnull User user) {
Objects.requireNonNull(user, "user");
this.handle.pushUserUpdate(ApiUser.cast(user));
}
}
@@ -34,7 +34,7 @@ import me.lucko.luckperms.common.commands.utils.CommandUtils;
import me.lucko.luckperms.common.config.ConfigKeys;
import me.lucko.luckperms.common.locale.LocalizedSpec;
import me.lucko.luckperms.common.locale.Message;
import me.lucko.luckperms.common.messaging.ExtendedMessagingService;
import me.lucko.luckperms.common.messaging.InternalMessagingService;
import me.lucko.luckperms.common.model.Group;
import me.lucko.luckperms.common.model.Track;
import me.lucko.luckperms.common.model.User;
@@ -185,7 +185,7 @@ public abstract class SubCommand<T> extends Command<T, Void> {
}
if (!sender.isImport()) {
Optional<ExtendedMessagingService> messagingService = plugin.getMessagingService();
Optional<InternalMessagingService> messagingService = plugin.getMessagingService();
if (messagingService.isPresent() && plugin.getConfiguration().get(ConfigKeys.AUTO_PUSH_UPDATES)) {
messagingService.get().pushUserUpdate(user);
}
@@ -208,7 +208,7 @@ public abstract class SubCommand<T> extends Command<T, Void> {
}
if (!sender.isImport()) {
Optional<ExtendedMessagingService> messagingService = plugin.getMessagingService();
Optional<InternalMessagingService> messagingService = plugin.getMessagingService();
if (messagingService.isPresent() && plugin.getConfiguration().get(ConfigKeys.AUTO_PUSH_UPDATES)) {
messagingService.get().getUpdateBuffer().request();
}
@@ -231,7 +231,7 @@ public abstract class SubCommand<T> extends Command<T, Void> {
}
if (!sender.isImport()) {
Optional<ExtendedMessagingService> messagingService = plugin.getMessagingService();
Optional<InternalMessagingService> messagingService = plugin.getMessagingService();
if (messagingService.isPresent() && plugin.getConfiguration().get(ConfigKeys.AUTO_PUSH_UPDATES)) {
messagingService.get().getUpdateBuffer().request();
}
@@ -34,7 +34,7 @@ import me.lucko.luckperms.common.config.LuckPermsConfiguration;
import me.lucko.luckperms.common.locale.CommandSpec;
import me.lucko.luckperms.common.locale.LocaleManager;
import me.lucko.luckperms.common.locale.Message;
import me.lucko.luckperms.common.messaging.ExtendedMessagingService;
import me.lucko.luckperms.common.messaging.InternalMessagingService;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import me.lucko.luckperms.common.utils.DateUtil;
import me.lucko.luckperms.common.utils.Predicates;
@@ -68,7 +68,7 @@ public class InfoCommand extends SingleCommand {
}
Message.INFO_MIDDLE.send(sender,
plugin.getMessagingService().map(ExtendedMessagingService::getName).orElse("None"),
plugin.getMessagingService().map(InternalMessagingService::getName).orElse("None"),
plugin.getContextManager().getStaticContextString().orElse("None"),
plugin.getPlayerCount(),
plugin.getUniqueConnections().size(),
@@ -32,7 +32,7 @@ import me.lucko.luckperms.common.commands.sender.Sender;
import me.lucko.luckperms.common.locale.CommandSpec;
import me.lucko.luckperms.common.locale.LocaleManager;
import me.lucko.luckperms.common.locale.Message;
import me.lucko.luckperms.common.messaging.ExtendedMessagingService;
import me.lucko.luckperms.common.messaging.InternalMessagingService;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import me.lucko.luckperms.common.utils.Predicates;
@@ -50,7 +50,7 @@ public class NetworkSyncCommand extends SingleCommand {
plugin.getUpdateTaskBuffer().request().join();
Message.UPDATE_TASK_COMPLETE_NETWORK.send(sender);
Optional<ExtendedMessagingService> messagingService = plugin.getMessagingService();
Optional<InternalMessagingService> messagingService = plugin.getMessagingService();
if (!messagingService.isPresent()) {
Message.UPDATE_TASK_PUSH_FAILURE_NOT_SETUP.send(sender);
return CommandResult.FAILURE;
@@ -1,271 +0,0 @@
/*
* This file is part of LuckPerms, licensed under the MIT License.
*
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package me.lucko.luckperms.common.messaging;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import me.lucko.luckperms.api.LogEntry;
import me.lucko.luckperms.common.actionlog.ExtendedLogEntry;
import me.lucko.luckperms.common.buffers.BufferedRequest;
import me.lucko.luckperms.common.config.ConfigKeys;
import me.lucko.luckperms.common.model.User;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
/**
* An abstract implementation of {@link me.lucko.luckperms.api.MessagingService}.
*/
public abstract class AbstractMessagingService implements ExtendedMessagingService {
protected static final String CHANNEL = "lpuc";
private static final String UPDATE_HEADER = "update:";
private static final String USER_UPDATE_HEADER = "userupdate:";
private static final String LOG_HEADER = "log";
private final LuckPermsPlugin plugin;
private final String name;
private final Set<UUID> receivedMessages;
private final Gson gson;
private final BufferedRequest<Void> updateBuffer;
public AbstractMessagingService(LuckPermsPlugin plugin, String name) {
this.plugin = plugin;
this.name = name;
this.receivedMessages = Collections.synchronizedSet(new HashSet<>());
this.gson = new GsonBuilder().disableHtmlEscaping().create();
this.updateBuffer = new PushUpdateBuffer(plugin);
}
public LuckPermsPlugin getPlugin() {
return this.plugin;
}
@Override
public String getName() {
return this.name;
}
@Override
public BufferedRequest<Void> getUpdateBuffer() {
return this.updateBuffer;
}
protected abstract void sendMessage(String message);
protected void onMessage(String msg, Consumer<String> callback) {
if (msg.startsWith(UPDATE_HEADER) && msg.length() > UPDATE_HEADER.length()) {
String content = msg.substring(UPDATE_HEADER.length());
UUID requestId = uuidFromString(content);
if (requestId == null) {
return;
}
if (!this.receivedMessages.add(requestId)) {
return;
}
this.plugin.getLog().info("[" + this.name + " Messaging] Received update ping with id: " + requestId.toString());
if (this.plugin.getEventFactory().handleNetworkPreSync(false, requestId)) {
return;
}
this.plugin.getUpdateTaskBuffer().request();
if (callback != null) {
callback.accept(msg);
}
} else if (msg.startsWith(USER_UPDATE_HEADER) && msg.length() > USER_UPDATE_HEADER.length()) {
String content = msg.substring(USER_UPDATE_HEADER.length());
Map.Entry<UUID, UUID> entry = uuidsFromString(content);
if (entry == null) {
return;
}
UUID requestId = entry.getKey();
UUID userUuid = entry.getValue();
if (!this.receivedMessages.add(requestId)) {
return;
}
User user = this.plugin.getUserManager().getIfLoaded(userUuid);
if (user == null) {
return;
}
this.plugin.getLog().info("[" + this.name + " Messaging] Received user update ping for '" + user.getFriendlyName() + "' with id: " + uuidToString(requestId));
if (this.plugin.getEventFactory().handleNetworkPreSync(false, requestId)) {
return;
}
this.plugin.getStorage().loadUser(user.getUuid(), null);
if (callback != null) {
callback.accept(msg);
}
} else if (msg.startsWith(LOG_HEADER) && msg.length() > LOG_HEADER.length()) {
String content = msg.substring(LOG_HEADER.length());
Map.Entry<String, ExtendedLogEntry> entry;
try {
entry = ExtendedLogEntry.deserialize(this.gson.fromJson(content, JsonObject.class));
} catch (Exception e) {
return;
}
if (entry.getKey() == null) {
return;
}
UUID requestId = uuidFromString(entry.getKey());
if (requestId == null) {
return;
}
if (!this.receivedMessages.add(requestId)) {
return;
}
this.plugin.getEventFactory().handleLogReceive(requestId, entry.getValue());
this.plugin.getLogDispatcher().dispatchFromRemote(entry.getValue());
if (callback != null) {
callback.accept(msg);
}
}
}
@Override
public void pushUpdate() {
this.plugin.getScheduler().doAsync(() -> {
UUID requestId = generatePingId();
String strId = uuidToString(requestId);
this.plugin.getLog().info("[" + this.name + " Messaging] Sending ping with id: " + strId);
sendMessage(UPDATE_HEADER + strId);
});
}
@Override
public void pushUserUpdate(User user) {
this.plugin.getScheduler().doAsync(() -> {
UUID requestId = generatePingId();
String strId = uuidToString(requestId);
this.plugin.getLog().info("[" + this.name + " Messaging] Sending user ping for '" + user.getFriendlyName() + "' with id: " + strId);
sendMessage(USER_UPDATE_HEADER + uuidsToString(requestId, user.getUuid()));
});
}
@Override
public void pushLog(LogEntry logEntry) {
this.plugin.getScheduler().doAsync(() -> {
UUID requestId = generatePingId();
String strId = uuidToString(requestId);
if (this.plugin.getEventFactory().handleLogNetworkPublish(!this.plugin.getConfiguration().get(ConfigKeys.PUSH_LOG_ENTRIES), requestId, logEntry)) {
return;
}
this.plugin.getLog().info("[" + this.name + " Messaging] Sending log with id: " + strId);
sendMessage(LOG_HEADER + this.gson.toJson(ExtendedLogEntry.serializeWithId(strId, logEntry)));
});
}
private UUID generatePingId() {
UUID uuid = UUID.randomUUID();
this.receivedMessages.add(uuid);
return uuid;
}
private static String uuidToString(UUID uuid) {
ByteBuffer buf = ByteBuffer.allocate(Long.BYTES * 2);
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
return Base64.getEncoder().encodeToString(buf.array());
}
private static UUID uuidFromString(String s) {
try {
byte[] bytes = Base64.getDecoder().decode(s);
ByteBuffer buf = ByteBuffer.wrap(bytes);
return new UUID(buf.getLong(), buf.getLong());
} catch (IllegalArgumentException e) {
return null;
}
}
private static String uuidsToString(UUID uuid1, UUID uuid2) {
ByteBuffer buf = ByteBuffer.allocate(Long.BYTES * 4);
buf.putLong(uuid1.getMostSignificantBits());
buf.putLong(uuid1.getLeastSignificantBits());
buf.putLong(uuid2.getMostSignificantBits());
buf.putLong(uuid2.getLeastSignificantBits());
return Base64.getEncoder().encodeToString(buf.array());
}
private static Map.Entry<UUID, UUID> uuidsFromString(String s) {
try {
byte[] bytes = Base64.getDecoder().decode(s);
ByteBuffer buf = ByteBuffer.wrap(bytes);
UUID uuid1 = new UUID(buf.getLong(), buf.getLong());
UUID uuid2 = new UUID(buf.getLong(), buf.getLong());
return Maps.immutableEntry(uuid1, uuid2);
} catch (IllegalArgumentException e) {
return null;
}
}
private final class PushUpdateBuffer extends BufferedRequest<Void> {
public PushUpdateBuffer(LuckPermsPlugin plugin) {
super(2000L, 200L, plugin.getScheduler().async());
}
@Override
protected Void perform() {
pushUpdate();
return null;
}
}
}
@@ -26,11 +26,10 @@
package me.lucko.luckperms.common.messaging;
import me.lucko.luckperms.api.LogEntry;
import me.lucko.luckperms.api.MessagingService;
import me.lucko.luckperms.common.buffers.BufferedRequest;
import me.lucko.luckperms.common.model.User;
public interface ExtendedMessagingService extends MessagingService {
public interface InternalMessagingService {
/**
* Gets the name of this messaging service
@@ -51,6 +50,12 @@ public interface ExtendedMessagingService extends MessagingService {
*/
BufferedRequest<Void> getUpdateBuffer();
/**
* Uses the messaging service to inform other servers about a general
* change.
*/
void pushUpdate();
/**
* Pushes an update for a specific user.
*
@@ -0,0 +1,209 @@
/*
* This file is part of LuckPerms, licensed under the MIT License.
*
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package me.lucko.luckperms.common.messaging;
import me.lucko.luckperms.api.LogEntry;
import me.lucko.luckperms.api.messenger.IncomingMessageConsumer;
import me.lucko.luckperms.api.messenger.Messenger;
import me.lucko.luckperms.api.messenger.MessengerProvider;
import me.lucko.luckperms.api.messenger.message.Message;
import me.lucko.luckperms.api.messenger.message.type.LogMessage;
import me.lucko.luckperms.api.messenger.message.type.UpdateMessage;
import me.lucko.luckperms.api.messenger.message.type.UserUpdateMessage;
import me.lucko.luckperms.common.actionlog.ExtendedLogEntry;
import me.lucko.luckperms.common.buffers.BufferedRequest;
import me.lucko.luckperms.common.config.ConfigKeys;
import me.lucko.luckperms.common.messaging.message.LogMessageImpl;
import me.lucko.luckperms.common.messaging.message.UpdateMessageImpl;
import me.lucko.luckperms.common.messaging.message.UserUpdateMessageImpl;
import me.lucko.luckperms.common.model.User;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import javax.annotation.Nonnull;
public class LuckPermsMessagingService implements InternalMessagingService, IncomingMessageConsumer {
private final LuckPermsPlugin plugin;
private final Set<UUID> receivedMessages;
private final BufferedRequest<Void> updateBuffer;
private final MessengerProvider messengerProvider;
private final Messenger messenger;
public LuckPermsMessagingService(LuckPermsPlugin plugin, MessengerProvider messengerProvider) {
this.plugin = plugin;
this.messengerProvider = messengerProvider;
this.messenger = messengerProvider.obtain(this);
Objects.requireNonNull(this.messenger, "messenger");
this.receivedMessages = Collections.synchronizedSet(new HashSet<>());
this.updateBuffer = new PushUpdateBuffer(plugin);
}
@Override
public String getName() {
return this.messengerProvider.getName();
}
@Override
public void close() {
this.messenger.close();
}
@Override
public BufferedRequest<Void> getUpdateBuffer() {
return this.updateBuffer;
}
private UUID generatePingId() {
UUID uuid = UUID.randomUUID();
this.receivedMessages.add(uuid);
return uuid;
}
@Override
public void pushUpdate() {
this.plugin.getScheduler().doAsync(() -> {
UUID requestId = generatePingId();
this.plugin.getLog().info("[" + getName() + " Messaging] Sending ping with id: " + requestId);
this.messenger.sendOutgoingMessage(new UpdateMessageImpl(requestId));
});
}
@Override
public void pushUserUpdate(User user) {
this.plugin.getScheduler().doAsync(() -> {
UUID requestId = generatePingId();
this.plugin.getLog().info("[" + getName() + " Messaging] Sending user ping for '" + user.getFriendlyName() + "' with id: " + requestId);
this.messenger.sendOutgoingMessage(new UserUpdateMessageImpl(requestId, user.getUuid()));
});
}
@Override
public void pushLog(LogEntry logEntry) {
this.plugin.getScheduler().doAsync(() -> {
UUID requestId = generatePingId();
if (this.plugin.getEventFactory().handleLogNetworkPublish(!this.plugin.getConfiguration().get(ConfigKeys.PUSH_LOG_ENTRIES), requestId, logEntry)) {
return;
}
this.plugin.getLog().info("[" + getName() + " Messaging] Sending log with id: " + requestId);
this.messenger.sendOutgoingMessage(new LogMessageImpl(requestId, logEntry));
});
}
@Override
public boolean consumeIncomingMessage(@Nonnull Message message) {
Objects.requireNonNull(message, "message");
if (message instanceof UpdateMessage) {
UpdateMessage msg = (UpdateMessage) message;
if (!this.receivedMessages.add(msg.getId())) {
return false;
}
this.plugin.getLog().info("[" + getName() + " Messaging] Received update ping with id: " + msg.getId());
if (this.plugin.getEventFactory().handleNetworkPreSync(false, msg.getId())) {
return true;
}
this.plugin.getUpdateTaskBuffer().request();
return true;
} else if (message instanceof UserUpdateMessage) {
UserUpdateMessage msg = (UserUpdateMessage) message;
if (!this.receivedMessages.add(msg.getId())) {
return false;
}
User user = this.plugin.getUserManager().getIfLoaded(msg.getUser());
if (user == null) {
return true;
}
this.plugin.getLog().info("[" + getName() + " Messaging] Received user update ping for '" + user.getFriendlyName() + "' with id: " + msg.getId());
if (this.plugin.getEventFactory().handleNetworkPreSync(false, msg.getId())) {
return true;
}
this.plugin.getStorage().loadUser(user.getUuid(), null);
return true;
} else if (message instanceof LogMessage) {
LogMessage msg = (LogMessage) message;
if (!this.receivedMessages.add(msg.getId())) {
return false;
}
this.plugin.getEventFactory().handleLogReceive(msg.getId(), msg.getLogEntry());
this.plugin.getLogDispatcher().dispatchFromRemote((ExtendedLogEntry) msg.getLogEntry());
return true;
} else {
this.plugin.getLog().warn("Unable to decode incoming message: " + message + " (" + message.getClass().getName() + ")");
return false;
}
}
@Override
public boolean consumeIncomingMessageAsString(@Nonnull String encodedString) {
Objects.requireNonNull(encodedString, "encodedString");
Message decoded = UpdateMessageImpl.decode(encodedString);
if (decoded != null) {
return consumeIncomingMessage(decoded);
}
decoded = UserUpdateMessageImpl.decode(encodedString);
if (decoded != null) {
return consumeIncomingMessage(decoded);
}
decoded = LogMessageImpl.decode(encodedString);
return decoded != null && consumeIncomingMessage(decoded);
}
private final class PushUpdateBuffer extends BufferedRequest<Void> {
public PushUpdateBuffer(LuckPermsPlugin plugin) {
super(2000L, 200L, plugin.getScheduler().async());
}
@Override
protected Void perform() {
pushUpdate();
return null;
}
}
}
@@ -25,9 +25,15 @@
package me.lucko.luckperms.common.messaging;
import me.lucko.luckperms.api.messenger.IncomingMessageConsumer;
import me.lucko.luckperms.api.messenger.Messenger;
import me.lucko.luckperms.api.messenger.MessengerProvider;
import me.lucko.luckperms.common.config.ConfigKeys;
import me.lucko.luckperms.common.messaging.redis.RedisMessenger;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import javax.annotation.Nonnull;
public class MessagingFactory<P extends LuckPermsPlugin> {
private final P plugin;
@@ -39,7 +45,7 @@ public class MessagingFactory<P extends LuckPermsPlugin> {
return this.plugin;
}
public final ExtendedMessagingService getInstance() {
public final InternalMessagingService getInstance() {
String messagingType = this.plugin.getConfiguration().get(ConfigKeys.MESSAGING_SERVICE).toLowerCase();
if (messagingType.equals("none") && this.plugin.getConfiguration().get(ConfigKeys.REDIS_ENABLED)) {
messagingType = "redis";
@@ -51,7 +57,7 @@ public class MessagingFactory<P extends LuckPermsPlugin> {
this.plugin.getLog().info("Loading messaging service... [" + messagingType.toUpperCase() + "]");
ExtendedMessagingService service = getServiceFor(messagingType);
InternalMessagingService service = getServiceFor(messagingType);
if (service != null) {
return service;
}
@@ -60,15 +66,12 @@ public class MessagingFactory<P extends LuckPermsPlugin> {
return null;
}
protected ExtendedMessagingService getServiceFor(String messagingType) {
protected InternalMessagingService getServiceFor(String messagingType) {
if (messagingType.equals("redis")) {
if (this.plugin.getConfiguration().get(ConfigKeys.REDIS_ENABLED)) {
RedisMessagingService redis = new RedisMessagingService(this.plugin);
try {
redis.init(this.plugin.getConfiguration().get(ConfigKeys.REDIS_ADDRESS), this.plugin.getConfiguration().get(ConfigKeys.REDIS_PASSWORD));
return redis;
return new LuckPermsMessagingService(this.plugin, new RedisMessengerProvider());
} catch (Exception e) {
this.plugin.getLog().warn("Couldn't load redis...");
e.printStackTrace();
}
} else {
@@ -79,4 +82,21 @@ public class MessagingFactory<P extends LuckPermsPlugin> {
return null;
}
private class RedisMessengerProvider implements MessengerProvider {
@Nonnull
@Override
public String getName() {
return "Redis";
}
@Nonnull
@Override
public Messenger obtain(@Nonnull IncomingMessageConsumer incomingMessageConsumer) {
RedisMessenger redis = new RedisMessenger(getPlugin(), incomingMessageConsumer);
redis.init(getPlugin().getConfiguration().get(ConfigKeys.REDIS_ADDRESS), getPlugin().getConfiguration().get(ConfigKeys.REDIS_PASSWORD));
return redis;
}
}
}
@@ -0,0 +1,48 @@
/*
* This file is part of LuckPerms, licensed under the MIT License.
*
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package me.lucko.luckperms.common.messaging.message;
import me.lucko.luckperms.api.messenger.message.Message;
import me.lucko.luckperms.api.messenger.message.OutgoingMessage;
import java.util.UUID;
import javax.annotation.Nonnull;
public abstract class AbstractMessage implements Message, OutgoingMessage {
private final UUID id;
public AbstractMessage(UUID id) {
this.id = id;
}
@Nonnull
@Override
public UUID getId() {
return this.id;
}
}
@@ -0,0 +1,138 @@
/*
* This file is part of LuckPerms, licensed under the MIT License.
*
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package me.lucko.luckperms.common.messaging.message;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import me.lucko.luckperms.api.LogEntry;
import me.lucko.luckperms.api.messenger.message.type.LogMessage;
import me.lucko.luckperms.common.actionlog.ExtendedLogEntry;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.UUID;
import javax.annotation.Nonnull;
public class LogMessageImpl extends AbstractMessage implements LogMessage {
private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().create();
private static final String LOG_HEADER = "log";
public static LogMessageImpl decode(String msg) {
if (msg.startsWith(LOG_HEADER) && msg.length() > LOG_HEADER.length()) {
String content = msg.substring(LOG_HEADER.length());
try {
return decodeContent(GSON.fromJson(content, JsonObject.class));
} catch (Exception e) {
return null;
}
}
return null;
}
private final LogEntry logEntry;
public LogMessageImpl(UUID id, LogEntry logEntry) {
super(id);
this.logEntry = logEntry;
}
@Nonnull
@Override
public LogEntry getLogEntry() {
return this.logEntry;
}
@Nonnull
@Override
public String asEncodedString() {
return LOG_HEADER + GSON.toJson(encodeContent(uuidToString(getId()), this.logEntry));
}
private static String uuidToString(UUID uuid) {
ByteBuffer buf = ByteBuffer.allocate(Long.BYTES * 2);
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
return Base64.getEncoder().encodeToString(buf.array());
}
private static UUID uuidFromString(String s) {
try {
byte[] bytes = Base64.getDecoder().decode(s);
ByteBuffer buf = ByteBuffer.wrap(bytes);
return new UUID(buf.getLong(), buf.getLong());
} catch (IllegalArgumentException e) {
return null;
}
}
private static JsonObject encodeContent(String id, LogEntry entry) {
JsonObject data = new JsonObject();
data.add("id", new JsonPrimitive(id));
data.add("actor", new JsonPrimitive(entry.getActor().toString()));
data.add("actorName", new JsonPrimitive(entry.getActorName()));
data.add("type", new JsonPrimitive(entry.getType().name()));
if (entry.getActed().isPresent()) {
data.add("acted", new JsonPrimitive(entry.getActed().get().toString()));
}
data.add("actedName", new JsonPrimitive(entry.getActedName()));
data.add("action", new JsonPrimitive(entry.getAction()));
return data;
}
private static LogMessageImpl decodeContent(JsonObject object) {
ExtendedLogEntry.ExtendedLogEntryBuilder builder = ExtendedLogEntry.build();
String id = object.get("id").getAsString();
if (id == null) {
return null;
}
UUID uuid = uuidFromString(id);
if (uuid == null) {
return null;
}
builder.actor(UUID.fromString(object.get("actor").getAsString()));
builder.actorName(object.get("actorName").getAsString());
builder.type(LogEntry.Type.valueOf(object.get("type").getAsString()));
if (object.has("acted")) {
builder.actor(UUID.fromString(object.get("acted").getAsString()));
}
builder.actedName(object.get("actedName").getAsString());
builder.action(object.get("action").getAsString());
return new LogMessageImpl(uuid, builder.build());
}
}
@@ -0,0 +1,74 @@
/*
* This file is part of LuckPerms, licensed under the MIT License.
*
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package me.lucko.luckperms.common.messaging.message;
import me.lucko.luckperms.api.messenger.message.type.UpdateMessage;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.UUID;
import javax.annotation.Nonnull;
public class UpdateMessageImpl extends AbstractMessage implements UpdateMessage {
private static final String UPDATE_HEADER = "update:";
public static UpdateMessageImpl decode(String msg) {
if (msg.startsWith(UPDATE_HEADER) && msg.length() > UPDATE_HEADER.length()) {
String content = msg.substring(UPDATE_HEADER.length());
return decodeContent(content);
}
return null;
}
public UpdateMessageImpl(UUID id) {
super(id);
}
@Nonnull
@Override
public String asEncodedString() {
return UPDATE_HEADER + encodeContent(getId());
}
private static String encodeContent(UUID uuid) {
ByteBuffer buf = ByteBuffer.allocate(Long.BYTES * 2);
buf.putLong(uuid.getMostSignificantBits());
buf.putLong(uuid.getLeastSignificantBits());
return Base64.getEncoder().encodeToString(buf.array());
}
private static UpdateMessageImpl decodeContent(String s) {
try {
byte[] bytes = Base64.getDecoder().decode(s);
ByteBuffer buf = ByteBuffer.wrap(bytes);
return new UpdateMessageImpl(new UUID(buf.getLong(), buf.getLong()));
} catch (IllegalArgumentException e) {
return null;
}
}
}
@@ -0,0 +1,87 @@
/*
* This file is part of LuckPerms, licensed under the MIT License.
*
* Copyright (c) lucko (Luck) <luck@lucko.me>
* Copyright (c) contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package me.lucko.luckperms.common.messaging.message;
import me.lucko.luckperms.api.messenger.message.type.UserUpdateMessage;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.UUID;
import javax.annotation.Nonnull;
public class UserUpdateMessageImpl extends AbstractMessage implements UserUpdateMessage {
private static final String USER_UPDATE_HEADER = "userupdate:";
public static UserUpdateMessageImpl decode(String msg) {
if (msg.startsWith(USER_UPDATE_HEADER) && msg.length() > USER_UPDATE_HEADER.length()) {
String content = msg.substring(USER_UPDATE_HEADER.length());
return decodeContent(content);
}
return null;
}
private final UUID userUuid;
public UserUpdateMessageImpl(UUID id, UUID userUuid) {
super(id);
this.userUuid = userUuid;
}
@Nonnull
@Override
public UUID getUser() {
return this.userUuid;
}
@Nonnull
@Override
public String asEncodedString() {
return USER_UPDATE_HEADER + encodeContent(getId(), this.userUuid);
}
private static String encodeContent(UUID id, UUID userUuid) {
ByteBuffer buf = ByteBuffer.allocate(Long.BYTES * 4);
buf.putLong(id.getMostSignificantBits());
buf.putLong(id.getLeastSignificantBits());
buf.putLong(userUuid.getMostSignificantBits());
buf.putLong(userUuid.getLeastSignificantBits());
return Base64.getEncoder().encodeToString(buf.array());
}
private static UserUpdateMessageImpl decodeContent(String s) {
try {
byte[] bytes = Base64.getDecoder().decode(s);
ByteBuffer buf = ByteBuffer.wrap(bytes);
UUID id = new UUID(buf.getLong(), buf.getLong());
UUID userUuid = new UUID(buf.getLong(), buf.getLong());
return new UserUpdateMessageImpl(id, userUuid);
} catch (IllegalArgumentException e) {
return null;
}
}
}
@@ -23,8 +23,11 @@
* SOFTWARE.
*/
package me.lucko.luckperms.common.messaging;
package me.lucko.luckperms.common.messaging.redis;
import me.lucko.luckperms.api.messenger.IncomingMessageConsumer;
import me.lucko.luckperms.api.messenger.Messenger;
import me.lucko.luckperms.api.messenger.message.OutgoingMessage;
import me.lucko.luckperms.common.plugin.LuckPermsPlugin;
import redis.clients.jedis.Jedis;
@@ -32,17 +35,23 @@ import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
import javax.annotation.Nonnull;
/**
* An implementation of {@link me.lucko.luckperms.api.MessagingService} using Redis.
* An implementation of {@link Messenger} using Redis.
*/
public class RedisMessagingService extends AbstractMessagingService {
public class RedisMessenger implements Messenger {
private static final String CHANNEL = "lpuc";
private final LuckPermsPlugin plugin;
private final IncomingMessageConsumer consumer;
private JedisPool jedisPool;
private LPSub sub;
public RedisMessagingService(LuckPermsPlugin plugin) {
super(plugin, "Redis");
public RedisMessenger(LuckPermsPlugin plugin, IncomingMessageConsumer consumer) {
this.plugin = plugin;
this.consumer = consumer;
}
public void init(String address, String password) {
@@ -67,24 +76,24 @@ public class RedisMessagingService extends AbstractMessagingService {
}
@Override
public void close() {
this.sub.unsubscribe();
this.jedisPool.destroy();
}
@Override
protected void sendMessage(String message) {
public void sendOutgoingMessage(@Nonnull OutgoingMessage outgoingMessage) {
try (Jedis jedis = this.jedisPool.getResource()) {
jedis.publish(CHANNEL, message);
jedis.publish(CHANNEL, outgoingMessage.asEncodedString());
} catch (Exception e) {
e.printStackTrace();
}
}
private static class LPSub extends JedisPubSub {
private final RedisMessagingService parent;
@Override
public void close() {
this.sub.unsubscribe();
this.jedisPool.destroy();
}
public LPSub(RedisMessagingService parent) {
private static class LPSub extends JedisPubSub {
private final RedisMessenger parent;
public LPSub(RedisMessenger parent) {
this.parent = parent;
}
@@ -93,7 +102,7 @@ public class RedisMessagingService extends AbstractMessagingService {
if (!channel.equals(CHANNEL)) {
return;
}
this.parent.onMessage(msg, null);
this.parent.consumer.consumeIncomingMessageAsString(msg);
}
}
@@ -46,7 +46,7 @@ import me.lucko.luckperms.common.logging.Logger;
import me.lucko.luckperms.common.managers.group.GroupManager;
import me.lucko.luckperms.common.managers.track.TrackManager;
import me.lucko.luckperms.common.managers.user.UserManager;
import me.lucko.luckperms.common.messaging.ExtendedMessagingService;
import me.lucko.luckperms.common.messaging.InternalMessagingService;
import me.lucko.luckperms.common.model.User;
import me.lucko.luckperms.common.storage.Storage;
import me.lucko.luckperms.common.storage.dao.file.FileWatcher;
@@ -65,7 +65,8 @@ import java.util.UUID;
import java.util.stream.Stream;
/**
* Main internal interface for LuckPerms plugins, providing the base for abstraction throughout the project.
* Main internal interface for LuckPerms plugins, providing the base for
* abstraction throughout the project.
*
* All plugin platforms implement this interface.
*/
@@ -107,11 +108,18 @@ public interface LuckPermsPlugin {
Storage getStorage();
/**
* Gets the redis messaging instance.
* Gets the messaging service.
*
* @return the redis messaging service
* @return the messaging service
*/
Optional<ExtendedMessagingService> getMessagingService();
Optional<InternalMessagingService> getMessagingService();
/**
* Sets the messaging service.
*
* @param service the service
*/
void setMessagingService(InternalMessagingService service);
/**
* Gets a wrapped logger instance for the platform.