package regionQueue; import com.gemstone.gemfire.cache.*; import com.gemstone.gemfire.cache.util.*; import com.gemstone.gemfire.distributed.DistributedSystem; import com.gemstone.bp.edu.emory.mathcs.backport.java.util.AbstractQueue; import com.gemstone.bp.edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue; import com.gemstone.bp.edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; import java.io.*; import java.util.*; /** * The client side of a distributed queue. A ClientQueue * implements the BlockingQueue interface and delegates * its work to a {@link RegionQueue} that is hosted by a {@link * ServerQueue}. * * @author GemStone Systems, Inc. * @since 4.1 */ public class ClientQueue extends AbstractQueue implements BlockingQueue { /** An argument to a Region get() that denotes that it is a peek * operation. */ static Object PEEK = "PEEK"; /** An argument to a Region get() that denotes that it is a take * operation. */ static Object TAKE = "TAKE"; /** An argument to a Region put() that denotes that it is a (queue) * put operation. */ static Object PUT = "PUT"; /** An argument to a Region get() that denotes that it is an "is * empty" operation. */ static Object IS_EMPTY = "IS_EMPTY"; /** An argument to a Region get() that denotes that it is a size * operation. */ static Object SIZE = "SIZE"; /** An argument to a Region get() that denotes that it is a * "remaining capacity" operation. */ static Object REMAINING_CAPACITY = "REMAINING_CAPACITY"; /** An argument to a Region get() that denotes that it is a * "to array" operation. */ static Object TO_ARRAY = "TO_ARRAY"; /** The next request id to use when getting an element from the * server queue. */ protected static long nextRequestId = 15; //////////////////////// Instance Fields /////////////////////// /** The name of this queue */ private final String name; /** The region that is used to communicate with the server. */ private final Region commRegion; /** The id of this VM */ protected String id; /** BridgeLoader and BridgeWriter endpoints configuration */ private String bridgeEndpoints; /** * This is the key of the region element used for adding elements to * the queue. Its uniqueness is required to prevent deadlock on the * Cache server. */ static String INSERT_PREFIX = "IP_"; private ThreadLocal INSERT_KEY = new ThreadLocal() { protected Object initialValue() { long threadId; synchronized(ClientQueue.class) { threadId = nextRequestId++; } return INSERT_PREFIX + id + threadId; } }; ////////////////////////// Constructors ///////////////////////// /** * Creates a new ClientQueue that will access queues * hosted by the given servers. * * @param name * The name of the distributed queue * @param hosts * The host(s) on which the server queue(s) reside * @param ports * The port(s) through which the server queue(s) are * accessed. * * @throws TimeoutException * If a cache access times out which creating the client * queue * @throws IllegalStateException * If the state of the cache prevents the client queue from * being created */ public ClientQueue(String name, String[] hosts, int[] ports) throws TimeoutException { this.name = name; if (hosts.length == 0) { String s = "Must specify at least one queue server host"; throw new IllegalArgumentException(s); } else if (ports.length == 0) { String s = "Must specify at least one queue server port"; throw new IllegalArgumentException(s); } else if (hosts.length != ports.length) { String s = "Must specify the same number of queue server " + "hosts and ports (" + hosts.length + " hosts, " + ports.length + " ports)"; throw new IllegalArgumentException(s); } // Since the client communicates with the server over a // point-to-point bridge connection, we can create a "stand-alone" // DistributedSystem. Properties props = new Properties(); props.setProperty("mcast-port", "0"); props.setProperty("locators", ""); DistributedSystem system = DistributedSystem.connect(props); this.id = String.valueOf(system.getDistributedMember()); Cache cache; try { cache = CacheFactory.create(system); } catch (CacheExistsException ex) { cache = ex.getCache(); } catch (CacheException ex) { // Something unexpected happened String s = "While creating the Cache"; IllegalStateException ex2 = new IllegalStateException(s); ex2.initCause(ex); throw ex2; } // Because some queue operations may wait for a long time, set the // net search timeout to be very high. cache.setSearchTimeout(Integer.MAX_VALUE); Region root; try { AttributesFactory factory = new AttributesFactory(); factory.setScope(Scope.LOCAL); root = cache.createRegion("DistributedQueues", factory.create()); } catch (RegionExistsException ex) { root = ex.getRegion(); } // Configure a BridgeLoader and a BridgeWriter to communicate with // the server. AttributesFactory factory = new AttributesFactory(); factory.setScope(Scope.LOCAL); Properties bridgeProps = new Properties(); bridgeProps.setProperty("LBPolicy", "Sticky"); // We can not allow timeouts (SO_TIMEOUT) because the QueueServer // will block, with wait(), in its CacheWriter if the queue is // full. Blocking in a CacheWriter is bad in general, because the // updated Entry is syncrhonized, which can cause socket timeouts // on the client and potentially repeating values. So, we can not // timeout a blocking put() or timeout a timed offer() too early. bridgeProps.setProperty("readTimeout", "0"); StringBuffer endpoints = new StringBuffer(); for (int i = 0; i < hosts.length; i++) { endpoints.append("server"); endpoints.append(i); endpoints.append("="); endpoints.append(hosts[i]); endpoints.append(":"); endpoints.append(ports[i]); if (i < hosts.length - 1) { endpoints.append(","); } } this.bridgeEndpoints = endpoints.toString(); bridgeProps.setProperty("endpoints", this.bridgeEndpoints); BridgeLoader loader = new BridgeLoader(); loader.init(bridgeProps); factory.setCacheLoader(loader); BridgeWriter writer = new BridgeWriter(); writer.init(bridgeProps); factory.setCacheWriter(writer); try { this.commRegion = root.createSubregion(name + "-" + ServerQueue.COMM_REGION_NAME, factory.create()); } catch (RegionExistsException ex) { String s = "Cannot create queue \"" + name + "\" because it " + "already exists"; throw new IllegalStateException(s); } } /////////////////////// Instance Methods /////////////////////// /** * Closes this ClientQueue and releases all resources * associated with it including the communication Region */ public void close() { Cache cache = this.commRegion.getCache(); this.commRegion.localDestroyRegion(); DistributedSystem system = cache.getDistributedSystem(); cache.close(); system.disconnect(); } public void put(Object o) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); addElement(o, PUT); } public Object take() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return getElement(TAKE); } public Object peek() { return getElement(PEEK); } public boolean isEmpty() { return ((Boolean) getElement(IS_EMPTY)).booleanValue(); } /** * Fetches an element from the remote queue. This method is used to * implement poll, take, and * peek. * * @param arg * Argument passed to the remote loader */ private Object getElement(Object arg) { // Because we communicate using regions, the keys that we use to // request queue elements are "cached". Therefore, if we were to // use the same key, we might get a queue element that had already // been removed. So, we have to request an object that has a // unique name across threads and members. String key; synchronized (ClientQueue.class) { key = this.id + (nextRequestId++); } try { Object element = this.commRegion.get(key, arg); if (element != null) { this.commRegion.localDestroy(key); } return element; } catch (CacheException ex) { String s = "While getting " + key; throw new RegionQueueException(s, ex); } } public Object poll() { return getElement(null); } public int size() { return ((Integer) getElement(SIZE)).intValue(); } public Object[] toArray(Object[] array) { return (Object[]) getElement(array); } public Object[] toArray() { return (Object[]) getElement(TO_ARRAY); } public Object poll(long timeout, TimeUnit unit) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // Canonicalize on milliseconds timeout = TimeUnit.MILLISECONDS.convert(timeout, unit); return getElement(new Long(timeout)); } /** * Adds an element to the remote queue. This method is used to * implement offer and put. * * @param obj * The object to be added to the queue * @param arg * Argument passed to remote writer * * @return Whether or not element was added */ private boolean addElement(Object obj, Object arg) { // We always use the same region key when we add an object to the // queue even if there are multiple threads operating on the // queue. try { Object insertKey = INSERT_KEY.get(); this.commRegion.put(insertKey, obj, arg); this.commRegion.localDestroy(insertKey); return true; } catch (CacheWriterException ex) { Throwable cause = ex.getCause(); while (cause instanceof CacheWriterException) { // Propagated exception gets wrapped cause = cause.getCause(); } if (cause == null) { return false; } else if (cause instanceof RegionQueueException) { throw (RegionQueueException) cause; } else { String s = "While adding " + obj; throw new RegionQueueException(s, cause); } } catch (CacheException ex) { String s = "While adding " + obj; throw new RegionQueueException(s, ex); } } public boolean offer(Object o) { return addElement(o, null); } public boolean offer(Object o, long timeout, TimeUnit unit) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // Canonicalize on milliseconds timeout = TimeUnit.MILLISECONDS.convert(timeout, unit); return addElement(o, new Long(timeout)); } public int remainingCapacity() { return ((Integer) getElement(REMAINING_CAPACITY)).intValue(); } /** Throws an UnsupportedOperationException */ public Iterator iterator() { throw new UnsupportedOperationException("Not implemented yet"); } /** Throws an UnsupportedOperationException */ public int drainTo(Collection c) { throw new UnsupportedOperationException("Not implemented yet"); } /** Throws an UnsupportedOperationException */ public int drainTo(Collection c, int maxElements) { throw new UnsupportedOperationException("Not implemented yet"); } public String getName() { return this.name; } public String toString() { return "ClientQueue: " + getName() + " endpoints: " + this.bridgeEndpoints; } /////////////////////// Main Program /////////////////////// private static PrintStream out = System.out; private static PrintStream err = System.err; /** * Prints usage information about this program */ private static void usage(String s) { err.println("\n** " + s + "\n"); err.println("usage: java regionQueue.ClientQueue name (host:port)+"); err.println(" name Name of distributed queue"); err.println(" host:port Host and port of server queue"); err.println(""); err.println("Creates a client queue and allows the user to " + "interact with it"); err.println(""); System.exit(1); } /** * Prints help information for the interactive mode */ private static void help() { out.println(""); out.println("Available commands are:"); out.println(" offer string [timeout] Adds an element to the queue"); out.println(" timeout Seconds to wait for room in queue"); out.println(" put string Waits until an element can be added to the queue"); out.println(""); out.println(" poll [timeout] Removes an available element from the queue"); out.println(" timeout Seconds to wait for element"); out.println(" take Waits for an element queue to be available before"); out.println(" removing it."); out.println(" peek Returns the element at the head of queue, but doesn't"); out.println(" remove it."); out.println(" toArray Prints the entire contents of the queue"); out.println(""); out.println(" size Prints the size of the queue"); out.println(" isEmpty Prints whether or not the queue is empty"); out.println(" remaining Prints number of empty elements in the queu"); out.println(""); out.println(" exit Exits this program"); out.println(""); } /** * Main program that creates a ClientQueue and allows * the user to interact with it via the console. */ public static void main(String[] args) throws Throwable { String name = null; List servers = new ArrayList(); for (int i = 0; i < args.length; i++) { if (name == null) { name = args[i]; } else { servers.add(args[i]); } } if (name == null) { usage("Missing queue name"); } else if (servers.isEmpty()) { usage("Missing server host:port"); } String[] hosts = new String[servers.size()]; int[] ports = new int[servers.size()]; int i = 0; for (Iterator iter = servers.iterator(); iter.hasNext(); i++) { String server = (String) iter.next(); int index = server.indexOf(':'); if (index < 0) { usage("Malformed server: " + server); } String host = server.substring(0, index); if ("".equals(host)) { usage("Malformed server: " + server); } else { hosts[i] = host; } try { Integer port = new Integer(server.substring(index + 1)); ports[i] = port.intValue(); } catch (NumberFormatException ex) { usage("Malformed server: " + server); } } ClientQueue queue = new ClientQueue(name, hosts, ports); BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); while (true) { try { out.print(name + "> "); out.flush(); String line = in.readLine(); if (line == null /* EOF */) { System.exit(0); } else if ("".equals(line)) { continue; } StringTokenizer st = new StringTokenizer(line, " "); String[] command = new String[st.countTokens()]; for (i = 0; st.hasMoreTokens(); i++) { command[i] = st.nextToken(); } if (command[0].equalsIgnoreCase("offer")) { if (command.length < 2) { out.println("** Missing element to offer"); } else { String element = command[1]; boolean offered; if (command.length > 2) { long timeout; try { timeout = (new Long(command[2])).longValue(); } catch (NumberFormatException ex) { err.println("** Malformed timeout: " + command[2]); continue; } offered = queue.offer(element, timeout, TimeUnit.SECONDS); } else { offered = queue.offer(element); } if (offered) { out.println("Element " + element + " was successfully added to the queue"); } else { out.println("Element " + element + " was not added to the queue"); } } } else if (command[0].equalsIgnoreCase("put")) { if (command.length < 2) { out.println("** Missing element to offer"); } else { String element = command[1]; queue.put(element); } } else if (command[0].equalsIgnoreCase("poll")) { Object o; if (command.length > 1) { long timeout; try { timeout = (new Long(command[1])).longValue(); } catch (NumberFormatException ex) { err.println("** Malformed timeout: " + command[1]); continue; } o = queue.poll(timeout, TimeUnit.SECONDS); } else { o = queue.poll(); } out.println("poll() returned " + o); } else if (command[0].equalsIgnoreCase("take")) { Object o = queue.take(); out.println("take() returned " + o); } else if (command[0].equalsIgnoreCase("peek")) { Object o = queue.peek(); out.println("peek() returned " + o); } else if (command[0].equalsIgnoreCase("toArray")) { Object[] array = queue.toArray(); StringBuffer sb = new StringBuffer(); sb.append("["); for (i = 0; i < array.length; i++) { sb.append(array[i]); if (i < array.length - 1) { sb.append(", "); } } sb.append("]"); out.println("Queue contains " + array.length + " elements: " + sb); } else if (command[0].equalsIgnoreCase("isEmpty")) { boolean isEmpty = queue.isEmpty(); out.println("Queue is " + (isEmpty ? "" : "not ") + "empty"); } else if (command[0].equalsIgnoreCase("size")) { int size = queue.size(); out.println("Queue contains " + size + " elements"); } else if (command[0].equalsIgnoreCase("remaining")) { int remaining = queue.remainingCapacity(); out.println("Queue has " + remaining + " unused elements"); } else if (command[0].equalsIgnoreCase("exit") || command[0].equalsIgnoreCase("quit")) { System.exit(0); } else if (command[0].equalsIgnoreCase("help")) { help(); } else { out.println("** Unrecognized command: " + command[0]); help(); } } catch (Exception ex) { ex.printStackTrace(err); } } } }