1
0

completed lab10

This commit is contained in:
2024-11-18 19:06:12 +03:00
parent 6c23a1c606
commit 23ac0ab6bb
10 changed files with 127 additions and 144 deletions

View File

@@ -25,8 +25,6 @@ public class Order {
private boolean closed = false;
private final List<OrderMessage> messages = new ArrayList<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
public Order(UUID clientId, OrderType type, CurrencyPair pair, OrderData data) {
this.clientId = clientId;
this.type = type;
@@ -37,12 +35,7 @@ public class Order {
}
public OrderView getView() {
lock.readLock().lock();
try {
return new OrderView(id, clientId, type, pair, originalData, lastData, closed);
} finally {
lock.readLock().unlock();
}
return new OrderView(id, clientId, type, pair, originalData, lastData, closed);
}
public UUID getId() {
@@ -62,53 +55,37 @@ public class Order {
}
public boolean isClosed() {
lock.readLock().lock();
try {
return closed;
} finally {
lock.readLock().unlock();
}
return closed;
}
public OrderData getLastData() {
lock.readLock().lock();
try {
return lastData;
} finally {
lock.readLock().unlock();
}
return lastData;
}
public void pushMessage(OrderMessage message) {
if (closed)
throw new IllegalStateException("Order is closed");
lock.writeLock().lock();
try {
if (message.type() == OrderMessageType.CREATED) {
if (lastData != null)
throw new IllegalStateException("Order already has data");
if (message.type() == OrderMessageType.CREATED) {
if (lastData != null)
throw new IllegalStateException("Order already has data");
lastData = ((OrderCreatedMessage) message).data();
} else if (message.type() == OrderMessageType.FILLED) {
lastData = ((OrderFilledMessage) message).newData();
} else if (message.type() == OrderMessageType.CLOSED) {
closed = true;
}
lastData = ((OrderCreatedMessage) message).data();
} else if (message.type() == OrderMessageType.FILLED) {
var data = ((OrderFilledMessage) message).newData();
if (lastData.quantity() <= data.quantity())
throw new IllegalStateException("Quantity of filled order is less than quantity of last order");
messages.add(message);
} finally {
lock.writeLock().unlock();
lastData = data;
} else if (message.type() == OrderMessageType.CLOSED) {
closed = true;
}
messages.add(message);
}
public List<OrderMessage> getMessages() {
lock.readLock().lock();
try {
return ImmutableList.copyOf(messages);
} finally {
lock.readLock().unlock();
}
return ImmutableList.copyOf(messages);
}
@Override

View File

@@ -10,87 +10,77 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class OrderBook {
private final TreeMap<Double, List<Order>> buyOrders = new TreeMap<>(Collections.reverseOrder());
private final TreeMap<Double, List<Order>> sellOrders = new TreeMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
public void addOrder(Order order) {
var orders = order.getType() == OrderType.BUY ? buyOrders : sellOrders;
lock.writeLock().lock();
try {
orders.computeIfAbsent(order.getLastData().rate(), (k) -> new ArrayList<>()).add(order);
} finally {
lock.writeLock().unlock();
}
orders.computeIfAbsent(order.getLastData().rate(), (k) -> new ArrayList<>()).add(order);
}
public void matchOrders() {
lock.writeLock().lock();
try {
while (!buyOrders.isEmpty() && !sellOrders.isEmpty()) {
var bestBuyEntry = buyOrders.firstEntry();
var bestSellEntry = sellOrders.firstEntry();
while (!buyOrders.isEmpty() && !sellOrders.isEmpty()) {
var bestBuyEntry = buyOrders.firstEntry();
var bestSellEntry = sellOrders.firstEntry();
var bestBuy = bestBuyEntry.getValue().getFirst();
var bestSell = bestSellEntry.getValue().getFirst();
var bestBuy = bestBuyEntry.getValue().getFirst();
var bestSell = bestSellEntry.getValue().getFirst();
if (bestBuy.isClosed()) {
bestBuyEntry.getValue().remove(bestBuy);
if (bestBuyEntry.getValue().isEmpty())
buyOrders.remove(bestBuyEntry.getKey());
if (bestBuy.isClosed()) {
bestBuyEntry.getValue().remove(bestBuy);
if (bestBuyEntry.getValue().isEmpty())
buyOrders.remove(bestBuyEntry.getKey());
continue;
}
if (bestSell.isClosed()) {
bestSellEntry.getValue().remove(bestSell);
if (bestSellEntry.getValue().isEmpty())
sellOrders.remove(bestSellEntry.getKey());
continue;
}
if (bestBuy.getLastData().rate() < bestSell.getLastData().rate())
break;
var matchQuantity = Math.min(bestBuy.getLastData().quantity(), bestSell.getLastData().quantity());
var newBuyOrderData = new OrderData(
bestBuy.getLastData().price() - matchQuantity * bestBuy.getLastData().rate(),
bestBuy.getLastData().quantity() - matchQuantity
);
var newSellOrderData = new OrderData(
bestSell.getLastData().price() - matchQuantity * bestSell.getLastData().rate(),
bestSell.getLastData().quantity() - matchQuantity
);
bestBuy.pushMessage(new OrderFilledMessage(bestSell.getId(), newBuyOrderData));
bestSell.pushMessage(new OrderFilledMessage(bestBuy.getId(), newSellOrderData));
if (bestBuy.getLastData().quantity() <= 0) {
bestBuyEntry.getValue().remove(bestBuy);
bestBuy.pushMessage(new OrderClosedMessage(OrderClosedMessage.Reason.FULFILLED));
if (bestBuyEntry.getValue().isEmpty())
buyOrders.remove(bestBuyEntry.getKey());
}
if (bestSell.getLastData().quantity() <= 0) {
bestSellEntry.getValue().remove(bestSell);
bestSell.pushMessage(new OrderClosedMessage(OrderClosedMessage.Reason.FULFILLED));
if (bestSellEntry.getValue().isEmpty())
sellOrders.remove(bestSellEntry.getKey());
}
continue;
}
if (bestSell.isClosed()) {
bestSellEntry.getValue().remove(bestSell);
if (bestSellEntry.getValue().isEmpty())
sellOrders.remove(bestSellEntry.getKey());
continue;
}
if (bestBuy.getLastData().rate() < bestSell.getLastData().rate())
break;
var matchQuantity = Math.min(bestBuy.getLastData().quantity(), bestSell.getLastData().quantity());
var newBuyOrderData = new OrderData(
bestBuy.getLastData().price() - matchQuantity * bestSell.getLastData().rate(),
bestBuy.getLastData().quantity() - matchQuantity
);
var newSellOrderData = new OrderData(
bestSell.getLastData().price() - matchQuantity * bestSell.getLastData().rate(),
bestSell.getLastData().quantity() - matchQuantity
);
bestBuy.pushMessage(new OrderFilledMessage(bestSell.getId(), newBuyOrderData));
bestSell.pushMessage(new OrderFilledMessage(bestBuy.getId(), newSellOrderData));
if (bestBuy.getLastData().quantity() <= 0) {
bestBuyEntry.getValue().remove(bestBuy);
bestBuy.pushMessage(new OrderClosedMessage(OrderClosedMessage.Reason.FULFILLED));
if (bestBuyEntry.getValue().isEmpty())
buyOrders.remove(bestBuyEntry.getKey());
}
if (bestSell.getLastData().quantity() <= 0) {
bestSellEntry.getValue().remove(bestSell);
bestSell.pushMessage(new OrderClosedMessage(OrderClosedMessage.Reason.FULFILLED));
if (bestSellEntry.getValue().isEmpty())
sellOrders.remove(bestSellEntry.getKey());
} else {
bestSellEntry.getValue().remove(bestSell);
addOrder(bestSell);
}
} finally {
lock.writeLock().unlock();
}
}
}

View File

@@ -18,10 +18,14 @@ import ru.lionarius.impl.OrderBook;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class PlainCurrencyExchange implements CurrencyExchange {
private final ClientRepository clientRepository = new InMemoryClientRepository();
private final Map<CurrencyPair, OrderBook> orderBooks;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Set<CurrencyPair> allowedPairs;
@@ -63,18 +67,23 @@ public class PlainCurrencyExchange implements CurrencyExchange {
if (!allowedPairs.contains(pair))
throw new IllegalArgumentException("Pair is not allowed");
lock.writeLock().lock();
try {
var orders = clientRepository.getClientOrders(clientId).orElseThrow();
var orders = clientRepository.getClientOrders(clientId).orElseThrow();
var order = new Order(clientId, type, pair, new OrderData(price, quantity));
orders.add(order);
var order = new Order(clientId, type, pair, new OrderData(price, quantity));
orders.add(order);
var orderBook = orderBooks.get(pair);
var orderBook = orderBooks.get(pair);
orderBook.addOrder(order);
orderBook.matchOrders();
orderBook.addOrder(order);
orderBook.matchOrders();
return order.getId();
return order.getId();
} finally {
lock.writeLock().unlock();
}
}, Runnable::run);
}
@@ -87,11 +96,16 @@ public class PlainCurrencyExchange implements CurrencyExchange {
if (orderId == null)
throw new IllegalArgumentException("Order ID cannot be null");
var orders = clientRepository.getClientOrders(clientId).orElseThrow();
orders.get(orderId).ifPresent(order -> {
order.pushMessage(new OrderClosedMessage(OrderClosedMessage.Reason.CANCELLED));
});
lock.writeLock().lock();
try {
var orders = clientRepository.getClientOrders(clientId).orElseThrow();
orders.get(orderId).ifPresent(order -> {
order.pushMessage(new OrderClosedMessage(OrderClosedMessage.Reason.CANCELLED));
});
} finally {
lock.writeLock().unlock();
}
}, Runnable::run);
}
@@ -101,9 +115,14 @@ public class PlainCurrencyExchange implements CurrencyExchange {
if (clientId == null)
throw new IllegalArgumentException("Client ID cannot be null");
var orders = clientRepository.getClientOrders(clientId).orElseThrow();
return orders.get().stream().map(Order::getView).toList();
lock.readLock().lock();
try {
var orders = clientRepository.getClientOrders(clientId).orElseThrow();
return orders.get().stream().map(Order::getView).toList();
} finally {
lock.readLock().unlock();
}
}, Runnable::run);
}
@@ -116,9 +135,14 @@ public class PlainCurrencyExchange implements CurrencyExchange {
if (orderId == null)
throw new IllegalArgumentException("Order ID cannot be null");
var orders = clientRepository.getClientOrders(clientId).orElseThrow();
return orders.get(orderId).map(Order::getMessages).orElseThrow();
lock.readLock().lock();
try {
var orders = clientRepository.getClientOrders(clientId).orElseThrow();
return orders.get(orderId).map(Order::getMessages).orElseThrow();
} finally {
lock.readLock().unlock();
}
}, Runnable::run);
}
}

View File

@@ -15,21 +15,15 @@ import java.util.AbstractMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class CommandProcessor implements Runnable {
private final BlockingQueue<Command<?>> commandQueue;
private final ExecutorService executorService;
private final ClientRepository clientRepository = new InMemoryClientRepository();
private final Map<CurrencyPair, OrderBook> orderBooks;
private final Set<CurrencyPair> allowedPairs;
public CommandProcessor(Set<CurrencyPair> allowedPairs, BlockingQueue<Command<?>> commandQueue) {
this.commandQueue = commandQueue;
this.executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
this.allowedPairs = ImmutableSet.copyOf(allowedPairs);
this.orderBooks = allowedPairs.stream()
@@ -39,22 +33,17 @@ public class CommandProcessor implements Runnable {
@Override
public void run() {
while (true) {
try {
try {
while (true) {
var command = commandQueue.take();
executorService.submit(() -> processCommand(command));
} catch (InterruptedException e) {
break;
processCommand(command);
}
} catch (InterruptedException e) {
System.out.println("Command processor exited");
}
}
public void shutdown() throws InterruptedException {
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
}
private void processCommand(Command<?> command) {
try {
if (command instanceof CreateClientCommand) {
@@ -68,7 +57,7 @@ public class CommandProcessor implements Runnable {
} else if (command instanceof GetOrderMessagesCommand) {
processGetOrderMessagesCommand((GetOrderMessagesCommand) command);
}
} catch (Exception e) {
} catch (RuntimeException e) {
command.result().completeExceptionally(e);
}
}

View File

@@ -69,6 +69,5 @@ public class QueueCurrencyExchange implements CurrencyExchange {
public void shutdown() throws InterruptedException {
this.commandProcessorThread.interrupt();
this.commandProcessorThread.join();
this.commandProcessor.shutdown();
}
}

View File

@@ -14,7 +14,7 @@ import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.*;
class ConcurrentCurrencyExchangeTest {
class ConcurrentPlainCurrencyExchangeTest {
private CurrencyExchange exchange;
private CurrencyPair RUB_CNY;
private ExecutorService executorService;
@@ -311,9 +311,11 @@ class ConcurrentCurrencyExchangeTest {
var orders = exchange.getOrders(buyer.id()).join();
totalCnyMoney -= orders.stream().mapToDouble(order -> order.originalData().quantity() - order.lastData().quantity()).sum();
totalRubMoney -= orders.stream().mapToDouble(order -> order.originalData().price() - order.lastData().price()).sum();
}
assertEquals(0.0, totalCnyMoney, 0.001);
assertEquals(0.0, totalRubMoney, 0.001);
}
@AfterEach

View File

@@ -313,9 +313,11 @@ class ConcurrentQueueCurrencyExchangeTest {
var orders = exchange.getOrders(buyer.id()).join();
totalCnyMoney -= orders.stream().mapToDouble(order -> order.originalData().quantity() - order.lastData().quantity()).sum();
totalRubMoney -= orders.stream().mapToDouble(order -> order.originalData().price() - order.lastData().price()).sum();
}
assertEquals(0.0, totalCnyMoney, 0.001);
assertEquals(0.0, totalRubMoney, 0.001);
}
@AfterEach

View File

@@ -16,7 +16,7 @@ import java.util.concurrent.*;
public class PerformanceTest {
private Set<CurrencyPair> pairs;
private final int numTraders = 50;
private final int numTraders = 500;
private final int numOrders = 1000;
private ExecutorService executorService;

View File

@@ -16,7 +16,7 @@ import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.*;
class CurrencyExchangeTest {
class PlainCurrencyExchangeTest {
private CurrencyExchange exchange;
private CurrencyPair RUB_CNY;

View File

@@ -16,7 +16,7 @@ import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.*;
class CurrencyQueueExchangeTest {
class QueueCurrencyExchangeTest {
private QueueCurrencyExchange exchange;
private CurrencyPair RUB_CNY;