From 01cb6cb08b7de56bc210e9f617f30bd788f2493c Mon Sep 17 00:00:00 2001 From: lionarius Date: Thu, 28 Nov 2024 05:49:23 +0300 Subject: [PATCH] lab11 --- build.gradle | 1 + .../impl/{queue => }/CommandProcessor.java | 28 ++----- .../impl/disruptor/CommandEvent.java | 19 +++++ .../impl/disruptor/CommandEventFactory.java | 10 +++ .../impl/disruptor/CommandEventHandler.java | 22 ++++++ .../disruptor/DisruptorCurrencyExchange.java | 76 +++++++++++++++++++ .../impl/queue/CommandProcessorTask.java | 41 ++++++++++ .../impl/queue/QueueCurrencyExchange.java | 10 ++- .../ConcurrentCurrencyExchangeTest.java | 2 + ...ncurrentDisruptorCurrencyExchangeTest.java | 19 +++++ .../ConcurrentPlainCurrencyExchangeTest.java | 2 + .../ConcurrentQueueCurrencyExchangeTest.java | 2 + .../{ => business}/CurrencyExchangeTest.java | 2 + .../DisruptorCurrencyExchangeTest.java | 19 +++++ .../PlainCurrencyExchangeTest.java | 2 + .../QueueCurrencyExchangeTest.java | 2 + .../{ => performance}/PerformanceTest.java | 73 +++++++++++++----- 17 files changed, 283 insertions(+), 47 deletions(-) rename src/main/java/ru/lionarius/impl/{queue => }/CommandProcessor.java (86%) create mode 100644 src/main/java/ru/lionarius/impl/disruptor/CommandEvent.java create mode 100644 src/main/java/ru/lionarius/impl/disruptor/CommandEventFactory.java create mode 100644 src/main/java/ru/lionarius/impl/disruptor/CommandEventHandler.java create mode 100644 src/main/java/ru/lionarius/impl/disruptor/DisruptorCurrencyExchange.java create mode 100644 src/main/java/ru/lionarius/impl/queue/CommandProcessorTask.java rename src/test/java/{ => business}/ConcurrentCurrencyExchangeTest.java (99%) create mode 100644 src/test/java/business/ConcurrentDisruptorCurrencyExchangeTest.java rename src/test/java/{ => business}/ConcurrentPlainCurrencyExchangeTest.java (96%) rename src/test/java/{ => business}/ConcurrentQueueCurrencyExchangeTest.java (96%) rename src/test/java/{ => business}/CurrencyExchangeTest.java (99%) create mode 100644 src/test/java/business/DisruptorCurrencyExchangeTest.java rename src/test/java/{ => business}/PlainCurrencyExchangeTest.java (96%) rename src/test/java/{ => business}/QueueCurrencyExchangeTest.java (96%) rename src/test/java/{ => performance}/PerformanceTest.java (65%) diff --git a/build.gradle b/build.gradle index 8a32d62..589ac65 100644 --- a/build.gradle +++ b/build.gradle @@ -14,6 +14,7 @@ dependencies { testImplementation 'org.junit.jupiter:junit-jupiter' implementation 'com.google.guava:guava:33.2.1-jre' + implementation 'com.lmax:disruptor:4.0.0' } test { diff --git a/src/main/java/ru/lionarius/impl/queue/CommandProcessor.java b/src/main/java/ru/lionarius/impl/CommandProcessor.java similarity index 86% rename from src/main/java/ru/lionarius/impl/queue/CommandProcessor.java rename to src/main/java/ru/lionarius/impl/CommandProcessor.java index 12c4329..f2f4345 100644 --- a/src/main/java/ru/lionarius/impl/queue/CommandProcessor.java +++ b/src/main/java/ru/lionarius/impl/CommandProcessor.java @@ -1,4 +1,4 @@ -package ru.lionarius.impl.queue; +package ru.lionarius.impl; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -8,43 +8,24 @@ import ru.lionarius.api.currency.CurrencyPair; import ru.lionarius.api.order.Order; import ru.lionarius.api.order.OrderData; import ru.lionarius.api.order.message.OrderClosedMessage; -import ru.lionarius.impl.InMemoryClientRepository; -import ru.lionarius.impl.OrderBook; import java.util.AbstractMap; import java.util.Map; import java.util.Set; -import java.util.concurrent.BlockingQueue; -public class CommandProcessor implements Runnable { - private final BlockingQueue> commandQueue; +public class CommandProcessor { private final ClientRepository clientRepository = new InMemoryClientRepository(); private final Map orderBooks; private final Set allowedPairs; - public CommandProcessor(Set allowedPairs, BlockingQueue> commandQueue) { - this.commandQueue = commandQueue; - + public CommandProcessor(Set allowedPairs) { this.allowedPairs = ImmutableSet.copyOf(allowedPairs); this.orderBooks = allowedPairs.stream() .map(pair -> new AbstractMap.SimpleEntry<>(pair, new OrderBook())) .collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); } - @Override - public void run() { - try { - while (true) { - var command = commandQueue.take(); - - processCommand(command); - } - } catch (InterruptedException e) { - System.out.println("Command processor exited"); - } - } - - private void processCommand(Command command) { + public void processCommand(Command command) { try { if (command instanceof CreateClientCommand) { processCreateClientCommand((CreateClientCommand) command); @@ -141,3 +122,4 @@ public class CommandProcessor implements Runnable { command.result().complete(result); } } + diff --git a/src/main/java/ru/lionarius/impl/disruptor/CommandEvent.java b/src/main/java/ru/lionarius/impl/disruptor/CommandEvent.java new file mode 100644 index 0000000..820f530 --- /dev/null +++ b/src/main/java/ru/lionarius/impl/disruptor/CommandEvent.java @@ -0,0 +1,19 @@ +package ru.lionarius.impl.disruptor; + +import ru.lionarius.api.command.Command; + +public class CommandEvent { + private Command command; + + public Command getCommand() { + return command; + } + + public void setCommand(Command command) { + this.command = command; + } + + void clear() { + this.command = null; + } +} diff --git a/src/main/java/ru/lionarius/impl/disruptor/CommandEventFactory.java b/src/main/java/ru/lionarius/impl/disruptor/CommandEventFactory.java new file mode 100644 index 0000000..ea8e877 --- /dev/null +++ b/src/main/java/ru/lionarius/impl/disruptor/CommandEventFactory.java @@ -0,0 +1,10 @@ +package ru.lionarius.impl.disruptor; + +import com.lmax.disruptor.EventFactory; + +public class CommandEventFactory implements EventFactory { + @Override + public CommandEvent newInstance() { + return new CommandEvent(); + } +} diff --git a/src/main/java/ru/lionarius/impl/disruptor/CommandEventHandler.java b/src/main/java/ru/lionarius/impl/disruptor/CommandEventHandler.java new file mode 100644 index 0000000..6cb0540 --- /dev/null +++ b/src/main/java/ru/lionarius/impl/disruptor/CommandEventHandler.java @@ -0,0 +1,22 @@ +package ru.lionarius.impl.disruptor; + +import com.lmax.disruptor.EventHandler; +import ru.lionarius.impl.CommandProcessor; + +public class CommandEventHandler implements EventHandler { + + private final CommandProcessor commandProcessor; + + public CommandEventHandler(CommandProcessor commandProcessor) { + this.commandProcessor = commandProcessor; + } + + @Override + public void onEvent(CommandEvent event, long sequence, boolean endOfBatch) throws Exception { + try { + commandProcessor.processCommand(event.getCommand()); + } finally { + event.clear(); + } + } +} diff --git a/src/main/java/ru/lionarius/impl/disruptor/DisruptorCurrencyExchange.java b/src/main/java/ru/lionarius/impl/disruptor/DisruptorCurrencyExchange.java new file mode 100644 index 0000000..4efe7d6 --- /dev/null +++ b/src/main/java/ru/lionarius/impl/disruptor/DisruptorCurrencyExchange.java @@ -0,0 +1,76 @@ +package ru.lionarius.impl.disruptor; + +import com.lmax.disruptor.*; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; +import ru.lionarius.api.CurrencyExchange; +import ru.lionarius.api.client.Client; +import ru.lionarius.api.command.*; +import ru.lionarius.api.currency.CurrencyPair; +import ru.lionarius.api.order.OrderType; +import ru.lionarius.api.order.OrderView; +import ru.lionarius.api.order.message.OrderMessage; +import ru.lionarius.impl.CommandProcessor; + +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; + +public class DisruptorCurrencyExchange implements CurrencyExchange { + private final Disruptor disruptor; + + public DisruptorCurrencyExchange(Set allowedPairs, int capacity) { + var commandProcessor = new CommandProcessor(allowedPairs); + + this.disruptor = new Disruptor<>(new CommandEventFactory(), capacity, Executors.defaultThreadFactory(), ProducerType.MULTI, new BusySpinWaitStrategy()); + this.disruptor.handleEventsWith(new CommandEventHandler(commandProcessor)); + disruptor.start(); + } + + private void publishCommand(Command command) { + disruptor.publishEvent((event, sequence) -> { + event.setCommand(command); + }); + } + + @Override + public CompletableFuture createClient(String name) { + var command = new CreateClientCommand(name); + publishCommand(command); + return command.result(); + } + + @Override + public CompletableFuture placeOrder(UUID clientId, CurrencyPair pair, OrderType type, double price, double quantity) { + var command = new PlaceOrderCommand(clientId, pair, type, price, quantity); + publishCommand(command); + return command.result(); + } + + @Override + public CompletableFuture cancelOrder(UUID clientId, UUID orderId) { + var command = new CancelOrderCommand(clientId, orderId); + publishCommand(command); + return command.result(); + } + + @Override + public CompletableFuture> getOrders(UUID clientId) { + var command = new GetOrdersCommand(clientId); + publishCommand(command); + return command.result(); + } + + @Override + public CompletableFuture> getOrderMessages(UUID clientId, UUID orderId) { + var command = new GetOrderMessagesCommand(clientId, orderId); + publishCommand(command); + return command.result(); + } + + public void shutdown() { + disruptor.shutdown(); + } +} diff --git a/src/main/java/ru/lionarius/impl/queue/CommandProcessorTask.java b/src/main/java/ru/lionarius/impl/queue/CommandProcessorTask.java new file mode 100644 index 0000000..2f21121 --- /dev/null +++ b/src/main/java/ru/lionarius/impl/queue/CommandProcessorTask.java @@ -0,0 +1,41 @@ +package ru.lionarius.impl.queue; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import ru.lionarius.api.client.ClientRepository; +import ru.lionarius.api.command.*; +import ru.lionarius.api.currency.CurrencyPair; +import ru.lionarius.api.order.Order; +import ru.lionarius.api.order.OrderData; +import ru.lionarius.api.order.message.OrderClosedMessage; +import ru.lionarius.impl.CommandProcessor; +import ru.lionarius.impl.InMemoryClientRepository; +import ru.lionarius.impl.OrderBook; + +import java.util.AbstractMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; + +public class CommandProcessorTask implements Runnable { + private final CommandProcessor commandProcessor; + private final BlockingQueue> commandQueue; + + public CommandProcessorTask(CommandProcessor commandProcessor, BlockingQueue> commandQueue) { + this.commandProcessor = commandProcessor; + this.commandQueue = commandQueue; + } + + @Override + public void run() { + try { + while (true) { + var command = commandQueue.take(); + + this.commandProcessor.processCommand(command); + } + } catch (InterruptedException e) { + // TODO: log + } + } +} diff --git a/src/main/java/ru/lionarius/impl/queue/QueueCurrencyExchange.java b/src/main/java/ru/lionarius/impl/queue/QueueCurrencyExchange.java index 9a0a890..57ca3a1 100644 --- a/src/main/java/ru/lionarius/impl/queue/QueueCurrencyExchange.java +++ b/src/main/java/ru/lionarius/impl/queue/QueueCurrencyExchange.java @@ -7,22 +7,24 @@ import ru.lionarius.api.currency.CurrencyPair; import ru.lionarius.api.order.OrderType; import ru.lionarius.api.order.OrderView; import ru.lionarius.api.order.message.OrderMessage; +import ru.lionarius.impl.CommandProcessor; -import java.util.*; +import java.util.List; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; public class QueueCurrencyExchange implements CurrencyExchange { private final BlockingQueue> commandQueue; - private final CommandProcessor commandProcessor; private final Thread commandProcessorThread; public QueueCurrencyExchange(Set allowedPairs) { this.commandQueue = new LinkedBlockingQueue<>(); - this.commandProcessor = new CommandProcessor(allowedPairs, commandQueue); - this.commandProcessorThread = new Thread(commandProcessor); + var commandProcessorTask = new CommandProcessorTask(new CommandProcessor(allowedPairs), commandQueue); + this.commandProcessorThread = new Thread(commandProcessorTask); this.commandProcessorThread.start(); } diff --git a/src/test/java/ConcurrentCurrencyExchangeTest.java b/src/test/java/business/ConcurrentCurrencyExchangeTest.java similarity index 99% rename from src/test/java/ConcurrentCurrencyExchangeTest.java rename to src/test/java/business/ConcurrentCurrencyExchangeTest.java index 1d8f00f..ba4bc74 100644 --- a/src/test/java/ConcurrentCurrencyExchangeTest.java +++ b/src/test/java/business/ConcurrentCurrencyExchangeTest.java @@ -1,3 +1,5 @@ +package business; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/src/test/java/business/ConcurrentDisruptorCurrencyExchangeTest.java b/src/test/java/business/ConcurrentDisruptorCurrencyExchangeTest.java new file mode 100644 index 0000000..5722abd --- /dev/null +++ b/src/test/java/business/ConcurrentDisruptorCurrencyExchangeTest.java @@ -0,0 +1,19 @@ +package business; + +import ru.lionarius.api.CurrencyExchange; +import ru.lionarius.api.currency.CurrencyPair; +import ru.lionarius.impl.disruptor.DisruptorCurrencyExchange; + +import java.util.Set; + +class ConcurrentDisruptorCurrencyExchangeTest extends ConcurrentCurrencyExchangeTest { + @Override + protected CurrencyExchange createExchange(Set pairs) { + return new DisruptorCurrencyExchange(pairs, 1024 * 16); + } + + @Override + protected void shutdownExchange(CurrencyExchange exchange) { + ((DisruptorCurrencyExchange) exchange).shutdown(); + } +} \ No newline at end of file diff --git a/src/test/java/ConcurrentPlainCurrencyExchangeTest.java b/src/test/java/business/ConcurrentPlainCurrencyExchangeTest.java similarity index 96% rename from src/test/java/ConcurrentPlainCurrencyExchangeTest.java rename to src/test/java/business/ConcurrentPlainCurrencyExchangeTest.java index d2eca77..a86b954 100644 --- a/src/test/java/ConcurrentPlainCurrencyExchangeTest.java +++ b/src/test/java/business/ConcurrentPlainCurrencyExchangeTest.java @@ -1,3 +1,5 @@ +package business; + import ru.lionarius.api.CurrencyExchange; import ru.lionarius.api.currency.CurrencyPair; import ru.lionarius.impl.plain.PlainCurrencyExchange; diff --git a/src/test/java/ConcurrentQueueCurrencyExchangeTest.java b/src/test/java/business/ConcurrentQueueCurrencyExchangeTest.java similarity index 96% rename from src/test/java/ConcurrentQueueCurrencyExchangeTest.java rename to src/test/java/business/ConcurrentQueueCurrencyExchangeTest.java index dde8df2..2af49e4 100644 --- a/src/test/java/ConcurrentQueueCurrencyExchangeTest.java +++ b/src/test/java/business/ConcurrentQueueCurrencyExchangeTest.java @@ -1,3 +1,5 @@ +package business; + import ru.lionarius.api.CurrencyExchange; import ru.lionarius.api.currency.CurrencyPair; import ru.lionarius.impl.queue.QueueCurrencyExchange; diff --git a/src/test/java/CurrencyExchangeTest.java b/src/test/java/business/CurrencyExchangeTest.java similarity index 99% rename from src/test/java/CurrencyExchangeTest.java rename to src/test/java/business/CurrencyExchangeTest.java index c20354b..a53881c 100644 --- a/src/test/java/CurrencyExchangeTest.java +++ b/src/test/java/business/CurrencyExchangeTest.java @@ -1,3 +1,5 @@ +package business; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/src/test/java/business/DisruptorCurrencyExchangeTest.java b/src/test/java/business/DisruptorCurrencyExchangeTest.java new file mode 100644 index 0000000..c6ed12c --- /dev/null +++ b/src/test/java/business/DisruptorCurrencyExchangeTest.java @@ -0,0 +1,19 @@ +package business; + +import ru.lionarius.api.CurrencyExchange; +import ru.lionarius.api.currency.CurrencyPair; +import ru.lionarius.impl.disruptor.DisruptorCurrencyExchange; + +import java.util.Set; + +class DisruptorCurrencyExchangeTest extends CurrencyExchangeTest { + @Override + protected CurrencyExchange createExchange(Set pairs) { + return new DisruptorCurrencyExchange(pairs, 1024 * 16); + } + + @Override + protected void shutdownExchange(CurrencyExchange exchange) { + ((DisruptorCurrencyExchange) exchange).shutdown(); + } +} diff --git a/src/test/java/PlainCurrencyExchangeTest.java b/src/test/java/business/PlainCurrencyExchangeTest.java similarity index 96% rename from src/test/java/PlainCurrencyExchangeTest.java rename to src/test/java/business/PlainCurrencyExchangeTest.java index 3324fa8..8ddccb1 100644 --- a/src/test/java/PlainCurrencyExchangeTest.java +++ b/src/test/java/business/PlainCurrencyExchangeTest.java @@ -1,3 +1,5 @@ +package business; + import ru.lionarius.api.CurrencyExchange; import ru.lionarius.api.currency.CurrencyPair; import ru.lionarius.impl.plain.PlainCurrencyExchange; diff --git a/src/test/java/QueueCurrencyExchangeTest.java b/src/test/java/business/QueueCurrencyExchangeTest.java similarity index 96% rename from src/test/java/QueueCurrencyExchangeTest.java rename to src/test/java/business/QueueCurrencyExchangeTest.java index ae1c3dd..ca5e64c 100644 --- a/src/test/java/QueueCurrencyExchangeTest.java +++ b/src/test/java/business/QueueCurrencyExchangeTest.java @@ -1,3 +1,5 @@ +package business; + import ru.lionarius.api.CurrencyExchange; import ru.lionarius.api.currency.CurrencyPair; import ru.lionarius.impl.queue.QueueCurrencyExchange; diff --git a/src/test/java/PerformanceTest.java b/src/test/java/performance/PerformanceTest.java similarity index 65% rename from src/test/java/PerformanceTest.java rename to src/test/java/performance/PerformanceTest.java index 5f45b76..488584f 100644 --- a/src/test/java/PerformanceTest.java +++ b/src/test/java/performance/PerformanceTest.java @@ -1,3 +1,6 @@ +package performance; + +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import ru.lionarius.api.CurrencyExchange; @@ -5,6 +8,7 @@ import ru.lionarius.api.client.Client; import ru.lionarius.api.currency.Currency; import ru.lionarius.api.currency.CurrencyPair; import ru.lionarius.api.order.OrderType; +import ru.lionarius.impl.disruptor.DisruptorCurrencyExchange; import ru.lionarius.impl.plain.PlainCurrencyExchange; import ru.lionarius.impl.queue.QueueCurrencyExchange; @@ -12,14 +16,18 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; +import java.util.function.Function; public class PerformanceTest { private Set pairs; private final int numTraders = 500; private final int numOrders = 1000; private ExecutorService executorService; - + @BeforeEach public void setUp() { var BTC = new Currency("BTC"); @@ -39,31 +47,56 @@ public class PerformanceTest { new CurrencyPair(RUB, BTC), new CurrencyPair(RUB, ETH) ); - + executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); } - + @Test void plainPerformanceTest() throws Exception { - var exchange = new PlainCurrencyExchange(pairs); - - var time = benchmark(numTraders, numOrders, exchange, pairs); - - System.out.printf("Plain implementation: %.2f orders/sec%n", time); + var orders = benchmark(numTraders, numOrders, pairs, PlainCurrencyExchange::new, null, false); + + System.out.printf("Plain implementation: %.2f orders/sec%n", orders); } @Test void queuePerformanceTest() throws Exception { - var exchange = new QueueCurrencyExchange(pairs); + var orders = benchmark(numTraders, numOrders, pairs, QueueCurrencyExchange::new, exchange -> { + try { + ((QueueCurrencyExchange) exchange).shutdown(); + } catch (InterruptedException e) { + Assertions.fail(e); + } + }, true); - var time = benchmark(numTraders, numOrders, exchange, pairs); - - exchange.shutdown(); - - System.out.printf("Queue implementation: %.2f orders/sec%n", time); + System.out.printf("Queue implementation: %.2f orders/sec%n", orders); } - - double benchmark(int numTraders, int numOrders, CurrencyExchange exchange, Set pairs) throws InterruptedException { + + @Test + void disruptorPerformanceTest() throws Exception { + var orders = benchmark(numTraders, numOrders, pairs, currencyPairs -> new DisruptorCurrencyExchange(currencyPairs, 1024 * 1024 * 8), exchange -> ((DisruptorCurrencyExchange) exchange).shutdown(), true); + + System.out.printf("Disruptor implementation: %.2f orders/sec%n", orders); + } + + double benchmark(int numTraders, int numOrders, Set pairs, Function, CurrencyExchange> exchangeFactory, Consumer shutdownExchange, boolean doWarmup) throws InterruptedException { + if (doWarmup) { + for (int i = 0; i < 5; i++) { + var exchange = exchangeFactory.apply(pairs); + benchmarkImpl(numTraders, numOrders, exchange, pairs); + if (shutdownExchange != null) + shutdownExchange.accept(exchange); + } + } + + var exchange = exchangeFactory.apply(pairs); + var orders = benchmarkImpl(numTraders, numOrders, exchange, pairs); + if (shutdownExchange != null) + shutdownExchange.accept(exchange); + + return orders; + } + + double benchmarkImpl(int numTraders, int numOrders, CurrencyExchange exchange, Set pairs) throws InterruptedException { var buyers = new ArrayList(); var sellers = new ArrayList(); @@ -97,11 +130,11 @@ public class PerformanceTest { var randomPair = pairs.get(random.nextInt(pairs.size())); var buyAmount = random.nextInt(100) + 1; var sellAmount = random.nextInt(100) + 1; - + var future = CompletableFuture.runAsync(() -> { exchange.placeOrder(seller.id(), randomPair, OrderType.SELL, buyAmount, sellAmount); }, executorService); - + futures.add(future); orders += 1; } @@ -116,7 +149,7 @@ public class PerformanceTest { var future = CompletableFuture.runAsync(() -> { exchange.placeOrder(buyer.id(), randomPair, OrderType.BUY, sellAmount, buyAmount); }, executorService); - + futures.add(future); orders += 1; }