/*- * See the file LICENSE for redistribution information. * * Copyright (c) 2006-2009 Oracle. All rights reserved. * * $Id$ */ #include "db_config.h" #include "db_int.h" #include "dbinc/log.h" #include "dbinc/mp.h" #include "dbinc/txn.h" static int __pgno_cmp __P((const void *, const void *)); /* * __memp_bh_settxn -- * Set the transaction that owns the given buffer. * * PUBLIC: int __memp_bh_settxn __P((DB_MPOOL *, MPOOLFILE *mfp, BH *, void *)); */ int __memp_bh_settxn(dbmp, mfp, bhp, vtd) DB_MPOOL *dbmp; MPOOLFILE *mfp; BH *bhp; void *vtd; { ENV *env; TXN_DETAIL *td; env = dbmp->env; td = (TXN_DETAIL *)vtd; if (td == NULL) { __db_errx(env, "%s: non-transactional update to a multiversion file", __memp_fns(dbmp, mfp)); return (EINVAL); } if (bhp->td_off != INVALID_ROFF) { DB_ASSERT(env, BH_OWNER(env, bhp) == td); return (0); } bhp->td_off = R_OFFSET(&env->tx_handle->reginfo, td); return (__txn_add_buffer(env, td)); } /* * __memp_skip_curadj -- * Indicate whether a cursor adjustment can be skipped for a snapshot * cursor. * * PUBLIC: int __memp_skip_curadj __P((DBC *, db_pgno_t)); */ int __memp_skip_curadj(dbc, pgno) DBC * dbc; db_pgno_t pgno; { BH *bhp; DB_MPOOL *dbmp; DB_MPOOLFILE *dbmfp; DB_MPOOL_HASH *hp; DB_TXN *txn; ENV *env; MPOOLFILE *mfp; REGINFO *infop; roff_t mf_offset; int ret, skip; u_int32_t bucket; env = dbc->env; dbmp = env->mp_handle; dbmfp = dbc->dbp->mpf; mfp = dbmfp->mfp; mf_offset = R_OFFSET(dbmp->reginfo, mfp); skip = 0; for (txn = dbc->txn; txn->parent != NULL; txn = txn->parent) ; /* * Determine the cache and hash bucket where this page lives and get * local pointers to them. Reset on each pass through this code, the * page number can change. */ MP_GET_BUCKET(env, mfp, pgno, &infop, hp, bucket, ret); if (ret != 0) { /* Panic: there is no way to return the error. */ (void)__env_panic(env, ret); return (0); } SH_TAILQ_FOREACH(bhp, &hp->hash_bucket, hq, __bh) { if (bhp->pgno != pgno || bhp->mf_offset != mf_offset) continue; if (!BH_OWNED_BY(env, bhp, txn)) skip = 1; break; } MUTEX_UNLOCK(env, hp->mtx_hash); return (skip); } #define DB_FREEZER_MAGIC 0x06102002 /* * __memp_bh_freeze -- * Save a buffer to temporary storage in case it is needed later by * a snapshot transaction. This function should be called with the buffer * locked and will exit with it locked. A BH_FROZEN buffer header is * allocated to represent the frozen data in mpool. * * PUBLIC: int __memp_bh_freeze __P((DB_MPOOL *, REGINFO *, DB_MPOOL_HASH *, * PUBLIC: BH *, int *)); */ int __memp_bh_freeze(dbmp, infop, hp, bhp, need_frozenp) DB_MPOOL *dbmp; REGINFO *infop; DB_MPOOL_HASH *hp; BH *bhp; int *need_frozenp; { BH *frozen_bhp; BH_FROZEN_ALLOC *frozen_alloc; DB_FH *fhp; ENV *env; MPOOL *c_mp; MPOOLFILE *mfp; db_mutex_t mutex; db_pgno_t maxpgno, newpgno, nextfree; size_t nio; int created, h_locked, ret, t_ret; u_int32_t magic, nbucket, ncache, pagesize; char filename[100], *real_name; env = dbmp->env; c_mp = infop->primary; created = h_locked = ret = 0; /* Find the associated MPOOLFILE. */ mfp = R_ADDR(dbmp->reginfo, bhp->mf_offset); pagesize = mfp->stat.st_pagesize; real_name = NULL; fhp = NULL; MVCC_MPROTECT(bhp->buf, pagesize, PROT_READ | PROT_WRITE); MPOOL_REGION_LOCK(env, infop); frozen_bhp = SH_TAILQ_FIRST(&c_mp->free_frozen, __bh); if (frozen_bhp != NULL) { SH_TAILQ_REMOVE(&c_mp->free_frozen, frozen_bhp, hq, __bh); *need_frozenp = SH_TAILQ_EMPTY(&c_mp->free_frozen); } else { *need_frozenp = 1; /* There might be a small amount of unallocated space. */ if (__env_alloc(infop, sizeof(BH_FROZEN_ALLOC) + sizeof(BH_FROZEN_PAGE), &frozen_alloc) == 0) { frozen_bhp = (BH *)(frozen_alloc + 1); frozen_bhp->mtx_buf = MUTEX_INVALID; SH_TAILQ_INSERT_TAIL(&c_mp->alloc_frozen, frozen_alloc, links); } } MPOOL_REGION_UNLOCK(env, infop); /* * If we can't get a frozen buffer header, return ENOMEM immediately: * we don't want to call __memp_alloc recursively. __memp_alloc will * turn the next free page it finds into frozen buffer headers. */ if (frozen_bhp == NULL) { ret = ENOMEM; goto err; } /* * For now, keep things simple and have one file per page size per * hash bucket. This improves concurrency but can mean lots of files * if there is lots of freezing. */ ncache = (u_int32_t)(infop - dbmp->reginfo); nbucket = (u_int32_t)(hp - (DB_MPOOL_HASH *)R_ADDR(infop, c_mp->htab)); snprintf(filename, sizeof(filename), "__db.freezer.%lu.%lu.%luK", (u_long)ncache, (u_long)nbucket, (u_long)pagesize / 1024); if ((ret = __db_appname(env, DB_APP_NONE, filename, NULL, &real_name)) != 0) goto err; MUTEX_LOCK(env, hp->mtx_hash); h_locked = 1; DB_ASSERT(env, F_ISSET(bhp, BH_EXCLUSIVE) && !F_ISSET(bhp, BH_FROZEN)); if (BH_REFCOUNT(bhp) > 1 || F_ISSET(bhp, BH_DIRTY)) { ret = EBUSY; goto err; } if ((ret = __os_open(env, real_name, pagesize, DB_OSO_CREATE | DB_OSO_EXCL, env->db_mode, &fhp)) == 0) { /* We're creating the file -- initialize the metadata page. */ created = 1; magic = DB_FREEZER_MAGIC; maxpgno = newpgno = 0; if ((ret = __os_write(env, fhp, &magic, sizeof(u_int32_t), &nio)) != 0 || (ret = __os_write(env, fhp, &newpgno, sizeof(db_pgno_t), &nio)) != 0 || (ret = __os_write(env, fhp, &maxpgno, sizeof(db_pgno_t), &nio)) != 0 || (ret = __os_seek(env, fhp, 0, 0, 0)) != 0) goto err; } else if (ret == EEXIST) ret = __os_open(env, real_name, pagesize, 0, env->db_mode, &fhp); if (ret != 0) goto err; if ((ret = __os_read(env, fhp, &magic, sizeof(u_int32_t), &nio)) != 0 || (ret = __os_read(env, fhp, &newpgno, sizeof(db_pgno_t), &nio)) != 0 || (ret = __os_read(env, fhp, &maxpgno, sizeof(db_pgno_t), &nio)) != 0) goto err; if (magic != DB_FREEZER_MAGIC) { ret = EINVAL; goto err; } if (newpgno == 0) { newpgno = ++maxpgno; if ((ret = __os_seek(env, fhp, 0, 0, sizeof(u_int32_t) + sizeof(db_pgno_t))) != 0 || (ret = __os_write(env, fhp, &maxpgno, sizeof(db_pgno_t), &nio)) != 0) goto err; } else { if ((ret = __os_seek(env, fhp, newpgno, pagesize, 0)) != 0 || (ret = __os_read(env, fhp, &nextfree, sizeof(db_pgno_t), &nio)) != 0) goto err; if ((ret = __os_seek(env, fhp, 0, 0, sizeof(u_int32_t))) != 0 || (ret = __os_write(env, fhp, &nextfree, sizeof(db_pgno_t), &nio)) != 0) goto err; } /* Write the buffer to the allocated page. */ if ((ret = __os_io(env, DB_IO_WRITE, fhp, newpgno, pagesize, 0, pagesize, bhp->buf, &nio)) != 0) goto err; ret = __os_closehandle(env, fhp); fhp = NULL; if (ret != 0) goto err; /* * Set up the frozen_bhp with the freezer page number. The original * buffer header is about to be freed, so transfer resources to the * frozen header here. */ mutex = frozen_bhp->mtx_buf; #ifdef DIAG_MVCC memcpy(frozen_bhp, bhp, SSZ(BH, align_off)); #else memcpy(frozen_bhp, bhp, SSZA(BH, buf)); #endif atomic_init(&frozen_bhp->ref, 0); if (mutex != MUTEX_INVALID) frozen_bhp->mtx_buf = mutex; else if ((ret = __mutex_alloc(env, MTX_MPOOL_BH, DB_MUTEX_SHARED, &frozen_bhp->mtx_buf)) != 0) goto err; F_SET(frozen_bhp, BH_FROZEN); F_CLR(frozen_bhp, BH_EXCLUSIVE); ((BH_FROZEN_PAGE *)frozen_bhp)->spgno = newpgno; /* * We're about to add the frozen buffer header to the version chain, so * we have temporarily created another buffer for the owning * transaction. */ if (frozen_bhp->td_off != INVALID_ROFF && (ret = __txn_add_buffer(env, BH_OWNER(env, frozen_bhp))) != 0) { (void)__env_panic(env, ret); goto err; } /* * Add the frozen buffer to the version chain and update the hash * bucket if this is the head revision. The original buffer will be * freed by __memp_alloc calling __memp_bhfree (assuming no other * thread has blocked waiting for it while we were freezing). */ SH_CHAIN_INSERT_AFTER(bhp, frozen_bhp, vc, __bh); if (!SH_CHAIN_HASNEXT(frozen_bhp, vc)) { SH_TAILQ_INSERT_BEFORE(&hp->hash_bucket, bhp, frozen_bhp, hq, __bh); SH_TAILQ_REMOVE(&hp->hash_bucket, bhp, hq, __bh); } MUTEX_UNLOCK(env, hp->mtx_hash); h_locked = 0; /* * Increment the file's block count -- freeing the original buffer will * decrement it. */ MUTEX_LOCK(env, mfp->mutex); ++mfp->block_cnt; MUTEX_UNLOCK(env, mfp->mutex); STAT(++hp->hash_frozen); if (0) { err: if (fhp != NULL && (t_ret = __os_closehandle(env, fhp)) != 0 && ret == 0) ret = t_ret; if (created) { DB_ASSERT(env, h_locked); if ((t_ret = __os_unlink(env, real_name, 0)) != 0 && ret == 0) ret = t_ret; } if (h_locked) MUTEX_UNLOCK(env, hp->mtx_hash); if (ret == 0) ret = EIO; if (frozen_bhp != NULL) { MPOOL_REGION_LOCK(env, infop); SH_TAILQ_INSERT_TAIL(&c_mp->free_frozen, frozen_bhp, hq); MPOOL_REGION_UNLOCK(env, infop); } } if (real_name != NULL) __os_free(env, real_name); if (ret != 0 && ret != EBUSY && ret != ENOMEM) __db_err(env, ret, "__memp_bh_freeze"); return (ret); } static int __pgno_cmp(a, b) const void *a, *b; { db_pgno_t *ap, *bp; ap = (db_pgno_t *)a; bp = (db_pgno_t *)b; return (int)(*ap - *bp); } /* * __memp_bh_thaw -- * Free a buffer header in temporary storage. Optionally restore the * buffer (if alloc_bhp != NULL). This function should be * called with the hash bucket locked and will return with it unlocked. * * PUBLIC: int __memp_bh_thaw __P((DB_MPOOL *, REGINFO *, * PUBLIC: DB_MPOOL_HASH *, BH *, BH *)); */ int __memp_bh_thaw(dbmp, infop, hp, frozen_bhp, alloc_bhp) DB_MPOOL *dbmp; REGINFO *infop; DB_MPOOL_HASH *hp; BH *frozen_bhp, *alloc_bhp; { DB_FH *fhp; ENV *env; #ifdef DIAGNOSTIC DB_LSN vlsn; #endif MPOOL *c_mp; MPOOLFILE *mfp; db_mutex_t mutex; db_pgno_t *freelist, *ppgno, freepgno, maxpgno, spgno; size_t nio; u_int32_t listsize, magic, nbucket, ncache, ntrunc, nfree, pagesize; #ifdef HAVE_FTRUNCATE int i; #endif int h_locked, needfree, ret, t_ret; char filename[100], *real_name; env = dbmp->env; fhp = NULL; c_mp = infop->primary; mfp = R_ADDR(dbmp->reginfo, frozen_bhp->mf_offset); freelist = NULL; pagesize = mfp->stat.st_pagesize; ret = 0; real_name = NULL; MUTEX_REQUIRED(env, hp->mtx_hash); DB_ASSERT(env, F_ISSET(frozen_bhp, BH_EXCLUSIVE) || alloc_bhp == NULL); h_locked = 1; DB_ASSERT(env, F_ISSET(frozen_bhp, BH_FROZEN) && !F_ISSET(frozen_bhp, BH_THAWED)); DB_ASSERT(env, alloc_bhp != NULL || SH_CHAIN_SINGLETON(frozen_bhp, vc) || (SH_CHAIN_HASNEXT(frozen_bhp, vc) && BH_OBSOLETE(frozen_bhp, hp->old_reader, vlsn))); DB_ASSERT(env, alloc_bhp == NULL || !F_ISSET(alloc_bhp, BH_FROZEN)); spgno = ((BH_FROZEN_PAGE *)frozen_bhp)->spgno; if (alloc_bhp != NULL) { mutex = alloc_bhp->mtx_buf; #ifdef DIAG_MVCC memcpy(alloc_bhp, frozen_bhp, SSZ(BH, align_off)); #else memcpy(alloc_bhp, frozen_bhp, SSZA(BH, buf)); #endif alloc_bhp->mtx_buf = mutex; MUTEX_LOCK(env, alloc_bhp->mtx_buf); atomic_init(&alloc_bhp->ref, 1); F_CLR(alloc_bhp, BH_FROZEN); } /* * For now, keep things simple and have one file per page size per * hash bucket. This improves concurrency but can mean lots of files * if there is lots of freezing. */ ncache = (u_int32_t)(infop - dbmp->reginfo); nbucket = (u_int32_t)(hp - (DB_MPOOL_HASH *)R_ADDR(infop, c_mp->htab)); snprintf(filename, sizeof(filename), "__db.freezer.%lu.%lu.%luK", (u_long)ncache, (u_long)nbucket, (u_long)pagesize / 1024); if ((ret = __db_appname(env, DB_APP_NONE, filename, NULL, &real_name)) != 0) goto err; if ((ret = __os_open(env, real_name, pagesize, 0, env->db_mode, &fhp)) != 0) goto err; /* * Read the first free page number -- we're about to free the page * after we we read it. */ if ((ret = __os_read(env, fhp, &magic, sizeof(u_int32_t), &nio)) != 0 || (ret = __os_read(env, fhp, &freepgno, sizeof(db_pgno_t), &nio)) != 0 || (ret = __os_read(env, fhp, &maxpgno, sizeof(db_pgno_t), &nio)) != 0) goto err; if (magic != DB_FREEZER_MAGIC) { ret = EINVAL; goto err; } /* Read the buffer from the frozen page. */ if (alloc_bhp != NULL) { DB_ASSERT(env, !F_ISSET(frozen_bhp, BH_FREED)); if ((ret = __os_io(env, DB_IO_READ, fhp, spgno, pagesize, 0, pagesize, alloc_bhp->buf, &nio)) != 0) goto err; } /* * Free the page from the file. If it's the last page, truncate. * Otherwise, update free page linked list. */ needfree = 1; if (spgno == maxpgno) { listsize = 100; if ((ret = __os_malloc(env, listsize * sizeof(db_pgno_t), &freelist)) != 0) goto err; nfree = 0; while (freepgno != 0) { if (nfree == listsize - 1) { listsize *= 2; if ((ret = __os_realloc(env, listsize * sizeof(db_pgno_t), &freelist)) != 0) goto err; } freelist[nfree++] = freepgno; if ((ret = __os_seek(env, fhp, freepgno, pagesize, 0)) != 0 || (ret = __os_read(env, fhp, &freepgno, sizeof(db_pgno_t), &nio)) != 0) goto err; } freelist[nfree++] = spgno; qsort(freelist, nfree, sizeof(db_pgno_t), __pgno_cmp); for (ppgno = &freelist[nfree - 1]; ppgno > freelist; ppgno--) if (*(ppgno - 1) != *ppgno - 1) break; ntrunc = (u_int32_t)(&freelist[nfree] - ppgno); if (ntrunc == (u_int32_t)maxpgno) { needfree = 0; ret = __os_closehandle(env, fhp); fhp = NULL; if (ret != 0 || (ret = __os_unlink(env, real_name, 0)) != 0) goto err; } #ifdef HAVE_FTRUNCATE else { maxpgno -= (db_pgno_t)ntrunc; if ((ret = __os_truncate(env, fhp, maxpgno + 1, pagesize)) != 0) goto err; /* Fix up the linked list */ freelist[nfree - ntrunc] = 0; if ((ret = __os_seek(env, fhp, 0, 0, sizeof(u_int32_t))) != 0 || (ret = __os_write(env, fhp, &freelist[0], sizeof(db_pgno_t), &nio)) != 0 || (ret = __os_write(env, fhp, &maxpgno, sizeof(db_pgno_t), &nio)) != 0) goto err; for (i = 0; i < (int)(nfree - ntrunc); i++) if ((ret = __os_seek(env, fhp, freelist[i], pagesize, 0)) != 0 || (ret = __os_write(env, fhp, &freelist[i + 1], sizeof(db_pgno_t), &nio)) != 0) goto err; needfree = 0; } #endif } if (needfree) { if ((ret = __os_seek(env, fhp, spgno, pagesize, 0)) != 0 || (ret = __os_write(env, fhp, &freepgno, sizeof(db_pgno_t), &nio)) != 0 || (ret = __os_seek(env, fhp, 0, 0, sizeof(u_int32_t))) != 0 || (ret = __os_write(env, fhp, &spgno, sizeof(db_pgno_t), &nio)) != 0) goto err; ret = __os_closehandle(env, fhp); fhp = NULL; if (ret != 0) goto err; } /* * Add the thawed buffer (if any) to the version chain. We can't * do this any earlier, because we can't guarantee that another thread * won't be waiting for it, which means we can't clean up if there are * errors reading from the freezer. We can't do it any later, because * we're about to free frozen_bhp, and without it we would need to do * another cache lookup to find out where the new page should live. */ MUTEX_REQUIRED(env, hp->mtx_hash); if (alloc_bhp != NULL) { alloc_bhp->priority = c_mp->lru_count; SH_CHAIN_INSERT_AFTER(frozen_bhp, alloc_bhp, vc, __bh); if (!SH_CHAIN_HASNEXT(alloc_bhp, vc)) { SH_TAILQ_INSERT_BEFORE(&hp->hash_bucket, frozen_bhp, alloc_bhp, hq, __bh); SH_TAILQ_REMOVE(&hp->hash_bucket, frozen_bhp, hq, __bh); } } else if (!SH_CHAIN_HASNEXT(frozen_bhp, vc)) { if (SH_CHAIN_HASPREV(frozen_bhp, vc)) SH_TAILQ_INSERT_BEFORE(&hp->hash_bucket, frozen_bhp, SH_CHAIN_PREV(frozen_bhp, vc, __bh), hq, __bh); SH_TAILQ_REMOVE(&hp->hash_bucket, frozen_bhp, hq, __bh); } SH_CHAIN_REMOVE(frozen_bhp, vc, __bh); if (alloc_bhp == NULL && frozen_bhp->td_off != INVALID_ROFF && (ret = __txn_remove_buffer(env, BH_OWNER(env, frozen_bhp), MUTEX_INVALID)) != 0) { (void)__env_panic(env, ret); goto err; } frozen_bhp->td_off = INVALID_ROFF; /* * If other threads are waiting for this buffer as well, they will have * incremented the reference count and will be waiting on the mutex. * For that reason, we can't unconditionally free the memory here. */ needfree = (atomic_dec(env, &frozen_bhp->ref) == 0); if (!needfree) F_SET(frozen_bhp, BH_THAWED); MUTEX_UNLOCK(env, hp->mtx_hash); if (F_ISSET(frozen_bhp, BH_EXCLUSIVE)) MUTEX_UNLOCK(env, frozen_bhp->mtx_buf); h_locked = 0; if (needfree) { MPOOL_REGION_LOCK(env, infop); SH_TAILQ_INSERT_TAIL(&c_mp->free_frozen, frozen_bhp, hq); MPOOL_REGION_UNLOCK(env, infop); } #ifdef HAVE_STATISTICS if (alloc_bhp != NULL) ++hp->hash_thawed; else ++hp->hash_frozen_freed; #endif if (0) { err: if (h_locked) MUTEX_UNLOCK(env, hp->mtx_hash); if (ret == 0) ret = EIO; } if (real_name != NULL) __os_free(env, real_name); if (freelist != NULL) __os_free(env, freelist); if (fhp != NULL && (t_ret = __os_closehandle(env, fhp)) != 0 && ret == 0) ret = t_ret; if (ret != 0) __db_err(env, ret, "__memp_bh_thaw"); return (ret); }