Monday, May 9, 2011

Synchronous queue

A SynchronousQueue is unlike the other implementations of BlockingQueue in that it isn't a queue of tasks to be carried out it's a queue of consumer threads waiting to process work. With a SynchronousQueue a producer attempts to put an item on the queue and blocks until there is a consumer ready to take the item from the queue. As in a regular queue consumers block on an empty queue.

SynchronousQueue is useful when the producer wants or needs to know when a consumer starts processing the task (or at least accepts it). The producer can be sure that the task has been taken by a consumer as soon as it unblocks from the call to put.

For best performance with a SynchronousQueue there should (nearly) always be a consumer waiting to take the task proffered by a producer. If there isn't generally a consumer waiting then the production speed is throttled by the arrival rate of consumers. This situation can easily arise when task production rate is intrinsically bursty. In such a situation the producer might be able to produce 100 tasks in a second and then none for an hour. If a SynchronousQueue is used the producer will have to wait after every task production. With one of the other BlockingQueue implementations however you could throw the tasks into a buffer.

If you need a task hand-off design where one thread needs to sync with another SynchronousQueue is the obvious choice. The code below exercises SynchronousQueue simply handing messages around.

package com.vaani.concurrency;

import java.util.Random;
import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueExample {

private final SynchronousQueue<WorkItem> queue = new SynchronousQueue<WorkItem>();
private final int consumerThreads = 5;
private final int producerThreads = 5;

public SynchronousQueueExample() {
System.out.println( "Starting..." );

// Start the consumers.
for( int i = 0; i < consumerThreads; i++ ) {
Consumer c = new Consumer( queue );
c.setName( "Consumer " + i );
c.start();
}

// Start the producers.
for( int i = 0; i < producerThreads; i++ ) {
Producer p = new Producer( queue );
p.setName( "Producer " + i );
p.start();
}
}

/**
* @param args
*/
public static void main( String[] args ) {
new SynchronousQueueExample();
}

private static class Consumer extends Thread {

private final SynchronousQueue<WorkItem> queue;

public Consumer( SynchronousQueue<WorkItem> queue ) {
this.queue = queue;
}

public void run() {
Random r = new Random();
while( !Thread.currentThread().isInterrupted() ) {
WorkItem item = null;
try {
item = queue.take();
System.out.println( Thread.currentThread().getName() + " consuming: " + item );
Thread.sleep( r.nextInt( 1000 ) );
} catch( InterruptedException inte ) {

Thread.currentThread().interrupt();
}
}
}
}

private static class Producer extends Thread {
private static final String[] DATA = { "a", "b", "c", "d", "e", "f", "g", "h",
"i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w",
"x", "y", "z" };

private final SynchronousQueue<WorkItem> queue;

public Producer( SynchronousQueue<WorkItem> queue ) {
this.queue = queue;
}

public void run() {
Random r = new Random();
while( !Thread.currentThread().isInterrupted() ) {
String data = DATA[ r.nextInt( DATA.length ) ];
WorkItem item = new WorkItem( data + "(" + Thread.currentThread().getName() + ")" );
try {
Thread.sleep( r.nextInt( 5000 ) );
System.out.println( Thread.currentThread().getName() + " queuing: " + item );
queue.put( item );
} catch( InterruptedException inte ) {

Thread.currentThread().interrupt();
}
}
}
}

private static class WorkItem {

private final String message;

public WorkItem( String message ) {
this.message = message;
}

public String toString() {
return message;
}
}

}

No comments:

Post a Comment

Chitika