diff --git a/src/main/java/ru/lionarius/IntegralCalculator.java b/src/main/java/ru/lionarius/IntegralCalculator.java index b1f1243..6841359 100644 --- a/src/main/java/ru/lionarius/IntegralCalculator.java +++ b/src/main/java/ru/lionarius/IntegralCalculator.java @@ -1,8 +1,11 @@ package ru.lionarius; +import ru.lionarius.sync.MyCountDownLatch; +import ru.lionarius.sync.MyReentrantLock; +import ru.lionarius.sync.MySemaphore; + import java.util.ArrayList; import java.util.concurrent.*; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.Function; @@ -111,9 +114,9 @@ public class IntegralCalculator { var sum = (function.apply(lowerBound) + function.apply(upperBound)) / 2; final var futures = new ArrayList>(); - final var semaphore = new Semaphore(2); - final var lock = new ReentrantLock(); - final var latch = new CountDownLatch(parallelism); + final var semaphore = new MySemaphore(2); + final var lock = new MyReentrantLock(); + final var latch = new MyCountDownLatch(parallelism); var times = new long[parallelism]; var totalProgress = new long[]{0L}; diff --git a/src/main/java/ru/lionarius/sync/MyCountDownLatch.java b/src/main/java/ru/lionarius/sync/MyCountDownLatch.java new file mode 100644 index 0000000..e8bc71a --- /dev/null +++ b/src/main/java/ru/lionarius/sync/MyCountDownLatch.java @@ -0,0 +1,41 @@ +package ru.lionarius.sync; + +public class MyCountDownLatch { + + private final Object lock = new Object(); + private int count; + + public MyCountDownLatch(int count) { + if (count < 0) + throw new IllegalArgumentException("Count cannot be negative"); + + this.count = count; + } + + public void countDown() { + synchronized (lock) { + if (count <= 0) + return; + + count--; + + if (count == 0) { + lock.notifyAll(); + } + } + } + + public void await() throws InterruptedException { + synchronized (lock) { + while (count > 0) { + lock.wait(); + } + } + } + + public int getCount() { + synchronized (lock) { + return count; + } + } +} diff --git a/src/main/java/ru/lionarius/sync/MyReentrantLock.java b/src/main/java/ru/lionarius/sync/MyReentrantLock.java new file mode 100644 index 0000000..513cd47 --- /dev/null +++ b/src/main/java/ru/lionarius/sync/MyReentrantLock.java @@ -0,0 +1,55 @@ +package ru.lionarius.sync; + +public class MyReentrantLock { + + private final Object lock = new Object(); + + private int counter = 0; + private Thread owner; + + public void lock() throws InterruptedException { + synchronized (lock) { + Thread current = Thread.currentThread(); + + while (isLocked() && current != owner) { + lock.wait(); + } + + counter++; + owner = current; + } + } + + public boolean tryLock() { + synchronized (lock) { + Thread current = Thread.currentThread(); + + if (isLocked() && current != owner) { + return false; + } + + counter++; + owner = current; + return true; + } + } + + public void unlock() { + synchronized (lock) { + if (Thread.currentThread() != owner) { + throw new IllegalMonitorStateException("Not owner"); + } + + counter--; + + if (counter == 0) { + owner = null; + lock.notify(); + } + } + } + + private boolean isLocked() { + return counter > 0; + } +} diff --git a/src/main/java/ru/lionarius/sync/MySemaphore.java b/src/main/java/ru/lionarius/sync/MySemaphore.java new file mode 100644 index 0000000..1eac0ed --- /dev/null +++ b/src/main/java/ru/lionarius/sync/MySemaphore.java @@ -0,0 +1,45 @@ +package ru.lionarius.sync; + +public class MySemaphore { + + private final Object lock = new Object(); + private int permits; + + public MySemaphore(int permits) { + this.permits = permits; + } + + public void acquire() throws InterruptedException { + synchronized (lock) { + while (permits <= 0) { + lock.wait(); + } + + permits--; + } + } + + public boolean tryAcquire() { + synchronized (lock) { + if (permits <= 0) { + return false; + } + + permits--; + return true; + } + } + + public void release() { + synchronized (lock) { + permits++; + lock.notify(); + } + } + + public int availablePermits() { + synchronized (lock) { + return permits; + } + } +} diff --git a/src/test/java/MyCountDownLatchTest.java b/src/test/java/MyCountDownLatchTest.java new file mode 100644 index 0000000..6694999 --- /dev/null +++ b/src/test/java/MyCountDownLatchTest.java @@ -0,0 +1,112 @@ +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import ru.lionarius.sync.MyCountDownLatch; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.jupiter.api.Assertions.*; + +class MyCountDownLatchTest { + @Test + void testInitialization() { + assertThrows(IllegalArgumentException.class, () -> new MyCountDownLatch(-1)); + assertDoesNotThrow(() -> new MyCountDownLatch(0)); + var latch = new MyCountDownLatch(5); + assertEquals(5, latch.getCount()); + } + + @Test + void testCountDown() { + var latch = new MyCountDownLatch(3); + assertEquals(3, latch.getCount()); + latch.countDown(); + assertEquals(2, latch.getCount()); + latch.countDown(); + assertEquals(1, latch.getCount()); + latch.countDown(); + assertEquals(0, latch.getCount()); + latch.countDown(); + assertEquals(0, latch.getCount()); + } + + @Test + void testAwait() throws InterruptedException { + var latch = new MyCountDownLatch(2); + var threadFinished = new AtomicBoolean(false); + + var thread = new Thread(() -> { + try { + latch.await(); + threadFinished.set(true); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + thread.start(); + Thread.sleep(100); + assertFalse(threadFinished.get()); + + latch.countDown(); + Thread.sleep(100); + assertFalse(threadFinished.get()); + + latch.countDown(); + thread.join(1000); + assertTrue(threadFinished.get()); + } + + @Test + void testMultipleThreads() throws InterruptedException { + final int THREAD_COUNT = 5; + var startLatch = new MyCountDownLatch(1); + var endLatch = new MyCountDownLatch(THREAD_COUNT); + AtomicBoolean[] threadsStarted = new AtomicBoolean[THREAD_COUNT]; + AtomicBoolean[] threadsFinished = new AtomicBoolean[THREAD_COUNT]; + + for (int i = 0; i < THREAD_COUNT; i++) { + threadsStarted[i] = new AtomicBoolean(false); + threadsFinished[i] = new AtomicBoolean(false); + final int index = i; + new Thread(() -> { + try { + startLatch.await(); + threadsStarted[index].set(true); + Thread.sleep(100); + threadsFinished[index].set(true); + endLatch.countDown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }).start(); + } + + Thread.sleep(100); + for (var started : threadsStarted) { + assertFalse(started.get()); + } + + startLatch.countDown(); + Thread.sleep(50); + + for (var started : threadsStarted) { + assertTrue(started.get()); + } + + endLatch.await(); + for (var finished : threadsFinished) { + assertTrue(finished.get()); + } + assertEquals(0, endLatch.getCount()); + } + + @Test + @Timeout(value = 1000, unit = TimeUnit.MILLISECONDS) + void testZeroCount() throws InterruptedException { + var latch = new MyCountDownLatch(0); + assertEquals(0, latch.getCount()); + latch.await(); + assertTrue(true); + } +} diff --git a/src/test/java/MyReentrantLockTest.java b/src/test/java/MyReentrantLockTest.java new file mode 100644 index 0000000..d0356d8 --- /dev/null +++ b/src/test/java/MyReentrantLockTest.java @@ -0,0 +1,89 @@ +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import ru.lionarius.sync.MyReentrantLock; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.jupiter.api.Assertions.*; + +class MyReentrantLockTest { + private MyReentrantLock lock; + + @BeforeEach + void setUp() { + lock = new MyReentrantLock(); + } + + @Test + @Timeout(value = 1000, unit = TimeUnit.MILLISECONDS) + void testLockAndUnlock() throws InterruptedException { + lock.lock(); + assertTrue(lock.tryLock()); + lock.unlock(); + lock.unlock(); + } + + @Test + @Timeout(value = 1000, unit = TimeUnit.MILLISECONDS) + void testTryLock() throws InterruptedException { + assertTrue(lock.tryLock()); + + var otherThreadLocked = new AtomicBoolean(false); + var otherThread = new Thread(() -> { + otherThreadLocked.set(lock.tryLock()); + }); + otherThread.start(); + otherThread.join(); + + assertFalse(otherThreadLocked.get()); // Другой поток не должен получить блокировку + lock.unlock(); + } + + @Test + @Timeout(value = 1000, unit = TimeUnit.MILLISECONDS) + void testUnlockByDifferentThread() throws InterruptedException { + lock.lock(); + + var otherThread = new Thread(() -> { + assertThrows(IllegalMonitorStateException.class, lock::unlock); + }); + otherThread.start(); + otherThread.join(); + + lock.unlock(); + } + + @Test + @Timeout(value = 1000, unit = TimeUnit.MILLISECONDS) + void testMultipleThreads() throws InterruptedException { + int threadCount = 10; + Thread[] threads = new Thread[threadCount]; + AtomicBoolean[] threadsCompleted = new AtomicBoolean[threadCount]; + + for (int i = 0; i < threadCount; i++) { + final int index = i; + threadsCompleted[i] = new AtomicBoolean(false); + threads[i] = new Thread(() -> { + try { + lock.lock(); + Thread.sleep(10); + lock.unlock(); + threadsCompleted[index].set(true); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + threads[i].start(); + } + + for (var thread : threads) { + thread.join(); + } + + for (var completed : threadsCompleted) { + assertTrue(completed.get(), "Все потоки должны завершить свою работу"); + } + } +} diff --git a/src/test/java/MySemaphoreTest.java b/src/test/java/MySemaphoreTest.java new file mode 100644 index 0000000..0eb4efe --- /dev/null +++ b/src/test/java/MySemaphoreTest.java @@ -0,0 +1,112 @@ +import org.junit.jupiter.api.Test; +import ru.lionarius.sync.MySemaphore; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; + +class MySemaphoreTest { + @Test + void testInitialization() { + assertDoesNotThrow(() -> new MySemaphore(-1)); + assertDoesNotThrow(() -> new MySemaphore(0)); + var semaphore = new MySemaphore(5); + assertEquals(5, semaphore.availablePermits()); + } + + @Test + void testAcquireAndRelease() throws InterruptedException { + var semaphore = new MySemaphore(2); + assertEquals(2, semaphore.availablePermits()); + semaphore.acquire(); + assertEquals(1, semaphore.availablePermits()); + semaphore.acquire(); + assertEquals(0, semaphore.availablePermits()); + assertFalse(semaphore.tryAcquire()); + semaphore.release(); + assertEquals(1, semaphore.availablePermits()); + assertTrue(semaphore.tryAcquire()); + assertEquals(0, semaphore.availablePermits()); + } + + @Test + void testTryAcquire() { + var semaphore = new MySemaphore(2); + assertTrue(semaphore.tryAcquire()); + assertEquals(1, semaphore.availablePermits()); + assertTrue(semaphore.tryAcquire()); + assertEquals(0, semaphore.availablePermits()); + assertFalse(semaphore.tryAcquire()); + assertEquals(0, semaphore.availablePermits()); + } + + @Test + void testMultipleThreads() throws InterruptedException { + final int THREAD_COUNT = 10; + final int PERMITS = 3; + var semaphore = new MySemaphore(PERMITS); + var startLatch = new CountDownLatch(1); + var finishLatch = new CountDownLatch(THREAD_COUNT); + var concurrentThreads = new AtomicInteger(0); + var maxConcurrentThreads = new AtomicInteger(0); + + for (int i = 0; i < THREAD_COUNT; i++) { + new Thread(() -> { + try { + startLatch.await(); + semaphore.acquire(); + int current = concurrentThreads.incrementAndGet(); + maxConcurrentThreads.updateAndGet(max -> Math.max(max, current)); + Thread.sleep(100); // Имитация работы + concurrentThreads.decrementAndGet(); + semaphore.release(); + finishLatch.countDown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }).start(); + } + + startLatch.countDown(); + assertTrue(finishLatch.await(5, TimeUnit.SECONDS)); + assertEquals(PERMITS, maxConcurrentThreads.get()); + assertEquals(PERMITS, semaphore.availablePermits()); + } + + @Test + void testBlockingAcquire() throws InterruptedException { + var semaphore = new MySemaphore(1); + semaphore.acquire(); + assertEquals(0, semaphore.availablePermits()); + + var thread = new Thread(() -> { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + thread.start(); + Thread.sleep(100); // Даем время потоку начать ожидание + assertTrue(thread.isAlive()); + assertEquals(0, semaphore.availablePermits()); + + semaphore.release(); + thread.join(1000); + assertFalse(thread.isAlive()); + assertEquals(0, semaphore.availablePermits()); + } + + @Test + void testAvailablePermits() { + var semaphore = new MySemaphore(5); + assertEquals(5, semaphore.availablePermits()); + semaphore.tryAcquire(); + assertEquals(4, semaphore.availablePermits()); + semaphore.release(); + assertEquals(5, semaphore.availablePermits()); + } +}