package cacheOverflow; import com.gemstone.gemfire.SystemFailure; import com.gemstone.gemfire.cache.*; import com.gemstone.gemfire.distributed.DistributedSystem; import java.io.*; import java.util.*; /** * An example program that demonstrates the "overflow" feature of * GemFire cache {@linkplain Region regions}. It places a given * number of 8 kilobyte arrays of long into a region. In * addition to displaying its progress, the program also maintains * {@linkplain WorkerStats statistics} about the amount of work it has * done. * * @author GemStone Systems, Inc. * * @since 3.2 */ public class CacheOverflow { private static final PrintStream err = System.err; /** * Prints usage information about this program */ private static void usage(String s) { err.println("\n** " + s + "\n"); err.println("usage: java cacheOverflow.CacheOverflow [options] args"); err.println("\nWhere [options] are"); err.println(" -backup Backup the region data on disk (data " + "will be written"); err.println(" to disk as soon as it is added " + "to the region)"); err.println(" -synchronous Writes to disk are synchronous"); err.println(" -validate Instead of populating a region " + "validates its contents."); err.println(" Only useful with -backup"); err.println(" -disableRolling Save the old backup files instead of rolling over to "); err.println(" an archive and deleting the files"); err.println("\nWhere args are"); err.println(" threads Number of threads adding to region"); err.println(" arrays Number of 8 kilobyte arrays added by " + "each thread"); err.println(" overflowThreshold Number of megabytes of region " + "data that can reside"); err.println(" in the VM before overflowing " + "to disk"); err.println(" maxOplogSize Number of megabytes of region data that can be written"); err.println(" to a backup file before rolling over to a new file"); err.println(" dir+ One or more directories in which " + "to write region data"); err.println("\n"); System.exit(1); } /** * Parses the command line and launches one or more threads that * place data into a region. */ public static void main(String[] args) { int threadCount = -1; long arrays = -1; long overflowThreshold = -1; int maxOplogSize = -1; Collection dirs = new ArrayList(); boolean synchronous = false; boolean persistBackup = false; boolean validate = false; boolean disableRolling = false; for (int i = 0; i < args.length; i++) { if (args[i].equals("-backup")) { persistBackup = true; } else if (args[i].equals("-synchronous")) { synchronous = true; } else if (args[i].equals("-validate")) { validate = true; } else if (args[i].equals("-disableRolling")) { disableRolling = true; } else if (args[i].startsWith("-")) { usage("Unknown option: " + args[i]); } else if (threadCount == -1) { try { threadCount = Integer.parseInt(args[i]); } catch (NumberFormatException ex) { usage("Malformed thread count: " + args[i]); } } else if (arrays == -1) { try { arrays = Long.parseLong(args[i]); } catch (NumberFormatException ex) { usage("Malformed number of arrays: " + args[i]); } } else if (overflowThreshold == -1) { try { overflowThreshold = Long.parseLong(args[i]); } catch (NumberFormatException ex) { usage("Malformed overflow threshold: " + args[i]); } } else if (maxOplogSize == -1) { try { maxOplogSize = Integer.parseInt(args[i]); } catch (NumberFormatException ex) { usage("Malformed maxOplogSize: " + args[i]); } } else { dirs.add(new File(args[i])); } } if (threadCount == -1) { usage("Missing number of threads"); } else if (arrays == -1) { usage("Missing number of arrays"); } else if (overflowThreshold == -1) { usage("Missing overflow threshold"); } else if (maxOplogSize == -1) { usage("Missing maxOplogSize"); } else if (dirs.isEmpty()) { usage("Missing directories"); } // Create the region Properties p = new Properties(); // With the port set to 0, and with default settings for locators, // this VM runs as a "standalone" process. p.setProperty("mcast-port", "0"); p.setProperty("statistic-sampling-enabled", "true"); p.setProperty("statistic-archive-file", "statArchive.gfs"); final DistributedSystem system = DistributedSystem.connect(p); Region region; Cache cache; try { cache = CacheFactory.create(system); AttributesFactory factory = new AttributesFactory(); factory.setScope(Scope.LOCAL); factory.setEvictionAttributes(EvictionAttributes .createLRUMemoryAttributes((int) overflowThreshold, null /* sizer */, EvictionAction.OVERFLOW_TO_DISK)); factory.setDiskDirs((File[]) dirs.toArray(new File[dirs.size()])); factory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); { DiskWriteAttributesFactory dwaf = new DiskWriteAttributesFactory(); dwaf.setSynchronous(synchronous); dwaf.setTimeInterval(1000); dwaf.setBytesThreshold(0); dwaf.setRollOplogs(!disableRolling); dwaf.setMaxOplogSize(maxOplogSize); DiskWriteAttributes dwa = dwaf.create(); factory.setDiskWriteAttributes(dwa); } region = cache.createRegion("CacheOverflow", factory.createRegionAttributes()); } catch (CacheException ex) { err.println("While creating the Cache and Region:"); ex.printStackTrace(err); System.exit(1); return; } Thread[] threads = new Thread[threadCount]; ThreadGroup tg = new ThreadGroup("Worker Threads") { public void uncaughtException(Thread t, Throwable e) { if (e instanceof VirtualMachineError) { SystemFailure.setFailure((VirtualMachineError)e); // don't throw } String s = "Uncaught exception in thread " + t; system.getLogWriter().severe(s, e); } }; for (int i = 0; i < threadCount; i++) { threads[i] = new Thread(tg, new Worker(i, region, arrays, validate), "Worker " + i); threads[i].start(); } for (int i = 0; i < threadCount; i++) { try { threads[i].join(); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); String s = "Interrupted while joining on " + threads[i]; system.getLogWriter().severe(s, ex); break; // interrupted, just get out } } } }