/*- * See the file LICENSE for redistribution information. * * Copyright (c) 2009 Oracle. All rights reserved. * * $Id$ */ #include "test.h" // tests containers in multiple threads, this thread class // runs the thread function in specified manner---manipulating // a specified type of container, with some thread inserting, // updating, and deleting, some others reading specified amount // of key/data pairs. // class WorkerThread { public: bool txnal/* if true, will create txn */, verbose; wt_job job; ContainerType cntnr_type; int total_insert; int strlenmin, strlenmax; rand_str_dbt rand_str_maker; Db *pdb; DbEnv *penv; WorkerThread() { pdb = NULL; penv = NULL; } WorkerThread(const WorkerThread& wt) { memcpy(this, &wt, sizeof(wt)); } void run() { WorkerThread *p = this; db_threadid_t tid; int cnt = 0, i, total = total_insert, ndx; DbstlDbt dbt, dbt2; string str, str2; bool iscds = false; u_int32_t oflags; size_t dlcnt = 0; dbt.set_flags(DB_DBT_USERMEM); dbt.set_data(DbstlMalloc(strlenmax + 10)); dbt.set_ulen(strlenmax + 10); dbt2.set_flags(DB_DBT_USERMEM); dbt2.set_data(DbstlMalloc(strlenmax + 10)); dbt2.set_ulen(strlenmax + 10); penv->get_open_flags(&oflags); if (oflags & DB_INIT_CDB) iscds = true; __os_id(NULL, 0, &tid); dbstl::register_db(p->pdb); dbstl::register_db_env(penv); if (p->cntnr_type == ct_vector) { typedef db_vector container_t; container_t vi(p->pdb, penv); const container_t& vi0 = vi; srand((unsigned int)tid); switch (p->job) { case wt_read: { while (cnt < total - 1) { cnt = 0; try { if (p->txnal) dbstl::begin_txn(0, penv); for (container_t::const_iterator itr = vi0.begin(); itr != vi0.end() && cnt < total - 1; itr++, cnt++) { str = (char*)((*itr).get_data()); if (verbose) cout<<"\n [ "<get_data()); } else { if (vi.size() == 0) return; __os_yield(NULL, 0, 500000); } } if (p->txnal) dbstl::commit_txn(penv); } catch (DbException e) { dlcnt++; dbstl::abort_txn(penv); if (e.get_errno() != DB_LOCK_DEADLOCK) throw e; else continue; } } cout<<"\n[ "<get_data())<<" ===> "; *itr = dbt; i++; if (verbose) cout<<(char*)dbt.get_data(); } else { if (vi.size() == 0) return; __os_yield(NULL, 0, 500000); } } if (p->txnal) dbstl::commit_txn(penv); } catch (DbException e) { dlcnt++; dbstl::abort_txn(penv); if (e.get_errno() != DB_LOCK_DEADLOCK) throw e; else continue; } } cout<<"\n[ "<first<<"--> "<txnal) dbstl::commit_txn(penv); } catch (DbException e) { dlcnt++; dbstl::abort_txn(penv); if (e.get_errno() != DB_LOCK_DEADLOCK) throw e; else continue; } __os_yield(NULL, 1, 0); // sleep 1 sec } if (iscds) // The update and delete thread will block until insert thread exits in cds mode g_StopInsert = 1; cout<<"\nthread "< "<first<<" : "<<(char*)(itr->second.get_data()); } else { if (vi.size() == 0) return; __os_yield(NULL, 0, 500000); } } if (p->txnal) dbstl::commit_txn(penv); } catch (DbException e) { dlcnt++; dbstl::abort_txn(penv); if (e.get_errno() != DB_LOCK_DEADLOCK) throw e; else continue; } __os_yield(NULL, 0, 500000); } cout<<"\n[ "<second.get_data())<<" ===> "; itr->second = dbt; i++; if (verbose) cout<<(char*)dbt.get_data(); } else { if (vi.size() == 0) return; __os_yield(NULL, 0, 500000); } } if (p->txnal) dbstl::commit_txn(penv); } catch (DbException e) { dlcnt++; dbstl::abort_txn(penv); if (e.get_errno() != DB_LOCK_DEADLOCK) throw e; else continue; } __os_yield(NULL, 0, 500000); } cout<<"\n[ "<first<<"--> "<txnal) dbstl::commit_txn(penv); } catch (DbException e) { dlcnt++; dbstl::abort_txn(penv); if (e.get_errno() != DB_LOCK_DEADLOCK) throw e; else continue; } __os_yield(NULL, 1, 0); // sleep 1 sec } if (iscds) // The update and delete thread will block until insert thread exits in cds mode g_StopInsert = 1; cout<<"\nthread "< "<first<<" : "<<(char*)(itr->second.get_data()); } else { if (vi.size() == 0) return; __os_yield(NULL, 0, 500000); } } if (p->txnal) dbstl::commit_txn(penv); } catch (DbException e) { dlcnt++; dbstl::abort_txn(penv); if (e.get_errno() != DB_LOCK_DEADLOCK) throw e; else continue; } __os_yield(NULL, 0, 500000); } cout<<"\n[ "<second.get_data())<<" ===> "; itr->second = dbt; i++; if (verbose) cout<<(char*)dbt.get_data(); } else { if (vi.size() == 0) return; __os_yield(NULL, 0, 500000); } } if (p->txnal) dbstl::commit_txn(penv); } catch (DbException e) { dlcnt++; dbstl::abort_txn(penv); if (e.get_errno() != DB_LOCK_DEADLOCK) throw e; else continue; } __os_yield(NULL, 0, 500000); } cout<<"\n[ "<txnal) dbstl::commit_txn(penv); } catch (DbException e) { dlcnt++; dbstl::abort_txn(penv); if (e.get_errno() != DB_LOCK_DEADLOCK) throw e; else continue; } __os_yield(NULL, 1, 0); // sleep 1 sec } if (iscds) // The update and delete thread will block until insert thread exits in cds mode g_StopInsert = 1; cout<<"\nthread "<get_data()); } else { if (vi.size() == 0) return; __os_yield(NULL, 0, 500000); } } if (p->txnal) dbstl::commit_txn(penv); } catch (DbException e) { dlcnt++; dbstl::abort_txn(penv); if (e.get_errno() != DB_LOCK_DEADLOCK) throw e; else continue; } __os_yield(NULL, 0, 500000); } cout<<"\n[ "<get_data())<<" ===> "; *itr = dbt; i++; if (verbose) cout<<(char*)dbt.get_data(); } else { if (vi.size() == 0) return; __os_yield(NULL, 0, 500000); } } if (p->txnal) dbstl::commit_txn(penv); } catch (DbException e) { dlcnt++; dbstl::abort_txn(penv); if (e.get_errno() != DB_LOCK_DEADLOCK) throw e; else continue; } __os_yield(NULL, 0, 500000); } cout<<"\n[ "<txnal) dbstl::commit_txn(penv); } catch (DbException e) { dlcnt++; dbstl::abort_txn(penv); if (e.get_errno() != DB_LOCK_DEADLOCK) throw e; else continue; } __os_yield(NULL, 1, 0); // sleep 1 sec } if (iscds) // The update and delete thread will block until insert thread exits in cds mode g_StopInsert = 1; cout<<"\nthread "<get_data()); } else { if (vi.size() == 0) return; __os_yield(NULL, 0, 500000); } } if (p->txnal) dbstl::commit_txn(penv); } catch (DbException e) { dlcnt++; dbstl::abort_txn(penv); if (e.get_errno() != DB_LOCK_DEADLOCK) throw e; else continue; } __os_yield(NULL, 0, 500000); } cout<<"\n[ "<get_data())<<" ===> "; *itr = dbt; i++; if (verbose) cout<<(char*)dbt.get_data(); } else { if (vi.size() == 0) return; __os_yield(NULL, 0, 500000); } } if (p->txnal) dbstl::commit_txn(penv); } catch (DbException e) { dlcnt++; dbstl::abort_txn(penv); if (e.get_errno() != DB_LOCK_DEADLOCK) throw e; else continue; } __os_yield(NULL, 0, 500000); } cout<<"\n[ "<