Code Pumpkin

Producer Consumer Design Pattern using BlockingQueue

April 5, 2017
Posted by Abhi Andhariya

Producer Consumer Problem is a classical concurrency problem. In fact it is one of the concurrency design pattern.  

This article is continuation of my post Producer Consumer Design Pattern in which I have explained basic idea, real life example, uses and benefits of Producer Consumer Design Pattern.  

Producer Consumer Design Patter can be implemented using following two approaces.

  1. By using wait(), notify() and notifyAll() methods
  2. By using BlockingQueue 

In this post, I will explain implementation using Blocking Queue. Before reading this article, I recommend you to go throgh my article Producer Consumer Design Pattern using wait() and notify().  It will help you to understand why BlockingQueue is more preferable while implementing Producer Consumer Design Pattern.


In first approach we use wait() and notify() method to communicate between Producer and Consumer thread and blocking each of them on individual condition like full queue and empty queue. 

With introduction of BlockingQueue Data Structure in Java 5, its now much simpler to implement Producer Consumer Design Pattern as BlockingQueue provides this control implicitly by introducing blocking methods put() and take().
 

  • put(E e): This method is used to insert elements to the queue. If the queue is full, it waits for the space to be available.
  • E take(): This method retrieves and remove the element from the head of the queue. If queue is empty it waits for the element to be available.


Now you don't require to use wait and notify to communicate between Producer and Consumer.

Java provides several BlockingQueue implementations such as ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue etc.

While implementing producer consumer problem, we will use LinkedBlockingQueue  implementation. 

Understanding Java Program

In this program, we have two threads named PRODUCER and CONSUMER, which are  instance of Producer and Consumer class respectively.  Both of these class implements Runnable interface.

The logic of what producer and the consumer should do is written in their respective run() method.

Main thread starts both PRODUCER and CONSUMER threads and also create an object of LinkedBlockingQueue class i.e. sharedQ to share as Queue between them.


/**
 * Simple Java Program to test Producer Consumer Design Pattern 
 * using  BlockingQueue
 *
 */
public class ProducerConsumerBQTest {

	public static void main(String[] args) throws InterruptedException {
		
		BlockingQueue sharedQ = new LinkedBlockingQueue();
		
		
		Thread consumerThread = new Thread(new ConsumerBQ(sharedQ), "CONSUMER");
		Thread producerThread = new Thread(new ProducerBQ(sharedQ), "PRODUCER");
		
		producerThread.start();
		consumerThread.start();
		
	}

}


    Producer runs in an infinite loop and keeps inserting random integer value from 1 to 100 into sharedQ until the queue is full.

    Unlike wait() and notify() implentation, here we don't need to explicity check the size of the sharedQ.

    Also we don't need to write any synchronized block as all BlockingQueue implementations are thread-safe,  all queuing methods are atomic in nature and use internal locks or other forms of concurrency control.

    
    /**
     * Producer Thread will keep producing values for Consumer.
     * 
     * It will use put() method of BlockingQueue to add values
     * in sharedQ
     *
     */
    class ProducerBQ implements Runnable
    {
    	private final BlockingQueue  sharedQ;
    	
    	public ProducerBQ(BlockingQueue  sharedQ)
    	{
    		this.sharedQ = sharedQ;
    	}
    	
    	@Override
    	public void run(){
    		
    		while(true)
    		{
    			try
    			{
    				Random random = new Random(); 
    				int number = random.nextInt(100);
    				System.out.println("Producing value " + number);
    				sharedQ.put(number);
    			}
    			catch(InterruptedException ie)
    			{
    				System.err.println("Error :: " + ie);
    			}
    		}
    	}
    }


    Similarly Consumer runs in an infinite loop and keeps consuming integers from sharedQ until the queue is empty. 

    
    /**
     * Consumer Thread will consumer values form shared queue. 
     * 
     * It uses take method of BlockingQueue 
     * 
     *
     */
    class ConsumerBQ implements Runnable
    {
    	private final BlockingQueue  sharedQ;
    	
    	public ConsumerBQ(BlockingQueue  sharedQ)
    	{
    		this.sharedQ = sharedQ;
    	}
    	
    	@Override
    	public void run(){
    		
    		while(true)
    		{
    			try
    			{
    				
    				System.out.println("Consumed value " + sharedQ.take());
    				
    			}
    			catch(InterruptedException ie)
    			{
    				System.err.println("Error :: " + ie);
    			}
    		}
    	}
    }

    Download Complete Java Program »

    That's all for this topic. If you guys have any suggestions or queries, feel free to drop a comment. We would be happy to add that in our post. You can also contribute your articles by creating contributor account here.

    Happy Learning 🙂

    If you like the content on CodePumpkin and if you wish to do something for the community and the planet Earth, you can donate to our campaign for planting more trees at CodePumpkin Cauvery Calling Campaign.

    We may not get time to plant a tree, but we can definitely donate ₹42 per Tree.



    About the Author


    Surviving Java Developer, Passionate Blogger, Table Tennis Lover, Bookworm, Occasional illustrator and a big fan of Joey Tribbiani, The Walking Dead and Game of Thrones...!!



    Tags: , , , , , , , ,


    Comments and Queries

    If you want someone to read your code, please put the code inside <pre><code> and </code></pre> tags. For example:
    <pre><code class="java"> 
    String foo = "bar";
    </code></pre>
    
    For more information on supported HTML tags in disqus comment, click here.
    Total Posts : 124
    follow us in feedly

    Like Us On Facebook