int count = 0;void increment() {count = count + 1;}
ExecutorService executor = Executors.newFixedThreadPool(2);IntStream.range(0, 1000).forEach(i -> executor.submit(this::increment));executor.shutdown();executor.awaitTermination(5, TimeUnit.SECONDS);System.out.println(count); // 995
In above code three steps need to perform
1. Read the current value.
2. Increment the current value by one.
3. Write the new value to the variable.
If two thread perform these steps in parallel it is possible both thread reading the same value in the step 1 in this case the write step lost the increment value.
To fixed the we can used the Synchronization when rare condition occur.
synchronized void increment() {count = count + 1;}
increment()
concurrently we get the desired result count of 1000. No race conditions occur any longer and the result is stable with every execution of the code:ExecutorService executor = Executors.newFixedThreadPool(2);IntStream.range(0, 10000).forEach(i -> executor.submit(this::increment));executor.shutdown();executor.awaitTermination(5, TimeUnit.SECONDS);System.out.println(count); // 10000
synchronized
keyword is also available as a block level statement.void incrementSync() {synchronized (this) {count = count + 1;}}
synchronized
keyword the Concurrency API supports various explicit locks specified by the Lock
interface. Locks support various methods for finer grained lock control thus are more expressive than implicit monitors.Multiple lock implementations are available in the standard JDK which will be demonstrated in the following sections.
ReentrantLock
is a mutual exclusion lock with the same basic behavior as the implicit monitors accessed via the synchronized
keyword but with extended capabilities. As the name suggests this lock implements reentrant characteristics just as implicit monitors.ReentrantLock lock = new ReentrantLock();int count = 0;void increment() {lock.lock();try {count++;} finally {lock.unlock();}}
ExecutorService executor = Executors.newFixedThreadPool(3);ReentrantLock lock = new ReentrantLock();executor.submit(() -> {lock.lock();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}finally {lock.unlock();}});executor.submit(() -> {System.out.println("Is Locked: " + lock.isLocked());System.out.println("Locked by : " + lock.isHeldByCurrentThread());boolean locked = lock.tryLock();System.out.println("Lock acquired: " + locked);});try {executor.awaitTermination(5, TimeUnit.SECONDS);executor.shutdown();} catch (InterruptedException e) {e.printStackTrace();}
Is Locked: trueLocked by : falseLock acquired: false
tryLock()
as an alternative to lock()
tries to acquire the lock without pausing the current thread. The boolean result must be used to check if the lock has actually been acquired before accessing any shared mutable variables.The interface ReadWriteLock
specifies another type of lock maintaining a pair of locks for read and write access. The idea behind read-write locks is that it’s usually safe to read mutable variables concurrently as long as nobody is writing to this variable. So the read-lock can be held simultaneously by multiple threads as long as no threads hold the write-lock. This can improve performance and throughput in case that reads are more frequent than writes.
ExecutorService executor = Executors.newFixedThreadPool(2);Map<String, String> map = new HashMap<>();ReadWriteLock lock = new ReentrantReadWriteLock();executor.submit(() -> {lock.writeLock().lock();try {TimeUnit.SECONDS.sleep(1);map.put("key", "ram");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.writeLock().unlock();}});executor.shutdown();executor.awaitTermination(5, TimeUnit.SECONDS);
ExecutorService executor = Executors.newFixedThreadPool(2);Map<String, String> map = new HashMap<>();ReadWriteLock lock = new ReentrantReadWriteLock();Runnable readTask = () -> {lock.readLock().lock();try {System.out.println(map.get("key"));TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.readLock().unlock();}};executor.submit(readTask);executor.submit(readTask);executor.shutdown();executor.awaitTermination(5, TimeUnit.SECONDS);
StampedLock
Java 8 ships with a new kind of lock called StampedLock
which also support read and write locks. AStampedLock
return a stamp represented by a long
value. You can use these stamps to either release a lock or to check if the lock is still valid. Additionally stamped locks support another lock mode called optimistic locking.
ExecutorService executor = Executors.newFixedThreadPool(2);Map<String, String> map = new HashMap<>();StampedLock lock = new StampedLock();executor.submit(() -> {long stamp = lock.writeLock();try {TimeUnit.SECONDS.sleep(1);map.put("key", "ram");} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlockWrite(stamp);}});Runnable readTask = () -> {long stamp = lock.readLock();try {System.out.println(map.get("key"));TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlockRead(stamp);}};executor.submit(readTask);executor.submit(readTask);executor.shutdown();executor.awaitTermination(5, TimeUnit.SECONDS);
readLock()
or writeLock()
returns a stamp which is later used for unlocking within the finally block. Keep in mind that stamped locks don’t implement reentrant characteristics. Each call to lock returns a new stamp and blocks if no lock is available even if the same thread already holds a lock. So you have to pay particular attention not to run into deadlocks.ExecutorService executor = Executors.newFixedThreadPool(2);StampedLock lock = new StampedLock();executor.submit(() -> {long stamp = lock.tryOptimisticRead();try {System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));TimeUnit.SECONDS.sleep(2);System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));TimeUnit.SECONDS.sleep(2);System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock(stamp);}});executor.submit(() -> {long stamp = lock.writeLock();try {System.out.println("Write Lock acquired");TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock(stamp);System.out.println("Write done");}});executor.shutdown();executor.awaitTermination(5, TimeUnit.SECONDS);
tryOptimisticRead()
which always returns a stamp without blocking the current thread, no matter if the lock is actually available. If there’s already a write lock active the returned stamp equals zero. You can always check if a stamp is valid by calling lock.validate(stamp)
.Optimistic Lock Valid: trueWrite Lock acquiredOptimistic Lock Valid: falseWrite doneOptimistic Lock Valid: false
So when working with optimistic locks you have to validate the lock every time after accessing any shared mutable variable to make sure the read was still valid.
If you want to convert a read lock into a write lock without unlocking and locking again. StampedLock
provides the method tryConvertToWriteLock()
for that purpose as seen in the next sample:
ExecutorService executor = Executors.newFixedThreadPool(2);StampedLock lock = new StampedLock();executor.submit(() -> {long stamp = lock.readLock();try {stamp = lock.tryConvertToWriteLock(stamp);if (stamp == 0L) {System.out.println("Could not convert to write lock");stamp = lock.writeLock();}{System.out.println("Read lock converted into write");}} finally {lock.unlock(stamp);}});executor.shutdown();executor.awaitTermination(5, TimeUnit.SECONDS);
tryConvertToWriteLock()
doesn’t block but may return a zero stamp indicating that no write lock is currently available. In that case we call writeLock()
to block the current thread until a write lock is available.ExecutorService executor = Executors.newFixedThreadPool(10);Semaphore semaphore = new Semaphore(5);Runnable longRunningTask = () -> {boolean permit = false;try {permit = semaphore.tryAcquire(1, TimeUnit.SECONDS);if (permit) {System.out.println("Semaphore acquired");TimeUnit.SECONDS.sleep(1);} else {System.out.println("Could not acquire semaphore");}} catch (InterruptedException e) {throw new IllegalStateException(e);} finally {if (permit) {semaphore.release();}}};IntStream.range(0, 10).forEach(i -> executor.submit(longRunningTask));executor.shutdown();executor.awaitTermination(5, TimeUnit.SECONDS);
try/finally
block to properly release the semaphore even in case of exceptions.Semaphore acquiredSemaphore acquiredSemaphore acquiredSemaphore acquiredSemaphore acquiredSemaphore acquiredSemaphore acquiredSemaphore acquiredCould not acquire semaphoreCould not acquire semaphore
sleep
up to a maximum of 5. Every subsequent call to tryAcquire()
elapses the maximum wait timeout of one second, resulting in the appropriate console output that no semaphore could be acquired.
No comments:
Post a Comment