package regionQueue; import com.gemstone.gemfire.cache.*; import com.gemstone.gemfire.cache.util.BridgeServer; import com.gemstone.gemfire.cache.util.CacheWriterAdapter; import com.gemstone.gemfire.cache.util.BridgeEventCallbackArgument; import com.gemstone.gemfire.distributed.DistributedSystem; import com.gemstone.bp.edu.emory.mathcs.backport.java.util.AbstractQueue; import com.gemstone.bp.edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue; import com.gemstone.bp.edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; import java.io.*; import java.util.*; /** * The server side of a distributed queue. The server holds the * contents of a queue in a {@link RegionQueue} that may be backed up * in other servers. It receives clients' requests to add and remove * items from the queue via a "communication region" that has a * special {@link CacheLoader} and {@link CacheWriter} that interact * with the queue. * *
*
* This class implements the BlockingQueue interface so
* it can be treated as a queue. However, it delegates all of the
* queue-related functionality to the underlying
* RegionQueue.
*
* @author GemStone Systems, Inc.
* @since 4.1
*/
public class ServerQueue extends AbstractQueue
implements BlockingQueue {
/** The name of the subregion used for communication. It is package
* protected so that other classe scan share it. */
static final String COMM_REGION_NAME = "Communications";
////////////////////// Instance Fields /////////////////////////
/** The name of this queue */
protected final String name;
/** The region queue that stores the contents of this distributed
* queue. */
protected final RegionQueue regionQueue;
/** The region that is used by clients to communicate with this
* distributed queue. */
private final Region commRegion;
/** Is this queue closed? */
private boolean isClosed = false;
////////////////////////// Constructors /////////////////////////
/**
* Creates a new ServerQueue whose contents is
* available to ClientQueue via the given port.
*
* @param name
* The name of the distributed queue
* @param system
* The distributed system to which the queue belongs
* @param port
* The port on which clients connect to the queue
* @param capacity
* The maximum number of elements allowed in the queue
*
* @throws TimeoutException
* If a cache access times out which creating the queue
* @throws IllegalStateException
* If the state of the cache prevents the queue from being
* created
*/
public ServerQueue(String name, int capacity,
DistributedSystem system, int port)
throws TimeoutException {
this.name = name;
Cache cache;
try {
cache = CacheFactory.create(system);
} catch (CacheExistsException ex) {
cache = CacheFactory.getInstance(system);
} catch (CacheException ex) {
// Something unexpected happened
String s = "While creating the Cache";
IllegalStateException ex2 = new IllegalStateException(s);
ex2.initCause(ex);
throw ex2;
}
Region root;
try {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_NO_ACK);
root = cache.createRegion("DistributedQueues",
factory.create());
} catch (RegionExistsException ex) {
root = ex.getRegion();
}
// Make the queue region mirroed with distributed ack scope to
// ensure high availability
AttributesFactory factory = new AttributesFactory();
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.setScope(Scope.DISTRIBUTED_ACK);
try {
Region queueRegion =
root.createSubregion(name, factory.create());
this.regionQueue = new RegionQueue(capacity, queueRegion);
} catch (RegionExistsException ex) {
String s = "Cannot create queue \"" + name + "\" because it " +
"already exists";
throw new IllegalStateException(s);
}
// The communication region can be a local region because it is
// "point-to-point" with the queue client
factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
// There is no need to keep the entries in this region, since all
// we are interested in is the invocation of the cachewriter and
// cacheloader. LRU happens after the events have been delivered.
factory.setEvictionAttributes(EvictionAttributes
.createLRUEntryAttributes(1, EvictionAction.LOCAL_DESTROY));
factory.setCacheLoader(new QueueLoader());
factory.setCacheWriter(new QueueWriter());
try {
this.commRegion =
root.createSubregion(name + "-" + COMM_REGION_NAME,
factory.create());
} catch (RegionExistsException ex) {
String s = "Cannot create queue \"" + name + "\" because it " +
"already exists";
throw new IllegalStateException(s);
}
// Start a bridge server on the desired port
BridgeServer server = cache.addBridgeServer();
server.setPort(port);
try {
server.start();
} catch (IOException ex) {
String s = "Could not start bridge server on port " + port;
IllegalStateException ex2 = new IllegalStateException(s);
ex2.initCause(ex);
throw ex2;
}
long timeout = 5;
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout * 1000) {
if (server.isRunning()) {
return;
}
}
String s = "Cache server did not start in " + timeout +
" seconds";
throw new TimeoutException(s);
}
/**
* Closes this ServerQueue and releases all resources
* held by it including the Cache and all regions.
*/
public void close() {
synchronized (this) {
if (this.isClosed) {
return;
}
Cache cache = this.commRegion.getCache();
// Stop bridge servers first so that clients can fail over
for (Iterator iter = cache.getBridgeServers().iterator();
iter.hasNext(); ) {
BridgeServer bridge = (BridgeServer) iter.next();
bridge.stop();
}
this.regionQueue.close();
this.commRegion.localDestroyRegion();
cache.close();
this.isClosed = true;
}
}
/////////////////////// Instance Methods ///////////////////////
public void put(Object o) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
this.regionQueue.put(o);
}
public Object take() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
return this.regionQueue.take();
}
public Object peek() {
return this.regionQueue.peek();
}
public boolean isEmpty() {
return this.regionQueue.isEmpty();
}
public Object poll() {
return this.regionQueue.poll();
}
public int size() {
return this.regionQueue.size();
}
public Object[] toArray(Object[] array) {
return this.regionQueue.toArray(array);
}
public Object[] toArray() {
return this.regionQueue.toArray();
}
public Object poll(long timeout, TimeUnit unit)
throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
return this.regionQueue.poll(timeout, unit);
}
public boolean offer(Object o) {
return this.regionQueue.offer(o);
}
public boolean offer(Object o, long timeout, TimeUnit unit)
throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
return this.regionQueue.offer(o, timeout, unit);
}
public int remainingCapacity() {
return this.regionQueue.remainingCapacity();
}
public Iterator iterator() {
return this.regionQueue.iterator();
}
public int drainTo(Collection c) {
return this.regionQueue.drainTo(c);
}
public int drainTo(Collection c, int maxElements) {
return this.regionQueue.drainTo(c, maxElements);
}
///////////////////////// Inner Classes /////////////////////////
/**
* A CacheLoader that perform some "get"-related
* operation on the RegionQueue. The operation is
* specified by the argument passed to {@link Region#get(Object)}.
*
* @see LoaderHelper#getArgument
*/
class QueueLoader implements CacheLoader {
/**
* Remove the element at the head of the queue
*/
public Object load(LoaderHelper helper)
throws CacheLoaderException {
Object arg = helper.getArgument();
try {
if (arg == null) {
// ServerQueue.this.logger.info("calling poll");
return ServerQueue.this.regionQueue.poll();
} else if (arg instanceof Long) {
long timeout = ((Long) arg).longValue();
TimeUnit unit = TimeUnit.MILLISECONDS;
return ServerQueue.this.regionQueue.poll(timeout, unit);
} else if (ClientQueue.PEEK.equals(arg)) {
return ServerQueue.this.regionQueue.peek();
} else if (ClientQueue.TAKE.equals(arg)) {
return ServerQueue.this.regionQueue.take();
} else if (ClientQueue.IS_EMPTY.equals(arg)) {
return new Boolean(ServerQueue.this.regionQueue.isEmpty());
} else if (ClientQueue.SIZE.equals(arg)) {
return new Integer(ServerQueue.this.regionQueue.size());
} else if (ClientQueue.REMAINING_CAPACITY.equals(arg)) {
return new Integer(ServerQueue.this.regionQueue.remainingCapacity());
} else if (ClientQueue.TO_ARRAY.equals(arg)) {
return ServerQueue.this.regionQueue.toArray();
} else if (arg instanceof Object[]) {
Object[] array = (Object[]) arg;
return ServerQueue.this.regionQueue.toArray(array);
} else {
String s = "Unrecognized callback argument: " + arg;
throw new CacheLoaderException(s);
}
} catch (InterruptedException ex) {
String s = "Interrupted while taking from queue " + name;
throw new CacheLoaderException(s, ex);
} catch (RegionQueueException ex) {
String s = "While removing an element from queue " + name;
throw new CacheLoaderException(s, ex.getCause());
}
}
public void close() {
}
}
/**
* A CacheWriter that adds an element to the tail of
* the queue.
*/
class QueueWriter extends CacheWriterAdapter {
/**
* Do the same work as beforeUpdate
*/
public void beforeCreate(EntryEvent event)
throws CacheWriterException {
beforeUpdate(event);
}
/**
* Add the new value of the region entry to queue region.
*
* @throws CacheWriterException
* If the element was not added to the queue. If the
* queue was full, then the cause of the exception is
* null.
*/
public void beforeUpdate(EntryEvent event)
throws CacheWriterException {
event.getKey();
// Ignore any data the QueueLoader generates
Operation operation = event.getOperation();
if (operation.isLoad()) {
// We don't care about other entries that are updated
return;
}
Object o = event.getNewValue();
Object arg = ((BridgeEventCallbackArgument)(event.getCallbackArgument())).getOriginalCallbackArg();
try {
if (arg == null) {
boolean offered = ServerQueue.this.regionQueue.offer(o);
if (!offered) {
String s = "Element " + o + " was not offered";
throw new CacheWriterException(s, null /* null cause */);
}
} else if (arg instanceof Long) {
long timeout = ((Long) arg).longValue();
TimeUnit unit = TimeUnit.MILLISECONDS;
boolean offered =
ServerQueue.this.regionQueue.offer(o, timeout, unit);
if (!offered) {
String s = "Element " + o + " was not offered";
throw new CacheWriterException(s, null /* null cause */);
}
} else if (ClientQueue.PUT.equals(arg)) {
ServerQueue.this.regionQueue.put(o);
} else {
String s = "Unrecognized callback argument: " + arg;
throw new CacheWriterException(s);
}
} catch (InterruptedException ex) {
String s = "Interrupted while adding to queue " + name;
throw new CacheWriterException(s, ex);
} catch (RegionQueueException ex) {
String s = "While adding element \"" + o +
"\" to queue \"" + name + "\"";
throw new CacheWriterException(s, ex.getCause());
}
}
}
/////////////////////// Main Program ///////////////////////
private static final PrintStream err = System.err;
private static final PrintStream out = System.out;
/**
* Prints usage information about this program
*/
private static void usage(String s) {
err.println("\n** " + s +"\n");
err.println("usage: java regionQueue.ServerQueue name capacity port");
err.println(" name Name of distributed queue");
err.println(" capacity Maximum number of elements in the queue");
err.println(" port Port on which clients connect");
err.println("");
err.println("Hosts a Server Queue");
err.println("");
System.exit(1);
}
public String getName() {
return this.name;
}
public String toString() {
return "ServerQueue: " + getName()
+ " with " + this.regionQueue;
}
/**
* A main program that creates a ServerQueue.
*/
public static void main(String[] args) throws Throwable {
String name = null;
Integer capacity = null;
Integer port = null;
for (int i = 0; i < args.length; i++) {
if (name == null) {
name = args[i];
} else if (capacity == null) {
try {
capacity = new Integer(args[i]);
} catch (NumberFormatException ex) {
usage("Malformed capacity: " + args[i]);
}
} else if (port == null) {
try {
port = new Integer(args[i]);
} catch (NumberFormatException ex) {
usage("Malformed port: " + args[i]);
}
}
}
if (name == null) {
usage("Missing queue name");
} else if (capacity == null) {
usage("Missing capacity");
} else if (port == null) {
usage("Missing port");
}
Properties props = new Properties();
props.setProperty("name", "ServerQueue");
DistributedSystem system = DistributedSystem.connect(props);
out.println("Creating ServerQueue...");
ServerQueue queue =
new ServerQueue(name, capacity.intValue(), system, port.intValue());
out.println("");
out.println("Press