Wednesday, June 22, 2011

ArrayBlockingQueue

An ArrayBlockingQueue is a queue returned by an array that have a limitation. It is a "bounded buffer" in which elements are helds in a constant size array. Once the capacity of this queue is defined it can not be grew up, if you will try to put element into the full queue will result in a blocking wait and similarly, obtain the element from vacate queue will block. In this queue the elements are ordered in FIFO (First-In-First-Out). In this queue the element that has been the highest time on the queue is the head element, and the element that has been the minimum time on the queue is the tail element of the queue. Insertion of an element in this queue is happened at tail and the element is retrieved from the head position.
All of the optional methods of the Collection and Iterator interfaces are implemented by this class and its iterator.

Syntax

public class ArrayBlockingQueue<E>
Parameter description
E : It is the element's type that's held in this collection.
Constructor of ArrayBlockingQueue class are :
  • ArrayBlockingQueue(int capacity) : With the constant capability and default way of accessing this constructor makes an ArrayBlockingQueue.
  • ArrayBlockingQueue(int capacity, boolean fair) : With the constant capability and the specified way of accessing this constructor makes an ArrayBlockingQueue.
  • ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) : With the constant capability, the specified way of accessing and the elements of the given collection this constructor makes an ArrayBlockingQueue, that are appended in collection's iterator traversing order.
Example :
Here we give a simple example which will illustrate you to how can you use the methods of ArrayBlockingQueue class.


public class ArrayBlockingQueueDemo {
  public static void main(String args[]) {
    ArrayBlockingQueue abq = new ArrayBlockingQueue(10);
    abq.add(1);
    abq.add(2);
    abq.add(3);
    abq.add(4);
    abq.add(5);
    System.out.println("Elements of queue1= " + abq);
    ArrayBlockingQueue abq1 = new ArrayBlockingQueue(10);
    abq1.offer("A");
    abq1.offer("B");
    abq1.offer("C");
    abq1.offer("D");
    abq1.offer("E");
    abq1.offer("F");
    System.out.println("Elements of queue2 = " + abq1);
    int i = abq.drainTo(abq1, 4);
    System.out.println("Now elements of queue2 = " + abq1);
    System.out.println("Rest element of queue1 = " + abq);
    Iterator it = abq1.iterator();
    System.out.println("Elements of queue2 using iterator = ");
    while (it.hasNext()) {
      System.out.println(it.next());
    }
    Object obj = abq1.peek();
    System.out.println("The head element of queue2 = " + obj);
    Object obj1 = abq1.poll();
    System.out.println("Elements of queue2 = " + abq1);
    System.out.println("The removed head element = " + obj1);
    int i1 = abq1.size();
   System.out.println("Size of queue2 = " + i1);
    int i2 = abq.size();
    System.out.println("Size of queue1 = " + i2);
  }
}

Output:
Elements of queue1= [1, 2, 3, 4, 5]

Elements of queue2 = [A, B, C, D, E, F]

Now elements of queue2 = [A, B, C, D, E, F, 1, 2, 3, 4]

Rest element of queue1 = [5]

Elements of queue2 using iterator =

A

B

C

D

E

F

1

2

3

4

The head element of queue2 = A

Elements of queue2 = [B, C, D, E, F, 1, 2, 3, 4]

The removed head element = A

Size of queue2 = 9

Size of queue1 = 1



Example 2 - Producer Consumer Problem

Lets look at the example:
This example has three components,
1. Producer Thread – This thread starts adding the data in to the Queue
2. Consumer Thread – This thread gets the data from the Queue whenever any data is added by the Producer.
3. Blocking Queue – This acts as an intermediate between the Producer and the Consumer thread. It gets the data or object from the producer thread and hands over to the consumer thread. 
ExecutorQueue.java
Let’s create an ExecutorQueue class which has the ArrayBlockingQueue object. Any data or object can be added and retrieved from the Array blocking Queue instance.
public class ExecutorQueue
{
public static BlockingQueue     queue   = new ArrayBlockingQueue(100);

/**
         * Method to add Data in to the Queue
         */
public static void addDataInQueue(Object obj)
{
queue.add(obj);
}

/**
         * Get the Data from the Queue
         */
public static Object getDataFromQueue() throws InterruptedException
{
return queue.take();
}
}



ConsumerThread.java
The below consumer Thread will wait in the Array blocking queue and retrieves the data as and when any object is added in the queue.

Note: In the Data Processing section, you can write your custom processing logic as per your requirements
public class ConsumerThread extends Thread
{
public void run()
{
System.out.println("\nConsumerThread started...");
boolean loop = true;
while (loop)
{
try
{
System.out.println("\nConsumerThread: Waiting to fetch data from Queue...");
String data = (String) ExecutorQueue.getDataFromQueue();
System.out.println("ConsumerThread: Got the data from Queue; Object = " + data);
System.out.println("ConsumerThread: Processing the data (" + data +")");

/*
                                 * Data Processing section:
                                 *
                                 * Note: Write your processing logic here based on the data retrieved
                                 */
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}

Main method – ProducerApplication.java
The Producer is the triggering point to test Array blocking queue. It does the following operations,
1. It takes care of starting the Consumer Thread so that it will wait for any data in the Queue.
2. Soon after starting the Consumer thread, it will start adding data in to the blocking queue. Once the data is added, the Consumer thread will start retrieving the data using the queue.take() method.


public class ProducerApplication
{
public static void main(String[] args)
{

// Start the Processor thread so that it will wait for an Object in the Blocking Queue. As soon as an Object is
// added in the Queue, it will take and process the data
System.out.println("ProducerApplication: Starting the Processor Thread...\n");
ConsumerThread thread = new ConsumerThread();
thread.start();

ExecutorQueue.addDataInQueue("Object-1");
ExecutorQueue.addDataInQueue("Object-2");
}
}

Output

ProducerApplication: Starting the Processor Thread...


ConsumerThread started...

ConsumerThread: Waiting to fetch data from Queue...
ConsumerThread: Got the data from Queue; Object = Object-1
ConsumerThread: Processing the data (Object-1)

ConsumerThread: Waiting to fetch data from Queue...
ConsumerThread: Got the data from Queue; Object = Object-2
ConsumerThread: Processing the data (Object-2)

ConsumerThread: Waiting to fetch data from Queue...

Download the source

Source code can be downloaded from here.



No comments:

Post a Comment

Chitika