From 47739344816560d482f88cc00ff3dce4b68d295f Mon Sep 17 00:00:00 2001 From: Luck Date: Wed, 14 Mar 2018 22:01:52 +0000 Subject: [PATCH] Fix race condition in the import process (#833) --- .../luckperms/common/backup/Importer.java | 65 +++++----- .../command/utils/StorageAssistant.java | 113 +++++++++++++----- .../commands/generic/parent/ParentAdd.java | 3 +- .../generic/parent/ParentAddTemp.java | 3 +- .../generic/parent/ParentClearTrack.java | 3 +- .../commands/generic/parent/ParentSet.java | 3 +- .../generic/parent/ParentSetTrack.java | 6 +- .../commands/group/GroupMainCommand.java | 17 +-- .../common/commands/track/TrackAppend.java | 3 +- .../common/commands/track/TrackInsert.java | 3 +- .../commands/track/TrackMainCommand.java | 10 +- .../common/commands/user/UserDemote.java | 3 +- .../common/commands/user/UserPromote.java | 3 +- 13 files changed, 134 insertions(+), 101 deletions(-) diff --git a/common/src/main/java/me/lucko/luckperms/common/backup/Importer.java b/common/src/main/java/me/lucko/luckperms/common/backup/Importer.java index b44b3e30..3ee360dc 100644 --- a/common/src/main/java/me/lucko/luckperms/common/backup/Importer.java +++ b/common/src/main/java/me/lucko/luckperms/common/backup/Importer.java @@ -26,21 +26,26 @@ package me.lucko.luckperms.common.backup; import com.google.common.base.Splitter; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.MultimapBuilder; import me.lucko.luckperms.common.command.CommandManager; import me.lucko.luckperms.common.command.CommandResult; import me.lucko.luckperms.common.locale.message.Message; import me.lucko.luckperms.common.sender.DummySender; import me.lucko.luckperms.common.sender.Sender; -import me.lucko.luckperms.common.utils.Cycle; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -54,8 +59,8 @@ public class Importer implements Runnable { private final CommandManager commandManager; private final Set notify; - private final List commands; - private final List toExecute; + private final List commandList; + private final List commands; public Importer(CommandManager commandManager, Sender executor, List commands) { this.commandManager = commandManager; @@ -65,7 +70,7 @@ public class Importer implements Runnable { } else { this.notify = ImmutableSet.of(executor, commandManager.getPlugin().getConsoleSender()); } - this.commands = commands.stream() + this.commandList = commands.stream() .map(String::trim) .filter(s -> !s.isEmpty()) .filter(s -> !s.startsWith("#")) @@ -73,7 +78,7 @@ public class Importer implements Runnable { .map(s -> s.startsWith("/luckperms ") ? s.substring("/luckperms ".length()) : s) .map(s -> s.startsWith("/lp ") ? s.substring("/lp ".length()) : s) .collect(Collectors.toList()); - this.toExecute = new ArrayList<>(); + this.commands = new ArrayList<>(); } @Override @@ -81,11 +86,14 @@ public class Importer implements Runnable { long startTime = System.currentTimeMillis(); this.notify.forEach(s -> Message.IMPORT_START.send(s)); + // start an update task in the background - we'll #join this later + CompletableFuture updateTask = CompletableFuture.runAsync(() -> this.commandManager.getPlugin().getUpdateTaskBuffer().requestDirectly()); + // form instances for all commands, and register them int index = 1; - for (String command : this.commands) { + for (String command : this.commandList) { ImportCommand cmd = new ImportCommand(this.commandManager, index, command); - this.toExecute.add(cmd); + this.commands.add(cmd); if (cmd.getCommand().startsWith("creategroup ") || cmd.getCommand().startsWith("createtrack ")) { cmd.process(); // process immediately @@ -94,37 +102,37 @@ public class Importer implements Runnable { index++; } - // divide commands up into pools - Cycle> commandPools = new Cycle<>(nInstances(128, ArrayList::new)); - - String lastTarget = null; - for (ImportCommand cmd : this.toExecute) { - // if the last target isn't the same, skip to a new pool - if (lastTarget == null || !lastTarget.equals(cmd.getTarget())) { - commandPools.next(); - } - - commandPools.current().add(cmd); - lastTarget = cmd.getTarget(); + // split data up into sections for each holder + // holder id --> commands + ListMultimap sections = MultimapBuilder.linkedHashKeys().arrayListValues().build(); + for (ImportCommand cmd : this.commands) { + String target = Strings.nullToEmpty(cmd.getTarget()); + sections.put(target, cmd); } + // join the update task future before scheduling command executions + updateTask.join(); + + // build a list of commands to be executed by each thread + ExecutorService executor = Executors.newFixedThreadPool(128); + // A set of futures, which are really just the threads we need to wait for. Set> futures = new HashSet<>(); AtomicInteger processedCount = new AtomicInteger(0); // iterate through each user sublist. - for (List subList : commandPools.getBacking()) { + for (Collection subList : sections.asMap().values()) { // register and start a new thread to process the sublist - futures.add(CompletableFuture.runAsync(() -> { + futures.add(CompletableFuture.completedFuture(subList).thenAcceptAsync(sl -> { // iterate through each user in the sublist, and grab their data. - for (ImportCommand cmd : subList) { + for (ImportCommand cmd : sl) { cmd.process(); processedCount.incrementAndGet(); } - }, this.commandManager.getPlugin().getBootstrap().getScheduler().async())); + }, executor)); } // all of the threads have been scheduled now and are running. we just need to wait for them all to complete @@ -132,7 +140,7 @@ public class Importer implements Runnable { while (true) { try { - overallFuture.get(10, TimeUnit.SECONDS); + overallFuture.get(2, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException e) { // abnormal error - just break e.printStackTrace(); @@ -147,11 +155,12 @@ public class Importer implements Runnable { break; } + executor.shutdown(); long endTime = System.currentTimeMillis(); double seconds = (endTime - startTime) / 1000; - int errors = (int) this.toExecute.stream().filter(v -> v.getResult().wasFailure()).count(); + int errors = (int) this.commands.stream().filter(v -> v.getResult().wasFailure()).count(); switch (errors) { case 0: @@ -166,7 +175,7 @@ public class Importer implements Runnable { } AtomicInteger errIndex = new AtomicInteger(1); - for (ImportCommand e : this.toExecute) { + for (ImportCommand e : this.commands) { if (e.getResult() != null && e.getResult().wasFailure()) { this.notify.forEach(s -> { Message.IMPORT_END_ERROR_HEADER.send(s, errIndex.get(), e.getId(), e.getCommand(), e.getResult().toString()); @@ -182,8 +191,8 @@ public class Importer implements Runnable { } private void sendProgress(int processedCount) { - int percent = (processedCount * 100) / this.commands.size(); - int errors = (int) this.toExecute.stream().filter(v -> v.isCompleted() && v.getResult().wasFailure()).count(); + int percent = (processedCount * 100) / this.commandList.size(); + int errors = (int) this.commands.stream().filter(v -> v.isCompleted() && v.getResult().wasFailure()).count(); if (errors == 1) { this.notify.forEach(s -> Message.IMPORT_PROGRESS_SIN.send(s, percent, processedCount, this.commands.size(), errors)); diff --git a/common/src/main/java/me/lucko/luckperms/common/command/utils/StorageAssistant.java b/common/src/main/java/me/lucko/luckperms/common/command/utils/StorageAssistant.java index c61795f9..a5ab7ce2 100644 --- a/common/src/main/java/me/lucko/luckperms/common/command/utils/StorageAssistant.java +++ b/common/src/main/java/me/lucko/luckperms/common/command/utils/StorageAssistant.java @@ -42,7 +42,64 @@ import java.util.Optional; */ public final class StorageAssistant { + public static Group loadGroup(String target, Sender sender, LuckPermsPlugin plugin, boolean auditTemporary) { + // special handling for the importer + if (sender.isImport()) { + Group group = plugin.getGroupManager().getIfLoaded(target); + if (group == null) { + Message.GROUP_NOT_FOUND.send(sender, target); + } + return group; + } + + Group group = plugin.getStorage().loadGroup(target).join().orElse(null); + if (group == null) { + // failed to load, but it might be a display name. + group = plugin.getGroupManager().getByDisplayName(target); + + // nope, not a display name + if (group == null) { + Message.GROUP_NOT_FOUND.send(sender, target); + return null; + } + + // it was a display name, we need to reload + plugin.getStorage().loadGroup(group.getName()).join(); + } + + if (auditTemporary) { + group.auditTemporaryPermissions(); + } + + return group; + } + + public static Track loadTrack(String target, Sender sender, LuckPermsPlugin plugin) { + Track track; + + // special handling for the importer + if (sender.isImport()) { + track = plugin.getTrackManager().getIfLoaded(target); + } else { + track = plugin.getStorage().loadTrack(target).join().orElse(null); + } + + if (track == null) { + Message.TRACK_NOT_FOUND.send(sender, target); + return null; + } + + return track; + } + public static void save(User user, Sender sender, LuckPermsPlugin plugin) { + // special handling for the importer + if (sender.isImport()) { + // join calls to save users - as we always load them + plugin.getStorage().saveUser(user).join(); + return; + } + try { plugin.getStorage().noBuffer().saveUser(user).get(); } catch (Exception e) { @@ -51,21 +108,22 @@ public final class StorageAssistant { return; } - if (sender.isImport()) { - user.getRefreshBuffer().request(); - } else { - user.getRefreshBuffer().requestDirectly(); - } + user.getRefreshBuffer().requestDirectly(); - if (!sender.isImport()) { - Optional messagingService = plugin.getMessagingService(); - if (messagingService.isPresent() && plugin.getConfiguration().get(ConfigKeys.AUTO_PUSH_UPDATES)) { - messagingService.get().pushUserUpdate(user); - } + Optional messagingService = plugin.getMessagingService(); + if (messagingService.isPresent() && plugin.getConfiguration().get(ConfigKeys.AUTO_PUSH_UPDATES)) { + messagingService.get().pushUserUpdate(user); } } public static void save(Group group, Sender sender, LuckPermsPlugin plugin) { + // special handling for the importer + if (sender.isImport()) { + // allow the buffer to handle things + plugin.getStorage().saveGroup(group); + return; + } + try { plugin.getStorage().noBuffer().saveGroup(group).get(); } catch (Exception e) { @@ -74,21 +132,22 @@ public final class StorageAssistant { return; } - if (sender.isImport()) { - plugin.getUpdateTaskBuffer().request(); - } else { - plugin.getUpdateTaskBuffer().requestDirectly(); - } + plugin.getUpdateTaskBuffer().requestDirectly(); - if (!sender.isImport()) { - Optional messagingService = plugin.getMessagingService(); - if (messagingService.isPresent() && plugin.getConfiguration().get(ConfigKeys.AUTO_PUSH_UPDATES)) { - messagingService.get().getUpdateBuffer().request(); - } + Optional messagingService = plugin.getMessagingService(); + if (messagingService.isPresent() && plugin.getConfiguration().get(ConfigKeys.AUTO_PUSH_UPDATES)) { + messagingService.get().getUpdateBuffer().request(); } } public static void save(Track track, Sender sender, LuckPermsPlugin plugin) { + // special handling for the importer + if (sender.isImport()) { + // allow the buffer to handle things + plugin.getStorage().saveTrack(track); + return; + } + try { plugin.getStorage().noBuffer().saveTrack(track).get(); } catch (Exception e) { @@ -97,17 +156,11 @@ public final class StorageAssistant { return; } - if (sender.isImport()) { - plugin.getUpdateTaskBuffer().request(); - } else { - plugin.getUpdateTaskBuffer().requestDirectly(); - } + plugin.getUpdateTaskBuffer().requestDirectly(); - if (!sender.isImport()) { - Optional messagingService = plugin.getMessagingService(); - if (messagingService.isPresent() && plugin.getConfiguration().get(ConfigKeys.AUTO_PUSH_UPDATES)) { - messagingService.get().getUpdateBuffer().request(); - } + Optional messagingService = plugin.getMessagingService(); + if (messagingService.isPresent() && plugin.getConfiguration().get(ConfigKeys.AUTO_PUSH_UPDATES)) { + messagingService.get().getUpdateBuffer().request(); } } diff --git a/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentAdd.java b/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentAdd.java index 53e0cc5b..2ad96fea 100644 --- a/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentAdd.java +++ b/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentAdd.java @@ -65,9 +65,8 @@ public class ParentAdd extends SharedSubCommand { String groupName = ArgumentParser.parseName(0, args); MutableContextSet context = ArgumentParser.parseContext(1, args, plugin); - Group group = plugin.getStorage().loadGroup(groupName).join().orElse(null); + Group group = StorageAssistant.loadGroup(groupName, sender, plugin, false); if (group == null) { - Message.DOES_NOT_EXIST.send(sender, groupName); return CommandResult.INVALID_ARGS; } diff --git a/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentAddTemp.java b/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentAddTemp.java index 0614bf11..22056e50 100644 --- a/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentAddTemp.java +++ b/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentAddTemp.java @@ -72,9 +72,8 @@ public class ParentAddTemp extends SharedSubCommand { TemporaryModifier modifier = ArgumentParser.parseTemporaryModifier(2, args).orElseGet(() -> plugin.getConfiguration().get(ConfigKeys.TEMPORARY_ADD_BEHAVIOUR)); MutableContextSet context = ArgumentParser.parseContext(2, args, plugin); - Group group = plugin.getStorage().loadGroup(groupName).join().orElse(null); + Group group = StorageAssistant.loadGroup(groupName, sender, plugin, false); if (group == null) { - Message.DOES_NOT_EXIST.send(sender, groupName); return CommandResult.INVALID_ARGS; } diff --git a/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentClearTrack.java b/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentClearTrack.java index 6a02941a..571c2d86 100644 --- a/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentClearTrack.java +++ b/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentClearTrack.java @@ -68,9 +68,8 @@ public class ParentClearTrack extends SharedSubCommand { return CommandResult.INVALID_ARGS; } - Track track = plugin.getStorage().loadTrack(trackName).join().orElse(null); + Track track = StorageAssistant.loadTrack(trackName, sender, plugin); if (track == null) { - Message.DOES_NOT_EXIST.send(sender, trackName); return CommandResult.LOADING_ERROR; } diff --git a/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentSet.java b/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentSet.java index 8819a056..677f0392 100644 --- a/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentSet.java +++ b/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentSet.java @@ -65,9 +65,8 @@ public class ParentSet extends SharedSubCommand { String groupName = ArgumentParser.parseName(0, args); MutableContextSet context = ArgumentParser.parseContext(1, args, plugin); - Group group = plugin.getStorage().loadGroup(groupName).join().orElse(null); + Group group = StorageAssistant.loadGroup(groupName, sender, plugin, false); if (group == null) { - Message.DOES_NOT_EXIST.send(sender, groupName); return CommandResult.LOADING_ERROR; } diff --git a/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentSetTrack.java b/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentSetTrack.java index c55cf2ad..5fc605e4 100644 --- a/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentSetTrack.java +++ b/common/src/main/java/me/lucko/luckperms/common/commands/generic/parent/ParentSetTrack.java @@ -70,9 +70,8 @@ public class ParentSetTrack extends SharedSubCommand { return CommandResult.INVALID_ARGS; } - Track track = plugin.getStorage().loadTrack(trackName).join().orElse(null); + Track track = StorageAssistant.loadTrack(trackName, sender, plugin); if (track == null) { - Message.DOES_NOT_EXIST.send(sender, trackName); return CommandResult.LOADING_ERROR; } @@ -100,9 +99,8 @@ public class ParentSetTrack extends SharedSubCommand { MutableContextSet context = ArgumentParser.parseContext(2, args, plugin); - Group group = plugin.getStorage().loadGroup(groupName).join().orElse(null); + Group group = StorageAssistant.loadGroup(groupName, sender, plugin, false); if (group == null) { - Message.DOES_NOT_EXIST.send(sender, groupName); return CommandResult.LOADING_ERROR; } diff --git a/common/src/main/java/me/lucko/luckperms/common/commands/group/GroupMainCommand.java b/common/src/main/java/me/lucko/luckperms/common/commands/group/GroupMainCommand.java index fec05236..d858ee7a 100644 --- a/common/src/main/java/me/lucko/luckperms/common/commands/group/GroupMainCommand.java +++ b/common/src/main/java/me/lucko/luckperms/common/commands/group/GroupMainCommand.java @@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableList; import me.lucko.luckperms.common.command.abstraction.Command; import me.lucko.luckperms.common.command.abstraction.MainCommand; +import me.lucko.luckperms.common.command.utils.StorageAssistant; import me.lucko.luckperms.common.commands.generic.meta.CommandMeta; import me.lucko.luckperms.common.commands.generic.other.HolderClear; import me.lucko.luckperms.common.commands.generic.other.HolderEditor; @@ -39,7 +40,6 @@ import me.lucko.luckperms.common.commands.generic.parent.CommandParent; import me.lucko.luckperms.common.commands.generic.permission.CommandPermission; import me.lucko.luckperms.common.locale.LocaleManager; import me.lucko.luckperms.common.locale.command.CommandSpec; -import me.lucko.luckperms.common.locale.message.Message; import me.lucko.luckperms.common.model.Group; import me.lucko.luckperms.common.plugin.LuckPermsPlugin; import me.lucko.luckperms.common.sender.Sender; @@ -84,20 +84,7 @@ public class GroupMainCommand extends MainCommand { @Override protected Group getTarget(String target, LuckPermsPlugin plugin, Sender sender) { - Group group = plugin.getStorage().loadGroup(target).join().orElse(null); - if (group == null) { - // failed to load, but it might be a display name. - group = plugin.getGroupManager().getByDisplayName(target); - - // nope, not a display name - if (group == null) { - Message.GROUP_NOT_FOUND.send(sender, target); - return null; - } - } - - group.auditTemporaryPermissions(); - return group; + return StorageAssistant.loadGroup(target, sender, plugin, true); } @Override diff --git a/common/src/main/java/me/lucko/luckperms/common/commands/track/TrackAppend.java b/common/src/main/java/me/lucko/luckperms/common/commands/track/TrackAppend.java index 44f5390f..fbf0c8bf 100644 --- a/common/src/main/java/me/lucko/luckperms/common/commands/track/TrackAppend.java +++ b/common/src/main/java/me/lucko/luckperms/common/commands/track/TrackAppend.java @@ -58,9 +58,8 @@ public class TrackAppend extends SubCommand { return CommandResult.INVALID_ARGS; } - Group group = plugin.getStorage().loadGroup(groupName).join().orElse(null); + Group group = StorageAssistant.loadGroup(groupName, sender, plugin, false); if (group == null) { - Message.DOES_NOT_EXIST.send(sender, groupName); return CommandResult.LOADING_ERROR; } diff --git a/common/src/main/java/me/lucko/luckperms/common/commands/track/TrackInsert.java b/common/src/main/java/me/lucko/luckperms/common/commands/track/TrackInsert.java index 72197df4..08fd21dc 100644 --- a/common/src/main/java/me/lucko/luckperms/common/commands/track/TrackInsert.java +++ b/common/src/main/java/me/lucko/luckperms/common/commands/track/TrackInsert.java @@ -66,9 +66,8 @@ public class TrackInsert extends SubCommand { return CommandResult.INVALID_ARGS; } - Group group = plugin.getStorage().loadGroup(groupName).join().orElse(null); + Group group = StorageAssistant.loadGroup(groupName, sender, plugin, false); if (group == null) { - Message.DOES_NOT_EXIST.send(sender, groupName); return CommandResult.LOADING_ERROR; } diff --git a/common/src/main/java/me/lucko/luckperms/common/commands/track/TrackMainCommand.java b/common/src/main/java/me/lucko/luckperms/common/commands/track/TrackMainCommand.java index fb7e3d12..d98c9418 100644 --- a/common/src/main/java/me/lucko/luckperms/common/commands/track/TrackMainCommand.java +++ b/common/src/main/java/me/lucko/luckperms/common/commands/track/TrackMainCommand.java @@ -31,9 +31,9 @@ import com.google.common.collect.ImmutableList; import me.lucko.luckperms.common.command.abstraction.Command; import me.lucko.luckperms.common.command.abstraction.MainCommand; +import me.lucko.luckperms.common.command.utils.StorageAssistant; import me.lucko.luckperms.common.locale.LocaleManager; import me.lucko.luckperms.common.locale.command.CommandSpec; -import me.lucko.luckperms.common.locale.message.Message; import me.lucko.luckperms.common.model.Track; import me.lucko.luckperms.common.plugin.LuckPermsPlugin; import me.lucko.luckperms.common.sender.Sender; @@ -73,13 +73,7 @@ public class TrackMainCommand extends MainCommand { @Override protected Track getTarget(String target, LuckPermsPlugin plugin, Sender sender) { - Track track = plugin.getStorage().loadTrack(target).join().orElse(null); - if (track == null) { - Message.TRACK_NOT_FOUND.send(sender, target); - return null; - } - - return track; + return StorageAssistant.loadTrack(target, sender, plugin); } @Override diff --git a/common/src/main/java/me/lucko/luckperms/common/commands/user/UserDemote.java b/common/src/main/java/me/lucko/luckperms/common/commands/user/UserDemote.java index 4b133333..f4c00b31 100644 --- a/common/src/main/java/me/lucko/luckperms/common/commands/user/UserDemote.java +++ b/common/src/main/java/me/lucko/luckperms/common/commands/user/UserDemote.java @@ -73,9 +73,8 @@ public class UserDemote extends SubCommand { return CommandResult.INVALID_ARGS; } - Track track = plugin.getStorage().loadTrack(trackName).join().orElse(null); + Track track = StorageAssistant.loadTrack(trackName, sender, plugin); if (track == null) { - Message.DOES_NOT_EXIST.send(sender, trackName); return CommandResult.LOADING_ERROR; } diff --git a/common/src/main/java/me/lucko/luckperms/common/commands/user/UserPromote.java b/common/src/main/java/me/lucko/luckperms/common/commands/user/UserPromote.java index a0281042..541c9e77 100644 --- a/common/src/main/java/me/lucko/luckperms/common/commands/user/UserPromote.java +++ b/common/src/main/java/me/lucko/luckperms/common/commands/user/UserPromote.java @@ -71,9 +71,8 @@ public class UserPromote extends SubCommand { return CommandResult.INVALID_ARGS; } - Track track = plugin.getStorage().loadTrack(trackName).join().orElse(null); + Track track = StorageAssistant.loadTrack(trackName, sender, plugin); if (track == null) { - Message.DOES_NOT_EXIST.send(sender, trackName); return CommandResult.LOADING_ERROR; }