/* * The Pool Continuous Query QuickStart Example. * * This example takes the following steps: * * 1. Connect to a GemFire Distributed System. * 2. Create a GemFire Cache. * 3. Get the example Region from the Cache. * 4. Populate some query objects on the Region. * 5. Get the Query Service from pool. * 6. Register a cqQuery listener * 7. Execute a cqQuery with initial Results * 8. Close the Cache. * 9. Disconnect from the Distributed System. * */ // Include the GemFire library. #include // Include the Query headers. #include #include #include // Include our Query objects, viz. Portfolio and Position. #include "queryobjects/Portfolio.hpp" #include "queryobjects/Position.hpp" // Use the "gemfire" namespace. using namespace gemfire; // Use the "testobject" namespace for the query objects. using namespace testobject; class MyCqListener : public CqListener { public: void onEvent(const CqEvent& cqe){ char* opStr = (char*)"Default"; PortfolioPtr portfolio( dynamic_cast (cqe.getNewValue().ptr() )); CacheableStringPtr key( dynamic_cast (cqe.getKey().ptr() )); switch (cqe.getQueryOperation()) { case CqOperation::OP_TYPE_CREATE: { opStr = (char*)"CREATE"; break; } case CqOperation::OP_TYPE_UPDATE: { opStr = (char*)"UPDATE"; break; } case CqOperation::OP_TYPE_DESTROY: { opStr = (char*)"UPDATE"; break; } default: break; } LOGINFO("MyCqListener::OnEvent called with %s, key[%s], value=(%ld,%s)", opStr, key->asChar(), portfolio->getID(), portfolio->getPkid()->asChar()); } void onError(const CqEvent& cqe){ LOGINFO("MyCqListener::OnError called"); } void close(){ LOGINFO("MyCqListener::close called"); } }; // The PoolCqQuery QuickStart example. int main(int argc, char ** argv) { try { // Connect to the GemFire Distributed System using the settings from the gfcpp.properties file by default. DistributedSystemPtr dSysPtr = DistributedSystem::connect("ExampleDistributedSystem"); LOGINFO("Connected to the GemFire Distributed System"); // Create a GemFire Cache with the "clientPoolCqQuery.xml" Cache XML file. CachePtr cachePtr = CacheFactory::create("ExampleCache", dSysPtr, "XMLs/clientPoolCqQuery.xml"); LOGINFO("Created the GemFire Cache"); // Get the example Region from the Cache which is declared in the Cache XML file. RegionPtr regionPtr = cachePtr->getRegion("/root/Portfolios"); LOGINFO("Obtained the Region from the Cache"); // Register our Serializable/Cacheable Query objects, viz. Portfolio and Position. Serializable::registerType( Portfolio::createDeserializable); Serializable::registerType( Position::createDeserializable); LOGINFO("Registered Serializable Query Objects"); // Populate the Region with some Portfolio objects. PortfolioPtr port1Ptr(new Portfolio(1 /*ID*/, 10 /*size*/)); PortfolioPtr port2Ptr(new Portfolio(2 /*ID*/, 20 /*size*/)); PortfolioPtr port3Ptr(new Portfolio(3 /*ID*/, 30 /*size*/)); regionPtr->put("Key1", port1Ptr); regionPtr->put("Key2", port2Ptr); regionPtr->put("Key3", port3Ptr); LOGINFO("Populated some Portfolio Objects"); //find the pool PoolPtr poolPtr = PoolManager::find("examplePool"); // Get the QueryService from the Pool. QueryServicePtr qrySvcPtr = poolPtr->getQueryService(); LOGINFO("Got the QueryService from the Pool"); //Create CqAttributes and Install Listener CqAttributesFactory cqFac; CqListenerPtr cqLstner (new MyCqListener()); cqFac.addCqListener(cqLstner); CqAttributesPtr cqAttr = cqFac.create(); //create a new Cq Query const char* qryStr = "select * from /root/Portfolios p where p.ID < 5"; CqQueryPtr qry = qrySvcPtr->newCq((char*)"MyCq", qryStr, cqAttr); //execute Cq Query with initial Results SelectResultsPtr resultsPtr = qry->executeWithInitialResults(); //make change to generate cq events regionPtr->put("Key3", port1Ptr); regionPtr->put("Key2", port2Ptr); regionPtr->put("Key1", port3Ptr); LOGINFO("ResultSet Query returned %d rows", resultsPtr->size()); // Iterate through the rows of the query result. SelectResultsIterator iter = resultsPtr->getIterator(); while (iter.hasNext()) { SerializablePtr ser = iter.next(); PortfolioPtr portfolio( dynamic_cast (ser.ptr() )); PositionPtr position(dynamic_cast (ser.ptr() )); if( portfolio != NULLPTR ) { LOGINFO(" query pulled portfolio object ID %ld, pkid %s\n", portfolio->getID(), portfolio->getPkid()->asChar()); } else if( position != NULLPTR ) { LOGINFO(" query pulled position object secId %s, shares %d\n", position->getSecId()->asChar(), position->getSharesOutstanding()); } else { if( ser != NULLPTR ) { LOGINFO (" query pulled object %s\n", ser->toString()->asChar() ); } else LOGINFO(" query pulled bad object\n"); } } // Stop the GemFire Continuous query. qry->stop(); // Close the GemFire Continuous query. qry->close(); // Close the GemFire Cache. cachePtr->close(); LOGINFO("Closed the GemFire Cache"); // Disconnect from the GemFire Distributed System. dSysPtr->disconnect(); LOGINFO("Disconnected from the GemFire Distributed System"); } // An exception should not occur catch(const Exception & gemfireExcp) { LOGERROR("PoolCqQuery GemFire Exception: %s", gemfireExcp.getMessage()); } }