The Java Concurrent Package which is introduced in JDK 1.5 reside in java.util.concurrent package.
Basically this package is for concurrent(multi threaded) applications. Below are its contents.
- BlockingQueue.
- BlockingDequeue.
- ConcurrentMap.
- NavigableMap.
- Executors.
- FutureTask
- CountDownLatch.
- CyclicBarrier.
- Atomic variable.
- Semaphore
- ReentrantLock.
- CopyOnWriteArrayList
- CopyOnWriteSet
The item in << >> notation depicts they are interfaces.
1. <<BlockingQueue>> :
Behaviour: If the queue is full it will block to put more elements to it and if the queue is empty it will block to take elements from it.
1.1 ArrayBlockingQueue:
Behaviour: Bounded, internally uses array as the structure. Can be used for the producer consumer problem.
BlockingQueue queue = new ArrayBlockingQueue(10); queue.put("1"); Object object = queue.take();
1.2 LinkedBlockingQueue:
Behaviour: Optionally bounded. Internally uses a Linked List for the queue.
BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>(); BlockingQueue<String> bounded = new LinkedBlockingQueue<String>(10); bounded.put("Value"); String value = bounded.take();
1.3 PriorityBlockingQueue:
Behaviour: Unbounded. It orders the queue based on priority. And since it orders its elements so you have to implement the Comparable and set the priority order. Since it orders no Null is allowed.
BlockingQueue queue = new PriorityBlockingQueue(); //String implements java.lang.Comparable queue.put("Value");
1.4 DelayQueue:
Behaviour: It blocks the element for certain delay time.
DelayQueue queue = new DelayQueue(); Delayed element1 = new MyDelayed(); queue.put(element1); Delayed element2 = queue.take(); //somewhere implement the below: Class MyDelayed implements Delayed{ public long getDelay(TimeUnit timeUnit){ return timeUnit; } }
1.5 SynchronousQueue:
Behaviour: It contains only one element. Saying it as “queue” is sometime an overstatement.
2. <<BlockingDeque>>:
Behaviour: A double ended BlockingQueue where insertion and take out of elements happen from both ends.
2.1 LinkedBlockingDeque:
3. <<ConncurrentMap>>:
Before learning How ConcurrentHashMap works in Java , we need to look at why ConcurrentHashMap is added to the Java SDK, because we already had Hashtable and synchronized Map.
Hashtable provides concurrent access to the Map.Entries objects by locking the entire map to perform any sort of operation (update,delete,read,create). Suppose we have a web application which uses this, the overhead created by Hashtable (locking the entire map) can be ignored under normal load. But under heavy load , the overhead of locking the entire map may prove fatal and may lead to delay response time and overtaxing of the server.
This is where ConcurrentHashMap comes to rescue. A ConcurrentHashMap is divided into number of 16 segments. In the ConcurrentHashMap Api , one can find the following constants:
static final int DEFAULT_INITIAL_CAPACITY = 16; static final int DEFAULT_CONCURRENCY_LEVEL = 16;
Thus, instead of a map wide lock, ConcurrentHashMap maintains a list of 16 locks by default. In a segment a single thread can update the data (by locking it )and multiple threads can read the data simultaneously (without any locking).
3.1 ConcurrentHashMap:
Behaviour:
- We can do concurrent modification of the structure.
- The entire ConcurrentHashMap object has 16 segments which can be accessed by 16 threads simultaneously.
- Unlike HashMap, it is thread-safe.
- Unlike HashMap, it will not accept null value, since the underline data structure is a Hashtable.
4. <<NavigableMap>>:
4.1 ConcurrentNavigableMap:
5. <<Executors>>:
5.1 <<ExecutorService>>
Using ExecutorService you can create a Thread Pool and from there you can assign each thread some task asynchronously.
Below is a code snippet showing usage of ExecutorService and Executors interfaces.
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; class MyRunnable implements Runnable{ public void run() { System.out.println(Thread.currentThread().getName()+" Start ---"); try { Thread.sleep(5000); //dummy work. } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" End ***"); } } public class Manager{ public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 5; i++) { Runnable r = new MyRunnable(); executorService.execute(r); } executorService.shutdown(); while (!executorService.isTerminated()) { //This is a dummy block to hold the control so that last line of this method is not reached. } System.out.println("Finished all threads"); } }
6. FutureTask:
FutureTask can be used where we want to store any returned value from the call() method of Callable interface.
Below is a example code:
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; class MyCallable implements Callable{ public String call() { System.out.println(Thread.currentThread().getName()+" Start ---"); try { Thread.sleep(5000); //dummy work. } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" End ***"); return "10";//returning some value. } } public class Manager{ public static void main(String[] args) throws ExecutionException, InterruptedException{ ExecutorService executor = Executors.newFixedThreadPool(3); FutureTask[] ft=new FutureTask[5]; for (int i = 0; i < 5; i++) { MyCallable r = new MyCallable(); ft[i]=new FutureTask(r); executor.execute(ft[i]); } executor.shutdown(); while (!executor.isTerminated()) { } System.out.println("Finished all threads " ); System.out.println("Printing the returned results from all threads:"); for(int i=0;i<ft.length;i++) { System.out.println(ft[i].get()); } } }
7. CountDownLatch:
CountDownLatch in Java is a type of synchronizer which allows a thread to wait for one or more threads before it starts processing.
CountDownLatch works in latch principle, the main thread will wait until latch is closed.
Any thread, usually the main thread , which calls CountDownLatch.await() will wait until count reaches zero or its interrupted by another Thread.
Once a thread has completed a task it has to do a count down by calling CountDownLatch.countDown().
Then the next awaiting thread starts running.
The disadvantage of CountDownLatch is that it’s not reusable: once the count become zero it is no longer usable. To tackle this java concurrent api have Cyclic Barrier.
A full example below:
class MyRunnable implements Runnable{ private String name; private int delay; private CountDownLatch latch; public MyRunnable(int delay, CountDownLatch latch, String name){ this.name=name; this.delay = delay; this.latch = latch; } @Override public void run(){ try{ Thread.sleep(delay); //dummy work System.out.println(Thread.currentThread().getName() + "---- finished"); } catch (InterruptedException e){ e.printStackTrace(); } latch.countDown(); } } public class Manager { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(4); // Let us create four threads and start them. Thread t1 = new Thread(new MyRunnable(5000, latch, "THREAD-1")); Thread t2 = new Thread(new MyRunnable(5000, latch, "THREAD-2")); Thread t3 = new Thread(new MyRunnable(5000, latch, "THREAD-3")); Thread t4 = new Thread(new MyRunnable(5000, latch, "THREAD-4")); t1.start(); t2.start(); t3.start(); t4.start(); System.out.println("Started all threads...."); // The main task waits for four threads latch.await(); // Main thread has started System.out.println(Thread.currentThread().getName() + "**** has finished"); } }
8. CyclicBarrier:
CyclicBarrier is also a synchronizer which allows every thread to wait for other threads before processing further. In other words, it is a barrier that all threads must wait at, until all threads reach it, before any of the threads can continue. Here is a diagram illustrating that:
The threads wait for each other by calling the await() method on the CyclicBarrier.
class MyRunnable implements Runnable { private String name; private int duration; private CyclicBarrier barrier; public MyRunnable(int duration, CyclicBarrier barrier, String name) { this.name = name; this.duration = duration; this.barrier = barrier; } @Override public void run(){ try { System.out.println(Thread.currentThread().getName() + " doing something"); Thread.sleep(duration);//some dummy work barrier.await(); System.out.println(Thread.currentThread().getName() + " doing something 2"); Thread.sleep(duration);//some dummy work barrier.await(); System.out.println(Thread.currentThread().getName() + " doing something 3"); Thread.sleep(duration);//some dummy work barrier.await(); System.out.println(Thread.currentThread().getName() + " is FINALLY COMPLETED !!!"); } catch (BrokenBarrierException e) { e.printStackTrace(); }catch (InterruptedException e) { e.printStackTrace(); } } } public class Manager { public static void main(String[] args) throws InterruptedException { CyclicBarrier barrier = new CyclicBarrier(4); Thread t1=new Thread(new MyRunnable(5000, barrier, "THREAD-1")); Thread t2=new Thread(new MyRunnable(5000, barrier, "THREAD-2")); Thread t3=new Thread(new MyRunnable(5000, barrier, "THREAD-3")); Thread t4=new Thread(new MyRunnable(5000, barrier, "THREAD-4")); t1.start(); t2.start(); t3.start(); t4.start(); System.out.println(Thread.currentThread().getName() + " has finished"); } }
9. Atomic Variables:
Atomic variables are the new form of variables which can be used for any atomic operation. Suppose we have an integer i and that integer is being incremented after some operation. Suppose during the operation another thread accesses it and updates the value, then it would lead to data mismatch. To avoid this Java 8 incorporates the atomic variables.
Lets see an example. In below example we had not used atomic variables :
private int count; public void run() { for (int i = 1; i < 5; i++) { processSomething(i); count++; } }
The above example is re-written using atomic variable:
import java.util.concurrent.atomic.AtomicInteger; //some codes private AtomicInteger count = new AtomicInteger(); public void run() { for (int i = 1; i < 5; i++) { processSomething(i); count.incrementAndGet(); } }
10. Semaphore:
11. <<Lock>>:
11.1 ReentrantLock:
As the name suggest in ReentrantLock a thread can enter the lock area multiple times, unlikely as in synchronized block. With ReentrantLock you can manage your synchronization more efficiently and effectively in compare to using the synchronized block.
Below are the drawbacks of synchronized block:
- there is no ordering of thread.
- there is no time bound for a thread, so there is a possibility of starvation.
- The entire synchronized block has to be in a single method, unlikely in ReentrantLock where you can have lock() and unlock() in separate methods.
Below code is written in normal synchronized way:
public void a(){ synchronized(this){ //code to be synchronized } }
The above code is re-written using Reentrant lock way:
Lock lock=new ReentrantLock(); public void a(){ lock.lock(); //code to be synchronized. lock.unlock(); } //Better keep the code to be synchronized in a try-catch block and the unlock in a finally block.
To avoid the starvation you can use tryLock():
lock.tryLock(10, TimeUnit.SECONDS); //code to be synchronized lock.unlock();
12. CopyOnWriteArrayList:
As the name indicates, CopyOnWriteArrayList creates a Cloned copy of underlying ArrayList, for every modification (add, set, remove, etc) operation. at a certain point both will be synchronized automatically, which is taken care of by JVM. Therefore, there is no effect for threads that are performing read operation.
It is costly to use because for every update operation a cloned copy will be created. Hence, CopyOnWriteArrayList is the best choice if our frequent operation is read operation.
The main important point about CopyOnWriteArrayList is the Iterator of CopyOnWriteArrayList can not perform remove operation otherwise we get Run-time exception saying UnsupportedOperationException. add() and set() methods on CopyOnWriteArrayList iterator also throws UnsupportedOperationException. Also Iterator of CopyOnWriteArrayList will never throw ConcurrentModificationException.
13. CopyOnWriteArraySet:
It is same as of the above, it is for the Set.