package regionQueue;
import com.gemstone.gemfire.cache.*;
import com.gemstone.gemfire.cache.util.*;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.bp.edu.emory.mathcs.backport.java.util.AbstractQueue;
import com.gemstone.bp.edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import com.gemstone.bp.edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import java.io.*;
import java.util.*;
/**
* The client side of a distributed queue. A ClientQueue
* implements the BlockingQueue interface and delegates
* its work to a {@link RegionQueue} that is hosted by a {@link
* ServerQueue}.
*
* @author GemStone Systems, Inc.
* @since 4.1
*/
public class ClientQueue extends AbstractQueue
implements BlockingQueue {
/** An argument to a Region get() that denotes that it is a peek
* operation. */
static Object PEEK = "PEEK";
/** An argument to a Region get() that denotes that it is a take
* operation. */
static Object TAKE = "TAKE";
/** An argument to a Region put() that denotes that it is a (queue)
* put operation. */
static Object PUT = "PUT";
/** An argument to a Region get() that denotes that it is an "is
* empty" operation. */
static Object IS_EMPTY = "IS_EMPTY";
/** An argument to a Region get() that denotes that it is a size
* operation. */
static Object SIZE = "SIZE";
/** An argument to a Region get() that denotes that it is a
* "remaining capacity" operation. */
static Object REMAINING_CAPACITY = "REMAINING_CAPACITY";
/** An argument to a Region get() that denotes that it is a
* "to array" operation. */
static Object TO_ARRAY = "TO_ARRAY";
/** The next request id to use when getting an element from the
* server queue. */
protected static long nextRequestId = 15;
//////////////////////// Instance Fields ///////////////////////
/** The name of this queue */
private final String name;
/** The region that is used to communicate with the server. */
private final Region commRegion;
/** The id of this VM */
protected String id;
/** BridgeLoader and BridgeWriter endpoints configuration */
private String bridgeEndpoints;
/**
* This is the key of the region element used for adding elements to
* the queue. Its uniqueness is required to prevent deadlock on the
* Cache server.
*/
static String INSERT_PREFIX = "IP_";
private ThreadLocal INSERT_KEY = new ThreadLocal() {
protected Object initialValue() {
long threadId;
synchronized(ClientQueue.class) {
threadId = nextRequestId++;
}
return INSERT_PREFIX + id + threadId;
}
};
////////////////////////// Constructors /////////////////////////
/**
* Creates a new ClientQueue that will access queues
* hosted by the given servers.
*
* @param name
* The name of the distributed queue
* @param hosts
* The host(s) on which the server queue(s) reside
* @param ports
* The port(s) through which the server queue(s) are
* accessed.
*
* @throws TimeoutException
* If a cache access times out which creating the client
* queue
* @throws IllegalStateException
* If the state of the cache prevents the client queue from
* being created
*/
public ClientQueue(String name, String[] hosts, int[] ports)
throws TimeoutException {
this.name = name;
if (hosts.length == 0) {
String s = "Must specify at least one queue server host";
throw new IllegalArgumentException(s);
} else if (ports.length == 0) {
String s = "Must specify at least one queue server port";
throw new IllegalArgumentException(s);
} else if (hosts.length != ports.length) {
String s = "Must specify the same number of queue server " +
"hosts and ports (" + hosts.length + " hosts, " + ports.length
+ " ports)";
throw new IllegalArgumentException(s);
}
// Since the client communicates with the server over a
// point-to-point bridge connection, we can create a "stand-alone"
// DistributedSystem.
Properties props = new Properties();
props.setProperty("mcast-port", "0");
props.setProperty("locators", "");
DistributedSystem system = DistributedSystem.connect(props);
this.id = String.valueOf(system.getDistributedMember());
Cache cache;
try {
cache = CacheFactory.create(system);
} catch (CacheExistsException ex) {
cache = ex.getCache();
} catch (CacheException ex) {
// Something unexpected happened
String s = "While creating the Cache";
IllegalStateException ex2 = new IllegalStateException(s);
ex2.initCause(ex);
throw ex2;
}
// Because some queue operations may wait for a long time, set the
// net search timeout to be very high.
cache.setSearchTimeout(Integer.MAX_VALUE);
Region root;
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
root = cache.createRegion("DistributedQueues",
factory.create());
} catch (RegionExistsException ex) {
root = ex.getRegion();
}
// Configure a BridgeLoader and a BridgeWriter to communicate with
// the server.
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
Properties bridgeProps = new Properties();
bridgeProps.setProperty("LBPolicy", "Sticky");
// We can not allow timeouts (SO_TIMEOUT) because the QueueServer
// will block, with wait(), in its CacheWriter if the queue is
// full. Blocking in a CacheWriter is bad in general, because the
// updated Entry is syncrhonized, which can cause socket timeouts
// on the client and potentially repeating values. So, we can not
// timeout a blocking put() or timeout a timed offer() too early.
bridgeProps.setProperty("readTimeout", "0");
StringBuffer endpoints = new StringBuffer();
for (int i = 0; i < hosts.length; i++) {
endpoints.append("server");
endpoints.append(i);
endpoints.append("=");
endpoints.append(hosts[i]);
endpoints.append(":");
endpoints.append(ports[i]);
if (i < hosts.length - 1) {
endpoints.append(",");
}
}
this.bridgeEndpoints = endpoints.toString();
bridgeProps.setProperty("endpoints", this.bridgeEndpoints);
BridgeLoader loader = new BridgeLoader();
loader.init(bridgeProps);
factory.setCacheLoader(loader);
BridgeWriter writer = new BridgeWriter();
writer.init(bridgeProps);
factory.setCacheWriter(writer);
try {
this.commRegion =
root.createSubregion(name + "-" + ServerQueue.COMM_REGION_NAME,
factory.create());
} catch (RegionExistsException ex) {
String s = "Cannot create queue \"" + name + "\" because it " +
"already exists";
throw new IllegalStateException(s);
}
}
/////////////////////// Instance Methods ///////////////////////
/**
* Closes this ClientQueue and releases all resources
* associated with it including the communication Region
*/
public void close() {
Cache cache = this.commRegion.getCache();
this.commRegion.localDestroyRegion();
DistributedSystem system = cache.getDistributedSystem();
cache.close();
system.disconnect();
}
public void put(Object o) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
addElement(o, PUT);
}
public Object take() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
return getElement(TAKE);
}
public Object peek() {
return getElement(PEEK);
}
public boolean isEmpty() {
return ((Boolean) getElement(IS_EMPTY)).booleanValue();
}
/**
* Fetches an element from the remote queue. This method is used to
* implement poll, take, and
* peek.
*
* @param arg
* Argument passed to the remote loader
*/
private Object getElement(Object arg) {
// Because we communicate using regions, the keys that we use to
// request queue elements are "cached". Therefore, if we were to
// use the same key, we might get a queue element that had already
// been removed. So, we have to request an object that has a
// unique name across threads and members.
String key;
synchronized (ClientQueue.class) {
key = this.id + (nextRequestId++);
}
try {
Object element = this.commRegion.get(key, arg);
if (element != null) {
this.commRegion.localDestroy(key);
}
return element;
} catch (CacheException ex) {
String s = "While getting " + key;
throw new RegionQueueException(s, ex);
}
}
public Object poll() {
return getElement(null);
}
public int size() {
return ((Integer) getElement(SIZE)).intValue();
}
public Object[] toArray(Object[] array) {
return (Object[]) getElement(array);
}
public Object[] toArray() {
return (Object[]) getElement(TO_ARRAY);
}
public Object poll(long timeout, TimeUnit unit)
throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
// Canonicalize on milliseconds
timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
return getElement(new Long(timeout));
}
/**
* Adds an element to the remote queue. This method is used to
* implement offer and put.
*
* @param obj
* The object to be added to the queue
* @param arg
* Argument passed to remote writer
*
* @return Whether or not element was added
*/
private boolean addElement(Object obj, Object arg) {
// We always use the same region key when we add an object to the
// queue even if there are multiple threads operating on the
// queue.
try {
Object insertKey = INSERT_KEY.get();
this.commRegion.put(insertKey, obj, arg);
this.commRegion.localDestroy(insertKey);
return true;
} catch (CacheWriterException ex) {
Throwable cause = ex.getCause();
while (cause instanceof CacheWriterException) {
// Propagated exception gets wrapped
cause = cause.getCause();
}
if (cause == null) {
return false;
} else if (cause instanceof RegionQueueException) {
throw (RegionQueueException) cause;
} else {
String s = "While adding " + obj;
throw new RegionQueueException(s, cause);
}
} catch (CacheException ex) {
String s = "While adding " + obj;
throw new RegionQueueException(s, ex);
}
}
public boolean offer(Object o) {
return addElement(o, null);
}
public boolean offer(Object o, long timeout, TimeUnit unit)
throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
// Canonicalize on milliseconds
timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
return addElement(o, new Long(timeout));
}
public int remainingCapacity() {
return ((Integer) getElement(REMAINING_CAPACITY)).intValue();
}
/** Throws an UnsupportedOperationException */
public Iterator iterator() {
throw new UnsupportedOperationException("Not implemented yet");
}
/** Throws an UnsupportedOperationException */
public int drainTo(Collection c) {
throw new UnsupportedOperationException("Not implemented yet");
}
/** Throws an UnsupportedOperationException */
public int drainTo(Collection c, int maxElements) {
throw new UnsupportedOperationException("Not implemented yet");
}
public String getName() {
return this.name;
}
public String toString() {
return "ClientQueue: " + getName()
+ " endpoints: " + this.bridgeEndpoints;
}
/////////////////////// Main Program ///////////////////////
private static PrintStream out = System.out;
private static PrintStream err = System.err;
/**
* Prints usage information about this program
*/
private static void usage(String s) {
err.println("\n** " + s + "\n");
err.println("usage: java regionQueue.ClientQueue name (host:port)+");
err.println(" name Name of distributed queue");
err.println(" host:port Host and port of server queue");
err.println("");
err.println("Creates a client queue and allows the user to " +
"interact with it");
err.println("");
System.exit(1);
}
/**
* Prints help information for the interactive mode
*/
private static void help() {
out.println("");
out.println("Available commands are:");
out.println(" offer string [timeout] Adds an element to the queue");
out.println(" timeout Seconds to wait for room in queue");
out.println(" put string Waits until an element can be added to the queue");
out.println("");
out.println(" poll [timeout] Removes an available element from the queue");
out.println(" timeout Seconds to wait for element");
out.println(" take Waits for an element queue to be available before");
out.println(" removing it.");
out.println(" peek Returns the element at the head of queue, but doesn't");
out.println(" remove it.");
out.println(" toArray Prints the entire contents of the queue");
out.println("");
out.println(" size Prints the size of the queue");
out.println(" isEmpty Prints whether or not the queue is empty");
out.println(" remaining Prints number of empty elements in the queu");
out.println("");
out.println(" exit Exits this program");
out.println("");
}
/**
* Main program that creates a ClientQueue and allows
* the user to interact with it via the console.
*/
public static void main(String[] args) throws Throwable {
String name = null;
List servers = new ArrayList();
for (int i = 0; i < args.length; i++) {
if (name == null) {
name = args[i];
} else {
servers.add(args[i]);
}
}
if (name == null) {
usage("Missing queue name");
} else if (servers.isEmpty()) {
usage("Missing server host:port");
}
String[] hosts = new String[servers.size()];
int[] ports = new int[servers.size()];
int i = 0;
for (Iterator iter = servers.iterator(); iter.hasNext(); i++) {
String server = (String) iter.next();
int index = server.indexOf(':');
if (index < 0) {
usage("Malformed server: " + server);
}
String host = server.substring(0, index);
if ("".equals(host)) {
usage("Malformed server: " + server);
} else {
hosts[i] = host;
}
try {
Integer port = new Integer(server.substring(index + 1));
ports[i] = port.intValue();
} catch (NumberFormatException ex) {
usage("Malformed server: " + server);
}
}
ClientQueue queue = new ClientQueue(name, hosts, ports);
BufferedReader in =
new BufferedReader(new InputStreamReader(System.in));
while (true) {
try {
out.print(name + "> ");
out.flush();
String line = in.readLine();
if (line == null /* EOF */) {
System.exit(0);
} else if ("".equals(line)) {
continue;
}
StringTokenizer st = new StringTokenizer(line, " ");
String[] command = new String[st.countTokens()];
for (i = 0; st.hasMoreTokens(); i++) {
command[i] = st.nextToken();
}
if (command[0].equalsIgnoreCase("offer")) {
if (command.length < 2) {
out.println("** Missing element to offer");
} else {
String element = command[1];
boolean offered;
if (command.length > 2) {
long timeout;
try {
timeout = (new Long(command[2])).longValue();
} catch (NumberFormatException ex) {
err.println("** Malformed timeout: " + command[2]);
continue;
}
offered = queue.offer(element, timeout, TimeUnit.SECONDS);
} else {
offered = queue.offer(element);
}
if (offered) {
out.println("Element " + element +
" was successfully added to the queue");
} else {
out.println("Element " + element +
" was not added to the queue");
}
}
} else if (command[0].equalsIgnoreCase("put")) {
if (command.length < 2) {
out.println("** Missing element to offer");
} else {
String element = command[1];
queue.put(element);
}
} else if (command[0].equalsIgnoreCase("poll")) {
Object o;
if (command.length > 1) {
long timeout;
try {
timeout = (new Long(command[1])).longValue();
} catch (NumberFormatException ex) {
err.println("** Malformed timeout: " + command[1]);
continue;
}
o = queue.poll(timeout, TimeUnit.SECONDS);
} else {
o = queue.poll();
}
out.println("poll() returned " + o);
} else if (command[0].equalsIgnoreCase("take")) {
Object o = queue.take();
out.println("take() returned " + o);
} else if (command[0].equalsIgnoreCase("peek")) {
Object o = queue.peek();
out.println("peek() returned " + o);
} else if (command[0].equalsIgnoreCase("toArray")) {
Object[] array = queue.toArray();
StringBuffer sb = new StringBuffer();
sb.append("[");
for (i = 0; i < array.length; i++) {
sb.append(array[i]);
if (i < array.length - 1) {
sb.append(", ");
}
}
sb.append("]");
out.println("Queue contains " + array.length + " elements: " + sb);
} else if (command[0].equalsIgnoreCase("isEmpty")) {
boolean isEmpty = queue.isEmpty();
out.println("Queue is " + (isEmpty ? "" : "not ") +
"empty");
} else if (command[0].equalsIgnoreCase("size")) {
int size = queue.size();
out.println("Queue contains " + size + " elements");
} else if (command[0].equalsIgnoreCase("remaining")) {
int remaining = queue.remainingCapacity();
out.println("Queue has " + remaining + " unused elements");
} else if (command[0].equalsIgnoreCase("exit") ||
command[0].equalsIgnoreCase("quit")) {
System.exit(0);
} else if (command[0].equalsIgnoreCase("help")) {
help();
} else {
out.println("** Unrecognized command: " + command[0]);
help();
}
} catch (Exception ex) {
ex.printStackTrace(err);
}
}
}
}