Producer Consumer using Java BlockingQueue

BlockingQueue

A BlockingQueue supports operations that :-
1. wait for some element to be available in the queue when retrieving an element.
2. wait for space to become available in the queue when storing an element.

We will see implementation of a simple Producer-Consumer using BlockingQueue. We will be using a fixed capacity ArrayBlockingQueue of size 10.
If we want to have a growing queue without any bounds(upper bound being Integer.MAX_VALUE), then we can use LinkedBlockingQueue

Producer

Our Producer will accept a BlockingQueue in the constructor. The Producer will use the put(e) call on BlockingQueue to write Integer value to the queue. put(e) call on queue will insert the element into the queue, and will wait for space to become available if the queue is full.

class Producer implements Runnable {
    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        int counter = 1;
        while (true) {
            try {
                queue.put(counter); //Inserts the specified element into this queue, 
                               //waiting if necessary for space to become available.
                System.out.println("Produced - " + counter++);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Consumer

Our Consumer will accept a BlockingQueue in the constructor. The Consumer will use the take() call on the BlockingQueue to get Integer value from the queue. take() call on queue will retrieve and remove the head element of the queue, and will wait for an element to become available if the queue is empty.

class Consumer implements Runnable {
    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(500); // Adding a delay while consuming data
                int counter = queue.take(); // Retrieves and removes the head of this queue,
                                            // waiting if necessary until an element becomes available.
                System.out.println("Consumed - " + counter);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Client to Run the Producer Consumer

Now, we will see a client to run our Producer and Consumer :-

public class ProducerConsumer {
    public static void main(String[] args) {
        // Define an ArrayBlockingQueue of fixed capacity 10
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);

        Producer producer = new Producer(queue); // Pass the queue to the Producer
        Consumer consumer = new Consumer(queue); // Pass the quque to the Consumer

        new Thread(producer).start(); // Start the Producer Thread
        new Thread(consumer).start(); // Start the Consumer Thread
    }
}

Output

The ProducerConsumer will produce the following output :-

Produced - 1
Produced - 2
Produced - 3
Produced - 4
Produced - 5
Produced - 6
Produced - 7
Produced - 8
Produced - 9
Produced - 10
Consumed - 1
Produced - 11
Consumed - 2
Produced - 12
Consumed - 3
Produced - 13
Consumed - 4
Produced - 14
Consumed - 5
Produced - 15

The Producer will keep on adding elements to the BlockingQueue till the queue becomes full(i.e 10 elements are inserted). While inserting the 11th element the Producer will wait for some space to become available in the queue. Our Consumer will wake up after 500 millisecs and will consume one element(1) from the queue. There will be space for one element in the queue and the producer will produce the 11th element and will again wait for some space to be available in the queue to produce the 12th element. So here the Speed of Producer is limited by the Consumer once the BlockingQueue becomes full.

Leave a Reply

Your email address will not be published. Required fields are marked *