Implement BungeeCord & LilyPad messaging services - closes #142
This commit is contained in:
@@ -41,7 +41,7 @@ import me.lucko.luckperms.common.locale.LocaleManager;
|
||||
import me.lucko.luckperms.common.managers.GroupManager;
|
||||
import me.lucko.luckperms.common.managers.TrackManager;
|
||||
import me.lucko.luckperms.common.managers.UserManager;
|
||||
import me.lucko.luckperms.common.messaging.RedisMessaging;
|
||||
import me.lucko.luckperms.common.messaging.AbstractMessagingService;
|
||||
import me.lucko.luckperms.common.storage.Storage;
|
||||
import me.lucko.luckperms.common.utils.BufferedRequest;
|
||||
import me.lucko.luckperms.common.utils.DebugHandler;
|
||||
@@ -103,7 +103,7 @@ public interface LuckPermsPlugin {
|
||||
*
|
||||
* @return the redis messaging service
|
||||
*/
|
||||
RedisMessaging getRedisMessaging();
|
||||
AbstractMessagingService getMessagingService();
|
||||
|
||||
/**
|
||||
* Gets a wrapped logger instance for the platform.
|
||||
|
||||
@@ -135,7 +135,7 @@ public class ApiProvider implements LuckPermsApi {
|
||||
|
||||
@Override
|
||||
public Optional<MessagingService> getMessagingService() {
|
||||
return Optional.ofNullable(plugin.getRedisMessaging());
|
||||
return Optional.ofNullable(plugin.getMessagingService());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
+2
-2
@@ -44,8 +44,8 @@ public class NetworkSyncCommand extends SingleCommand {
|
||||
plugin.getUpdateTaskBuffer().request().join();
|
||||
Message.UPDATE_TASK_COMPLETE_NETWORK.send(sender);
|
||||
|
||||
if (plugin.getRedisMessaging() != null) {
|
||||
plugin.getRedisMessaging().pushUpdate();
|
||||
if (plugin.getMessagingService() != null) {
|
||||
plugin.getMessagingService().pushUpdate();
|
||||
Message.UPDATE_TASK_PUSH_SUCCESS.send(sender);
|
||||
} else {
|
||||
Message.UPDATE_TASK_PUSH_FAILURE.send(sender);
|
||||
|
||||
@@ -139,6 +139,7 @@ public class ConfigKeys {
|
||||
.put("log", c.getString("split-storage.methods.log", "h2"))
|
||||
.build();
|
||||
}));
|
||||
public static final ConfigKey<String> MESSAGING_SERVICE = EnduringKey.wrap(StringKey.of("messaging-service", "none"));
|
||||
public static final ConfigKey<Boolean> REDIS_ENABLED = EnduringKey.wrap(BooleanKey.of("redis.enabled", false));
|
||||
public static final ConfigKey<String> REDIS_ADDRESS = EnduringKey.wrap(StringKey.of("redis.address", null));
|
||||
public static final ConfigKey<String> REDIS_PASSWORD = EnduringKey.wrap(StringKey.of("redis.password", ""));
|
||||
|
||||
+103
@@ -0,0 +1,103 @@
|
||||
/*
|
||||
* Copyright (c) 2016 Lucko (Luck) <luck@lucko.me>
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the "Software"), to deal
|
||||
* in the Software without restriction, including without limitation the rights
|
||||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in all
|
||||
* copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
* SOFTWARE.
|
||||
*/
|
||||
|
||||
package me.lucko.luckperms.common.messaging;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
import me.lucko.luckperms.api.MessagingService;
|
||||
import me.lucko.luckperms.common.LuckPermsPlugin;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* An abstract implementation of {@link me.lucko.luckperms.api.MessagingService}.
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
public abstract class AbstractMessagingService implements MessagingService {
|
||||
public static final String CHANNEL = "lpuc";
|
||||
|
||||
private final LuckPermsPlugin plugin;
|
||||
private final String name;
|
||||
|
||||
private final Set<UUID> receivedMsgs = Collections.synchronizedSet(new HashSet<>());
|
||||
|
||||
public abstract void close();
|
||||
|
||||
protected abstract void sendMessage(String channel, String message);
|
||||
|
||||
protected void onMessage(String channel, String msg, Consumer<UUID> callback) {
|
||||
if (!channel.equals(CHANNEL)) {
|
||||
return;
|
||||
}
|
||||
|
||||
UUID uuid = parseUpdateMessage(msg);
|
||||
if (uuid == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!receivedMsgs.add(uuid)) {
|
||||
return;
|
||||
}
|
||||
|
||||
plugin.getLog().info("[" + name + " Messaging] Received update ping with id: " + uuid.toString());
|
||||
plugin.getUpdateTaskBuffer().request();
|
||||
|
||||
if (callback != null) {
|
||||
callback.accept(uuid);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushUpdate() {
|
||||
plugin.doAsync(() -> {
|
||||
UUID id = generateId();
|
||||
plugin.getLog().info("[" + name + " Messaging] Sending ping with id: " + id.toString());
|
||||
|
||||
sendMessage(CHANNEL, "update:" + id.toString());
|
||||
});
|
||||
}
|
||||
|
||||
private UUID generateId() {
|
||||
UUID uuid = UUID.randomUUID();
|
||||
receivedMsgs.add(uuid);
|
||||
return uuid;
|
||||
}
|
||||
|
||||
private static UUID parseUpdateMessage(String msg) {
|
||||
if (!msg.startsWith("update:")) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String requestId = msg.substring("update:".length());
|
||||
try {
|
||||
return UUID.fromString(requestId);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -24,7 +24,6 @@ package me.lucko.luckperms.common.messaging;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
import me.lucko.luckperms.api.MessagingService;
|
||||
import me.lucko.luckperms.common.LuckPermsPlugin;
|
||||
|
||||
import redis.clients.jedis.Jedis;
|
||||
@@ -32,22 +31,19 @@ import redis.clients.jedis.JedisPool;
|
||||
import redis.clients.jedis.JedisPoolConfig;
|
||||
import redis.clients.jedis.JedisPubSub;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Uses Redis to push/receive changes to/from other servers
|
||||
* An implementation of {@link me.lucko.luckperms.api.MessagingService} using Redis.
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
public class RedisMessaging implements MessagingService {
|
||||
private static final String CHANNEL = "luckperms";
|
||||
|
||||
public class RedisMessaging extends AbstractMessagingService {
|
||||
private final LuckPermsPlugin plugin;
|
||||
private JedisPool jedisPool;
|
||||
private LPSub sub;
|
||||
|
||||
public RedisMessaging(LuckPermsPlugin plugin) {
|
||||
super(plugin, "Redis");
|
||||
this.plugin = plugin;
|
||||
}
|
||||
|
||||
public void init(String address, String password) {
|
||||
String[] addressSplit = address.split(":");
|
||||
String host = addressSplit[0];
|
||||
@@ -60,7 +56,7 @@ public class RedisMessaging implements MessagingService {
|
||||
}
|
||||
|
||||
plugin.doAsync(() -> {
|
||||
sub = new LPSub(plugin);
|
||||
sub = new LPSub(this);
|
||||
try (Jedis jedis = jedisPool.getResource()) {
|
||||
jedis.subscribe(sub, CHANNEL);
|
||||
} catch (Exception e) {
|
||||
@@ -69,60 +65,28 @@ public class RedisMessaging implements MessagingService {
|
||||
});
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
@Override
|
||||
public void close() {
|
||||
sub.unsubscribe();
|
||||
jedisPool.destroy();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushUpdate() {
|
||||
plugin.doAsync(() -> {
|
||||
UUID id = sub.generateId();
|
||||
plugin.getLog().info("[Redis Messaging] Sending redis ping with id: " + id.toString());
|
||||
try (Jedis jedis = jedisPool.getResource()) {
|
||||
jedis.publish(CHANNEL, "update:" + id.toString());
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
protected void sendMessage(String channel, String message) {
|
||||
try (Jedis jedis = jedisPool.getResource()) {
|
||||
jedis.publish(CHANNEL, message);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@RequiredArgsConstructor
|
||||
private static class LPSub extends JedisPubSub {
|
||||
private final LuckPermsPlugin plugin;
|
||||
private final Set<UUID> receivedMsgs = Collections.synchronizedSet(new HashSet<>());
|
||||
|
||||
private UUID generateId() {
|
||||
UUID uuid = UUID.randomUUID();
|
||||
receivedMsgs.add(uuid);
|
||||
return uuid;
|
||||
}
|
||||
private final RedisMessaging parent;
|
||||
|
||||
@Override
|
||||
public void onMessage(String channel, String msg) {
|
||||
if (!channel.equals(CHANNEL)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!msg.startsWith("update:")) {
|
||||
return;
|
||||
}
|
||||
|
||||
String requestId = msg.substring("update:".length());
|
||||
UUID uuid;
|
||||
try {
|
||||
uuid = UUID.fromString(requestId);
|
||||
} catch (IllegalArgumentException e) {
|
||||
e.printStackTrace();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!receivedMsgs.add(uuid)) {
|
||||
return;
|
||||
}
|
||||
|
||||
plugin.getLog().info("[Redis Messaging] Received update ping with id: " + uuid.toString());
|
||||
plugin.getUpdateTaskBuffer().request();
|
||||
parent.onMessage(channel, msg, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user