From da797f154d9358ab56cd27a8e359943a96320167 Mon Sep 17 00:00:00 2001 From: Luck Date: Sun, 18 Mar 2018 16:02:04 +0000 Subject: [PATCH] Implement Messaging Service using SQL (#534) --- bukkit/src/main/resources/config.yml | 3 + bungee/src/main/resources/config.yml | 4 + .../common/messaging/MessagingFactory.java | 41 ++++++- .../messaging/sql/AbstractSqlMessenger.java | 112 ++++++++++++++++++ .../common/messaging/sql/SqlMessenger.java | 85 +++++++++++++ .../common/storage/dao/sql/SqlDao.java | 4 + nukkit/src/main/resources/config.yml | 3 + sponge/src/main/resources/luckperms.conf | 3 + 8 files changed, 254 insertions(+), 1 deletion(-) create mode 100644 common/src/main/java/me/lucko/luckperms/common/messaging/sql/AbstractSqlMessenger.java create mode 100644 common/src/main/java/me/lucko/luckperms/common/messaging/sql/SqlMessenger.java diff --git a/bukkit/src/main/resources/config.yml b/bukkit/src/main/resources/config.yml index 648674d6..1f4cc1a3 100644 --- a/bukkit/src/main/resources/config.yml +++ b/bukkit/src/main/resources/config.yml @@ -197,6 +197,9 @@ watch-files: true # for LuckPerms to poll the database for changes. # # - Possible options: +# => sql Uses the SQL database to form a queue system for communication. Will only work when +# 'storage-method' is set to MySQL or MariaDB. This is chosen by default if the +# option is set to 'none' and SQL storage is in use. Set to 'notsql' to disable this. # => bungee Uses the plugin messaging channels to communicate with the proxy. # LuckPerms must be installed on your proxy & all connected servers backend servers. # Won't work if you have more than one BungeeCord proxy. diff --git a/bungee/src/main/resources/config.yml b/bungee/src/main/resources/config.yml index 391f174b..911ddfef 100644 --- a/bungee/src/main/resources/config.yml +++ b/bungee/src/main/resources/config.yml @@ -194,6 +194,10 @@ watch-files: true # for LuckPerms to poll the database for changes. # # - Possible options: +# => sql Uses the SQL database to form a queue system for communication. Will only work +# when 'storage-method' is set to MySQL or MariaDB. This is chosen by default if +# the option is set to 'none' and SQL storage is in use. Set to 'notsql' to +# disable this. # => bungee Uses the plugin messaging channels to communicate with the proxy. # LuckPerms must be installed on your proxy & all connected servers backend # servers. Won't work if you have more than one BungeeCord proxy. diff --git a/common/src/main/java/me/lucko/luckperms/common/messaging/MessagingFactory.java b/common/src/main/java/me/lucko/luckperms/common/messaging/MessagingFactory.java index 1d8fde96..76939527 100644 --- a/common/src/main/java/me/lucko/luckperms/common/messaging/MessagingFactory.java +++ b/common/src/main/java/me/lucko/luckperms/common/messaging/MessagingFactory.java @@ -25,12 +25,18 @@ package me.lucko.luckperms.common.messaging; +import com.google.common.base.Preconditions; + import me.lucko.luckperms.api.messenger.IncomingMessageConsumer; import me.lucko.luckperms.api.messenger.Messenger; import me.lucko.luckperms.api.messenger.MessengerProvider; import me.lucko.luckperms.common.config.ConfigKeys; import me.lucko.luckperms.common.messaging.redis.RedisMessenger; +import me.lucko.luckperms.common.messaging.sql.SqlMessenger; import me.lucko.luckperms.common.plugin.LuckPermsPlugin; +import me.lucko.luckperms.common.storage.dao.sql.SqlDao; +import me.lucko.luckperms.common.storage.dao.sql.connection.hikari.MariaDbConnectionFactory; +import me.lucko.luckperms.common.storage.dao.sql.connection.hikari.MySqlConnectionFactory; import javax.annotation.Nonnull; @@ -51,7 +57,14 @@ public class MessagingFactory

{ messagingType = "redis"; } - if (messagingType.equals("none")) { + if (messagingType.equals("none") && this.plugin.getStorage().getDao() instanceof SqlDao) { + SqlDao dao = (SqlDao) this.plugin.getStorage().getDao(); + if (dao.getProvider() instanceof MySqlConnectionFactory || dao.getProvider() instanceof MariaDbConnectionFactory) { + messagingType = "sql"; + } + } + + if (messagingType.equals("none") || messagingType.equals("notsql")) { return null; } @@ -77,6 +90,12 @@ public class MessagingFactory

{ } else { this.plugin.getLogger().warn("Messaging Service was set to redis, but redis is not enabled!"); } + } else if (messagingType.equals("sql")) { + try { + return new LuckPermsMessagingService(this.plugin, new SqlMessengerProvider()); + } catch (Exception e) { + e.printStackTrace(); + } } return null; @@ -99,4 +118,24 @@ public class MessagingFactory

{ } } + private class SqlMessengerProvider implements MessengerProvider { + + @Nonnull + @Override + public String getName() { + return "Sql"; + } + + @Nonnull + @Override + public Messenger obtain(@Nonnull IncomingMessageConsumer incomingMessageConsumer) { + SqlDao dao = (SqlDao) getPlugin().getStorage().getDao(); + Preconditions.checkState(dao.getProvider() instanceof MySqlConnectionFactory || dao.getProvider() instanceof MariaDbConnectionFactory, "not a supported sql type"); + + SqlMessenger sql = new SqlMessenger(getPlugin(), dao, incomingMessageConsumer); + sql.init(); + return sql; + } + } + } diff --git a/common/src/main/java/me/lucko/luckperms/common/messaging/sql/AbstractSqlMessenger.java b/common/src/main/java/me/lucko/luckperms/common/messaging/sql/AbstractSqlMessenger.java new file mode 100644 index 00000000..0911c137 --- /dev/null +++ b/common/src/main/java/me/lucko/luckperms/common/messaging/sql/AbstractSqlMessenger.java @@ -0,0 +1,112 @@ +/* + * This file is part of LuckPerms, licensed under the MIT License. + * + * Copyright (c) lucko (Luck) + * Copyright (c) contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package me.lucko.luckperms.common.messaging.sql; + +import me.lucko.luckperms.api.messenger.IncomingMessageConsumer; +import me.lucko.luckperms.api.messenger.Messenger; +import me.lucko.luckperms.api.messenger.message.OutgoingMessage; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import javax.annotation.Nonnull; + +/** + * An implementation of {@link Messenger} using SQL. + */ +public abstract class AbstractSqlMessenger implements Messenger { + + private final IncomingMessageConsumer consumer; + private long lastId = -1; + + protected AbstractSqlMessenger(IncomingMessageConsumer consumer) { + this.consumer = consumer; + } + + protected abstract Connection getConnection() throws SQLException; + + public void init() throws SQLException { + try (Connection c = getConnection()) { + // init table + try (PreparedStatement ps = c.prepareStatement("CREATE TABLE IF NOT EXISTS `luckperms_messages` (`id` INT AUTO_INCREMENT NOT NULL, `time` TIMESTAMP NOT NULL, `msg` TEXT NOT NULL, PRIMARY KEY (`id`))")) { + ps.execute(); + } + // pull last id + try (PreparedStatement ps = c.prepareStatement("SELECT MAX(`id`) as `latest` FROM `luckperms_messages`")) { + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + this.lastId = rs.getLong("latest"); + } + } + } + } + } + + @Override + public void sendOutgoingMessage(@Nonnull OutgoingMessage outgoingMessage) { + try (Connection c = getConnection()) { + try (PreparedStatement ps = c.prepareStatement("INSERT INTO luckperms_messages(`time`, `msg`) VALUES(NOW(), ?)")) { + ps.setString(1, outgoingMessage.asEncodedString()); + ps.execute(); + } + } catch (SQLException e) { + e.printStackTrace(); + } + } + + public void pollMessages() { + try (Connection c = getConnection()) { + try (PreparedStatement ps = c.prepareStatement("SELECT `id`, `msg` FROM luckperms_messages WHERE `id` > ? AND (NOW() - `time` > 60)")) { + ps.setLong(1, this.lastId); + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + long id = rs.getLong("id"); + this.lastId = Math.max(this.lastId, id); + + String message = rs.getString("msg"); + this.consumer.consumeIncomingMessageAsString(message); + } + } + } + } catch (SQLException e) { + e.printStackTrace(); + } + } + + public void runHousekeeping() { + try (Connection c = getConnection()) { + try (PreparedStatement ps = c.prepareStatement("DELETE FROM luckperms_messages WHERE (NOW() - `time` > 60)")) { + ps.execute(); + } + } catch (SQLException e) { + e.printStackTrace(); + } + } + + +} diff --git a/common/src/main/java/me/lucko/luckperms/common/messaging/sql/SqlMessenger.java b/common/src/main/java/me/lucko/luckperms/common/messaging/sql/SqlMessenger.java new file mode 100644 index 00000000..eaec1b1a --- /dev/null +++ b/common/src/main/java/me/lucko/luckperms/common/messaging/sql/SqlMessenger.java @@ -0,0 +1,85 @@ +/* + * This file is part of LuckPerms, licensed under the MIT License. + * + * Copyright (c) lucko (Luck) + * Copyright (c) contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package me.lucko.luckperms.common.messaging.sql; + +import me.lucko.luckperms.api.messenger.IncomingMessageConsumer; +import me.lucko.luckperms.common.plugin.LuckPermsPlugin; +import me.lucko.luckperms.common.plugin.SchedulerAdapter; +import me.lucko.luckperms.common.plugin.SchedulerTask; +import me.lucko.luckperms.common.storage.dao.sql.SqlDao; + +import java.sql.Connection; +import java.sql.SQLException; + +public class SqlMessenger extends AbstractSqlMessenger { + private final LuckPermsPlugin plugin; + private final SqlDao sqlDao; + + private SchedulerTask pollTask; + private SchedulerTask housekeepingTask; + + public SqlMessenger(LuckPermsPlugin plugin, SqlDao sqlDao, IncomingMessageConsumer consumer) { + super(consumer); + this.plugin = plugin; + this.sqlDao = sqlDao; + } + + @Override + public void init() { + try { + super.init(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + // schedule poll tasks + SchedulerAdapter scheduler = this.plugin.getBootstrap().getScheduler(); + this.pollTask = scheduler.asyncRepeating(this::pollMessages, 20L); + this.housekeepingTask = scheduler.asyncRepeating(this::runHousekeeping, 20L * 30); + } + + @Override + public void close() { + SchedulerTask task = this.pollTask; + if (task != null) { + task.cancel(); + } + task = this.housekeepingTask; + if (task != null) { + task.cancel(); + } + + this.pollTask = null; + this.housekeepingTask = null; + + super.close(); + } + + @Override + protected Connection getConnection() throws SQLException { + return this.sqlDao.getProvider().getConnection(); + } +} diff --git a/common/src/main/java/me/lucko/luckperms/common/storage/dao/sql/SqlDao.java b/common/src/main/java/me/lucko/luckperms/common/storage/dao/sql/SqlDao.java index 4b302671..b5f25bca 100644 --- a/common/src/main/java/me/lucko/luckperms/common/storage/dao/sql/SqlDao.java +++ b/common/src/main/java/me/lucko/luckperms/common/storage/dao/sql/SqlDao.java @@ -129,6 +129,10 @@ public class SqlDao extends AbstractDao { return this.gson; } + public AbstractConnectionFactory getProvider() { + return this.provider; + } + public Function getPrefix() { return this.prefix; } diff --git a/nukkit/src/main/resources/config.yml b/nukkit/src/main/resources/config.yml index 33a9bf73..be2de641 100644 --- a/nukkit/src/main/resources/config.yml +++ b/nukkit/src/main/resources/config.yml @@ -197,6 +197,9 @@ watch-files: true # for LuckPerms to poll the database for changes. # # - Possible options: +# => sql Uses the SQL database to form a queue system for communication. Will only work when +# 'storage-method' is set to MySQL or MariaDB. This is chosen by default if the +# option is set to 'none' and SQL storage is in use. Set to 'notsql' to disable this. # => redis Uses Redis pub-sub to push changes. Your server connection info must be configured # below. # => none Disables the service. diff --git a/sponge/src/main/resources/luckperms.conf b/sponge/src/main/resources/luckperms.conf index 6192d73e..d4870d86 100644 --- a/sponge/src/main/resources/luckperms.conf +++ b/sponge/src/main/resources/luckperms.conf @@ -201,6 +201,9 @@ watch-files = true # for LuckPerms to poll the database for changes. # # - Possible options: +# => sql Uses the SQL database to form a queue system for communication. Will only work when +# 'storage-method' is set to MySQL or MariaDB. This is chosen by default if the +# option is set to 'none' and SQL storage is in use. Set to 'notsql' to disable this. # => bungee Uses the plugin messaging channels to communicate with the proxy. # LuckPerms must be installed on your proxy & all connected servers backend servers. # Won't work if you have more than one BungeeCord proxy.