Friday, June 17, 2011

SynchronousQueue


SynchronousQueue is an interesting creature, according to the Javadoc:
A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one.
Essentially, SynchronousQueue is another implementation of the aforementioned BlockingQueue. It gives us an extremely lightweight way to exchange single elements from one thread to another, using the blocking semantics used byArrayBlockingQueue. In Listing 2, I’ve rewritten the code from Listing 1 using SynchronousQueue instead of ArrayBlockingQueue:

Producer consumer problem with Synchronous queue

import java.util.*;
import java.util.concurrent.*;
 
class Producer implements Runnable {
    private BlockingQueue drop;
    List messages = Arrays.asList(
            "Mares eat oats",
            "Does eat oats",
            "Little lambs eat ivy",
            "Wouldn't you eat ivy too?");
 
    public Producer(BlockingQueue d) { this.drop = d; }
 
    public void run() {
        try {
            for (String s : messages)
                drop.put(s);
            drop.put("DONE");
        } catch (InterruptedException intEx) {
            System.out.println("Interrupted! " +
            "Last one out, turn out the lights!");
        }
    }
}
 
class Consumer implements Runnable {
    private BlockingQueue drop;
    public Consumer(BlockingQueue d) { this.drop = d; }
 
    public void run() {
        try {
            String msg = null;
            while (!((msg = drop.take()).equals("DONE")))
            System.out.println(msg);
        } catch (InterruptedException intEx) {
            System.out.println("Interrupted! " +
            "Last one out, turn out the lights!");
        }
    }
}
 
public class SynchronousQDemo{
    public static void main(String[] args) {
        BlockingQueue drop = new SynchronousQueue();
        (new Thread(new Producer(drop))).start();
        (new Thread(new Consumer(drop))).start();
    }
}

The implementation code looks almost identical to linked blocking queue one, but the application has an added benefit, in that SynchronousQueue will allow an insert into the queue only if there is a thread waiting to consume it.

In practice, SynchronousQueue is similar to the «rendezvous channels» available in languages like Ada or CSP. These are also sometimes known as «joins» in other environments, including .NET.

No comments:

Post a Comment

Chitika