1
0
This commit is contained in:
2024-10-20 18:57:35 +03:00
parent b6de049094
commit 5bae7a279b
7 changed files with 461 additions and 4 deletions

View File

@@ -1,8 +1,11 @@
package ru.lionarius;
import ru.lionarius.sync.MyCountDownLatch;
import ru.lionarius.sync.MyReentrantLock;
import ru.lionarius.sync.MySemaphore;
import java.util.ArrayList;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Function;
@@ -111,9 +114,9 @@ public class IntegralCalculator {
var sum = (function.apply(lowerBound) + function.apply(upperBound)) / 2;
final var futures = new ArrayList<Future<Double>>();
final var semaphore = new Semaphore(2);
final var lock = new ReentrantLock();
final var latch = new CountDownLatch(parallelism);
final var semaphore = new MySemaphore(2);
final var lock = new MyReentrantLock();
final var latch = new MyCountDownLatch(parallelism);
var times = new long[parallelism];
var totalProgress = new long[]{0L};

View File

@@ -0,0 +1,41 @@
package ru.lionarius.sync;
public class MyCountDownLatch {
private final Object lock = new Object();
private int count;
public MyCountDownLatch(int count) {
if (count < 0)
throw new IllegalArgumentException("Count cannot be negative");
this.count = count;
}
public void countDown() {
synchronized (lock) {
if (count <= 0)
return;
count--;
if (count == 0) {
lock.notifyAll();
}
}
}
public void await() throws InterruptedException {
synchronized (lock) {
while (count > 0) {
lock.wait();
}
}
}
public int getCount() {
synchronized (lock) {
return count;
}
}
}

View File

@@ -0,0 +1,55 @@
package ru.lionarius.sync;
public class MyReentrantLock {
private final Object lock = new Object();
private int counter = 0;
private Thread owner;
public void lock() throws InterruptedException {
synchronized (lock) {
Thread current = Thread.currentThread();
while (isLocked() && current != owner) {
lock.wait();
}
counter++;
owner = current;
}
}
public boolean tryLock() {
synchronized (lock) {
Thread current = Thread.currentThread();
if (isLocked() && current != owner) {
return false;
}
counter++;
owner = current;
return true;
}
}
public void unlock() {
synchronized (lock) {
if (Thread.currentThread() != owner) {
throw new IllegalMonitorStateException("Not owner");
}
counter--;
if (counter == 0) {
owner = null;
lock.notify();
}
}
}
private boolean isLocked() {
return counter > 0;
}
}

View File

@@ -0,0 +1,45 @@
package ru.lionarius.sync;
public class MySemaphore {
private final Object lock = new Object();
private int permits;
public MySemaphore(int permits) {
this.permits = permits;
}
public void acquire() throws InterruptedException {
synchronized (lock) {
while (permits <= 0) {
lock.wait();
}
permits--;
}
}
public boolean tryAcquire() {
synchronized (lock) {
if (permits <= 0) {
return false;
}
permits--;
return true;
}
}
public void release() {
synchronized (lock) {
permits++;
lock.notify();
}
}
public int availablePermits() {
synchronized (lock) {
return permits;
}
}
}

View File

@@ -0,0 +1,112 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import ru.lionarius.sync.MyCountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.jupiter.api.Assertions.*;
class MyCountDownLatchTest {
@Test
void testInitialization() {
assertThrows(IllegalArgumentException.class, () -> new MyCountDownLatch(-1));
assertDoesNotThrow(() -> new MyCountDownLatch(0));
var latch = new MyCountDownLatch(5);
assertEquals(5, latch.getCount());
}
@Test
void testCountDown() {
var latch = new MyCountDownLatch(3);
assertEquals(3, latch.getCount());
latch.countDown();
assertEquals(2, latch.getCount());
latch.countDown();
assertEquals(1, latch.getCount());
latch.countDown();
assertEquals(0, latch.getCount());
latch.countDown();
assertEquals(0, latch.getCount());
}
@Test
void testAwait() throws InterruptedException {
var latch = new MyCountDownLatch(2);
var threadFinished = new AtomicBoolean(false);
var thread = new Thread(() -> {
try {
latch.await();
threadFinished.set(true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
thread.start();
Thread.sleep(100);
assertFalse(threadFinished.get());
latch.countDown();
Thread.sleep(100);
assertFalse(threadFinished.get());
latch.countDown();
thread.join(1000);
assertTrue(threadFinished.get());
}
@Test
void testMultipleThreads() throws InterruptedException {
final int THREAD_COUNT = 5;
var startLatch = new MyCountDownLatch(1);
var endLatch = new MyCountDownLatch(THREAD_COUNT);
AtomicBoolean[] threadsStarted = new AtomicBoolean[THREAD_COUNT];
AtomicBoolean[] threadsFinished = new AtomicBoolean[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
threadsStarted[i] = new AtomicBoolean(false);
threadsFinished[i] = new AtomicBoolean(false);
final int index = i;
new Thread(() -> {
try {
startLatch.await();
threadsStarted[index].set(true);
Thread.sleep(100);
threadsFinished[index].set(true);
endLatch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
Thread.sleep(100);
for (var started : threadsStarted) {
assertFalse(started.get());
}
startLatch.countDown();
Thread.sleep(50);
for (var started : threadsStarted) {
assertTrue(started.get());
}
endLatch.await();
for (var finished : threadsFinished) {
assertTrue(finished.get());
}
assertEquals(0, endLatch.getCount());
}
@Test
@Timeout(value = 1000, unit = TimeUnit.MILLISECONDS)
void testZeroCount() throws InterruptedException {
var latch = new MyCountDownLatch(0);
assertEquals(0, latch.getCount());
latch.await();
assertTrue(true);
}
}

View File

@@ -0,0 +1,89 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import ru.lionarius.sync.MyReentrantLock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.jupiter.api.Assertions.*;
class MyReentrantLockTest {
private MyReentrantLock lock;
@BeforeEach
void setUp() {
lock = new MyReentrantLock();
}
@Test
@Timeout(value = 1000, unit = TimeUnit.MILLISECONDS)
void testLockAndUnlock() throws InterruptedException {
lock.lock();
assertTrue(lock.tryLock());
lock.unlock();
lock.unlock();
}
@Test
@Timeout(value = 1000, unit = TimeUnit.MILLISECONDS)
void testTryLock() throws InterruptedException {
assertTrue(lock.tryLock());
var otherThreadLocked = new AtomicBoolean(false);
var otherThread = new Thread(() -> {
otherThreadLocked.set(lock.tryLock());
});
otherThread.start();
otherThread.join();
assertFalse(otherThreadLocked.get()); // Другой поток не должен получить блокировку
lock.unlock();
}
@Test
@Timeout(value = 1000, unit = TimeUnit.MILLISECONDS)
void testUnlockByDifferentThread() throws InterruptedException {
lock.lock();
var otherThread = new Thread(() -> {
assertThrows(IllegalMonitorStateException.class, lock::unlock);
});
otherThread.start();
otherThread.join();
lock.unlock();
}
@Test
@Timeout(value = 1000, unit = TimeUnit.MILLISECONDS)
void testMultipleThreads() throws InterruptedException {
int threadCount = 10;
Thread[] threads = new Thread[threadCount];
AtomicBoolean[] threadsCompleted = new AtomicBoolean[threadCount];
for (int i = 0; i < threadCount; i++) {
final int index = i;
threadsCompleted[i] = new AtomicBoolean(false);
threads[i] = new Thread(() -> {
try {
lock.lock();
Thread.sleep(10);
lock.unlock();
threadsCompleted[index].set(true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
threads[i].start();
}
for (var thread : threads) {
thread.join();
}
for (var completed : threadsCompleted) {
assertTrue(completed.get(), "Все потоки должны завершить свою работу");
}
}
}

View File

@@ -0,0 +1,112 @@
import org.junit.jupiter.api.Test;
import ru.lionarius.sync.MySemaphore;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.*;
class MySemaphoreTest {
@Test
void testInitialization() {
assertDoesNotThrow(() -> new MySemaphore(-1));
assertDoesNotThrow(() -> new MySemaphore(0));
var semaphore = new MySemaphore(5);
assertEquals(5, semaphore.availablePermits());
}
@Test
void testAcquireAndRelease() throws InterruptedException {
var semaphore = new MySemaphore(2);
assertEquals(2, semaphore.availablePermits());
semaphore.acquire();
assertEquals(1, semaphore.availablePermits());
semaphore.acquire();
assertEquals(0, semaphore.availablePermits());
assertFalse(semaphore.tryAcquire());
semaphore.release();
assertEquals(1, semaphore.availablePermits());
assertTrue(semaphore.tryAcquire());
assertEquals(0, semaphore.availablePermits());
}
@Test
void testTryAcquire() {
var semaphore = new MySemaphore(2);
assertTrue(semaphore.tryAcquire());
assertEquals(1, semaphore.availablePermits());
assertTrue(semaphore.tryAcquire());
assertEquals(0, semaphore.availablePermits());
assertFalse(semaphore.tryAcquire());
assertEquals(0, semaphore.availablePermits());
}
@Test
void testMultipleThreads() throws InterruptedException {
final int THREAD_COUNT = 10;
final int PERMITS = 3;
var semaphore = new MySemaphore(PERMITS);
var startLatch = new CountDownLatch(1);
var finishLatch = new CountDownLatch(THREAD_COUNT);
var concurrentThreads = new AtomicInteger(0);
var maxConcurrentThreads = new AtomicInteger(0);
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
try {
startLatch.await();
semaphore.acquire();
int current = concurrentThreads.incrementAndGet();
maxConcurrentThreads.updateAndGet(max -> Math.max(max, current));
Thread.sleep(100); // Имитация работы
concurrentThreads.decrementAndGet();
semaphore.release();
finishLatch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
startLatch.countDown();
assertTrue(finishLatch.await(5, TimeUnit.SECONDS));
assertEquals(PERMITS, maxConcurrentThreads.get());
assertEquals(PERMITS, semaphore.availablePermits());
}
@Test
void testBlockingAcquire() throws InterruptedException {
var semaphore = new MySemaphore(1);
semaphore.acquire();
assertEquals(0, semaphore.availablePermits());
var thread = new Thread(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
thread.start();
Thread.sleep(100); // Даем время потоку начать ожидание
assertTrue(thread.isAlive());
assertEquals(0, semaphore.availablePermits());
semaphore.release();
thread.join(1000);
assertFalse(thread.isAlive());
assertEquals(0, semaphore.availablePermits());
}
@Test
void testAvailablePermits() {
var semaphore = new MySemaphore(5);
assertEquals(5, semaphore.availablePermits());
semaphore.tryAcquire();
assertEquals(4, semaphore.availablePermits());
semaphore.release();
assertEquals(5, semaphore.availablePermits());
}
}