Showing posts with label blocking queue. Show all posts
Showing posts with label blocking queue. Show all posts

Thursday, June 23, 2011

Blocking queues in java

The key facilities that BlockingQueue provides to such systems are, as its name implies, enqueuing and dequeueing methods that do not return until they have executed successfully. So, for example, a print server does not need to constantly poll the queue to discover whether any print jobs are waiting; it need only call the poll method, supplying a timeout, and the system will suspend it until either a queue element becomes available or the timeout expires. BlockingQueue defines seven new methods, in three groups:


Group1 : Adding an Element

boolean offer(E e, long timeout, TimeUnit unit)
                // insert e, waiting up to the timeout
void put(E e)   // add e, waiting as long as necessary

The nonblocking overload of offer defined in Queue will return false if it cannot immediately insert the element. This new overload waits for a time specified using java.util.concurrent.TimeUnit, an Enum which allows timeouts to be defined in units such as milliseconds or seconds.
Taking these methods together with those inherited from Queue, there are four ways in which the methods for adding elements to a BlockingQueue can behave: offer returns false if it does not succeed immediately, blocking offer returns false if it does not succeed within its timeout, add throws an exception if it does not succeed immediately, and put blocks until it succeeds.

Group2  : Removing an Element
E poll(long timeout, TimeUnit unit)
                // retrieve and remove the head, waiting up to the timeout
E take()        // retrieve and remove the head of this queue, waiting
                // as long as necessary

Again taking these methods together with those inherited from Queue, there are four ways in which the methods for removing elements from a BlockingQueue can behave: poll returns null if it does not succeed immediately, blocking poll returns null if it does not succeed within its timeout, remove throws an exception if it does not succeed immediately, and take blocks until it succeeds.

Group 3 : Retrieving or Querying the Contents of the Queue
int drainTo(Collection<? super E> c)
                // clear the queue into c
int drainTo(Collection<? super E> c, int maxElements)
                // clear at most the specified number of elements into c
int remainingCapacity()
                // return the number of elements that would be accepted
                // without blocking, or Integer.MAX_VALUE if unbounded


The drainTo  methods perform atomically and efficiently, so the second overload is useful in situations in which you know that you have processing capability available immediately for a certain number of elements, and the first is useful for example when all producer threads have stopped working. Their return value is the number of elements transferred. RemainingCapacity reports the spare capacity of the queue, although as with any such value in multi-threaded contexts, the result of a call should not be used as part of a test-then-act sequence; between the test (the call of remainingCapacity) and the action (adding an element to the queue) of one thread, another thread might have intervened to add or remove elements.
BlockingQueue guarantees that the queue operations of its implementations will be thread-safe and atomic.
But this guarantee doesn't extend to the bulk operations inherited from CollectionaddAll, containsAll, retainAll and removeAllunless the individual implementation provides it. So it is possible, for example, for addAll to fail, throwing an exception, after adding only some of the elements in a collection.

Blocking queue has the following characteristics:
  • methods to add an item to the queue, waiting for space to become available in the queue if necessary;
  • corresponding methods that take an item from the queue, waiting for an item to put in the queue if it is empty;
  • optional time limits and interruptibility on the latter calls;
  • efficient thread-safety: blocking queues are specifically designed to have their put() method called from one thread and the take() method from another— in particular, items posted to the queue will be published correctly to any other thread taking the item from the queue again; significantly, the implementations generally achieve this without locking the entire queue, making them highly concurrent components;
  • integration with Java thread pools: a flavour of blocking queue can be passed into the constructor of ThreadPoolExecutor to customise the behaviour of the thread pool.
Implementations of blocking queue
ArrayBlockingQueue : A simple bounded BloickingQueue implementation backed by an array.

DelayQueue : An unbounded blocking queue of Delayed elements, in which an element can only be taken when its delay has expired.It uses elements that implement the new java.util.concurrent.Delayed interface.

PriorityBlockingQueue : This queue bases ordering on a specified Comparator, and the element returned by any take( ) call is the smallest element based on this ordering.

LinkedBlockingQueue : A simple bounded BloickingQueue implementation backed by a linked list.

SynchronousQueue : This queue has a size of zero (yes, you read that correctly). It blocks put( ) calls until another thread calls take( ), and blocks take( ) calls until another thread calls put( ). Essentially, elements can only go directly from a producer to a consumer, and nothing is stored in the queue itself (other than for transition purposes).


Example - Producer consumer problem with Blocking queue
The queue takes care of all the details of synchronizing access to its contents and notifying other threads of the availability of data.

Producer.java
public class Producer extends Thread {
    private BlockingQueue cubbyhole;
    private int number;
                          
    public Producer(BlockingQueue c, int num) {
        cubbyhole = c;
        number = num;
    }

    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                cubbyhole.put(i);
                System.out.format("Producer #%d put: %d%n", number, i);
                sleep((int)(Math.random() * 100));
            } catch (InterruptedException e) { }
        }
    }
}

Consumer.java

import java.util.concurrent.*;
public class Consumer extends Thread {
    private BlockingQueue<Integer> cubbyhole;
    private int number;
        
    public Consumer(BlockingQueue<Integer> c, int num) {
        cubbyhole = c;
        number = num;
    }

    public void run() {
        int value = 0;
        for (int i = 0; i < 10; i++) {
            try {
                value = cubbyhole.take();
                System.out.format("Consumer #%d got: %d%n", number, value);  
            } catch (InterruptedException e) { }        
        }
    }
}


ProducerConsumerTest.java

public class ProducerConsumerTest {
    public static void main(String[] args) {

        ArrayBlockingQueue c = new ArrayBlockingQueue(1);
        Producer p1 = new Producer(c, 1);
        Consumer c1 = new Consumer(c, 1);

        p1.start();
        c1.start();
    }
}

Possible Use cases for BlockingQueue

These features make BlockingQueues useful for cases such as the following:
  • a server, where incoming connections are placed on a queue, and a pool of threads picks them up as those threads become free;
  • in a variety of parallel processes, where we want to manage or limit resource usage at different stages of the process.

Wednesday, June 22, 2011

ArrayBlockingQueue

An ArrayBlockingQueue is a queue returned by an array that have a limitation. It is a "bounded buffer" in which elements are helds in a constant size array. Once the capacity of this queue is defined it can not be grew up, if you will try to put element into the full queue will result in a blocking wait and similarly, obtain the element from vacate queue will block. In this queue the elements are ordered in FIFO (First-In-First-Out). In this queue the element that has been the highest time on the queue is the head element, and the element that has been the minimum time on the queue is the tail element of the queue. Insertion of an element in this queue is happened at tail and the element is retrieved from the head position.
All of the optional methods of the Collection and Iterator interfaces are implemented by this class and its iterator.

Syntax

public class ArrayBlockingQueue<E>
Parameter description
E : It is the element's type that's held in this collection.
Constructor of ArrayBlockingQueue class are :
  • ArrayBlockingQueue(int capacity) : With the constant capability and default way of accessing this constructor makes an ArrayBlockingQueue.
  • ArrayBlockingQueue(int capacity, boolean fair) : With the constant capability and the specified way of accessing this constructor makes an ArrayBlockingQueue.
  • ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) : With the constant capability, the specified way of accessing and the elements of the given collection this constructor makes an ArrayBlockingQueue, that are appended in collection's iterator traversing order.
Example :
Here we give a simple example which will illustrate you to how can you use the methods of ArrayBlockingQueue class.


public class ArrayBlockingQueueDemo {
  public static void main(String args[]) {
    ArrayBlockingQueue abq = new ArrayBlockingQueue(10);
    abq.add(1);
    abq.add(2);
    abq.add(3);
    abq.add(4);
    abq.add(5);
    System.out.println("Elements of queue1= " + abq);
    ArrayBlockingQueue abq1 = new ArrayBlockingQueue(10);
    abq1.offer("A");
    abq1.offer("B");
    abq1.offer("C");
    abq1.offer("D");
    abq1.offer("E");
    abq1.offer("F");
    System.out.println("Elements of queue2 = " + abq1);
    int i = abq.drainTo(abq1, 4);
    System.out.println("Now elements of queue2 = " + abq1);
    System.out.println("Rest element of queue1 = " + abq);
    Iterator it = abq1.iterator();
    System.out.println("Elements of queue2 using iterator = ");
    while (it.hasNext()) {
      System.out.println(it.next());
    }
    Object obj = abq1.peek();
    System.out.println("The head element of queue2 = " + obj);
    Object obj1 = abq1.poll();
    System.out.println("Elements of queue2 = " + abq1);
    System.out.println("The removed head element = " + obj1);
    int i1 = abq1.size();
   System.out.println("Size of queue2 = " + i1);
    int i2 = abq.size();
    System.out.println("Size of queue1 = " + i2);
  }
}

Output:
Elements of queue1= [1, 2, 3, 4, 5]

Elements of queue2 = [A, B, C, D, E, F]

Now elements of queue2 = [A, B, C, D, E, F, 1, 2, 3, 4]

Rest element of queue1 = [5]

Elements of queue2 using iterator =

A

B

C

D

E

F

1

2

3

4

The head element of queue2 = A

Elements of queue2 = [B, C, D, E, F, 1, 2, 3, 4]

The removed head element = A

Size of queue2 = 9

Size of queue1 = 1



Example 2 - Producer Consumer Problem

Lets look at the example:
This example has three components,
1. Producer Thread – This thread starts adding the data in to the Queue
2. Consumer Thread – This thread gets the data from the Queue whenever any data is added by the Producer.
3. Blocking Queue – This acts as an intermediate between the Producer and the Consumer thread. It gets the data or object from the producer thread and hands over to the consumer thread. 
ExecutorQueue.java
Let’s create an ExecutorQueue class which has the ArrayBlockingQueue object. Any data or object can be added and retrieved from the Array blocking Queue instance.
public class ExecutorQueue
{
public static BlockingQueue     queue   = new ArrayBlockingQueue(100);

/**
         * Method to add Data in to the Queue
         */
public static void addDataInQueue(Object obj)
{
queue.add(obj);
}

/**
         * Get the Data from the Queue
         */
public static Object getDataFromQueue() throws InterruptedException
{
return queue.take();
}
}



ConsumerThread.java
The below consumer Thread will wait in the Array blocking queue and retrieves the data as and when any object is added in the queue.

Note: In the Data Processing section, you can write your custom processing logic as per your requirements
public class ConsumerThread extends Thread
{
public void run()
{
System.out.println("\nConsumerThread started...");
boolean loop = true;
while (loop)
{
try
{
System.out.println("\nConsumerThread: Waiting to fetch data from Queue...");
String data = (String) ExecutorQueue.getDataFromQueue();
System.out.println("ConsumerThread: Got the data from Queue; Object = " + data);
System.out.println("ConsumerThread: Processing the data (" + data +")");

/*
                                 * Data Processing section:
                                 *
                                 * Note: Write your processing logic here based on the data retrieved
                                 */
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}

Main method – ProducerApplication.java
The Producer is the triggering point to test Array blocking queue. It does the following operations,
1. It takes care of starting the Consumer Thread so that it will wait for any data in the Queue.
2. Soon after starting the Consumer thread, it will start adding data in to the blocking queue. Once the data is added, the Consumer thread will start retrieving the data using the queue.take() method.


public class ProducerApplication
{
public static void main(String[] args)
{

// Start the Processor thread so that it will wait for an Object in the Blocking Queue. As soon as an Object is
// added in the Queue, it will take and process the data
System.out.println("ProducerApplication: Starting the Processor Thread...\n");
ConsumerThread thread = new ConsumerThread();
thread.start();

ExecutorQueue.addDataInQueue("Object-1");
ExecutorQueue.addDataInQueue("Object-2");
}
}

Output

ProducerApplication: Starting the Processor Thread...


ConsumerThread started...

ConsumerThread: Waiting to fetch data from Queue...
ConsumerThread: Got the data from Queue; Object = Object-1
ConsumerThread: Processing the data (Object-1)

ConsumerThread: Waiting to fetch data from Queue...
ConsumerThread: Got the data from Queue; Object = Object-2
ConsumerThread: Processing the data (Object-2)

ConsumerThread: Waiting to fetch data from Queue...

Download the source

Source code can be downloaded from here.



Friday, June 17, 2011

LinkedBlockingQueue


PriorityBlockingQueue

Sometimes we need to maintain an object in a priority based. The PriorityBlockingQueue is a queue that have unbounded capacity which provides a data structure to store object in their priority, priority of these objects may be on their natural order or depends on comparator provided at construction time just as the class PriorityQueue does. A PriorityBlockingQueue is unbounded though it blocks the retrieval operations and the addition operation may get failed because of resource exhaustion. An entry of 'null' element is not allowed by this class. The guarantee of traversing the elements of PriorityBlockingQueue in any specified order is not taken by the iterator() method of the Interface Iterator. If it is required to traveling the elements in an ordered manner then consider Arrays.sort(pq.toArray()). In this class there is not a specific criteria to work on the equal priority element if you required to distinguish the equal priorities element you may define your custom classes or comparators in which a key will break ties between the primary priority values.

Syntax

public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable
Constructor of PriorityBlockingQueue :
Here we are discussing about the constructor of PriorityBlockingQueue through which we can create objects according to the requirement. These constructor are :
  1. PriorityBlockingQueue() : This constructor creates a queue with default size. The default initial capacity of this queue is 11 and it orders the elements according to their natural order.
  2. PriorityBlockingQueue( Collection c) : This constructor creates a queue that contains the elements in a specified collection.
  3. PrirityBlockingQueue(int initialCapacity) : This constructor creates a queue with the specified size.
  4. PriorityBlockingQueue(int initialCapacity, Comparator cmp) : This constructor creates a queue with the initial specified size and elements of this queue are ordered according to the comparator.
This class also provides methods some of them which are commonly used are as follows :
  1.   add(E e) :
  2.              Syntax:               public boolean add(E e)
  3. iterator() :
  4.              Syntax                public Iterator<E> iterator()
  5. poll() :
  6.               Syntax                public E poll()
  7. remove(Object o) :
  8.                Syntax               public boolean remove(Object o)
  9. size() :
  10.                Syntax               public int size()
Example :

public class PriorityBlockingQueueDemo {
  public static void main(String args[]) {
    PriorityBlockingQueue<Integer> pbq1 = new PriorityBlockingQueue<Integer>();

    PriorityBlockingQueue<Integer> pbq2 = new PriorityBlockingQueue<Integer>();

    pbq1.add(11);

    pbq1.put(14);

    pbq1.offer(13);

    pbq1.offer(12);

    System.out.println("Elements of PBQ1 = " + pbq1);

    int i = pbq1.size();

    System.out.println("Size of PBQ1 = " + i);

    pbq2.offer(1);

    pbq2.put(2);

    pbq2.add(5);

    pbq2.add(3);

    pbq2.add(4);

    pbq2.add(6);

    pbq2.add(7);

    System.out.println("Elements of PBQ2 = " + pbq2);

    int i1 = pbq2.size();

    System.out.println("Size of PBQ2 = " + i1);

    pbq1.drainTo(pbq2);

    System.out.println("pbq1 = " + pbq1);

    System.out.println("pbq2 = " + pbq2);

    Iterator it = pbq2.iterator();

    System.out.println("Elements of PBQ2 using an iterator :");

    while (it.hasNext()) {

      System.out.println(it.next());

    }

  }

}

Output

Elements of PBQ1 = [11, 12, 13, 14]

Size of PBQ1 = 4

Elements of PBQ2 = [1, 2, 5, 3, 4, 6, 7]

Size of PBQ2 = 7

pbq1 = []

pbq2 = [1, 2, 5, 3, 4, 6, 7, 11, 12, 13, 14]

Elements of PBQ2 using an iterator :

1

2

5

3

4

6

7

11

12

13

14


DelayQueue


Monday, May 9, 2011

Synchronous queue

A SynchronousQueue is unlike the other implementations of BlockingQueue in that it isn't a queue of tasks to be carried out it's a queue of consumer threads waiting to process work. With a SynchronousQueue a producer attempts to put an item on the queue and blocks until there is a consumer ready to take the item from the queue. As in a regular queue consumers block on an empty queue.

SynchronousQueue is useful when the producer wants or needs to know when a consumer starts processing the task (or at least accepts it). The producer can be sure that the task has been taken by a consumer as soon as it unblocks from the call to put.

For best performance with a SynchronousQueue there should (nearly) always be a consumer waiting to take the task proffered by a producer. If there isn't generally a consumer waiting then the production speed is throttled by the arrival rate of consumers. This situation can easily arise when task production rate is intrinsically bursty. In such a situation the producer might be able to produce 100 tasks in a second and then none for an hour. If a SynchronousQueue is used the producer will have to wait after every task production. With one of the other BlockingQueue implementations however you could throw the tasks into a buffer.

If you need a task hand-off design where one thread needs to sync with another SynchronousQueue is the obvious choice. The code below exercises SynchronousQueue simply handing messages around.

package com.vaani.concurrency;

import java.util.Random;
import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueExample {

private final SynchronousQueue<WorkItem> queue = new SynchronousQueue<WorkItem>();
private final int consumerThreads = 5;
private final int producerThreads = 5;

public SynchronousQueueExample() {
System.out.println( "Starting..." );

// Start the consumers.
for( int i = 0; i < consumerThreads; i++ ) {
Consumer c = new Consumer( queue );
c.setName( "Consumer " + i );
c.start();
}

// Start the producers.
for( int i = 0; i < producerThreads; i++ ) {
Producer p = new Producer( queue );
p.setName( "Producer " + i );
p.start();
}
}

/**
* @param args
*/
public static void main( String[] args ) {
new SynchronousQueueExample();
}

private static class Consumer extends Thread {

private final SynchronousQueue<WorkItem> queue;

public Consumer( SynchronousQueue<WorkItem> queue ) {
this.queue = queue;
}

public void run() {
Random r = new Random();
while( !Thread.currentThread().isInterrupted() ) {
WorkItem item = null;
try {
item = queue.take();
System.out.println( Thread.currentThread().getName() + " consuming: " + item );
Thread.sleep( r.nextInt( 1000 ) );
} catch( InterruptedException inte ) {

Thread.currentThread().interrupt();
}
}
}
}

private static class Producer extends Thread {
private static final String[] DATA = { "a", "b", "c", "d", "e", "f", "g", "h",
"i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w",
"x", "y", "z" };

private final SynchronousQueue<WorkItem> queue;

public Producer( SynchronousQueue<WorkItem> queue ) {
this.queue = queue;
}

public void run() {
Random r = new Random();
while( !Thread.currentThread().isInterrupted() ) {
String data = DATA[ r.nextInt( DATA.length ) ];
WorkItem item = new WorkItem( data + "(" + Thread.currentThread().getName() + ")" );
try {
Thread.sleep( r.nextInt( 5000 ) );
System.out.println( Thread.currentThread().getName() + " queuing: " + item );
queue.put( item );
} catch( InterruptedException inte ) {

Thread.currentThread().interrupt();
}
}
}
}

private static class WorkItem {

private final String message;

public WorkItem( String message ) {
this.message = message;
}

public String toString() {
return message;
}
}

}

Chitika