1
0
This commit is contained in:
2024-11-28 05:49:23 +03:00
parent bb3dc3cb1e
commit 01cb6cb08b
17 changed files with 283 additions and 47 deletions

View File

@@ -14,6 +14,7 @@ dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter' testImplementation 'org.junit.jupiter:junit-jupiter'
implementation 'com.google.guava:guava:33.2.1-jre' implementation 'com.google.guava:guava:33.2.1-jre'
implementation 'com.lmax:disruptor:4.0.0'
} }
test { test {

View File

@@ -1,4 +1,4 @@
package ru.lionarius.impl.queue; package ru.lionarius.impl;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
@@ -8,43 +8,24 @@ import ru.lionarius.api.currency.CurrencyPair;
import ru.lionarius.api.order.Order; import ru.lionarius.api.order.Order;
import ru.lionarius.api.order.OrderData; import ru.lionarius.api.order.OrderData;
import ru.lionarius.api.order.message.OrderClosedMessage; import ru.lionarius.api.order.message.OrderClosedMessage;
import ru.lionarius.impl.InMemoryClientRepository;
import ru.lionarius.impl.OrderBook;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue;
public class CommandProcessor implements Runnable { public class CommandProcessor {
private final BlockingQueue<Command<?>> commandQueue;
private final ClientRepository clientRepository = new InMemoryClientRepository(); private final ClientRepository clientRepository = new InMemoryClientRepository();
private final Map<CurrencyPair, OrderBook> orderBooks; private final Map<CurrencyPair, OrderBook> orderBooks;
private final Set<CurrencyPair> allowedPairs; private final Set<CurrencyPair> allowedPairs;
public CommandProcessor(Set<CurrencyPair> allowedPairs, BlockingQueue<Command<?>> commandQueue) { public CommandProcessor(Set<CurrencyPair> allowedPairs) {
this.commandQueue = commandQueue;
this.allowedPairs = ImmutableSet.copyOf(allowedPairs); this.allowedPairs = ImmutableSet.copyOf(allowedPairs);
this.orderBooks = allowedPairs.stream() this.orderBooks = allowedPairs.stream()
.map(pair -> new AbstractMap.SimpleEntry<>(pair, new OrderBook())) .map(pair -> new AbstractMap.SimpleEntry<>(pair, new OrderBook()))
.collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)); .collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
} }
@Override public void processCommand(Command<?> command) {
public void run() {
try {
while (true) {
var command = commandQueue.take();
processCommand(command);
}
} catch (InterruptedException e) {
System.out.println("Command processor exited");
}
}
private void processCommand(Command<?> command) {
try { try {
if (command instanceof CreateClientCommand) { if (command instanceof CreateClientCommand) {
processCreateClientCommand((CreateClientCommand) command); processCreateClientCommand((CreateClientCommand) command);
@@ -141,3 +122,4 @@ public class CommandProcessor implements Runnable {
command.result().complete(result); command.result().complete(result);
} }
} }

View File

@@ -0,0 +1,19 @@
package ru.lionarius.impl.disruptor;
import ru.lionarius.api.command.Command;
public class CommandEvent {
private Command<?> command;
public Command<?> getCommand() {
return command;
}
public void setCommand(Command<?> command) {
this.command = command;
}
void clear() {
this.command = null;
}
}

View File

@@ -0,0 +1,10 @@
package ru.lionarius.impl.disruptor;
import com.lmax.disruptor.EventFactory;
public class CommandEventFactory implements EventFactory<CommandEvent> {
@Override
public CommandEvent newInstance() {
return new CommandEvent();
}
}

View File

@@ -0,0 +1,22 @@
package ru.lionarius.impl.disruptor;
import com.lmax.disruptor.EventHandler;
import ru.lionarius.impl.CommandProcessor;
public class CommandEventHandler implements EventHandler<CommandEvent> {
private final CommandProcessor commandProcessor;
public CommandEventHandler(CommandProcessor commandProcessor) {
this.commandProcessor = commandProcessor;
}
@Override
public void onEvent(CommandEvent event, long sequence, boolean endOfBatch) throws Exception {
try {
commandProcessor.processCommand(event.getCommand());
} finally {
event.clear();
}
}
}

View File

@@ -0,0 +1,76 @@
package ru.lionarius.impl.disruptor;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import ru.lionarius.api.CurrencyExchange;
import ru.lionarius.api.client.Client;
import ru.lionarius.api.command.*;
import ru.lionarius.api.currency.CurrencyPair;
import ru.lionarius.api.order.OrderType;
import ru.lionarius.api.order.OrderView;
import ru.lionarius.api.order.message.OrderMessage;
import ru.lionarius.impl.CommandProcessor;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
public class DisruptorCurrencyExchange implements CurrencyExchange {
private final Disruptor<CommandEvent> disruptor;
public DisruptorCurrencyExchange(Set<CurrencyPair> allowedPairs, int capacity) {
var commandProcessor = new CommandProcessor(allowedPairs);
this.disruptor = new Disruptor<>(new CommandEventFactory(), capacity, Executors.defaultThreadFactory(), ProducerType.MULTI, new BusySpinWaitStrategy());
this.disruptor.handleEventsWith(new CommandEventHandler(commandProcessor));
disruptor.start();
}
private void publishCommand(Command<?> command) {
disruptor.publishEvent((event, sequence) -> {
event.setCommand(command);
});
}
@Override
public CompletableFuture<Client> createClient(String name) {
var command = new CreateClientCommand(name);
publishCommand(command);
return command.result();
}
@Override
public CompletableFuture<UUID> placeOrder(UUID clientId, CurrencyPair pair, OrderType type, double price, double quantity) {
var command = new PlaceOrderCommand(clientId, pair, type, price, quantity);
publishCommand(command);
return command.result();
}
@Override
public CompletableFuture<Void> cancelOrder(UUID clientId, UUID orderId) {
var command = new CancelOrderCommand(clientId, orderId);
publishCommand(command);
return command.result();
}
@Override
public CompletableFuture<List<OrderView>> getOrders(UUID clientId) {
var command = new GetOrdersCommand(clientId);
publishCommand(command);
return command.result();
}
@Override
public CompletableFuture<List<OrderMessage>> getOrderMessages(UUID clientId, UUID orderId) {
var command = new GetOrderMessagesCommand(clientId, orderId);
publishCommand(command);
return command.result();
}
public void shutdown() {
disruptor.shutdown();
}
}

View File

@@ -0,0 +1,41 @@
package ru.lionarius.impl.queue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import ru.lionarius.api.client.ClientRepository;
import ru.lionarius.api.command.*;
import ru.lionarius.api.currency.CurrencyPair;
import ru.lionarius.api.order.Order;
import ru.lionarius.api.order.OrderData;
import ru.lionarius.api.order.message.OrderClosedMessage;
import ru.lionarius.impl.CommandProcessor;
import ru.lionarius.impl.InMemoryClientRepository;
import ru.lionarius.impl.OrderBook;
import java.util.AbstractMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
public class CommandProcessorTask implements Runnable {
private final CommandProcessor commandProcessor;
private final BlockingQueue<Command<?>> commandQueue;
public CommandProcessorTask(CommandProcessor commandProcessor, BlockingQueue<Command<?>> commandQueue) {
this.commandProcessor = commandProcessor;
this.commandQueue = commandQueue;
}
@Override
public void run() {
try {
while (true) {
var command = commandQueue.take();
this.commandProcessor.processCommand(command);
}
} catch (InterruptedException e) {
// TODO: log
}
}
}

View File

@@ -7,22 +7,24 @@ import ru.lionarius.api.currency.CurrencyPair;
import ru.lionarius.api.order.OrderType; import ru.lionarius.api.order.OrderType;
import ru.lionarius.api.order.OrderView; import ru.lionarius.api.order.OrderView;
import ru.lionarius.api.order.message.OrderMessage; import ru.lionarius.api.order.message.OrderMessage;
import ru.lionarius.impl.CommandProcessor;
import java.util.*; import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
public class QueueCurrencyExchange implements CurrencyExchange { public class QueueCurrencyExchange implements CurrencyExchange {
private final BlockingQueue<Command<?>> commandQueue; private final BlockingQueue<Command<?>> commandQueue;
private final CommandProcessor commandProcessor;
private final Thread commandProcessorThread; private final Thread commandProcessorThread;
public QueueCurrencyExchange(Set<CurrencyPair> allowedPairs) { public QueueCurrencyExchange(Set<CurrencyPair> allowedPairs) {
this.commandQueue = new LinkedBlockingQueue<>(); this.commandQueue = new LinkedBlockingQueue<>();
this.commandProcessor = new CommandProcessor(allowedPairs, commandQueue);
this.commandProcessorThread = new Thread(commandProcessor); var commandProcessorTask = new CommandProcessorTask(new CommandProcessor(allowedPairs), commandQueue);
this.commandProcessorThread = new Thread(commandProcessorTask);
this.commandProcessorThread.start(); this.commandProcessorThread.start();
} }

View File

@@ -1,3 +1,5 @@
package business;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;

View File

@@ -0,0 +1,19 @@
package business;
import ru.lionarius.api.CurrencyExchange;
import ru.lionarius.api.currency.CurrencyPair;
import ru.lionarius.impl.disruptor.DisruptorCurrencyExchange;
import java.util.Set;
class ConcurrentDisruptorCurrencyExchangeTest extends ConcurrentCurrencyExchangeTest {
@Override
protected CurrencyExchange createExchange(Set<CurrencyPair> pairs) {
return new DisruptorCurrencyExchange(pairs, 1024 * 16);
}
@Override
protected void shutdownExchange(CurrencyExchange exchange) {
((DisruptorCurrencyExchange) exchange).shutdown();
}
}

View File

@@ -1,3 +1,5 @@
package business;
import ru.lionarius.api.CurrencyExchange; import ru.lionarius.api.CurrencyExchange;
import ru.lionarius.api.currency.CurrencyPair; import ru.lionarius.api.currency.CurrencyPair;
import ru.lionarius.impl.plain.PlainCurrencyExchange; import ru.lionarius.impl.plain.PlainCurrencyExchange;

View File

@@ -1,3 +1,5 @@
package business;
import ru.lionarius.api.CurrencyExchange; import ru.lionarius.api.CurrencyExchange;
import ru.lionarius.api.currency.CurrencyPair; import ru.lionarius.api.currency.CurrencyPair;
import ru.lionarius.impl.queue.QueueCurrencyExchange; import ru.lionarius.impl.queue.QueueCurrencyExchange;

View File

@@ -1,3 +1,5 @@
package business;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;

View File

@@ -0,0 +1,19 @@
package business;
import ru.lionarius.api.CurrencyExchange;
import ru.lionarius.api.currency.CurrencyPair;
import ru.lionarius.impl.disruptor.DisruptorCurrencyExchange;
import java.util.Set;
class DisruptorCurrencyExchangeTest extends CurrencyExchangeTest {
@Override
protected CurrencyExchange createExchange(Set<CurrencyPair> pairs) {
return new DisruptorCurrencyExchange(pairs, 1024 * 16);
}
@Override
protected void shutdownExchange(CurrencyExchange exchange) {
((DisruptorCurrencyExchange) exchange).shutdown();
}
}

View File

@@ -1,3 +1,5 @@
package business;
import ru.lionarius.api.CurrencyExchange; import ru.lionarius.api.CurrencyExchange;
import ru.lionarius.api.currency.CurrencyPair; import ru.lionarius.api.currency.CurrencyPair;
import ru.lionarius.impl.plain.PlainCurrencyExchange; import ru.lionarius.impl.plain.PlainCurrencyExchange;

View File

@@ -1,3 +1,5 @@
package business;
import ru.lionarius.api.CurrencyExchange; import ru.lionarius.api.CurrencyExchange;
import ru.lionarius.api.currency.CurrencyPair; import ru.lionarius.api.currency.CurrencyPair;
import ru.lionarius.impl.queue.QueueCurrencyExchange; import ru.lionarius.impl.queue.QueueCurrencyExchange;

View File

@@ -1,3 +1,6 @@
package performance;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import ru.lionarius.api.CurrencyExchange; import ru.lionarius.api.CurrencyExchange;
@@ -5,6 +8,7 @@ import ru.lionarius.api.client.Client;
import ru.lionarius.api.currency.Currency; import ru.lionarius.api.currency.Currency;
import ru.lionarius.api.currency.CurrencyPair; import ru.lionarius.api.currency.CurrencyPair;
import ru.lionarius.api.order.OrderType; import ru.lionarius.api.order.OrderType;
import ru.lionarius.impl.disruptor.DisruptorCurrencyExchange;
import ru.lionarius.impl.plain.PlainCurrencyExchange; import ru.lionarius.impl.plain.PlainCurrencyExchange;
import ru.lionarius.impl.queue.QueueCurrencyExchange; import ru.lionarius.impl.queue.QueueCurrencyExchange;
@@ -12,7 +16,11 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.*; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Function;
public class PerformanceTest { public class PerformanceTest {
private Set<CurrencyPair> pairs; private Set<CurrencyPair> pairs;
@@ -45,25 +53,50 @@ public class PerformanceTest {
@Test @Test
void plainPerformanceTest() throws Exception { void plainPerformanceTest() throws Exception {
var exchange = new PlainCurrencyExchange(pairs); var orders = benchmark(numTraders, numOrders, pairs, PlainCurrencyExchange::new, null, false);
var time = benchmark(numTraders, numOrders, exchange, pairs); System.out.printf("Plain implementation: %.2f orders/sec%n", orders);
System.out.printf("Plain implementation: %.2f orders/sec%n", time);
} }
@Test @Test
void queuePerformanceTest() throws Exception { void queuePerformanceTest() throws Exception {
var exchange = new QueueCurrencyExchange(pairs); var orders = benchmark(numTraders, numOrders, pairs, QueueCurrencyExchange::new, exchange -> {
try {
((QueueCurrencyExchange) exchange).shutdown();
} catch (InterruptedException e) {
Assertions.fail(e);
}
}, true);
var time = benchmark(numTraders, numOrders, exchange, pairs); System.out.printf("Queue implementation: %.2f orders/sec%n", orders);
exchange.shutdown();
System.out.printf("Queue implementation: %.2f orders/sec%n", time);
} }
double benchmark(int numTraders, int numOrders, CurrencyExchange exchange, Set<CurrencyPair> pairs) throws InterruptedException { @Test
void disruptorPerformanceTest() throws Exception {
var orders = benchmark(numTraders, numOrders, pairs, currencyPairs -> new DisruptorCurrencyExchange(currencyPairs, 1024 * 1024 * 8), exchange -> ((DisruptorCurrencyExchange) exchange).shutdown(), true);
System.out.printf("Disruptor implementation: %.2f orders/sec%n", orders);
}
double benchmark(int numTraders, int numOrders, Set<CurrencyPair> pairs, Function<Set<CurrencyPair>, CurrencyExchange> exchangeFactory, Consumer<CurrencyExchange> shutdownExchange, boolean doWarmup) throws InterruptedException {
if (doWarmup) {
for (int i = 0; i < 5; i++) {
var exchange = exchangeFactory.apply(pairs);
benchmarkImpl(numTraders, numOrders, exchange, pairs);
if (shutdownExchange != null)
shutdownExchange.accept(exchange);
}
}
var exchange = exchangeFactory.apply(pairs);
var orders = benchmarkImpl(numTraders, numOrders, exchange, pairs);
if (shutdownExchange != null)
shutdownExchange.accept(exchange);
return orders;
}
double benchmarkImpl(int numTraders, int numOrders, CurrencyExchange exchange, Set<CurrencyPair> pairs) throws InterruptedException {
var buyers = new ArrayList<Client>(); var buyers = new ArrayList<Client>();
var sellers = new ArrayList<Client>(); var sellers = new ArrayList<Client>();