/*- * See the file LICENSE for redistribution information. * * Copyright (c) 2009 Oracle. All rights reserved. * * $Id$ */ #ifndef _DB_STL_DBC_H #define _DB_STL_DBC_H #include #include #include "dbstl_common.h" #include "dbstl_dbt.h" #include "dbstl_exception.h" #include "dbstl_container.h" #include "dbstl_resource_manager.h" START_NS(dbstl) // Forward declarations. class db_container; class DbCursorBase; template class RandDbCursor; class DbstlMultipleKeyDataIterator; class DbstlMultipleRecnoDataIterator; using std::set; ///////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////// // // LazyDupCursor class template definition. // // This class allows us to make a shallow copy on construction. When the // cursor pointer is first dereferenced a deep copy is made. // // The allowed type for BaseType is DbCursor<> and RandDbCursor<> // The expected usage of this class is: // 1. Create an iterator in container::begin(), the iterator::pcsr.csr_ptr_ // points to an object, thus no need to duplicate. // 2. The iterator is created with default argument, thus the // iterator::pcsr.csr_ptr_ and dup_src_ is NULL, and this iterator is // copied using copy constructor for may be many times, but until the // cursor is really used, no cursor is duplicated. // // There is an informing mechanism between an instance of this class and // its dup_src_ cursor: when that cursor is about to change state, it will // inform all registered LazyDupCursor "listeners" of the change, so that // they will duplicate from the cursor before the change, because that // is the expected cursor state for the listeners. template class LazyDupCursor { // dup_src_ is used by this class internally to duplicate another // cursor and set to csr_ptr_, and it is assigned in the copy // constructor from another LazyDupCursor object's csr_ptr_; csr_ptr_ // is the acutual pointer that is used to perform cursor operations. // BaseType *csr_ptr_, *dup_src_; typedef LazyDupCursor self; public: //////////////////////////////////////////////////////////////////// // // Begin public constructors and destructor. // inline LazyDupCursor() { csr_ptr_ = NULL; dup_src_ = NULL; } // Used in all iterator types' constructors, dbcptr is created // solely for this object, and the cursor is not yet opened, so we // simply assign it to csr_ptr_. explicit inline LazyDupCursor(BaseType *dbcptr) { csr_ptr_ = dbcptr; // Already have pointer, do not need to duplicate. dup_src_ = NULL; } // Do not copy to csr_ptr_, shallow copy from dp2.csr_ptr_. LazyDupCursor(const self& dp2) { csr_ptr_ = NULL; if (dp2.csr_ptr_) dup_src_ = dp2.csr_ptr_; else dup_src_ = dp2.dup_src_; if (dup_src_) dup_src_->add_dupper(this); } ~LazyDupCursor() { // Not duplicated yet, remove from dup_src_. if (csr_ptr_ == NULL && dup_src_ != NULL) dup_src_->erase_dupper(this); if (csr_ptr_) delete csr_ptr_;// Delete the cursor. } //////////////////////////////////////////////////////////////////// // Deep copy. inline const self& operator=(const self &dp2) { BaseType *dcb; dcb = dp2.csr_ptr_ ? dp2.csr_ptr_ : dp2.dup_src_; this->operator=(dcb); return dp2; } // Deep copy. inline BaseType *operator=(BaseType *dcb) { if (csr_ptr_) { // Only dup_src_ will inform this, not csr_ptr_. delete csr_ptr_; csr_ptr_ = NULL; } if (dcb) csr_ptr_ = new BaseType(*dcb); if (dup_src_ != NULL) { dup_src_->erase_dupper(this); dup_src_ = NULL; } return dcb; } void set_cursor(BaseType *dbc) { assert(dbc != NULL); if (csr_ptr_) { // Only dup_src_ will inform this, not csr_ptr_. delete csr_ptr_; csr_ptr_ = NULL; } csr_ptr_ = dbc; if (dup_src_ != NULL) { dup_src_->erase_dupper(this); dup_src_ = NULL; } } // If dup_src_ is informing this object, pass false parameter. inline BaseType* duplicate(bool erase_dupper = true) { assert(dup_src_ != NULL); if (csr_ptr_) { // Only dup_src_ will inform this, not csr_ptr_. delete csr_ptr_; csr_ptr_ = NULL; } csr_ptr_ = new BaseType(*dup_src_); if (erase_dupper) dup_src_->erase_dupper(this); dup_src_ = NULL; return csr_ptr_; } inline BaseType* operator->() { if (csr_ptr_) return csr_ptr_; return duplicate(); } inline operator bool() { return csr_ptr_ != NULL; } inline bool operator!() { return !csr_ptr_; } inline bool operator==(void *p) { return csr_ptr_ == p; } inline BaseType* base_ptr(){ if (csr_ptr_) return csr_ptr_; return duplicate(); } }; ///////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////// // // DbCursorBase class definition. // // DbCursorBase is the base class for DbCursor<> class template, this class // wraps the Berkeley DB cursor, in order for the ResourceManager to close // the Berkeley DB cursor and set the pointer to null. // If we don't set the cursor to NULL, the handle could become valid again, // since Berkeley DB recycles handles. DB STL would then try to use the same // handle across different instances, which is not supported. // // In ResourceManager, whenver a cursor is opened, it stores the // DbCursorBase* pointer, so that when need to close the cursor, it calls // DbCursorBase::close() function. // class DbCursorBase { protected: Dbc *csr_; DbTxn *owner_txn_; Db *owner_db_; int csr_status_; public: enum DbcGetSkipOptions{SKIP_KEY, SKIP_DATA, SKIP_NONE}; inline DbTxn *get_owner_txn() const { return owner_txn_;} inline void set_owner_txn(DbTxn *otxn) { owner_txn_ = otxn;} inline Db *get_owner_db() const { return owner_db_;} inline void set_owner_db(Db *odb) { owner_db_ = odb;} inline Dbc *get_cursor() const { return csr_;} inline Dbc *&get_cursor_reference() { return csr_;} inline void set_cursor(Dbc*csr1) { if (csr_) ResourceManager::instance()->remove_cursor(this); csr_ = csr1; } inline int close() { int ret = 0; if (csr_ != NULL && (((DBC *)csr_)->flags & DBC_ACTIVE) != 0) { ret = csr_->close(); csr_ = NULL; } return ret; } DbCursorBase(){ owner_txn_ = NULL; owner_db_ = NULL; csr_ = NULL; csr_status_ = 0; } DbCursorBase(const DbCursorBase &csrbase) { this->operator=(csrbase); } const DbCursorBase &operator=(const DbCursorBase &csrbase) { owner_txn_ = csrbase.owner_txn_; owner_db_ = csrbase.owner_db_; csr_ = NULL; // Need to call DbCursor<>::dup to duplicate. csr_status_ = 0; return csrbase; } virtual ~DbCursorBase() { close(); } }; // DbCursorBase //////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////// // // DbCursor class template definition // // DbCursor is the connection between Berkeley DB and dbstl container classes // it is the wrapper class for Dbc* cursor of Berkeley Db, to be used for // iterator classes of Berkeley DB backed STL container classes. // Requirement: // 1. Deep copy using Dbc->dup. // 2. Dbc*cursor management via ResourceManager class. // 3. Provide methods to do increment, decrement and advance operations, // advance is only available for random access iterator from DB_RECNO // containers. // template class DbCursor : public DbCursorBase{ protected: // Lazy duplication support: store the LazyDupCursor objects which // will duplicate from this cursor. typedef LazyDupCursor > dupper_t; typedef LazyDupCursor > dupperr_t; typedef set >* > dupset_t; typedef set >* > dupsetr_t; set >* > sduppers1_; set >* > sduppers2_; // We must use DB_DBT_USERMEM for Dbc::get and Db::get if they are // used in multi-threaded application, so we use key_buf_ and // data_buf_ data members for get operations, and initialize them // to use user memory. Dbt key_buf_, data_buf_; // Similar to Berkeley DB C++ API's classes, used to iterate through // bulk retrieved key/data pairs. DbstlMultipleKeyDataIterator *multi_itr_; DbstlMultipleRecnoDataIterator *recno_itr_; // Whether to use bulk retrieval. If non-zero, do bulk retrieval, // bulk buffer size is this member, otherwise not bulk read. // By default this member is 0. u_int32_t bulk_retrieval_; // Whether to use DB_RMW flag in Dbc::get, by default false. bool rmw_get_; // Whether to poll data from cursor's current position on every // get_current_key/data call. // Note that curr_key_/curr_data_ members are always maintained // to contain current k/d value of the pair pointed to by csr_. // If doing bulk retrieval, this flag is ignored, we will always // read data from bulk buffer. bool directdb_get_; // Inform LazyDupCursor objects registered in this object to do // duplication because this cursor is to be changed. // This function should be called in any function of // DbCursor and RandDbCursor whenever the cursor is about to change // state(move/close, etc). inline void inform_duppers() { typename dupset_t::iterator i1; typename dupsetr_t::iterator i2; for (i1 = sduppers1_.begin(); i1 != sduppers1_.end(); i1++) (*i1)->duplicate(false); for (i2 = sduppers2_.begin(); i2 != sduppers2_.end(); i2++) (*i2)->duplicate(false); sduppers1_.clear(); sduppers2_.clear(); } public: friend class DataItem; // Current key/data pair pointed by "csr_" Dbc*cursor. They are both // maintained on cursor movement. If directdb_get_ is true, // they are both refreshed on every get_current{[_key][_data]} call and // the retrieved key/data pair is returned to user. DataItem curr_key_; DataItem curr_data_; typedef DbCursor self; // This function is used by all iterators to do equals comparison. // Random iterators will also use it to do less than/greater than // comparisons. // Internally, the page number difference or index difference is // returned, so for btree and hash databases, if two cursors point to // the same key/data pair, we will get 0 returned, meaning they are // equal; if return value is not 0, it means no more than that they // they are not equal. We can't assume any order information between // the two cursors. For recno databases, we use the recno to do less // than and greater than comparison. So we can get a reliable knowledge // of the relative position of two iterators from the return value. int compare(const self *csr2) const{ int res, ret; BDBOP(((DBC *)csr_)->cmp((DBC *)csr_, (DBC *)csr2->csr_, &res, 0), ret); return res; } //////////////////////////////////////////////////////////////////// // // Add and remove cursor change event listeners. // inline void add_dupper(dupper_t *dupper) { sduppers1_.insert(dupper); } inline void add_dupper(dupperr_t *dupper) { sduppers2_.insert(dupper); } inline void erase_dupper(dupper_t *dup1) { sduppers1_.erase(dup1); } inline void erase_dupper(dupperr_t *dup1) { sduppers2_.erase(dup1); } //////////////////////////////////////////////////////////////////// public: inline bool get_rmw() { return rmw_get_; } bool set_rmw(bool rmw, DB_ENV *env = NULL ) { u_int32_t flag = 0; DB_ENV *dbenv = NULL; int ret; if (env) dbenv = env; else dbenv = ((DBC*)csr_)->dbenv; BDBOP(dbenv->get_open_flags(dbenv, &flag), ret); // DB_RMW flag requires locking subsystem started. if (rmw && ((flag & DB_INIT_LOCK) || (flag & DB_INIT_CDB) || (flag & DB_INIT_TXN))) rmw_get_ = true; else rmw_get_ = false; return rmw_get_; } // Modify bulk buffer size. Bulk read is enabled when creating an // iterator, so users later can only modify the bulk buffer size // to another value, but can't enable/disable bulk read while an // iterator is already alive. // Returns true if succeeded, false otherwise. inline bool set_bulk_buffer(u_int32_t sz) { if (bulk_retrieval_ && sz) { normalize_bulk_bufsize(sz); bulk_retrieval_ = sz; return true; } return false; } inline u_int32_t get_bulk_bufsize() { return bulk_retrieval_; } inline void enlarge_dbt(Dbt &d, u_int32_t sz) { void *p; p = DbstlReAlloc(d.get_data(), sz); dbstl_assert(p != NULL); d.set_ulen(sz); d.set_data(p); d.set_size(sz); } // Move forward or backward, often by 1 key/data pair, we can use // different flags for Dbc::get function. Then update the key/data // pair and csr_status_ members. // int increment(int flag) { int ret = 0; Dbt &k = key_buf_, &d = data_buf_; u_int32_t sz, getflags = 0, bulk_bufsz; if (csr_ == NULL) return INVALID_ITERATOR_CURSOR; curr_key_.reset(); curr_data_.reset(); inform_duppers(); // Berkeley DB cursor flags are not bitwise set, so we can't // use bit operations here. // if (this->bulk_retrieval_ != 0) switch (flag) { case DB_PREV: case DB_PREV_DUP: case DB_PREV_NODUP: case DB_LAST: case DB_JOIN_ITEM: case DB_GET_RECNO: case DB_SET_RECNO: break; default: getflags |= DB_MULTIPLE_KEY; if (data_buf_.get_ulen() != bulk_retrieval_) enlarge_dbt(data_buf_, bulk_retrieval_); break; } if (this->rmw_get_) getflags |= DB_RMW; // Do not use BDBOP or BDBOP2 here because it is likely // that an iteration will step onto end() position. retry: ret = csr_->get(&k, &d, flag | getflags); if (ret == 0) { if (bulk_retrieval_ && (getflags & DB_MULTIPLE_KEY)) { // A new retrieval, so both multi_itr_ and // recno_itr_ must be NULL. if (((DBC*)csr_)->dbtype == DB_RECNO) { if (recno_itr_) { delete recno_itr_; recno_itr_ = NULL; } recno_itr_ = new DbstlMultipleRecnoDataIterator(d); } else { if (multi_itr_) { delete multi_itr_; multi_itr_ = NULL; } multi_itr_ = new DbstlMultipleKeyDataIterator(d); } } else { // Non bulk retrieval succeeded. curr_key_.set_dbt(k, false); curr_data_.set_dbt(d, false); limit_buf_size_after_use(); } } else if (ret == DB_BUFFER_SMALL) { // Either the key or data DBTs might trigger a // DB_KEYSMALL return. Only enlarge the DBT if it // is actually too small. if (((sz = d.get_size()) > 0) && (sz > d.get_ulen())) enlarge_dbt(d, sz); if (((sz = k.get_size()) > 0) && (sz > k.get_ulen())) enlarge_dbt(k, sz); goto retry; } else { if (ret == DB_NOTFOUND) { ret = INVALID_ITERATOR_POSITION; this->curr_key_.reset(); this->curr_data_.reset(); } else if (bulk_retrieval_ && (getflags & DB_MULTIPLE_KEY)){ BDBOP(((DBC*)csr_)->dbp-> get_pagesize(((DBC*)csr_)-> dbp, &bulk_bufsz), ret); if (bulk_bufsz > d.get_ulen()) {// buf size error normalize_bulk_bufsize(bulk_bufsz); bulk_retrieval_ = bulk_bufsz; enlarge_dbt(d, bulk_bufsz); goto retry; } else throw_bdb_exception( "DbCursor<>::increment", ret); } else throw_bdb_exception( "DbCursor<>::increment", ret); } csr_status_ = ret; return ret; } // After each use of key_buf_ and data_buf_, limit their buffer size to // a reasonable size so that they don't waste a big memory space. inline void limit_buf_size_after_use() { if (bulk_retrieval_) // Bulk buffer has to be huge, so don't check it. return; if (key_buf_.get_ulen() > DBSTL_MAX_KEY_BUF_LEN) { key_buf_.set_data(DbstlReAlloc(key_buf_.get_data(), DBSTL_MAX_KEY_BUF_LEN)); key_buf_.set_ulen(DBSTL_MAX_KEY_BUF_LEN); } if (data_buf_.get_ulen() > DBSTL_MAX_DATA_BUF_LEN) { data_buf_.set_data(DbstlReAlloc(data_buf_.get_data(), DBSTL_MAX_DATA_BUF_LEN)); data_buf_.set_ulen(DBSTL_MAX_DATA_BUF_LEN); } } // Duplicate this object's cursor and set it to dbc1. // inline int dup(DbCursor& dbc1) const { Dbc* pcsr = 0; int ret; if (csr_ != 0 && csr_->dup(&pcsr, DB_POSITION) == 0) { dbc1.set_cursor(pcsr); dbc1.set_owner_db(this->get_owner_db()); dbc1.set_owner_txn(this->get_owner_txn()); ResourceManager::instance()->add_cursor( this->get_owner_db(), &dbc1); ret = 0; } else ret = ITERATOR_DUP_ERROR; return ret; } public: // Open a cursor, do not move it, it is at an invalid position. // All cursors should be opened using this method. // inline int open(db_container *pdbc, int flags) { int ret; Db *pdb = pdbc->get_db_handle(); if (pdb == NULL) return 0; if (csr_) // Close before open. return 0; ret = ResourceManager::instance()-> open_cursor(this, pdb, flags); set_rmw(rmw_get_); this->csr_status_ = ret; return ret; } // Move Berkeley DB cursor to specified key k, by default use DB_SET, // but DB_SET_RANGE can and may also be used. // int move_to(const key_dt&k, u_int32_t flag = DB_SET) { Dbt &d1 = data_buf_; int ret; u_int32_t sz; DataItem k1(k, true); if (csr_ == NULL) return INVALID_ITERATOR_CURSOR; curr_key_.reset(); curr_data_.reset(); inform_duppers(); // It is likely that k is not in db, causing get(DB_SET) to // fail, we should not throw an exception because of this. // if (rmw_get_) flag |= DB_RMW; retry: ret = csr_->get(&k1.get_dbt(), &d1, flag); if (ret == 0) { curr_key_ = k1; curr_data_.set_dbt(d1, false); limit_buf_size_after_use(); } else if (ret == DB_BUFFER_SMALL) { sz = d1.get_size(); assert(sz > 0); enlarge_dbt(d1, sz); goto retry; } else { if (ret == DB_NOTFOUND) { ret = INVALID_ITERATOR_POSITION; // Invalidate current values because it is // at an invalid position. this->curr_key_.reset(); this->curr_data_.reset(); } else throw_bdb_exception("DbCursor<>::move_to", ret); } csr_status_ = ret; return ret; } // Returns the number of keys equal to the current one. inline size_t count() { int ret; db_recno_t cnt; BDBOP2(csr_->count(&cnt, 0), ret, close()); return (size_t)cnt; } int insert(const key_dt&k, const data_dt& d, int pos = DB_BEFORE) { // !!!XXX: // We do a deep copy of the input data into a local // variable. Apparently not doing so causes issues // when using gcc. Even though the put completes prior // to returning from this function call. // It would be best to avoid this additional copy. int ret; // (k, d) pair may be a temporary pair, so we must copy them. DataItem k1(k, false), d1(d, false); inform_duppers(); if (pos == DB_AFTER) { ret = this->csr_->put(&k1.get_dbt(), &d1.get_dbt(), pos); // May be using this flag for an empty database, // because begin() an iterator of an empty db_vector // equals its end() iterator, so use DB_KEYLAST to // retry. // if (ret == EINVAL || ret == 0) return ret; else if (ret) throw_bdb_exception("DbCursor<>::insert", ret); } if (pos == DB_NODUPDATA) BDBOP3(this->csr_->put(&k1.get_dbt(), &d1.get_dbt(), pos), ret, DB_KEYEXIST, close()); else BDBOP2(this->csr_->put(&k1.get_dbt(), &d1.get_dbt(), pos), ret, close()); this->csr_status_ = ret; if (ret == 0) { curr_key_ = k1; curr_data_ = d1; } // This cursor points to the new key/data pair now. return ret; } // Replace current cursor-pointed data item with d. inline int replace(const data_dt& d) { Dbt k1; int ret; // !!!XXX: // We do a deep copy of the input data into a local // variable. Apparently not doing so causes issues // when using gcc. Even though the put completes prior // to returning from this function call. // It would be best to avoid this additional copy. // d may be a temporary object, so we must copy it. DataItem d1(d, false); BDBOP2(this->csr_->put(&k1, &d1.get_dbt(), DB_CURRENT), ret, close()); curr_data_ = d1; // Update current data. this->csr_status_ = ret; return ret; } // Remove old key and insert new key-psuodo_data. First insert then // move to old key and remove it so that the cursor remains at the // old key's position, according to DB documentation. // But from practice I can see // the cursor after delete seems not at old position because a for // loop iteration exits prematurelly, not all elements are passed. // inline int replace_key(const key_dt&k) { data_dt d; key_dt k0; int ret; this->get_current_key_data(k0, d); if (k0 == k) return 0; DbCursor csr2; this->dup(csr2); // Delete current, then insert new key/data pair. ret = csr2.del(); ret = csr2.insert(k, d, DB_KEYLAST); this->csr_status_ = ret; // Now this->csr_ is sitting on an invalid position, its // iterator is invalidated. Must first move it to the next // position before using it. return ret; } inline int del() { int ret; inform_duppers(); BDBOP2(csr_->del(0), ret, close()); // By default pos.csr_ will stay at where it was after delete, // which now is an invalid position. So we need to move to // next to conform to stl specifications, but we don't move it // here, iterator::erase should move the iterator itself // forward. // this->csr_status_ = ret; return ret; } // Make sure the bulk buffer is large enough, and a multiple of 1KB. // This function may be called prior to cursor initialization, it is // not possible to verify that the buffer size is a multiple of the // page size here. u_int32_t normalize_bulk_bufsize(u_int32_t &bulksz) { if (bulksz == 0) return 0; while (bulksz < 16 * sizeof(data_dt)) bulksz *= 2; bulksz = bulksz + 1024 - bulksz % 1024; return bulksz; } //////////////////////////////////////////////////////////////////// // // Begin public constructors and destructor. // explicit DbCursor(u_int32_t b_bulk_retrieval = 0, bool brmw1 = false, bool directdbget = true) : DbCursorBase(), curr_key_(sizeof(key_dt)), curr_data_(sizeof(data_dt)) { u_int32_t bulksz = sizeof(data_dt); // non-bulk rmw_get_ = brmw1; this->bulk_retrieval_ = normalize_bulk_bufsize(b_bulk_retrieval); recno_itr_ = NULL; multi_itr_ = NULL; if (bulk_retrieval_) { if (bulksz <= bulk_retrieval_) bulksz = bulk_retrieval_; else { normalize_bulk_bufsize(bulksz); bulk_retrieval_ = bulksz; } } key_buf_.set_data(DbstlMalloc(sizeof(key_dt))); key_buf_.set_ulen(sizeof(key_dt)); key_buf_.set_flags(DB_DBT_USERMEM); data_buf_.set_data(DbstlMalloc(bulksz)); data_buf_.set_ulen(bulksz); data_buf_.set_flags(DB_DBT_USERMEM); directdb_get_ = directdbget; } // Copy constructor, duplicate cursor here. DbCursor(const DbCursor& dbc) : DbCursorBase(dbc), curr_key_(dbc.curr_key_), curr_data_(dbc.curr_data_) { void *pk, *pd; dbc.dup(*this); csr_status_ = dbc.csr_status_; if (csr_ || dbc.csr_) this->rmw_get_ = set_rmw(dbc.rmw_get_, ((DBC*)dbc.csr_)->dbenv); else rmw_get_ = dbc.rmw_get_; bulk_retrieval_ = dbc.bulk_retrieval_; // Now we have to copy key_buf_ and data_buf_ to support // multiple retrieval. key_buf_.set_data(pk = DbstlMalloc(dbc.key_buf_.get_ulen())); key_buf_.set_ulen(dbc.key_buf_.get_ulen()); key_buf_.set_size(dbc.key_buf_.get_size()); key_buf_.set_flags(DB_DBT_USERMEM); memcpy(pk, dbc.key_buf_.get_data(), key_buf_.get_ulen()); data_buf_.set_data(pd = DbstlMalloc(dbc.data_buf_.get_ulen())); data_buf_.set_ulen(dbc.data_buf_.get_ulen()); data_buf_.set_size(dbc.data_buf_.get_size()); data_buf_.set_flags(DB_DBT_USERMEM); memcpy(pd, dbc.data_buf_.get_data(), data_buf_.get_ulen()); if (dbc.recno_itr_) { recno_itr_ = new DbstlMultipleRecnoDataIterator( data_buf_); recno_itr_->set_pointer(dbc.recno_itr_->get_pointer()); } else recno_itr_ = NULL; if (dbc.multi_itr_) { multi_itr_ = new DbstlMultipleKeyDataIterator( data_buf_); multi_itr_->set_pointer(dbc.multi_itr_->get_pointer()); } else multi_itr_ = NULL; directdb_get_ = dbc.directdb_get_; // Do not copy sduppers, they are private to each DbCursor<> // object. } virtual ~DbCursor() { close(); // Call close() ahead of freeing following buffers. free(key_buf_.get_data()); free(data_buf_.get_data()); if (multi_itr_) delete multi_itr_; if (recno_itr_) delete recno_itr_; } //////////////////////////////////////////////////////////////////// const DbCursor& operator= (const DbCursor& dbc) { void *pk; u_int32_t ulen; DbCursorBase::operator =(dbc); dbc.dup(*this); curr_key_ = dbc.curr_key_; curr_data_ = dbc.curr_data_; rmw_get_ = dbc.rmw_get_; this->bulk_retrieval_ = dbc.bulk_retrieval_; this->directdb_get_ = dbc.directdb_get_; // Now we have to copy key_buf_ and data_buf_ to support // bulk retrieval. key_buf_.set_data(pk = DbstlReAlloc(key_buf_.get_data(), ulen = dbc.key_buf_.get_ulen())); key_buf_.set_ulen(ulen); key_buf_.set_size(dbc.key_buf_.get_size()); key_buf_.set_flags(DB_DBT_USERMEM); memcpy(pk, dbc.key_buf_.get_data(), ulen); data_buf_.set_data(pk = DbstlReAlloc(key_buf_.get_data(), ulen = dbc.key_buf_.get_ulen())); data_buf_.set_ulen(ulen); data_buf_.set_size(dbc.data_buf_.get_size()); data_buf_.set_flags(DB_DBT_USERMEM); memcpy(pk, dbc.key_buf_.get_data(), ulen); if (dbc.recno_itr_) { if (recno_itr_) { delete recno_itr_; recno_itr_ = NULL; } recno_itr_ = new DbstlMultipleRecnoDataIterator( data_buf_); recno_itr_->set_pointer(dbc.recno_itr_->get_pointer()); } else if (recno_itr_) { delete recno_itr_; recno_itr_ = NULL; } if (dbc.multi_itr_) { if (multi_itr_) { delete multi_itr_; multi_itr_ = NULL; } multi_itr_ = new DbstlMultipleKeyDataIterator( data_buf_); multi_itr_->set_pointer(dbc.multi_itr_->get_pointer()); } else if (multi_itr_) { delete multi_itr_; multi_itr_ = NULL; } return dbc; // Do not copy sduppers, they are private to each DbCursor<> // object. } // Move Dbc*cursor to next position. If doing bulk read, read from // the bulk buffer. If bulk buffer exhausted, do another bulk read // from database, and then read from the bulk buffer. Quit if no // more data in database. // int next(int flag = DB_NEXT) { Dbt k, d; db_recno_t recno; int ret; retry: if (bulk_retrieval_) { if (multi_itr_) { if (multi_itr_->next(k, d)) { curr_key_.set_dbt(k, false); curr_data_.set_dbt(d, false); return 0; } else { delete multi_itr_; multi_itr_ = NULL; } } if (recno_itr_) { if (recno_itr_->next(recno, d)) { curr_key_.set_dbt(k, false); curr_data_.set_dbt(d, false); return 0; } else { delete recno_itr_; recno_itr_ = NULL; } } } ret = increment(flag); if (bulk_retrieval_ && ret == 0) goto retry; return ret; } inline int prev(int flag = DB_PREV) { return increment(flag); } // Move Dbc*cursor to first element. If doing bulk read, read data // from bulk buffer. int first() { Dbt k, d; db_recno_t recno; int ret; ret = increment(DB_FIRST); if (bulk_retrieval_) { if (multi_itr_) { if (multi_itr_->next(k, d)) { curr_key_.set_dbt(k, false); curr_data_.set_dbt(d, false); return 0; } else { delete multi_itr_; multi_itr_ = NULL; } } if (recno_itr_) { if (recno_itr_->next(recno, d)) { curr_key_.set_dbt(k, false); curr_data_.set_dbt(d, false); return 0; } else { delete recno_itr_; recno_itr_ = NULL; } } } return ret; } inline int last() { return increment(DB_LAST); } // Get current key/data pair, shallow copy. Return 0 on success, // -1 if no data. inline int get_current_key_data(key_dt&k, data_dt&d) { if (directdb_get_) update_current_key_data_from_db( DbCursorBase::SKIP_NONE); if (curr_key_.get_data(k) == 0 && curr_data_.get_data(d) == 0) return 0; else return INVALID_KEY_DATA; } // Get current data, shallow copy. Return 0 on success, -1 if no data. inline int get_current_data(data_dt&d) { if (directdb_get_) update_current_key_data_from_db(DbCursorBase::SKIP_KEY); if (curr_data_.get_data(d) == 0) return 0; else return INVALID_KEY_DATA; } // Get current key, shallow copy. Return 0 on success, -1 if no data. inline int get_current_key(key_dt&k) { if (directdb_get_) update_current_key_data_from_db( DbCursorBase::SKIP_DATA); if (curr_key_.get_data(k) == 0) return 0; else return INVALID_KEY_DATA; } inline void close() { if (csr_) { inform_duppers(); ResourceManager::instance()->remove_cursor(this); } csr_ = NULL; } // Parameter skipkd specifies skip retrieving key or data: // If 0, don't skip, retrieve both; // If 1, skip retrieving key; // If 2, skip retrieving data. // Do not poll from db again if doing bulk retrieval. void update_current_key_data_from_db(DbcGetSkipOptions skipkd) { int ret; u_int32_t sz, sz1, kflags = DB_DBT_USERMEM, dflags = DB_DBT_USERMEM; // Do not poll from db again if doing bulk retrieval. if (this->bulk_retrieval_) return; if (this->csr_status_ != 0) { curr_key_.reset(); curr_data_.reset(); return; } // We will modify flags if skip key or data, so cache old // value and set it after get calls. if (skipkd != DbCursorBase::SKIP_NONE) { kflags = key_buf_.get_flags(); dflags = data_buf_.get_flags(); } if (skipkd == DbCursorBase::SKIP_KEY) { key_buf_.set_dlen(0); key_buf_.set_flags(DB_DBT_PARTIAL | DB_DBT_USERMEM); } if (skipkd == DbCursorBase::SKIP_DATA) { data_buf_.set_dlen(0); data_buf_.set_flags(DB_DBT_PARTIAL | DB_DBT_USERMEM); } retry: ret = csr_->get(&key_buf_, &data_buf_, DB_CURRENT); if (ret == 0) { if (skipkd != DbCursorBase::SKIP_KEY) curr_key_ = key_buf_; if (skipkd != DbCursorBase::SKIP_DATA) curr_data_ = data_buf_; limit_buf_size_after_use(); } else if (ret == DB_BUFFER_SMALL) { if ((sz = key_buf_.get_size()) > 0) enlarge_dbt(key_buf_, sz); if ((sz1 = data_buf_.get_size()) > 0) enlarge_dbt(data_buf_, sz1); if (sz == 0 && sz1 == 0) THROW0(InvalidDbtException); goto retry; } else { if (skipkd != DbCursorBase::SKIP_NONE) { key_buf_.set_flags(kflags); data_buf_.set_flags(dflags); } throw_bdb_exception( "DbCursor<>::update_current_key_data_from_db", ret); } if (skipkd != DbCursorBase::SKIP_NONE) { key_buf_.set_flags(kflags); data_buf_.set_flags(dflags); } } }; // DbCursor<> //////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// // // RandDbCursor class template definition // // RandDbCursor is a random accessible cursor wrapper for use by // db_vector_iterator, it derives from DbCursor<> class. It has a fixed key // data type, which is index_type. // typedef db_recno_t index_type; template class RandDbCursor : public DbCursor { protected: friend class DataItem; typedef ssize_t difference_type; public: typedef RandDbCursor self; typedef DbCursor base; // Return current csr_ pointed element's index in recno database // (i.e. the index starting from 1). csr_ must be open and // point to an existing key/data pair. // inline index_type get_current_index() const { index_type ndx; if (this->directdb_get_) ((self *)this)->update_current_key_data_from_db( DbCursorBase::SKIP_DATA); this->curr_key_.get_data(ndx); return ndx; } inline int compare(const self *csr2) const{ index_type i1, i2; i1 = this->get_current_index(); i2 = csr2->get_current_index(); return i1 - i2; } // Insert data d before/after current position. int insert(const data_dt& d, int pos = DB_BEFORE){ int k = 1, ret; //data_dt dta; // Inserting into empty db, must set key to 1. if (pos == DB_KEYLAST) k = 1; ret = base::insert(k, d, pos); // Inserting into a empty db using begin() itr, so flag is // DB_AFTER and surely failed, so change to use DB_KEYLAST // and try again. if (ret == EINVAL) { k = 1; pos = DB_KEYLAST; ret = base::insert(k, d, pos); } this->csr_status_ = ret; return ret; } /* * Move the cursor n positions, if reaches the beginning or end, * returns DB_NOTFOUND. */ int advance(difference_type n) { int ret = 0; index_type indx; u_int32_t sz, flags = 0; indx = this->get_current_index(); if (n == 0) return 0; index_type i = (index_type)n; indx += i; if (n < 0 && indx < 1) { // Index in recno db starts from 1. ret = INVALID_ITERATOR_POSITION; return ret; } this->inform_duppers(); // Do a search to determine whether new position is valid. Dbt k, &d = this->data_buf_; k.set_data(&indx); k.set_size(sizeof(indx)); if (this->rmw_get_) flags |= DB_RMW; retry: if (this->csr_ && ((ret = this->csr_->get(&k, &d, DB_SET)) == DB_NOTFOUND)) { this->csr_status_ = ret = INVALID_ITERATOR_POSITION; this->curr_key_.reset(); this->curr_data_.reset(); } else if (ret == DB_BUFFER_SMALL) { sz = d.get_size(); assert(sz > 0); this->enlarge_dbt(d, sz); goto retry; } else if (ret == 0) { this->curr_key_.set_dbt(k, false); this->curr_data_.set_dbt(d, false); this->limit_buf_size_after_use(); } else throw_bdb_exception("RandDbCursor<>::advance", ret); this->csr_status_ = ret; return ret; } // Return the last index of recno db (index starting from 1), // it will also move the underlying cursor to last key/data pair. // inline index_type last_index() { int ret; ret = this->last(); if (ret) return 0;// Invalid position. else return get_current_index(); } explicit RandDbCursor(u_int32_t b_bulk_retrieval = 0, bool b_rmw1 = false, bool directdbget = true) : base(b_bulk_retrieval, b_rmw1, directdbget) { } RandDbCursor(const RandDbCursor& rdbc) : base(rdbc) { } explicit RandDbCursor(Dbc* csr1, int posidx = 0) : base(csr1) { } virtual ~RandDbCursor() { } }; // RandDbCursor<> END_NS //ns dbstl #endif // !_DB_STL_DBC_H