/* * The Execute Function QuickStart Example. * * This example takes the following steps: * * 1. Connect to a GemFire Distributed System. * 2. Create a GemFire Cache. * 3. Get the example Region from the Cache. * 4. Populate some objects on the Region. * 5. Create Execute Objects * 6. Execute Functions * 7. Close the Cache. * 8. Disconnect from the Distributed System. * */ // Include the GemFire library. #include // Include the Execute function headers. #include #include #include // Use the "gemfire" namespace. using namespace gemfire; const char * poolRegNames[] = { (char*)"partition_region",(char*)"PoolRegion2" }; const char* poolName = (char*)"__QUICK_START_POOL1__"; const char * serverGroup = (char*)"ServerGroup1"; char* getFuncIName = (char*)"MultiGetFunctionI"; char* putFuncIName = (char*)"MultiPutFunctionI"; char* getFuncName = (char*)"MultiGetFunction"; char* putFuncName = (char*)"MultiPutFunction"; const char * hostNames[] = {(char*)"localhost", (char*)"localhost"}; const uint16_t hostPorts[] = {50505, 40404}; void addServerLocatorEPs( const char* hLists[], const uint16_t hPorts[], size_t len, PoolFactoryPtr pfPtr, bool poolLocators = true ) { for (size_t i=0; i < len; i++) { if ( poolLocators ) { pfPtr->addLocator(hLists[i], hPorts[i]); } else { pfPtr->addServer(hLists[i], hPorts[i]); } } } PoolPtr createPool(const char* poolName, const char* serverGroup, bool locator = true, bool server = false, int redundancy = 0, bool clientNotification = false, int subscriptionAckInterval = -1) { PoolFactoryPtr poolFacPtr = PoolManager::createFactory(); if( server == true )//with explicit server list { addServerLocatorEPs( hostNames, hostPorts, 2, poolFacPtr, false ); //do region creation with end } else if(locator == true) //with locator { addServerLocatorEPs( hostNames, hostPorts, 2, poolFacPtr ); if(serverGroup){ poolFacPtr->setServerGroup(serverGroup); } } else //neither server nor locator return NULLPTR { return PoolPtr( NULLPTR); } poolFacPtr->setSubscriptionRedundancy(redundancy); poolFacPtr->setSubscriptionEnabled(clientNotification); if ( subscriptionAckInterval != -1 ) { poolFacPtr->setSubscriptionAckInterval( subscriptionAckInterval ); } return poolFacPtr->create(poolName); } RegionPtr createRegionAndAttachPool(CachePtr cache, const char* name, bool ack, const char* poolName, bool caching = true, int ettl = 0, int eit = 0, int rttl = 0, int rit = 0,int lel = 0, ExpirationAction::Action action = ExpirationAction::DESTROY ) { AttributesFactory af; af.setScope( ack ? ScopeType::DISTRIBUTED_ACK : ScopeType::DISTRIBUTED_NO_ACK ); af.setCachingEnabled( caching ); af.setLruEntriesLimit(lel); af.setEntryIdleTimeout(action, eit); af.setEntryTimeToLive(action, ettl); af.setRegionIdleTimeout(action, rit); af.setRegionTimeToLive(action, rttl); af.setPoolName(poolName); RegionAttributesPtr rattrsPtr = af.createRegionAttributes( ); return cache->createRegion( name, rattrsPtr ); } // The Execute Function QuickStart example. int main(int argc, char ** argv) { try { // Connect to the GemFire Distributed System using the settings from the gfcpp.properties file by default. DistributedSystemPtr dSysPtr = DistributedSystem::connect("ExampleDistributedSystem"); LOGINFO("Connected to the GemFire Distributed System"); // Create a GemFire Cache with the "clientCqQuery.xml" Cache XML file. CachePtr cachePtr = CacheFactory::create("ExampleCache", dSysPtr, "XMLs/clientExecuteFunctions.xml"); LOGINFO("Created the GemFire Cache"); PoolPtr pptr = createPool(poolName, serverGroup, false, true, 0, true ); RegionPtr regPtr0 = createRegionAndAttachPool(cachePtr, poolRegNames[0],true, poolName); regPtr0->registerAllKeys(); char buf[128]; for(int i=0; i < 34; i++) { sprintf(buf, "VALUE--%d", i); CacheablePtr value(CacheableString::create(buf)); sprintf(buf, "KEY--%d", i); CacheableKeyPtr key = CacheableKey::create(buf); regPtr0->put(key, value); } bool getResult = true; CacheableVectorPtr routingObj = CacheableVector::create(); for(int i=0; i < 34; i++) { if(i%2==0) continue; sprintf(buf, "KEY--%d", i); CacheableKeyPtr key = CacheableKey::create(buf); routingObj->push_back(key); } //test data independant function with result on one server LOGINFO("test data independant function with result on one server"); CacheablePtr args = routingObj; ExecutionPtr exc = FunctionService::onServer(pptr); CacheableVectorPtr executeFunctionResult = exc->withArgs(args)->execute(getFuncIName, getResult)->getResult(); if(executeFunctionResult==NULLPTR) { LOGINFO("get executeFunctionResult is NULL"); } else { sprintf(buf, "get result count = %d", executeFunctionResult->size()); LOGINFO(buf); for(size_t i=0; i < executeFunctionResult->size(); i++) { sprintf(buf, "get result[%d]=%s", i, dynCast(executeFunctionResult->operator[](i))->asChar()); LOGINFO(buf); } } LOGINFO("test data independant function without result on one server"); getResult = false; exc->withArgs(args)->execute(putFuncIName, getResult); LOGINFO("test data independant function with result on all servers"); getResult = true; exc = FunctionService::onServers(pptr); executeFunctionResult = exc->withArgs(args)->execute(getFuncIName, getResult)->getResult(); if(executeFunctionResult==NULLPTR) { LOGINFO("get executeFunctionResult is NULL"); } else { sprintf(buf, "get result count = %d", executeFunctionResult->size()); LOGINFO(buf); for(size_t i=0; i < executeFunctionResult->size(); i++) { sprintf(buf, "get result[%d]=%s", i, dynCast(executeFunctionResult->operator[](i))->asChar()); LOGINFO(buf); } } getResult = false; LOGINFO("test data independant function without result on all servers"); exc->withArgs(args)->execute(putFuncIName, getResult); LOGINFO("test data dependant function with result"); getResult = true; args = CacheableBoolean::create( 1 ); exc = FunctionService::onRegion(regPtr0); executeFunctionResult = exc->withFilter(routingObj)->withArgs(args)->execute(getFuncName, getResult)->getResult(); if(executeFunctionResult==NULLPTR) { LOGINFO( "execute on region: executeFunctionResult is NULL"); } else { LOGINFO( "Execute on Region: result count = %d", executeFunctionResult->size()); for(size_t i=0; i < executeFunctionResult->size(); i++) { sprintf(buf, "Execute on Region: result[%d]=%s", i, dynCast(executeFunctionResult->operator[](i))->asChar()); LOGINFO(buf); } } LOGINFO("test data dependant function without result"); getResult = false; exc->withFilter(routingObj)->withArgs(args)->execute(putFuncName, getResult); // Close the GemFire Cache. cachePtr->close(); LOGINFO("Closed the GemFire Cache"); // Disconnect from the GemFire Distributed System. dSysPtr->disconnect(); LOGINFO("Disconnected from the GemFire Distributed System"); } // An exception should not occur catch(const Exception & gemfireExcp) { LOGERROR("Function Execution GemFire Exception: %s", gemfireExcp.getMessage()); } }