/*- * See the file LICENSE for redistribution information. * * Copyright (c) 2001-2009 Oracle. All rights reserved. * * $Id$ */ /* * In this application, we specify all communication via the command line. In * a real application, we would expect that information about the other sites * in the system would be maintained in some sort of configuration file. The * critical part of this interface is that we assume at startup that we can * find out * 1) what our Berkeley DB home environment is, * 2) what host/port we wish to listen on for connections; and * 3) an optional list of other sites we should attempt to connect to. * * These pieces of information are expressed by the following flags. * -h home (required; h stands for home directory) * -l host:port (required; l stands for local) * -C or -M (optional; start up as client or master) * -r host:port (optional; r stands for remote; any number of these may be * specified) * -R host:port (optional; R stands for remote peer; only one of these may * be specified) * -a all|quorum (optional; a stands for ack policy) * -b (optional; b stands for bulk) * -n nsites (optional; number of sites in replication group; defaults to 0 * to try to dynamically compute nsites) * -p priority (optional; defaults to 100) * -v (optional; v stands for verbose) */ #include #include #include #include #include "StlRepConfigInfo.h" #include "dbstl_map.h" using std::cout; using std::cin; using std::cerr; using std::endl; using std::flush; using std::istream; using std::istringstream; using std::string; using std::getline; using namespace dbstl; #define CACHESIZE (10 * 1024 * 1024) #define DATABASE "quote.db" const char *progname = "exstl_repquote"; #include #ifdef _WIN32 #define WIN32_LEAN_AND_MEAN #include #define snprintf _snprintf #define sleep(s) Sleep(1000 * (s)) extern "C" { extern int getopt(int, char * const *, const char *); extern char *optarg; extern int optind; } typedef HANDLE thread_t; typedef DWORD thread_exit_status_t; #define thread_create(thrp, attr, func, arg) \ (((*(thrp) = CreateThread(NULL, 0, \ (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0) #define thread_join(thr, statusp) \ ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \ GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1) #else /* !_WIN32 */ #include typedef pthread_t thread_t; typedef void* thread_exit_status_t; #define thread_create(thrp, attr, func, arg) \ pthread_create((thrp), (attr), (func), (arg)) #define thread_join(thr, statusp) pthread_join((thr), (statusp)) #endif // Struct used to store information in Db app_private field. typedef struct { bool app_finished; bool in_client_sync; bool is_master; } APP_DATA; static void log(const char *); void *checkpoint_thread (void *); void *log_archive_thread (void *); class RepQuoteExample { public: typedef db_map > str_map_t; RepQuoteExample(); void init(RepConfigInfo* config); void doloop(); int terminate(); static void event_callback(DbEnv * dbenv, u_int32_t which, void *info); private: // disable copy constructor. RepQuoteExample(const RepQuoteExample &); void operator = (const RepQuoteExample &); // internal data members. APP_DATA app_data; RepConfigInfo *app_config; DbEnv *cur_env; Db *dbp; str_map_t *strmap; thread_t ckp_thr; thread_t lga_thr; // private methods. void print_stocks(); void prompt(); bool open_db(bool creating); void close_db(){ delete strmap; strmap = NULL; dbstl::close_db(dbp); dbp = NULL; } static void close_db(Db *&);// Close an unregistered Db handle. }; bool RepQuoteExample::open_db(bool creating) { int ret; if (dbp) return true; dbp = new Db(cur_env, DB_CXX_NO_EXCEPTIONS); u_int32_t flags = DB_AUTO_COMMIT | DB_THREAD; if (creating) flags |= DB_CREATE; ret = dbp->open(NULL, DATABASE, NULL, DB_BTREE, flags, 0); switch (ret) { case 0: register_db(dbp); if (strmap) delete strmap; strmap = new str_map_t(dbp, cur_env); return (true); case DB_LOCK_DEADLOCK: // Fall through case DB_REP_HANDLE_DEAD: log("\nFailed to open stock db."); break; default: if (ret == DB_REP_LOCKOUT) break; // Fall through else if (ret == ENOENT && !creating) log("\nStock DB does not yet exist\n"); else { DbException ex(ret); throw ex; } } // switch // (All retryable errors fall through to here.) // log("\nPlease retry the operation"); close_db(dbp); return (false); } void RepQuoteExample::close_db(Db *&dbp) { if (dbp) { try { dbp->close(0); delete dbp; dbp = 0; } catch (...) { delete dbp; dbp = 0; throw; } } } RepQuoteExample::RepQuoteExample() : app_config(0), cur_env(NULL) { app_data.app_finished = 0; app_data.in_client_sync = 0; app_data.is_master = 0; // assume I start out as client cur_env = new DbEnv(DB_CXX_NO_EXCEPTIONS); strmap = NULL; dbp = NULL; } void RepQuoteExample::init(RepConfigInfo *config) { app_config = config; cur_env->set_app_private(&app_data); cur_env->set_errfile(stderr); cur_env->set_errpfx(progname); cur_env->set_event_notify(event_callback); // Configure bulk transfer to send groups of records to clients // in a single network transfer. This is useful for master sites // and clients participating in client-to-client synchronization. // if (app_config->bulk) cur_env->rep_set_config(DB_REP_CONF_BULK, 1); // Set the total number of sites in the replication group. // This is used by repmgr internal election processing. // if (app_config->totalsites > 0) cur_env->rep_set_nsites(app_config->totalsites); // Turn on debugging and informational output if requested. if (app_config->verbose) cur_env->set_verbose(DB_VERB_REPLICATION, 1); // Set replication group election priority for this environment. // An election first selects the site with the most recent log // records as the new master. If multiple sites have the most // recent log records, the site with the highest priority value // is selected as master. // cur_env->rep_set_priority(app_config->priority); // Set the policy that determines how master and client sites // handle acknowledgement of replication messages needed for // permanent records. The default policy of "quorum" requires only // a quorum of electable peers sufficient to ensure a permanent // record remains durable if an election is held. The "all" option // requires all clients to acknowledge a permanent replication // message instead. // cur_env->repmgr_set_ack_policy(app_config->ack_policy); // Set the threshold for the minimum and maximum time the client // waits before requesting retransmission of a missing message. // Base these values on the performance and load characteristics // of the master and client host platforms as well as the round // trip message time. // cur_env->rep_set_request(20000, 500000); // Configure deadlock detection to ensure that any deadlocks // are broken by having one of the conflicting lock requests // rejected. DB_LOCK_DEFAULT uses the lock policy specified // at environment creation time or DB_LOCK_RANDOM if none was // specified. // cur_env->set_lk_detect(DB_LOCK_DEFAULT); // The following base replication features may also be useful to your // application. See Berkeley DB documentation for more details. // - Master leases: Provide stricter consistency for data reads // on a master site. // - Timeouts: Customize the amount of time Berkeley DB waits // for such things as an election to be concluded or a master // lease to be granted. // - Delayed client synchronization: Manage the master site's // resources by spreading out resource-intensive client // synchronizations. // - Blocked client operations: Return immediately with an error // instead of waiting indefinitely if a client operation is // blocked by an ongoing client synchronization. cur_env->repmgr_set_local_site(app_config->this_host.host, app_config->this_host.port, 0); for ( REP_HOST_INFO *cur = app_config->other_hosts; cur != NULL; cur = cur->next) { cur_env->repmgr_add_remote_site(cur->host, cur->port, NULL, cur->peer ? DB_REPMGR_PEER : 0); } // Configure heartbeat timeouts so that repmgr monitors the // health of the TCP connection. Master sites broadcast a heartbeat // at the frequency specified by the DB_REP_HEARTBEAT_SEND timeout. // Client sites wait for message activity the length of the // DB_REP_HEARTBEAT_MONITOR timeout before concluding that the // connection to the master is lost. The DB_REP_HEARTBEAT_MONITOR // timeout should be longer than the DB_REP_HEARTBEAT_SEND timeout. // cur_env->rep_set_timeout(DB_REP_HEARTBEAT_SEND, 5000000); cur_env->rep_set_timeout(DB_REP_HEARTBEAT_MONITOR, 10000000); // The following repmgr features may also be useful to your // application. See Berkeley DB documentation for more details. // - Two-site strict majority rule - In a two-site replication // group, require both sites to be available to elect a new // master. // - Timeouts - Customize the amount of time repmgr waits // for such things as waiting for acknowledgements or attempting // to reconnect to other sites. // - Site list - return a list of sites currently known to repmgr. // We can now open our environment, although we're not ready to // begin replicating. However, we want to have a dbenv around // so that we can send it into any of our message handlers. cur_env->set_cachesize(0, CACHESIZE, 0); cur_env->set_flags(DB_TXN_NOSYNC, 1); cur_env->open(app_config->home, DB_CREATE | DB_RECOVER | DB_THREAD | DB_INIT_REP | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN, 0); // Start checkpoint and log archive support threads. (void)thread_create(&ckp_thr, NULL, checkpoint_thread, cur_env); (void)thread_create(&lga_thr, NULL, log_archive_thread, cur_env); dbstl::register_db_env(cur_env); cur_env->repmgr_start(3, app_config->start_policy); } int RepQuoteExample::terminate() { try { // Wait for checkpoint and log archive threads to finish. // Windows does not allow NULL pointer for exit code variable. thread_exit_status_t exstat; (void)thread_join(lga_thr, &exstat); (void)thread_join(ckp_thr, &exstat); // We have used the DB_TXN_NOSYNC environment flag for // improved performance without the usual sacrifice of // transactional durability, as discussed in the // "Transactional guarantees" page of the Reference // Guide: if one replication site crashes, we can // expect the data to exist at another site. However, // in case we shut down all sites gracefully, we push // out the end of the log here so that the most // recent transactions don't mysteriously disappear. cur_env->log_flush(NULL); } catch (DbException dbe) { cout << "\nerror closing environment: " << dbe.what() << endl; } return 0; } void RepQuoteExample::prompt() { cout << "QUOTESERVER"; if (!app_data.is_master) cout << "(read-only)"; cout << "> " << flush; } void log(const char *msg) { cerr << msg << endl; } // Simple command-line user interface: // - enter " " to insert or update a record in the // database; // - just press Return (i.e., blank input line) to print out the contents of // the database; // - enter "quit" or "exit" to quit. // void RepQuoteExample::doloop() { string input; while (prompt(), getline(cin, input)) { istringstream is(input); string token1, token2; // Read 0, 1 or 2 tokens from the input. // int count = 0; if (is >> token1) { count++; if (is >> token2) count++; } if (count == 1) { if (token1 == "exit" || token1 == "quit") { app_data.app_finished = 1; break; } else { log("\nFormat: \n"); continue; } } // Here we know count is either 0 or 2, so we're about to try a // DB operation. // // Open database with DB_CREATE only if this is a master // database. A client database uses polling to attempt // to open the database without DB_CREATE until it is // successful. // // This DB_CREATE polling logic can be simplified under // some circumstances. For example, if the application can // be sure a database is already there, it would never need // to open it with DB_CREATE. // if (!open_db(app_data.is_master)) continue; try { if (count == 0) if (app_data.in_client_sync) log( "Cannot read data during client initialization - please try again."); else print_stocks(); else if (!app_data.is_master) log("\nCan't update at client\n"); else { char *symbol = new char[token1.length() + 1]; strcpy(symbol, token1.c_str()); char *price = new char[token2.length() + 1]; strcpy(price, token2.c_str()); begin_txn(0, cur_env); strmap->insert(make_pair(symbol, price)); commit_txn(cur_env); delete symbol; delete price; } } catch (DbDeadlockException e) { log("\nplease retry the operation\n"); close_db(); } catch (DbRepHandleDeadException e) { log("\nplease retry the operation\n"); close_db(); } catch (DbException e) { if (e.get_errno() == DB_REP_LOCKOUT) { log("\nplease retry the operation\n"); close_db(); } else throw; } } close_db(); } void RepQuoteExample::event_callback(DbEnv* dbenv, u_int32_t which, void *info) { APP_DATA *app = (APP_DATA*)dbenv->get_app_private(); info = NULL; /* Currently unused. */ switch (which) { case DB_EVENT_REP_MASTER: app->in_client_sync = 0; app->is_master = 1; break; case DB_EVENT_REP_CLIENT: app->is_master = 0; app->in_client_sync = 1; break; case DB_EVENT_REP_STARTUPDONE: app->in_client_sync = 0; break; case DB_EVENT_REP_NEWMASTER: app->in_client_sync = 1; break; case DB_EVENT_REP_PERM_FAILED: // Did not get enough acks to guarantee transaction // durability based on the configured ack policy. This // transaction will be flushed to the master site's // local disk storage for durability. // log( "Insufficient acknowledgements to guarantee transaction durability."); break; default: dbenv->errx("\nignoring event %d", which); } } void RepQuoteExample::print_stocks() { #define MAXKEYSIZE 10 #define MAXDATASIZE 20 cout << "\tSymbol\tPrice" << endl << "\t======\t=====" << endl; str_map_t::iterator itr; if (strmap == NULL) strmap = new str_map_t(dbp, cur_env); begin_txn(0, cur_env); for (itr = strmap->begin(); itr != strmap->end(); ++itr) cout<<"\t"<first<<"\t"<second<get_app_private(); for (;;) { // Wait for one minute, polling once per second to see if // application has finished. When application has finished, // terminate this thread. // for (i = 0; i < 60; i++) { sleep(1); if (app->app_finished == 1) return ((void *)EXIT_SUCCESS); } // Perform a checkpoint. if ((ret = env->txn_checkpoint(0, 0, 0)) != 0) { env->err(ret, "Could not perform checkpoint.\n"); return ((void *)EXIT_FAILURE); } } } // This is a simple log archive thread. Once per minute, it removes all but // the most recent 3 logs that are safe to remove according to a call to // DBENV->log_archive(). // // Log cleanup is needed to conserve disk space, but aggressive log cleanup // can cause more frequent client initializations if a client lags too far // behind the current master. This can happen in the event of a slow client, // a network partition, or a new master that has not kept as many logs as the // previous master. // // The approach in this routine balances the need to mitigate against a // lagging client by keeping a few more of the most recent unneeded logs // with the need to conserve disk space by regularly cleaning up log files. // Use of automatic log removal (DBENV->log_set_config() DB_LOG_AUTO_REMOVE // flag) is not recommended for replication due to the risk of frequent // client initializations. // void *log_archive_thread(void *args) { DbEnv *env; APP_DATA *app; char **begin, **list; int i, listlen, logs_to_keep, minlog, ret; env = (DbEnv *)args; app = (APP_DATA *)env->get_app_private(); logs_to_keep = 3; for (;;) { // Wait for one minute, polling once per second to see if // application has finished. When application has finished, // terminate this thread. // for (i = 0; i < 60; i++) { sleep(1); if (app->app_finished == 1) return ((void *)EXIT_SUCCESS); } // Get the list of unneeded log files. if ((ret = env->log_archive(&list, DB_ARCH_ABS)) != 0) { env->err(ret, "Could not get log archive list."); return ((void *)EXIT_FAILURE); } if (list != NULL) { listlen = 0; // Get the number of logs in the list. for (begin = list; *begin != NULL; begin++, listlen++); // Remove all but the logs_to_keep most recent // unneeded log files. // minlog = listlen - logs_to_keep; for (begin = list, i= 0; i < minlog; list++, i++) { if ((ret = unlink(*list)) != 0) { env->err(ret, "logclean: remove %s", *list); env->errx( "logclean: Error remove %s", *list); free(begin); return ((void *)EXIT_FAILURE); } } free(begin); } } }