Java 8 Concurrency Synchronization and Locks

Synchronized: In the previous we’ve learned how to execute code in parallel via executor services. When writing such multi-threaded code you have to pay particular attention when accessing shared mutable variables concurrently from multiple threads. Let's assume we want to increment the count  an integer which is accessible simultaneously from multiple threads.
We define a field  count with a method increment() to increase count by one:
int count = 0;
void increment() {
    count = count + 1;
}

When calling this method concurrently from multiple threads we’re in serious trouble and data inconstancy issue:

ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 1000)
    .forEach(i -> executor.submit(this::increment));

executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);


System.out.println(count);  // 995
Instead of seeing a constant result count of 1000 the actual result varies with every execution. The reason is that we share a mutable variable upon different threads without synchronizing the access to this variable which results.

In above code three steps need to perform

1. Read the current value.

2. Increment the current value by one.

3. Write the new value to the variable.

If two thread perform these steps in parallel it is possible both thread reading  the same value in the step 1 in this case the write step lost the increment value.

To fixed the we can used the Synchronization when rare condition occur.

synchronized void increment() {
    count = count + 1;
}

When using increment() concurrently we get the desired result count of 1000. No race conditions occur any longer and the result is stable with every execution of the code:


ExecutorService executor = Executors.newFixedThreadPool(2);

IntStream.range(0, 10000)
    .forEach(i -> executor.submit(this::increment));

executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);

System.out.println(count);  // 10000
The synchronized keyword is also available as a block level statement.

void incrementSync() {
    synchronized (this) {
        count = count + 1;
    }
}


Locks

Instead of using implicit locking via the synchronized keyword the Concurrency API supports various explicit locks specified by the Lock interface. Locks support various methods for finer grained lock control thus are more expressive than implicit monitors.
Multiple lock implementations are available in the standard JDK which will be demonstrated in the following sections.
ReentrantLock
The class ReentrantLock is a mutual exclusion lock with the same basic behavior as the implicit monitors accessed via the synchronized keyword but with extended capabilities. As the name suggests this lock implements reentrant characteristics just as implicit monitors.
ReentrantLock lock = new ReentrantLock();
int count = 0;

void increment() {
    lock.lock();
    try {
        count++;
    } finally {
        lock.unlock();
    }
}

ReentrantLock lock has two method lock() and unlock(). The lock() method lock and unlock() method release the lock.  It’s important to wrap your code into a try/finally block to ensure unlocking in case of exceptions. This method is thread-safe just like the synchronized counterpart. If another thread has already acquired the lock subsequent calls to lock() pause the current thread until the lock has been unlocked. Only one thread can hold the lock at any given time.


  ExecutorService executor = Executors.newFixedThreadPool(3);
  ReentrantLock lock = new ReentrantLock();
  executor.submit(() -> {
  lock.lock();
    try {
          TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
      }
   finally {
          lock.unlock();
      }
    });

executor.submit(() -> {
System.out.println("Is Locked: " + lock.isLocked());
System.out.println("Locked by : " + lock.isHeldByCurrentThread());
boolean locked = lock.tryLock();
System.out.println("Lock acquired: " + locked);
  });

  try {
      executor.awaitTermination(5, TimeUnit.SECONDS);
      executor.shutdown();
  } catch (InterruptedException e) {
      e.printStackTrace();
    }
 


Is Locked: true
Locked by : false
Lock acquired: false

The method tryLock() as an alternative to lock() tries to acquire the lock without pausing the current thread. The boolean result must be used to check if the lock has actually been acquired before accessing any shared mutable variables.
ReadWriteLock

The interface ReadWriteLock specifies another type of lock maintaining a pair of locks for read and write access. The idea behind read-write locks is that it’s usually safe to read mutable variables concurrently as long as nobody is writing to this variable. So the read-lock can be held simultaneously by multiple threads as long as no threads hold the write-lock. This can improve performance and throughput in case that reads are more frequent than writes.

ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();

executor.submit(() -> {
    lock.writeLock().lock();
    try {
        TimeUnit.SECONDS.sleep(1);
        map.put("key", "ram");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.writeLock().unlock();
    }
});
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);


The above example first acquires a write-lock in order to put a new value to the map after sleeping for one second. Before this task has finished two other tasks are being submitted trying to read the entry from the map and sleep for one second:

ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
ReadWriteLock lock = new ReentrantReadWriteLock();

Runnable readTask = () -> {
    lock.readLock().lock();
    try {
        System.out.println(map.get("key"));
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.readLock().unlock();
    }
};

executor.submit(readTask);
executor.submit(readTask);

executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);


When you execute this code sample you’ll notice that both read tasks have to wait the whole second until the write task has finished. After the write lock has been released both read tasks are executed in parallel and print the result simultaneously to the console. They don’t have to wait for each other to finish because read-locks can safely be acquired concurrently as long as no write-lock is held by another thread.

StampedLock

Java 8 ships with a new kind of lock called StampedLock which also support read and write locks. AStampedLock return a stamp represented by a long value. You can use these stamps to either release a lock or to check if the lock is still valid. Additionally stamped locks support another lock mode called optimistic locking.

ExecutorService executor = Executors.newFixedThreadPool(2);
Map<String, String> map = new HashMap<>();
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.writeLock();
    try {
        TimeUnit.SECONDS.sleep(1);
        map.put("key", "ram");
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlockWrite(stamp);
    }
});

Runnable readTask = () -> {
    long stamp = lock.readLock();
    try {
        System.out.println(map.get("key"));
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlockRead(stamp);
    }
};

executor.submit(readTask);
executor.submit(readTask);

executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);

Obtaining a read or write lock via readLock() or writeLock() returns a stamp which is later used for unlocking within the finally block. Keep in mind that stamped locks don’t implement reentrant characteristics. Each call to lock returns a new stamp and blocks if no lock is available even if the same thread already holds a lock. So you have to pay particular attention not to run into deadlocks.

ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.tryOptimisticRead();
    try {
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock(stamp);
    }
});

executor.submit(() -> {
    long stamp = lock.writeLock();
    try {
        System.out.println("Write Lock acquired");
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock(stamp);
        System.out.println("Write done");
    }
});

executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);

An optimistic read lock is acquired by calling tryOptimisticRead() which always returns a stamp without blocking the current thread, no matter if the lock is actually available. If there’s already a write lock active the returned stamp equals zero. You can always check if a stamp is valid by calling lock.validate(stamp).

Optimistic Lock Valid: true
Write Lock acquired
Optimistic Lock Valid: false
Write done
Optimistic Lock Valid: false

The optimistic lock is valid right after acquiring the lock. In contrast to normal read locks an optimistic lock doesn’t prevent other threads to obtain a write lock instantaneously. After sending the first thread to sleep for one second the second thread obtains a write lock without waiting for the optimistic read lock to be released. From this point the optimistic read lock is no longer valid. Even when the write lock is released the optimistic read locks stays invalid.

So when working with optimistic locks you have to validate the lock every time after accessing any shared mutable variable to make sure the read was still valid.

If you want to convert a read lock into a write lock without unlocking and locking again. StampedLock provides the method tryConvertToWriteLock() for that purpose as seen in the next sample:

ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();

executor.submit(() -> {
    long stamp = lock.readLock();
    try {
        stamp = lock.tryConvertToWriteLock(stamp);
        if (stamp == 0L) {
            System.out.println("Could not convert to write lock");
            stamp = lock.writeLock();
        }{
            System.out.println("Read lock converted into write");
        }
    } finally {
        lock.unlock(stamp);
    }
});

executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);

Calling tryConvertToWriteLock() doesn’t block but may return a zero stamp indicating that no write lock is currently available. In that case we call writeLock() to block the current thread until a write lock is available.
Semaphores
In addition to locks the Concurrency API also supports counting semaphores. Whereas locks usually grant exclusive access to variables or resources, a semaphore is capable of maintaining whole sets of permits. This is useful in different scenarios where you have to limit the amount concurrent access to certain parts of your application.

ExecutorService executor = Executors.newFixedThreadPool(10);
Semaphore semaphore = new Semaphore(5);
Runnable longRunningTask = () -> {
    boolean permit = false;
    try {
        permit = semaphore.tryAcquire(1, TimeUnit.SECONDS);
        if (permit) {
            System.out.println("Semaphore acquired");
            TimeUnit.SECONDS.sleep(1);
        } else {
            System.out.println("Could not acquire semaphore");
        }
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    } finally {
        if (permit) {
            semaphore.release();
        }
    }
    };
IntStream.range(0, 10).forEach(i -> executor.submit(longRunningTask));
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);

The executor can potentially run 10 tasks concurrently but we use a semaphore of size 5, thus limiting concurrent access to 5. It’s important to use a try/finally block to properly release the semaphore even in case of exceptions.

Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Could not acquire semaphore
Could not acquire semaphore

The semaphores permits access to the actual long running operation simulated by sleep up to a maximum of 5. Every subsequent call to tryAcquire() elapses the maximum wait timeout of one second, resulting in the appropriate console output that no semaphore could be acquired.


No comments:

Post a Comment