Java 8 Concurrency Thread and Executor

The Concurrency API was first introduced with the release of Java 5 and then progressively enhanced with every new Java release. In this post will  focus on Java 8 and make heavy use of lambda expressions and other new features. If you are not yet familiar with lambdas expression I would recommend first reading my Java 8 Part 1 and Java 8 Part 2 before proceeding this.

Threads and Runnables: 

All modern operating systems support concurrency both via processes and threads. A process is the instance of a program that is being executed by one or many threads. A thread of execution is the smallest sequence of programmed instructions that can be managed independently by a scheduler.

Java support Thread since JDK 1.0 and We can create a thread by extending the Thread class and Implement the runnable interface.

Thread create by extending Thread class and run.

class MyThread extends Thread{
    @Override
    public void run() {
     System.out.println("Thread is running..");
    }
}

Run the thread in below code without lambda

MyThread thread=new MyThread();
thread.start();

We can more simpler the above code by using java 8. In below code there is no need to create separate thread class.

Thread t=new Thread(new Thread(){
        @Override
      public void run() {
      System.out.println("Thread java 8");
      }
  });
 t.start();
More simpler the code.

Thread t=new Thread(()->System.out.println("Thread java 8"));
t.start();

Thread create by implements Runnable interface and run.

class  MyRunnableThread implements  Runnable{

    @Override
    public void run() {
        System.out.println("Runnable thread is running..");
    }
}

Run the thread in below code without lambda

MyRunnableThread runnableTh=new MyRunnableThread();
Thread t=new Thread(runnableTh);
t.start();
System.out.println("Runnable thread Done");
Since Runnable is a functional interface we can utilize Java 8 lambda expressions to print the current threads console. and the result of the code might be different on every execution.

Runnable task= ()-> {
      System.out.println("Runnable java 8");
  };

Thread t=new Thread(task);
t.start();
 System.out.println("Runnable thread Done");
   
More simpler: 

Thread t=new Thread(new Runnable() {
    @Override
    public void run() {
       System.out.println("Runnable java 8");
        }
    });

 t1.start();
  System.out.println("Runnable thread Done");

Due to concurrent execution we cannot predict if the runnable will be invoked before or after printing ‘done’. The order is non-deterministic, thus making concurrent programming a complex task in larger applications.

Threads can be put to sleep for a certain duration. This is quite handy to simulate long running tasks in the subsequent code samples of this article:

Runnable runnable = () -> {
    try {
        String name = Thread.currentThread().getName();
        System.out.println("Thread Name "+name);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Thread Name "+name);
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
};

Thread threadS = new Thread(runnable);
threadS.start();
When you run the above code you’ll notice the one second delay between the first and the second print statement. TimeUnit is a useful enum for working with units of time. Alternatively you can achieve the same by calling Thread.sleep(1000)

Working with the Thread class can be very tedious and error-prone. Due to that reason the Concurrency API has been introduced. The API is located in package   java.util.concurrent and contains many useful classes for handling concurrent programming. Since that time the Concurrency API has been enhanced with every new Java release and even Java 8 provides new classes and methods for dealing with concurrency.

Executors: 

The Concurrency API introduces the concept of an ExecutorService as a higher level replacement for working with threads directly. Executors are capable of running asynchronous tasks and typically manage a pool of threads, so we don’t have to create new threads manually. All threads of the internal pool will be reused under the hood for revenant tasks, so we can run as many concurrent tasks as we want throughout the life-cycle of our application with a single executor service.

Below is the first thread example using Executors

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorServiceThread1 {
    public static void main(String[] args) {
        ExecutorService executorService= Executors.newSingleThreadExecutor();
        executorService.submit(()->{
            String threadName=Thread.currentThread().getName();
            System.out.println("Thread name "+threadName);
        });
    }
}    
Thread name pool-1-thread-1

The class Executors provides convenient factory methods for creating different kinds of executor services. In above code we use an executor with a thread pool of size one.

When thread start with executor then java never stop the thread for which Executors has to stop explicitly other keep listed for new task.

An ExecutorService provides two methods for that purpose: shutdown() waits for currently running tasks to finish while shutdownNow() interrupts all running tasks and shut the executor down immediately.


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceThread1 {
    public static void main(String[] args) {
        ExecutorService executorService= Executors.newSingleThreadExecutor();
        executorService.submit(()->{
            String threadName=Thread.currentThread().getName();
            System.out.println("Thread name "+threadName);
        });

        try {
            System.out.println("attempt to shutdown executor");
            executorService.shutdown();
            executorService.awaitTermination(5, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            System.err.println("tasks interrupted");
        }
        finally {
            if (!executorService.isTerminated()) {
                System.err.println("cancel non-finished tasks");
            }
            executorService.shutdownNow();
            System.out.println("shutdown finished");
        }

    }
}
The executor shuts down softly by waiting a certain amount of time for termination of currently running tasks. After a maximum of five seconds the executor finally shuts down explicitly by interrupting all running tasks.

Callables and Futures:  In addition to Runnable executors support another kind of task named Callable. Callables are functional interfaces just like runnables but instead of being void they return a value.
In below code we have defined the callable return completed status.
Callable<String> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(1);
        return "COMPLETED";
    } catch (InterruptedException e) {
        return "FAIL";
    }
};
Callables can be submitted to executor services. Since submit() doesn’t wait until the task completes, the executor service cannot return the result of the callable directly. Instead the executor returns a special result of type Future which can be used to retrieve the actual result at a later point in time.

ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<String> future = executorService.submit(task);
System.out.println("Is task done " + future.isDone());
String status;
try {
    status = future.get();
} catch (InterruptedException e) {
    throw new RuntimeException(e);
} catch (ExecutionException e) {
    throw new RuntimeException(e);
}

System.out.println("Is task done " + future.isDone());
System.out.println(status);

executorService.shutdownNow();
executorService.awaitTermination(5, TimeUnit.SECONDS);

After submitting the callable to the executor we first check if the future has already been finished execution via isDone()

Calling the method get() blocks the current thread and waits until the callable completes before returning the actual result status. After completed the  future we will see the below result on the console screen. 

Is task done false Is task done true COMPLETED
Timeouts:   Any call to future.get() will block and wait until the underlying callable has been terminated. In the worst case a callable runs forever - thus making your application unresponsive. You can simply counteract those scenarios by passing a timeout:
In below code we have sleep the task 10 second and we are expecting the result in 5 second in this case it will through the exception and start next execution.

Callable<String> task = () -> {
    try {
        TimeUnit.SECONDS.sleep(10);
        return "COMPLETED";
    } catch (InterruptedException e) {
        return "FAIL";
    }
};
future.get(5, TimeUnit.SECONDS);

Executing the above code results in a TimeoutException:
    Exception in thread "main" java.util.concurrent.TimeoutException
	at java.util.concurrent.FutureTask.get(FutureTask.java:205)

InvokeAll:  Executors support batch submitting of multiple callables at once via invokeAll(). This method accepts a collection of callables and returns a list of futures.


ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        () -> "runtask1",
        () -> "runtask2",
        () -> "runtask3",
        () -> "runtaskn");

executor.invokeAll(callables)
       .stream()
       .map(future -> {
            try {
                return future.get();
            }
            catch (Exception e) {
                throw new IllegalStateException(e);
            }
        })
  .forEach(System.out::println);

runtask1
runtask2
runtask3
runtaskn

InvokeAny: Another way of batch-submitting callables is the method invokeAny() which works slightly different to invokeAll(). Instead of returning future objects this method blocks until the first callable terminates and returns the result of that callable.

In order to test this behavior we use this helper method to simulate callables with different durations. The method returns a callable that sleeps for a certain amount of time until returning the given result:


Callable<String> callable(String result, long sleepSeconds) {
    return () -> {
        TimeUnit.SECONDS.sleep(sleepSeconds);
        return result;
    };
}

We use this method to create a bunch of callables with different durations from one to three seconds. Submitting those callables to an executor via invokeAny() returns the string result of the fastest callable - in that case task2:

ExecutorService executor = Executors.newWorkStealingPool();

List<Callable<String>> callables = Arrays.asList(
        callable("task1", 5),
        callable("task2", 2),
        callable("task3", 3));

String result = executor.invokeAny(callables);
System.out.println(result);

The above example uses yet another type of executor created via newWorkStealingPool(). This factory method is part of Java 8 and returns an executor of type ForkJoinPool which works slightly different than normal executors.

Scheduled Executors:  We’ve already learned how to submit and run tasks once on an executor. If we want to run task  periodically run common tasks multiple times, we can utilize scheduled thread pools.

ScheduledExecutorService is capable of scheduling tasks to run either periodically or once after a certain amount of time has elapsed.

In below code we are running the task after given amount of time.

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println(" Scheduling: " + System.nanoTime());
ScheduledFuture<?> future = executor.schedule(task, 2, TimeUnit.SECONDS);

TimeUnit.MILLISECONDS.sleep(500);

long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS);
System.out.printf("Remaining Delay: %sms", remainingDelay);

Scheduling a task produces a specialized future of type ScheduledFuture which - in addition to Future - provides the method getDelay() to retrieve the remaining delay. After this delay has elapsed the task will be executed concurrently.

In order to schedule tasks to be executed periodically, executors provide the two methods scheduleAtFixedRate() and scheduleWithFixedDelay(). The first method is capable of executing tasks with a fixed time rate, e.g. once every second as demonstrated in this example:

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime());

int initialDelay = 0;
int period = 1;
executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);


Scheduling: 296809628219600
Scheduling: 296810629717500
Scheduling: 296811629000700
Scheduling: 296812639617100
....
..
Additionally this method accepts an initial delay which describes the leading wait time before the task will be executed for the first time.
Please keep in mind that scheduleAtFixedRate() doesn’t take into account the actual duration of the task. So if you specify a period of one second but the task needs 2 seconds to be executed then the thread pool will working to capacity very soon.
In that case you should consider using scheduleWithFixedDelay() instead. This method works just like the counterpart described above. The difference is that the wait time period applies between the end of a task and the start of the next task. For example:


ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

Runnable task = () -> {
    try {
        TimeUnit.SECONDS.sleep(2);
        System.out.println("Scheduling: " + System.nanoTime());
    }
    catch (InterruptedException e) {
        System.err.println("task interrupted");
    }
};

executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);
This example schedules a task with a fixed delay of one second between the end of an execution and the start of the next execution. The initial delay is zero and the tasks duration is two seconds. So we end up with an execution interval of 0s, 3s, 6s, 9s and so on. As you can see scheduleWithFixedDelay() is handy if you cannot predict the duration of the scheduled tasks.


No comments:

Post a Comment