package regionQueue; import com.gemstone.gemfire.cache.*; import com.gemstone.gemfire.cache.util.BridgeServer; import com.gemstone.gemfire.cache.util.CacheWriterAdapter; import com.gemstone.gemfire.cache.util.BridgeEventCallbackArgument; import com.gemstone.gemfire.distributed.DistributedSystem; import com.gemstone.bp.edu.emory.mathcs.backport.java.util.AbstractQueue; import com.gemstone.bp.edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue; import com.gemstone.bp.edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; import java.io.*; import java.util.*; /** * The server side of a distributed queue. The server holds the * contents of a queue in a {@link RegionQueue} that may be backed up * in other servers. It receives clients' requests to add and remove * items from the queue via a "communication region" that has a * special {@link CacheLoader} and {@link CacheWriter} that interact * with the queue. * *

* * This class implements the BlockingQueue interface so * it can be treated as a queue. However, it delegates all of the * queue-related functionality to the underlying * RegionQueue. * * @author GemStone Systems, Inc. * @since 4.1 */ public class ServerQueue extends AbstractQueue implements BlockingQueue { /** The name of the subregion used for communication. It is package * protected so that other classe scan share it. */ static final String COMM_REGION_NAME = "Communications"; ////////////////////// Instance Fields ///////////////////////// /** The name of this queue */ protected final String name; /** The region queue that stores the contents of this distributed * queue. */ protected final RegionQueue regionQueue; /** The region that is used by clients to communicate with this * distributed queue. */ private final Region commRegion; /** Is this queue closed? */ private boolean isClosed = false; ////////////////////////// Constructors ///////////////////////// /** * Creates a new ServerQueue whose contents is * available to ClientQueue via the given port. * * @param name * The name of the distributed queue * @param system * The distributed system to which the queue belongs * @param port * The port on which clients connect to the queue * @param capacity * The maximum number of elements allowed in the queue * * @throws TimeoutException * If a cache access times out which creating the queue * @throws IllegalStateException * If the state of the cache prevents the queue from being * created */ public ServerQueue(String name, int capacity, DistributedSystem system, int port) throws TimeoutException { this.name = name; Cache cache; try { cache = CacheFactory.create(system); } catch (CacheExistsException ex) { cache = CacheFactory.getInstance(system); } catch (CacheException ex) { // Something unexpected happened String s = "While creating the Cache"; IllegalStateException ex2 = new IllegalStateException(s); ex2.initCause(ex); throw ex2; } Region root; try { AttributesFactory factory = new AttributesFactory(); factory.setScope(Scope.DISTRIBUTED_NO_ACK); root = cache.createRegion("DistributedQueues", factory.create()); } catch (RegionExistsException ex) { root = ex.getRegion(); } // Make the queue region mirroed with distributed ack scope to // ensure high availability AttributesFactory factory = new AttributesFactory(); factory.setDataPolicy(DataPolicy.REPLICATE); factory.setScope(Scope.DISTRIBUTED_ACK); try { Region queueRegion = root.createSubregion(name, factory.create()); this.regionQueue = new RegionQueue(capacity, queueRegion); } catch (RegionExistsException ex) { String s = "Cannot create queue \"" + name + "\" because it " + "already exists"; throw new IllegalStateException(s); } // The communication region can be a local region because it is // "point-to-point" with the queue client factory = new AttributesFactory(); factory.setScope(Scope.LOCAL); // There is no need to keep the entries in this region, since all // we are interested in is the invocation of the cachewriter and // cacheloader. LRU happens after the events have been delivered. factory.setEvictionAttributes(EvictionAttributes .createLRUEntryAttributes(1, EvictionAction.LOCAL_DESTROY)); factory.setCacheLoader(new QueueLoader()); factory.setCacheWriter(new QueueWriter()); try { this.commRegion = root.createSubregion(name + "-" + COMM_REGION_NAME, factory.create()); } catch (RegionExistsException ex) { String s = "Cannot create queue \"" + name + "\" because it " + "already exists"; throw new IllegalStateException(s); } // Start a bridge server on the desired port BridgeServer server = cache.addBridgeServer(); server.setPort(port); try { server.start(); } catch (IOException ex) { String s = "Could not start bridge server on port " + port; IllegalStateException ex2 = new IllegalStateException(s); ex2.initCause(ex); throw ex2; } long timeout = 5; long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < timeout * 1000) { if (server.isRunning()) { return; } } String s = "Cache server did not start in " + timeout + " seconds"; throw new TimeoutException(s); } /** * Closes this ServerQueue and releases all resources * held by it including the Cache and all regions. */ public void close() { synchronized (this) { if (this.isClosed) { return; } Cache cache = this.commRegion.getCache(); // Stop bridge servers first so that clients can fail over for (Iterator iter = cache.getBridgeServers().iterator(); iter.hasNext(); ) { BridgeServer bridge = (BridgeServer) iter.next(); bridge.stop(); } this.regionQueue.close(); this.commRegion.localDestroyRegion(); cache.close(); this.isClosed = true; } } /////////////////////// Instance Methods /////////////////////// public void put(Object o) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); this.regionQueue.put(o); } public Object take() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return this.regionQueue.take(); } public Object peek() { return this.regionQueue.peek(); } public boolean isEmpty() { return this.regionQueue.isEmpty(); } public Object poll() { return this.regionQueue.poll(); } public int size() { return this.regionQueue.size(); } public Object[] toArray(Object[] array) { return this.regionQueue.toArray(array); } public Object[] toArray() { return this.regionQueue.toArray(); } public Object poll(long timeout, TimeUnit unit) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return this.regionQueue.poll(timeout, unit); } public boolean offer(Object o) { return this.regionQueue.offer(o); } public boolean offer(Object o, long timeout, TimeUnit unit) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return this.regionQueue.offer(o, timeout, unit); } public int remainingCapacity() { return this.regionQueue.remainingCapacity(); } public Iterator iterator() { return this.regionQueue.iterator(); } public int drainTo(Collection c) { return this.regionQueue.drainTo(c); } public int drainTo(Collection c, int maxElements) { return this.regionQueue.drainTo(c, maxElements); } ///////////////////////// Inner Classes ///////////////////////// /** * A CacheLoader that perform some "get"-related * operation on the RegionQueue. The operation is * specified by the argument passed to {@link Region#get(Object)}. * * @see LoaderHelper#getArgument */ class QueueLoader implements CacheLoader { /** * Remove the element at the head of the queue */ public Object load(LoaderHelper helper) throws CacheLoaderException { Object arg = helper.getArgument(); try { if (arg == null) { // ServerQueue.this.logger.info("calling poll"); return ServerQueue.this.regionQueue.poll(); } else if (arg instanceof Long) { long timeout = ((Long) arg).longValue(); TimeUnit unit = TimeUnit.MILLISECONDS; return ServerQueue.this.regionQueue.poll(timeout, unit); } else if (ClientQueue.PEEK.equals(arg)) { return ServerQueue.this.regionQueue.peek(); } else if (ClientQueue.TAKE.equals(arg)) { return ServerQueue.this.regionQueue.take(); } else if (ClientQueue.IS_EMPTY.equals(arg)) { return new Boolean(ServerQueue.this.regionQueue.isEmpty()); } else if (ClientQueue.SIZE.equals(arg)) { return new Integer(ServerQueue.this.regionQueue.size()); } else if (ClientQueue.REMAINING_CAPACITY.equals(arg)) { return new Integer(ServerQueue.this.regionQueue.remainingCapacity()); } else if (ClientQueue.TO_ARRAY.equals(arg)) { return ServerQueue.this.regionQueue.toArray(); } else if (arg instanceof Object[]) { Object[] array = (Object[]) arg; return ServerQueue.this.regionQueue.toArray(array); } else { String s = "Unrecognized callback argument: " + arg; throw new CacheLoaderException(s); } } catch (InterruptedException ex) { String s = "Interrupted while taking from queue " + name; throw new CacheLoaderException(s, ex); } catch (RegionQueueException ex) { String s = "While removing an element from queue " + name; throw new CacheLoaderException(s, ex.getCause()); } } public void close() { } } /** * A CacheWriter that adds an element to the tail of * the queue. */ class QueueWriter extends CacheWriterAdapter { /** * Do the same work as beforeUpdate */ public void beforeCreate(EntryEvent event) throws CacheWriterException { beforeUpdate(event); } /** * Add the new value of the region entry to queue region. * * @throws CacheWriterException * If the element was not added to the queue. If the * queue was full, then the cause of the exception is * null. */ public void beforeUpdate(EntryEvent event) throws CacheWriterException { event.getKey(); // Ignore any data the QueueLoader generates Operation operation = event.getOperation(); if (operation.isLoad()) { // We don't care about other entries that are updated return; } Object o = event.getNewValue(); Object arg = ((BridgeEventCallbackArgument)(event.getCallbackArgument())).getOriginalCallbackArg(); try { if (arg == null) { boolean offered = ServerQueue.this.regionQueue.offer(o); if (!offered) { String s = "Element " + o + " was not offered"; throw new CacheWriterException(s, null /* null cause */); } } else if (arg instanceof Long) { long timeout = ((Long) arg).longValue(); TimeUnit unit = TimeUnit.MILLISECONDS; boolean offered = ServerQueue.this.regionQueue.offer(o, timeout, unit); if (!offered) { String s = "Element " + o + " was not offered"; throw new CacheWriterException(s, null /* null cause */); } } else if (ClientQueue.PUT.equals(arg)) { ServerQueue.this.regionQueue.put(o); } else { String s = "Unrecognized callback argument: " + arg; throw new CacheWriterException(s); } } catch (InterruptedException ex) { String s = "Interrupted while adding to queue " + name; throw new CacheWriterException(s, ex); } catch (RegionQueueException ex) { String s = "While adding element \"" + o + "\" to queue \"" + name + "\""; throw new CacheWriterException(s, ex.getCause()); } } } /////////////////////// Main Program /////////////////////// private static final PrintStream err = System.err; private static final PrintStream out = System.out; /** * Prints usage information about this program */ private static void usage(String s) { err.println("\n** " + s +"\n"); err.println("usage: java regionQueue.ServerQueue name capacity port"); err.println(" name Name of distributed queue"); err.println(" capacity Maximum number of elements in the queue"); err.println(" port Port on which clients connect"); err.println(""); err.println("Hosts a Server Queue"); err.println(""); System.exit(1); } public String getName() { return this.name; } public String toString() { return "ServerQueue: " + getName() + " with " + this.regionQueue; } /** * A main program that creates a ServerQueue. */ public static void main(String[] args) throws Throwable { String name = null; Integer capacity = null; Integer port = null; for (int i = 0; i < args.length; i++) { if (name == null) { name = args[i]; } else if (capacity == null) { try { capacity = new Integer(args[i]); } catch (NumberFormatException ex) { usage("Malformed capacity: " + args[i]); } } else if (port == null) { try { port = new Integer(args[i]); } catch (NumberFormatException ex) { usage("Malformed port: " + args[i]); } } } if (name == null) { usage("Missing queue name"); } else if (capacity == null) { usage("Missing capacity"); } else if (port == null) { usage("Missing port"); } Properties props = new Properties(); props.setProperty("name", "ServerQueue"); DistributedSystem system = DistributedSystem.connect(props); out.println("Creating ServerQueue..."); ServerQueue queue = new ServerQueue(name, capacity.intValue(), system, port.intValue()); out.println(""); out.println("Press to quit"); out.println("\nNote that some exceptions may be logged."); out.println("They are not necessarily errors."); out.println(""); (new BufferedReader(new InputStreamReader(System.in))).readLine(); queue.close(); System.exit(0); } }