Producer Consumer Design Pattern using BlockingQueue
Producer Consumer Problem is a classical concurrency problem. In fact it is one of the concurrency design pattern.
This article is continuation of my post Producer Consumer Design Pattern in which I have explained basic idea, real life example, uses and benefits of Producer Consumer Design Pattern.
Producer Consumer Design Patter can be implemented using following two approaches.
In this post, I will explain implementation using Blocking Queue. Before reading this article, I recommend you to go throgh my article Producer Consumer Design Pattern using wait() and notify(). It will help you to understand why BlockingQueue is more preferable while implementing Producer Consumer Design Pattern.
In first approach we use wait() and notify() method to communicate between Producer and Consumer thread and blocking each of them on individual condition like full queue and empty queue.
With introduction of BlockingQueue Data Structure in Java 5, its now much simpler to implement Producer Consumer Design Pattern as BlockingQueue provides this control implicitly by introducing blocking methods put() and take().
put(E e)
: This method is used to insert elements to the queue. If the queue is full, it waits for the space to be available.E take()
: This method retrieves and remove the element from the head of the queue. If queue is empty it waits for the element to be available.
Now you don’t require to use wait and notify to communicate between Producer and Consumer.
Java provides several BlockingQueue implementations such as ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue etc.
While implementing producer consumer problem, we will use LinkedBlockingQueue implementation.
Understanding Java Program
In this program, we have two threads named PRODUCER and CONSUMER, which are instance of Producer and Consumer class respectively. Both of these class implements Runnable interface.
The logic of what producer and the consumer should do is written in their respective run() method.
Main thread starts both PRODUCER and CONSUMER threads and also create an object of LinkedBlockingQueue class i.e. sharedQ to share as Queue between them.
/** * Simple Java Program to test Producer Consumer Design Pattern * using BlockingQueue * */ public class ProducerConsumerBQTest { public static void main(String[] args) throws InterruptedException { BlockingQueue sharedQ = new LinkedBlockingQueue(); Thread consumerThread = new Thread(new ConsumerBQ(sharedQ), "CONSUMER"); Thread producerThread = new Thread(new ProducerBQ(sharedQ), "PRODUCER"); producerThread.start(); consumerThread.start(); } }
Producer runs in an infinite loop and keeps inserting random integer value from 1 to 100 into sharedQ until the queue is full.
Unlike wait() and notify() implentation, here we don’t need to explicity check the size of the sharedQ.
Also we don’t need to write any synchronized block as all BlockingQueue implementations are thread-safe, all queuing methods are atomic in nature and use internal locks or other forms of concurrency control.
/** * Producer Thread will keep producing values for Consumer. * * It will use put() method of BlockingQueue to add values * in sharedQ * */ class ProducerBQ implements Runnable { private final BlockingQueue sharedQ; public ProducerBQ(BlockingQueue sharedQ) { this.sharedQ = sharedQ; } @Override public void run(){ while(true) { try { Random random = new Random(); int number = random.nextInt(100); System.out.println("Producing value " + number); sharedQ.put(number); } catch(InterruptedException ie) { System.err.println("Error :: " + ie); } } } }
Similarly Consumer runs in an infinite loop and keeps consuming integers from sharedQ until the queue is empty.
/** * Consumer Thread will consumer values form shared queue. * * It uses take method of BlockingQueue * * */ class ConsumerBQ implements Runnable { private final BlockingQueue sharedQ; public ConsumerBQ(BlockingQueue sharedQ) { this.sharedQ = sharedQ; } @Override public void run(){ while(true) { try { System.out.println("Consumed value " + sharedQ.take()); } catch(InterruptedException ie) { System.err.println("Error :: " + ie); } } } }
That's all for this topic. If you guys have any suggestions or queries, feel free to drop a comment. We would be happy to add that in our post. You can also contribute your articles by creating contributor account here.
Happy Learning 🙂
If you like the content on CodePumpkin and if you wish to do something for the community and the planet Earth, you can donate to our campaign for planting more trees at CodePumpkin Cauvery Calling Campaign.
We may not get time to plant a tree, but we can definitely donate ₹42 per Tree.
About the Author
Tags: BlockingQueue, Concurrency, Design Patterns, Java, LinkedBlockingQueue, Multithreading, Producer Consumer Design Pattern, Synchronization, Thread
Comments and Queries
If you want someone to read your code, please put the code inside <pre><code> and </code></pre> tags. For example:<pre><code class="java"> String foo = "bar"; </code></pre>For more information on supported HTML tags in disqus comment, click here.