155 lines
5.8 KiB
Java
155 lines
5.8 KiB
Java
package ru.lionarius.impl.queue;
|
|
|
|
import com.google.common.collect.ImmutableMap;
|
|
import com.google.common.collect.ImmutableSet;
|
|
import ru.lionarius.api.client.ClientRepository;
|
|
import ru.lionarius.api.command.*;
|
|
import ru.lionarius.api.currency.CurrencyPair;
|
|
import ru.lionarius.api.order.Order;
|
|
import ru.lionarius.api.order.OrderData;
|
|
import ru.lionarius.api.order.message.OrderClosedMessage;
|
|
import ru.lionarius.impl.InMemoryClientRepository;
|
|
import ru.lionarius.impl.OrderBook;
|
|
|
|
import java.util.AbstractMap;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
public class CommandProcessor implements Runnable {
|
|
private final BlockingQueue<Command<?>> commandQueue;
|
|
private final ExecutorService executorService;
|
|
|
|
private final ClientRepository clientRepository = new InMemoryClientRepository();
|
|
private final Map<CurrencyPair, OrderBook> orderBooks;
|
|
private final Set<CurrencyPair> allowedPairs;
|
|
|
|
public CommandProcessor(Set<CurrencyPair> allowedPairs, BlockingQueue<Command<?>> commandQueue) {
|
|
this.commandQueue = commandQueue;
|
|
this.executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
|
|
|
|
this.allowedPairs = ImmutableSet.copyOf(allowedPairs);
|
|
this.orderBooks = allowedPairs.stream()
|
|
.map(pair -> new AbstractMap.SimpleEntry<>(pair, new OrderBook()))
|
|
.collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
|
|
}
|
|
|
|
@Override
|
|
public void run() {
|
|
while (true) {
|
|
try {
|
|
var command = commandQueue.take();
|
|
|
|
executorService.submit(() -> processCommand(command));
|
|
} catch (InterruptedException e) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
public void shutdown() throws InterruptedException {
|
|
executorService.shutdown();
|
|
executorService.awaitTermination(5, TimeUnit.SECONDS);
|
|
}
|
|
|
|
private void processCommand(Command<?> command) {
|
|
try {
|
|
if (command instanceof CreateClientCommand) {
|
|
processCreateClientCommand((CreateClientCommand) command);
|
|
} else if (command instanceof PlaceOrderCommand) {
|
|
processPlaceOrderCommand((PlaceOrderCommand) command);
|
|
} else if (command instanceof CancelOrderCommand) {
|
|
processCancelOrderCommand((CancelOrderCommand) command);
|
|
} else if (command instanceof GetOrdersCommand) {
|
|
processGetOrdersCommand((GetOrdersCommand) command);
|
|
} else if (command instanceof GetOrderMessagesCommand) {
|
|
processGetOrderMessagesCommand((GetOrderMessagesCommand) command);
|
|
}
|
|
} catch (Exception e) {
|
|
command.result().completeExceptionally(e);
|
|
}
|
|
}
|
|
|
|
private void processCreateClientCommand(CreateClientCommand command) {
|
|
if (command.name() == null)
|
|
throw new IllegalArgumentException("Name cannot be null");
|
|
|
|
command.result().complete(clientRepository.createClient(command.name()));
|
|
}
|
|
|
|
private void processPlaceOrderCommand(PlaceOrderCommand command) {
|
|
if (command.clientId() == null)
|
|
throw new IllegalArgumentException("Client ID cannot be null");
|
|
|
|
if (command.pair() == null)
|
|
throw new IllegalArgumentException("Currency pair cannot be null");
|
|
|
|
if (command.type() == null)
|
|
throw new IllegalArgumentException("Order type cannot be null");
|
|
|
|
if (command.price() <= 0.0)
|
|
throw new IllegalArgumentException("Price must be positive");
|
|
|
|
if (command.quantity() <= 0.0)
|
|
throw new IllegalArgumentException("Quantity must be positive");
|
|
|
|
if (!allowedPairs.contains(command.pair()))
|
|
throw new IllegalArgumentException("Pair is not allowed");
|
|
|
|
var orders = clientRepository.getClientOrders(command.clientId()).orElseThrow();
|
|
|
|
var order = new Order(command.clientId(), command.type(), command.pair(), new OrderData(command.price(), command.quantity()));
|
|
orders.add(order);
|
|
|
|
var orderBook = orderBooks.get(command.pair());
|
|
|
|
orderBook.addOrder(order);
|
|
orderBook.matchOrders();
|
|
|
|
command.result().complete(order.getId());
|
|
}
|
|
|
|
private void processCancelOrderCommand(CancelOrderCommand command) {
|
|
if (command.clientId() == null)
|
|
throw new IllegalArgumentException("Client ID cannot be null");
|
|
|
|
if (command.orderId() == null)
|
|
throw new IllegalArgumentException("Order ID cannot be null");
|
|
|
|
var orders = clientRepository.getClientOrders(command.clientId()).orElseThrow();
|
|
|
|
var order = orders.get(command.orderId()).orElseThrow();
|
|
order.pushMessage(new OrderClosedMessage(OrderClosedMessage.Reason.CANCELLED));
|
|
|
|
command.result().complete(null);
|
|
}
|
|
|
|
private void processGetOrdersCommand(GetOrdersCommand command) {
|
|
if (command.clientId() == null)
|
|
throw new IllegalArgumentException("Client ID cannot be null");
|
|
|
|
var orders = clientRepository.getClientOrders(command.clientId()).orElseThrow();
|
|
|
|
var result = orders.get().stream().map(Order::getView).toList();
|
|
|
|
command.result().complete(result);
|
|
}
|
|
|
|
private void processGetOrderMessagesCommand(GetOrderMessagesCommand command) {
|
|
if (command.clientId() == null)
|
|
throw new IllegalArgumentException("Client ID cannot be null");
|
|
|
|
if (command.orderId() == null)
|
|
throw new IllegalArgumentException("Order ID cannot be null");
|
|
|
|
var orders = clientRepository.getClientOrders(command.clientId()).orElseThrow();
|
|
|
|
var result = orders.get(command.orderId()).map(Order::getMessages).orElseThrow();
|
|
|
|
command.result().complete(result);
|
|
}
|
|
}
|