bt_cursor.c

Go to the documentation of this file.
00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 1996, 1997, 1998, 1999, 2000
00005  *      Sleepycat Software.  All rights reserved.
00006  */
00007 
00008 #include "config.h"
00009 
00010 #ifndef lint
00011 static const char revid[] = "$Id: bt__cursor_8c-source.html,v 1.1 2008/06/08 10:13:31 sebdiaz Exp $";
00012 #endif /* not lint */
00013 
00014 #ifndef NO_SYSTEM_INCLUDES
00015 #include <sys/types.h>
00016 
00017 #include <errno.h>
00018 #include <stdlib.h>
00019 #include <string.h>
00020 #endif
00021 
00022 #include "db_int.h"
00023 #include "db_page.h"
00024 #include "db_shash.h"
00025 #include "btree.h"
00026 #include "lock.h"
00027 #include "qam.h"
00028 #include "common_ext.h"
00029 
00030 static int  __bam_c_close __P((DBC *, db_pgno_t, int *));
00031 static int  __bam_c_del __P((DBC *));
00032 static int  __bam_c_destroy __P((DBC *));
00033 static int  __bam_c_first __P((DBC *));
00034 static int  __bam_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
00035 static int  __bam_c_getstack __P((DBC *));
00036 static int  __bam_c_last __P((DBC *));
00037 static int  __bam_c_next __P((DBC *, int));
00038 static int  __bam_c_physdel __P((DBC *));
00039 static int  __bam_c_prev __P((DBC *));
00040 static int  __bam_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
00041 static void __bam_c_reset __P((BTREE_CURSOR *));
00042 static int  __bam_c_search __P((DBC *, const DBT *, u_int32_t, int *));
00043 static int  __bam_c_writelock __P((DBC *));
00044 static int  __bam_getboth_finddatum __P((DBC *, DBT *));
00045 static int  __bam_getbothc __P((DBC *, DBT *));
00046 static int  __bam_isopd __P((DBC *, db_pgno_t *));
00047 
00048 /*
00049  * Acquire a new page/lock.  If we hold a page/lock, discard the page, and
00050  * lock-couple the lock.
00051  *
00052  * !!!
00053  * We have to handle both where we have a lock to lock-couple and where we
00054  * don't -- we don't duplicate locks when we duplicate cursors if we are
00055  * running in a transaction environment as there's no point if locks are
00056  * never discarded.  This means that the cursor may or may not hold a lock.
00057  */
00058 #undef  ACQUIRE
00059 #define ACQUIRE(dbc, mode, lpgno, lock, fpgno, pagep, ret) {\
00060         if ((pagep) != NULL) {                                          \
00061                 ret = CDB_memp_fput((dbc)->dbp->mpf, pagep, 0);         \
00062                 pagep = NULL;                                           \
00063         } else                                                          \
00064                 ret = 0;                                                \
00065         if ((ret) == 0 && STD_LOCKING(dbc))                             \
00066                 ret = CDB___db_lget(dbc,                                        \
00067                     (lock).off == LOCK_INVALID ? 0 : LCK_COUPLE,        \
00068                     lpgno, mode, 0, &lock);                             \
00069         else                                                            \
00070                 (lock).off = LOCK_INVALID;                              \
00071         if ((ret) == 0)                                                 \
00072                 ret = CDB_memp_fget((dbc)->dbp->mpf, &(fpgno), 0, &(pagep));\
00073 }
00074 
00075 /* Acquire a new page/lock for a cursor. */
00076 #undef  ACQUIRE_CUR
00077 #define ACQUIRE_CUR(dbc, mode, ret) {                                   \
00078         BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
00079         ACQUIRE(dbc, mode,                                              \
00080             __cp->pgno, __cp->lock, __cp->pgno, __cp->page, ret);       \
00081         if ((ret) == 0)                                                 \
00082                 __cp->lock_mode = (mode);                               \
00083 }
00084 
00085 /*
00086  * Acquire a new page/lock for a cursor, and move the cursor on success.
00087  * The reason that this is a separate macro is because we don't want to
00088  * set the pgno/indx fields in the cursor until we actually have the lock,
00089  * otherwise the cursor adjust routines will adjust the cursor even though
00090  * we're not really on the page.
00091  */
00092 #undef  ACQUIRE_CUR_SET
00093 #define ACQUIRE_CUR_SET(dbc, mode, p, ret) {                            \
00094         BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
00095         ACQUIRE(dbc, mode, p, __cp->lock, p, __cp->page, ret);          \
00096         if ((ret) == 0) {                                               \
00097                 __cp->pgno = p;                                 \
00098                 __cp->indx = 0;                                 \
00099                 __cp->lock_mode = (mode);                               \
00100         }                                                               \
00101 }
00102 
00103 /*
00104  * Acquire a write lock if we don't already have one.
00105  *
00106  * !!!
00107  * See ACQUIRE macro on why we handle cursors that don't have locks.
00108  */
00109 #undef  ACQUIRE_WRITE_LOCK
00110 #define ACQUIRE_WRITE_LOCK(dbc, ret) {                                  \
00111         BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
00112         ret = 0;                                                        \
00113         if (STD_LOCKING(dbc) &&                                         \
00114             __cp->lock_mode != DB_LOCK_WRITE &&                         \
00115             ((ret) = CDB___db_lget(dbc,                                 \
00116             __cp->lock.off == LOCK_INVALID ? 0 : LCK_COUPLE,            \
00117             __cp->pgno, DB_LOCK_WRITE, 0, &__cp->lock)) == 0)           \
00118                 __cp->lock_mode = DB_LOCK_WRITE;                        \
00119 }
00120 
00121 /* Discard the current page/lock. */
00122 #undef  DISCARD
00123 #define DISCARD(dbc, ldiscard, lock, pagep, ret) {                      \
00124         int __t_ret;                                                    \
00125         if ((pagep) != NULL) {                                          \
00126                 ret = CDB_memp_fput((dbc)->dbp->mpf, pagep, 0);         \
00127                 pagep = NULL;                                           \
00128         } else                                                          \
00129                 ret = 0;                                                \
00130         if ((lock).off != LOCK_INVALID) {                               \
00131                 __t_ret = ldiscard ?                                    \
00132                     __LPUT((dbc), lock): __TLPUT((dbc), lock);          \
00133                 if (__t_ret != 0 && (ret) == 0)                         \
00134                         ret = __t_ret;                                  \
00135                 (lock).off = LOCK_INVALID;                              \
00136         }                                                               \
00137 }
00138 
00139 /* Discard the current page/lock for a cursor. */
00140 #undef  DISCARD_CUR
00141 #define DISCARD_CUR(dbc, ret) {                                         \
00142         BTREE_CURSOR *__cp = (BTREE_CURSOR *)(dbc)->internal;           \
00143         DISCARD(dbc, 0, __cp->lock, __cp->page, ret);                   \
00144         if ((ret) == 0)                                                 \
00145                 __cp->lock_mode = DB_LOCK_NG;                           \
00146 }
00147 
00148 /* If on-page item is a deleted record. */
00149 #undef  IS_DELETED
00150 #define IS_DELETED(page, indx)                                          \
00151         B_DISSET(GET_BKEYDATA(page,                                     \
00152             (indx) + (TYPE(page) == P_LBTREE ? O_INDX : 0))->type)
00153 #undef  IS_CUR_DELETED
00154 #define IS_CUR_DELETED(dbc)                                             \
00155         IS_DELETED((dbc)->internal->page, (dbc)->internal->indx)
00156 
00157 /*
00158  * Test to see if two cursors could point to duplicates of the same key.
00159  * In the case of off-page duplicates they are they same, as the cursors
00160  * will be in the same off-page duplicate tree.  In the case of on-page
00161  * duplicates, the key index offsets must be the same.  For the last test,
00162  * as the original cursor may not have a valid page pointer, we use the
00163  * current cursor's.
00164  */
00165 #undef  IS_DUPLICATE
00166 #define IS_DUPLICATE(dbc, i1, i2)                                       \
00167             (((PAGE *)(dbc)->internal->page)->inp[i1] ==                \
00168              ((PAGE *)(dbc)->internal->page)->inp[i2])
00169 #undef  IS_CUR_DUPLICATE
00170 #define IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)                     \
00171         (F_ISSET(dbc, DBC_OPD) ||                                       \
00172             (orig_pgno == (dbc)->internal->pgno &&                      \
00173             IS_DUPLICATE(dbc, (dbc)->internal->indx, orig_indx)))
00174 
00175 /*
00176  * __bam_c_reset --
00177  *      Initialize internal cursor structure.
00178  */
00179 static void
00180 __bam_c_reset(cp)
00181         BTREE_CURSOR *cp;
00182 {
00183         cp->sp = cp->csp = cp->stack;
00184         cp->esp = cp->stack + sizeof(cp->stack) / sizeof(cp->stack[0]);
00185         cp->lock.off = LOCK_INVALID;
00186         cp->lock_mode = DB_LOCK_NG;
00187         cp->recno = RECNO_OOB;
00188         cp->flags = 0;
00189 }
00190 
00191 /*
00192  * CDB___bam_c_init --
00193  *      Initialize the access private portion of a cursor
00194  *
00195  * PUBLIC: int CDB___bam_c_init __P((DBC *, DBTYPE));
00196  */
00197 int
00198 CDB___bam_c_init(dbc, dbtype)
00199         DBC *dbc;
00200         DBTYPE dbtype;
00201 {
00202         BTREE *t;
00203         BTREE_CURSOR *cp;
00204         DB *dbp;
00205         u_int32_t minkey;
00206         int ret;
00207 
00208         dbp = dbc->dbp;
00209 
00210         /* Allocate/initialize the internal structure. */
00211         if (dbc->internal == NULL) {
00212                 if ((ret = CDB___os_malloc(dbp->dbenv,
00213                     sizeof(BTREE_CURSOR), NULL, &cp)) != 0)
00214                         return (ret);
00215                 dbc->internal = (DBC_INTERNAL *)cp;
00216         } else
00217                 cp = (BTREE_CURSOR *)dbc->internal;
00218         __bam_c_reset(cp);
00219 
00220         /* Initialize methods. */
00221         dbc->c_close = CDB___db_c_close;
00222         dbc->c_count = CDB___db_c_count;
00223         dbc->c_del = CDB___db_c_del;
00224         dbc->c_dup = CDB___db_c_dup;
00225         dbc->c_get = CDB___db_c_get;
00226         dbc->c_put = CDB___db_c_put;
00227         if (dbtype == DB_BTREE) {
00228                 dbc->c_am_close = __bam_c_close;
00229                 dbc->c_am_del = __bam_c_del;
00230                 dbc->c_am_destroy = __bam_c_destroy;
00231                 dbc->c_am_get = __bam_c_get;
00232                 dbc->c_am_put = __bam_c_put;
00233                 dbc->c_am_writelock = __bam_c_writelock;
00234         } else {
00235                 dbc->c_am_close = __bam_c_close;
00236                 dbc->c_am_del = CDB___ram_c_del;
00237                 dbc->c_am_destroy = __bam_c_destroy;
00238                 dbc->c_am_get = CDB___ram_c_get;
00239                 dbc->c_am_put = CDB___ram_c_put;
00240                 dbc->c_am_writelock = __bam_c_writelock;
00241         }
00242 
00243         /*
00244          * The btree leaf page data structures require that two key/data pairs
00245          * (or four items) fit on a page, but other than that there's no fixed
00246          * requirement.  The btree off-page duplicates only require two items,
00247          * to be exact, but requiring four for them as well seems reasonable.
00248          *
00249          * Translate the number of items into the bytes a key/data pair can use
00250          * before being placed on an overflow page.  Assume every item requires
00251          * the maximum alignment for padding, out of sheer paranoia.
00252          *
00253          * Recno uses the btree bt_ovflsize value -- it's close enough.
00254          */
00255         t = dbp->bt_internal;
00256         minkey = F_ISSET(dbc, DBC_OPD) ? 2 : t->bt_minkey;
00257         cp->ovflsize = (dbp->pgsize - P_OVERHEAD) / (minkey * P_INDX)
00258             - (BKEYDATA_PSIZE(0) + ALIGN(1, 4));
00259 
00260         return (0);
00261 }
00262 
00263 /*
00264  * CDB___bam_c_refresh
00265  *      When the cursor is reused, set things up properly
00266  * PUBLIC: int CDB___bam_c_refresh __P((DBC *));
00267  */
00268 int
00269 CDB___bam_c_refresh(dbc)
00270         DBC *dbc;
00271 {
00272         BTREE_CURSOR *cp;
00273         DB *dbp;
00274 
00275         dbp = dbc->dbp;
00276         cp = (BTREE_CURSOR *)dbc->internal;
00277         __bam_c_reset(cp);
00278 
00279         /*
00280          * If our caller set the root page number, it's because
00281          * the root was known.  This is always the case for off page
00282          * dup cursors.  Otherwise, pull it out of our internal information.
00283          */
00284         if (cp->root == PGNO_INVALID)
00285                 cp->root = ((BTREE *)dbp->bt_internal)->bt_root;
00286 
00287         /* Initialize for record numbers. */
00288         if (F_ISSET(dbc, DBC_OPD) ||
00289             dbc->dbtype == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM)) {
00290                 F_SET(cp, C_RECNUM);
00291 
00292                 /*
00293                  * All btrees that support record numbers, optionally standard
00294                  * recno trees, and all off-page duplicate recno trees have
00295                  * mutable record numbers.
00296                  */
00297                 if ((F_ISSET(dbc, DBC_OPD) && dbc->dbtype == DB_RECNO) ||
00298                     F_ISSET(dbp, DB_BT_RECNUM | DB_RE_RENUMBER))
00299                         F_SET(cp, C_RENUMBER);
00300         }
00301 
00302         return (0);
00303 }
00304 
00305 /*
00306  * __bam_c_close --
00307  *      Close down the cursor.
00308  */
00309 static int
00310 __bam_c_close(dbc, root_pgno, rmroot)
00311         DBC *dbc;
00312         db_pgno_t root_pgno;
00313         int *rmroot;
00314 {
00315         BTREE_CURSOR *cp, *cp_opd, *cp_c;
00316         DB *dbp;
00317         DBC *dbc_opd, *dbc_c;
00318         PAGE *h;
00319         u_int32_t num;
00320         int cdb_lock, ret, t_ret;
00321 
00322         dbp = dbc->dbp;
00323         cp = (BTREE_CURSOR *)dbc->internal;
00324         cp_opd = (dbc_opd = cp->opd) == NULL ?
00325             NULL : (BTREE_CURSOR *)dbc_opd->internal;
00326         cdb_lock = ret = 0;
00327 
00328         if (dbc_opd != NULL)
00329                 DB_ASSERT(cp_opd->csp == cp_opd->stack);
00330 
00331         /*
00332          * There are 3 ways this function is called:
00333          *
00334          * 1. Closing a primary cursor: we get called with a pointer to a
00335          *    primary cursor that has a NULL opd field.  This happens when
00336          *    closing a btree/recno database cursor without an associated
00337          *    off-page duplicate tree.
00338          *
00339          * 2. Closing a primary and an off-page duplicate cursor stack: we
00340          *    get called with a pointer to the primary cursor which has a
00341          *    non-NULL opd field.  This happens when closing a btree cursor
00342          *    into database with an associated off-page btree/recno duplicate
00343          *    tree. (It can't be a primary recno database, recno databases
00344          *    don't support duplicates.)
00345          *
00346          * 3. Closing an off-page duplicate cursor stack: we get called with
00347          *    a pointer to the off-page duplicate cursor.  This happens when
00348          *    closing a non-btree database that has an associated off-page
00349          *    btree/recno duplicate tree or for a btree database when the
00350          *    opd tree is not empty (root_pgno == PGNO_INVALID).
00351          *
00352          * If either the primary or off-page duplicate cursor deleted a btree
00353          * key/data pair, check to see if the item is still referenced by a
00354          * different cursor.  If it is, confirm that cursor's delete flag is
00355          * set and leave it to that cursor to do the delete.
00356          *
00357          * NB: The test for == 0 below is correct.  Our caller already removed
00358          * our cursor argument from the active queue, we won't find it when we
00359          * search the queue in CDB___bam_ca_delete().
00360          * NB: It can't be true that both the primary and off-page duplicate
00361          * cursors have deleted a btree key/data pair.  Either the primary
00362          * cursor may have deleted an item and there's no off-page duplicate
00363          * cursor, or there's an off-page duplicate cursor and it may have
00364          * deleted an item.
00365          *
00366          * Primary recno databases aren't an issue here.  Recno keys are either
00367          * deleted immediately or never deleted, and do not have to be handled
00368          * here.
00369          *
00370          * Off-page duplicate recno databases are an issue here, cases #2 and
00371          * #3 above can both be off-page recno databases.  The problem is the
00372          * same as the final problem for off-page duplicate btree databases.
00373          * If we no longer need the off-page duplicate tree, we want to remove
00374          * it.  For off-page duplicate btrees, we are done with the tree when
00375          * we delete the last item it contains, i.e., there can be no further
00376          * references to it when it's empty.  For off-page duplicate recnos,
00377          * we remove items from the tree as the application calls the remove
00378          * function, so we are done with the tree when we close the last cursor
00379          * that references it.
00380          *
00381          * We optionally take the root page number from our caller.  If the
00382          * primary database is a btree, we can get it ourselves because dbc
00383          * is the primary cursor.  If the primary database is not a btree,
00384          * the problem is that we may be dealing with a stack of pages.  The
00385          * cursor we're using to do the delete points at the bottom of that
00386          * stack and we need the top of the stack.
00387          */
00388         if (F_ISSET(cp, C_DELETED)) {
00389                 dbc_c = dbc;
00390                 switch (dbc->dbtype) {
00391                 case DB_BTREE:                          /* Case #1, #3. */
00392                         if (CDB___bam_ca_delete(dbp, cp->pgno, cp->indx, 1) == 0)
00393                                 goto lock;
00394                         goto done;
00395                 case DB_RECNO:
00396                         if (!F_ISSET(dbc, DBC_OPD))     /* Case #1. */
00397                                 goto done;
00398                                                         /* Case #3. */
00399                         if (CDB___ram_ca_delete(dbp, cp->root) == 0)
00400                                 goto lock;
00401                         goto done;
00402                 default:
00403                         return (CDB___db_unknown_type(dbp->dbenv,
00404                                 "__bam_c_close", dbc->dbtype));
00405                 }
00406         }
00407 
00408         if (dbc_opd == NULL)
00409                 goto done;
00410 
00411         if (F_ISSET(cp_opd, C_DELETED)) {               /* Case #2. */
00412                 /*
00413                  * We will not have been provided a root page number.  Acquire
00414                  * one from the primary database.
00415                  */
00416                 if ((ret = CDB_memp_fget(dbp->mpf, &cp->pgno, 0, &h)) != 0)
00417                         goto err;
00418                 root_pgno = GET_BOVERFLOW(h, cp->indx + O_INDX)->pgno;
00419                 if ((ret = CDB_memp_fput(dbp->mpf, h, 0)) != 0)
00420                         goto err;
00421 
00422                 dbc_c = dbc_opd;
00423                 switch (dbc_opd->dbtype) {
00424                 case DB_BTREE:
00425                         if (CDB___bam_ca_delete(
00426                             dbp, cp_opd->pgno, cp_opd->indx, 1) == 0)
00427                                 goto lock;
00428                         goto done;
00429                 case DB_RECNO:
00430                         if (CDB___ram_ca_delete(dbp, cp_opd->root) == 0)
00431                                 goto lock;
00432                         goto done;
00433                 default:
00434                         return (CDB___db_unknown_type(dbp->dbenv,
00435                                 "__bam_c_close", dbc->dbtype));
00436                 }
00437         }
00438         goto done;
00439 
00440 lock:   cp_c = (BTREE_CURSOR *)dbc_c->internal;
00441 
00442         /*
00443          * If this is CDB, upgrade the lock if necessary.  While we acquired
00444          * the write lock to logically delete the record, we released it when
00445          * we returned from that call, and so may not be holding a write lock
00446          * at the moment.  NB: to get here in CDB we must either be holding a
00447          * write lock or be the only cursor that is permitted to acquire write
00448          * locks.  The reason is that there can never be more than a single CDB
00449          * write cursor (that cursor cannot be dup'd), and so that cursor must
00450          * be closed and the item therefore deleted before any other cursor
00451          * could acquire a reference to this item.
00452          *
00453          * Note that dbc may be an off-page dup cursor;  this is the sole
00454          * instance in which an OPD cursor does any locking, but it's necessary
00455          * because we may be closed by ourselves without a parent cursor
00456          * handy, and we have to do a lock upgrade on behalf of somebody.
00457          * If this is the case, the OPD has been given the parent's locking
00458          * info in CDB___db_c_get--the OPD is also a WRITEDUP.
00459          */
00460         if (LOCKING(dbp->dbenv)) {
00461                 DB_ASSERT(!F_ISSET(dbc, DBC_OPD) || F_ISSET(dbc, DBC_WRITEDUP));
00462                 if (!F_ISSET(dbc, DBC_WRITER)) {
00463                         if ((ret =
00464                             CDB_lock_get(dbp->dbenv, dbc->locker, DB_LOCK_UPGRADE,
00465                             &dbc->lock_dbt, DB_LOCK_WRITE, &dbc->mylock)) != 0)
00466                                 goto err;
00467                         cdb_lock = 1;
00468                 }
00469 
00470                 cp_c->lock.off = LOCK_INVALID;
00471                 if ((ret =
00472                     CDB_memp_fget(dbp->mpf, &cp_c->pgno, 0, &cp_c->page)) != 0)
00473                         goto err;
00474 
00475                 goto delete;
00476         }
00477 
00478         /*
00479          * The variable dbc_c has been initialized to reference the cursor in
00480          * which we're going to do the delete.  Initialize the cursor's page
00481          * and lock structures as necessary.
00482          *
00483          * First, we may not need to acquire any locks.  If we're in case #3,
00484          * that is, the primary database isn't a btree database, our caller
00485          * is responsible for acquiring any necessary locks before calling us.
00486          */
00487         if (F_ISSET(dbc, DBC_OPD)) {
00488                 cp_c->lock.off = LOCK_INVALID;
00489                 if ((ret =
00490                     CDB_memp_fget(dbp->mpf, &cp_c->pgno, 0, &cp_c->page)) != 0)
00491                         goto err;
00492                 goto delete;
00493         }
00494 
00495         /*
00496          * Otherwise, acquire a write lock.  If the cursor that did the initial
00497          * logical deletion (and which had a write lock) is not the same as the
00498          * cursor doing the physical deletion (which may have only ever had a
00499          * read lock on the item), we need to upgrade.  The confusion comes as
00500          * follows:
00501          *
00502          *      C1      created, acquires item read lock
00503          *      C2      dup C1, create C2, also has item read lock.
00504          *      C1      acquire write lock, delete item
00505          *      C1      close
00506          *      C2      close, needs a write lock to physically delete item.
00507          *
00508          * If we're in a TXN, we know that C2 will be able to acquire the write
00509          * lock, because no locker other than the one shared by C1 and C2 can
00510          * acquire a write lock -- the original write lock C1 acquire was never
00511          * discarded.
00512          *
00513          * If we're not in a TXN, it's nastier.  Other cursors might acquire
00514          * read locks on the item after C1 closed, discarding its write lock,
00515          * and such locks would prevent C2 from acquiring a read lock.  That's
00516          * OK, though, we'll simply wait until we can acquire a read lock, or
00517          * we'll deadlock.  (Which better not happen, since we're not in a TXN.)
00518          *
00519          * Lock the primary database page, regardless of whether we're deleting
00520          * an item on a primary database page or an off-page duplicates page.
00521          */
00522         ACQUIRE(dbc, DB_LOCK_WRITE,
00523             cp->pgno, cp_c->lock, cp_c->pgno, cp_c->page, ret);
00524         if (ret != 0)
00525                 goto err;
00526 
00527 delete: /*
00528          * If the delete occurred in a btree, delete the on-page physical item
00529          * referenced by the cursor.
00530          */
00531         if (dbc_c->dbtype == DB_BTREE && (ret = __bam_c_physdel(dbc_c)) != 0)
00532                 goto err;
00533 
00534         if (dbc_opd != NULL)
00535                 DB_ASSERT(cp_opd->csp == cp_opd->stack);
00536 
00537         /*
00538          * If we're not working in an off-page duplicate tree, then we're
00539          * done.
00540          */
00541         if (!F_ISSET(dbc_c, DBC_OPD) || root_pgno == PGNO_INVALID)
00542                 goto done;
00543 
00544         /*
00545          * We may have just deleted the last element in the off-page duplicate
00546          * tree, and closed the last cursor in the tree.  For an off-page btree
00547          * there are no other cursors in the tree by definition, if the tree is
00548          * empty.  For an off-page recno we know we have closed the last cursor
00549          * in the tree because the CDB___ram_ca_delete call above returned 0 only
00550          * in that case.  So, if the off-page duplicate tree is empty at this
00551          * point, we want to remove it.
00552          */
00553         if ((ret = CDB_memp_fget(dbp->mpf, &root_pgno, 0, &h)) != 0)
00554                 goto err;
00555         if ((num = NUM_ENT(h)) == 0) {
00556                 if ((ret = CDB___db_free(dbc, h)) != 0)
00557                         goto err;
00558         } else {
00559                 if ((ret = CDB_memp_fput(dbp->mpf, h, 0)) != 0)
00560                         goto err;
00561                 goto done;
00562         }
00563 
00564         /*
00565          * When removing the tree, we have to do one of two things.  If this is
00566          * case #2, that is, the primary tree is a btree, delete the key that's
00567          * associated with the tree from the btree leaf page.  We know we are
00568          * the only reference to it and we already have the correct lock.  We
00569          * detect this case because the cursor that was passed to us references
00570          * an off-page duplicate cursor.
00571          *
00572          * If this is case #3, that is, the primary tree isn't a btree, pass
00573          * the information back to our caller, it's their job to do cleanup on
00574          * the primary page.
00575          */
00576         if (dbc_opd != NULL) {
00577                 cp->lock.off = LOCK_INVALID;
00578                 if ((ret = CDB_memp_fget(dbp->mpf, &cp->pgno, 0, &cp->page)) != 0)
00579                         goto err;
00580                 if ((ret = __bam_c_physdel(dbc)) != 0)
00581                         goto err;
00582         } else
00583                 *rmroot = 1;
00584 err:
00585 done:   /*
00586          * Discard the page references and locks, and confirm that the stack
00587          * has been emptied.
00588          */
00589         if (dbc_opd != NULL) {
00590                 DISCARD_CUR(dbc_opd, t_ret);
00591                 if (t_ret != 0 && ret == 0)
00592                         ret = t_ret;
00593                 DB_ASSERT(cp_opd->csp == cp_opd->stack);
00594         }
00595         DISCARD_CUR(dbc, t_ret);
00596         if (t_ret != 0 && ret == 0)
00597                 ret = t_ret;
00598         DB_ASSERT(cp->csp == cp->stack);
00599 
00600         /* Downgrade any CDB lock we acquired. */
00601         if (cdb_lock)
00602                 (void)CDB___lock_downgrade(
00603                     dbp->dbenv, &dbc->mylock, DB_LOCK_IWRITE, 0);
00604 
00605         return (ret);
00606 }
00607 
00608 /*
00609  * __bam_c_destroy --
00610  *      Close a single cursor -- internal version.
00611  */
00612 static int
00613 __bam_c_destroy(dbc)
00614         DBC *dbc;
00615 {
00616         /* Discard the structures. */
00617         CDB___os_free(dbc->internal, sizeof(BTREE_CURSOR));
00618 
00619         return (0);
00620 }
00621 
00622 /*
00623  * CDB___bam_c_count --
00624  *      Return a count of on and off-page duplicates.
00625  *
00626  * PUBLIC: int CDB___bam_c_count __P((DBC *, db_recno_t *));
00627  */
00628 int
00629 CDB___bam_c_count(dbc, recnop)
00630         DBC *dbc;
00631         db_recno_t *recnop;
00632 {
00633         BTREE_CURSOR *cp;
00634         DB *dbp;
00635         db_indx_t indx, top;
00636         db_recno_t recno;
00637         int ret;
00638 
00639         dbp = dbc->dbp;
00640         cp = (BTREE_CURSOR *)dbc->internal;
00641 
00642         /*
00643          * Called with the top-level cursor that may reference an off-page
00644          * duplicates page.  If it's a set of on-page duplicates, get the
00645          * page and count.  Otherwise, get the root page of the off-page
00646          * duplicate tree, and use the count.  We don't have to acquire any
00647          * new locks, we have to have a read lock to even get here.
00648          */
00649         if (cp->opd == NULL) {
00650                 if ((ret = CDB_memp_fget(dbp->mpf, &cp->pgno, 0, &cp->page)) != 0)
00651                         return (ret);
00652 
00653                 /*
00654                  * Move back to the beginning of the set of duplicates and
00655                  * then count forward.
00656                  */
00657                 for (indx = cp->indx;; indx -= P_INDX)
00658                         if (indx == 0 ||
00659                             !IS_DUPLICATE(dbc, indx, indx - P_INDX))
00660                                 break;
00661                 for (recno = 1, top = NUM_ENT(cp->page);
00662                     indx < top; ++recno, indx += P_INDX)
00663                         if (!IS_DUPLICATE(dbc, indx, indx + P_INDX))
00664                                 break;
00665                 *recnop = recno;
00666         } else {
00667                 if ((ret = CDB_memp_fget(dbp->mpf,
00668                     &cp->opd->internal->root, 0, &cp->page)) != 0)
00669                         return (ret);
00670 
00671                 *recnop = RE_NREC(cp->page);
00672         }
00673 
00674         ret = CDB_memp_fput(dbp->mpf, cp->page, 0);
00675         cp->page = NULL;
00676 
00677         return (ret);
00678 }
00679 
00680 /*
00681  * __bam_c_del --
00682  *      Delete using a cursor.
00683  */
00684 static int
00685 __bam_c_del(dbc)
00686         DBC *dbc;
00687 {
00688         BTREE_CURSOR *cp;
00689         DB *dbp;
00690         int ret, t_ret;
00691 
00692         dbp = dbc->dbp;
00693         cp = (BTREE_CURSOR *)dbc->internal;
00694         ret = 0;
00695 
00696         /* If the item was already deleted, return failure. */
00697         if (F_ISSET(cp, C_DELETED))
00698                 return (DB_KEYEMPTY);
00699 
00700         /*
00701          * We don't physically delete the record until the cursor moves, so
00702          * we have to have a long-lived write lock on the page instead of a
00703          * a long-lived read lock.  Note, we have to have a read lock to even
00704          * get here.
00705          *
00706          * If we're maintaining record numbers, we lock the entire tree, else
00707          * we lock the single page.
00708          */
00709         if (F_ISSET(cp, C_RECNUM)) {
00710                 if ((ret = __bam_c_getstack(dbc)) != 0)
00711                         goto err;
00712         } else {
00713                 ACQUIRE_CUR(dbc, DB_LOCK_WRITE, ret);
00714                 if (ret != 0)
00715                         goto err;
00716         }
00717 
00718         /* Log the change. */
00719         if (DB_LOGGING(dbc) &&
00720             (ret = CDB___bam_cdel_log(dbp->dbenv, dbc->txn, &LSN(cp->page), 0,
00721             dbp->log_fileid, PGNO(cp->page), &LSN(cp->page), cp->indx)) != 0)
00722                 goto err;
00723 
00724         /* Set the intent-to-delete flag on the page and update all cursors. */
00725         if (TYPE(cp->page) == P_LBTREE)
00726                 B_DSET(GET_BKEYDATA(cp->page, cp->indx + O_INDX)->type);
00727         else
00728                 B_DSET(GET_BKEYDATA(cp->page, cp->indx)->type);
00729 
00730         /* Mark the page dirty. */
00731         ret = CDB_memp_fset(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
00732 
00733 err:    /*
00734          * If we've been successful so far and the tree has record numbers,
00735          * adjust the record counts.  Either way, release any acquired pages.
00736          */
00737         if (F_ISSET(cp, C_RECNUM)) {
00738                 if (ret == 0)
00739                         ret = CDB___bam_adjust(dbc, -1);
00740                 (void)CDB___bam_stkrel(dbc, STK_CLRDBC);
00741         } else {
00742                 DISCARD_CUR(dbc, t_ret);
00743                 if (t_ret != 0 && ret == 0)
00744                         ret = t_ret;
00745         }
00746 
00747         /* Update the cursors last, after all chance of failure is past. */
00748         if (ret == 0)
00749                 (void)CDB___bam_ca_delete(dbp, cp->pgno, cp->indx, 1);
00750 
00751         return (ret);
00752 }
00753 
00754 /*
00755  * CDB___bam_c_dup --
00756  *      Duplicate a btree cursor, such that the new one holds appropriate
00757  *      locks for the position of the original.
00758  *
00759  * PUBLIC: int CDB___bam_c_dup __P((DBC *, DBC *));
00760  */
00761 int
00762 CDB___bam_c_dup(orig_dbc, new_dbc)
00763         DBC *orig_dbc, *new_dbc;
00764 {
00765         BTREE_CURSOR *orig, *new;
00766         int ret;
00767 
00768         orig = (BTREE_CURSOR *)orig_dbc->internal;
00769         new = (BTREE_CURSOR *)new_dbc->internal;
00770 
00771         /*
00772          * If we're holding a lock we need to acquire a copy of it, unless
00773          * we're in a transaction.  We don't need to copy any lock we're
00774          * holding inside a transaction because all the locks are retained
00775          * until the transaction commits or aborts.
00776          */
00777         if (orig->lock.off != LOCK_INVALID && orig_dbc->txn == NULL) {
00778                 if ((ret = CDB___db_lget(new_dbc,
00779                     0, new->pgno, new->lock_mode, 0, &new->lock)) != 0)
00780                         return (ret);
00781         }
00782         new->ovflsize = orig->ovflsize;
00783         new->recno = orig->recno;
00784         new->flags = orig->flags;
00785 
00786         return (0);
00787 }
00788 
00789 /*
00790  * __bam_c_get --
00791  *      Get using a cursor (btree).
00792  */
00793 static int
00794 __bam_c_get(dbc, key, data, flags, pgnop)
00795         DBC *dbc;
00796         DBT *key, *data;
00797         u_int32_t flags;
00798         db_pgno_t *pgnop;
00799 {
00800         BTREE_CURSOR *cp;
00801         DB *dbp;
00802         db_pgno_t orig_pgno;
00803         db_indx_t orig_indx;
00804         int exact, newopd, ret;
00805 
00806         dbp = dbc->dbp;
00807         cp = (BTREE_CURSOR *)dbc->internal;
00808         orig_pgno = cp->pgno;
00809         orig_indx = cp->indx;
00810 
00811         newopd = 0;
00812         switch (flags) {
00813         case DB_CURRENT:
00814                 /* It's not possible to return a deleted record. */
00815                 if (F_ISSET(cp, C_DELETED)) {
00816                         ret = DB_KEYEMPTY;
00817                         goto err;
00818                 }
00819 
00820                 /*
00821                  * Acquire the current page.  We have at least a read-lock
00822                  * already.  The caller may have set DB_RMW asking for a
00823                  * write lock, but upgrading to a write lock has no better
00824                  * chance of succeeding now instead of later, so don't try.
00825                  */
00826                 if ((ret = CDB_memp_fget(dbp->mpf, &cp->pgno, 0, &cp->page)) != 0)
00827                         goto err;
00828                 break;
00829         case DB_FIRST:
00830                 newopd = 1;
00831                 if ((ret = __bam_c_first(dbc)) != 0)
00832                         goto err;
00833                 break;
00834         case DB_GET_BOTH:
00835                 /*
00836                  * There are two ways to get here based on DBcursor->c_get
00837                  * with the DB_GET_BOTH flag set:
00838                  *
00839                  * 1. Searching a sorted off-page duplicate tree: do a tree
00840                  * search.
00841                  *
00842                  * 2. Searching btree: do a tree search.  If it returns a
00843                  * reference to off-page duplicate tree, return immediately
00844                  * and let our caller deal with it.  If the search doesn't
00845                  * return a reference to off-page duplicate tree, start an
00846                  * on-page search.
00847                  */
00848                 if (F_ISSET(dbc, DBC_OPD)) {
00849                         if ((ret = __bam_c_search(
00850                             dbc, data, DB_GET_BOTH, &exact)) != 0)
00851                                 goto err;
00852                         if (!exact) {
00853                                 ret = DB_NOTFOUND;
00854                                 goto err;
00855                         }
00856                 } else {
00857                         if ((ret = __bam_c_search(
00858                             dbc, key, DB_GET_BOTH, &exact)) != 0)
00859                                 return (ret);
00860                         if (!exact) {
00861                                 ret = DB_NOTFOUND;
00862                                 goto err;
00863                         }
00864 
00865                         if (pgnop != NULL && __bam_isopd(dbc, pgnop)) {
00866                                 newopd = 1;
00867                                 break;
00868                         }
00869                         if ((ret = __bam_getboth_finddatum(dbc, data)) != 0)
00870                                 goto err;
00871                 }
00872                 break;
00873         case DB_GET_BOTHC:
00874                 if ((ret = __bam_getbothc(dbc, data)) != 0)
00875                         goto err;
00876                 break;
00877         case DB_LAST:
00878                 newopd = 1;
00879                 if ((ret = __bam_c_last(dbc)) != 0)
00880                         goto err;
00881                 break;
00882         case DB_NEXT:
00883                 newopd = 1;
00884                 if (cp->pgno == PGNO_INVALID) {
00885                         if ((ret = __bam_c_first(dbc)) != 0)
00886                                 goto err;
00887                 } else
00888                         if ((ret = __bam_c_next(dbc, 1)) != 0)
00889                                 goto err;
00890                 break;
00891         case DB_NEXT_DUP:
00892                 if ((ret = __bam_c_next(dbc, 1)) != 0)
00893                         goto err;
00894                 if (!IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx)) {
00895                         ret = DB_NOTFOUND;
00896                         goto err;
00897                 }
00898                 break;
00899         case DB_NEXT_NODUP:
00900                 newopd = 1;
00901                 if (cp->pgno == PGNO_INVALID) {
00902                         if ((ret = __bam_c_first(dbc)) != 0)
00903                                 goto err;
00904                 } else
00905                         do {
00906                                 if ((ret = __bam_c_next(dbc, 1)) != 0)
00907                                         goto err;
00908                         } while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx));
00909                 break;
00910         case DB_PREV:
00911                 newopd = 1;
00912                 if (cp->pgno == PGNO_INVALID) {
00913                         if ((ret = __bam_c_last(dbc)) != 0)
00914                                 goto err;
00915                 } else
00916                         if ((ret = __bam_c_prev(dbc)) != 0)
00917                                 goto err;
00918                 break;
00919         case DB_PREV_NODUP:
00920                 newopd = 1;
00921                 if (cp->pgno == PGNO_INVALID) {
00922                         if ((ret = __bam_c_last(dbc)) != 0)
00923                                 goto err;
00924                 } else
00925                         do {
00926                                 if ((ret = __bam_c_prev(dbc)) != 0)
00927                                         goto err;
00928                         } while (IS_CUR_DUPLICATE(dbc, orig_pgno, orig_indx));
00929                 break;
00930         case DB_SET:
00931         case DB_SET_RECNO:
00932                 newopd = 1;
00933                 if ((ret = __bam_c_search(dbc, key, flags, &exact)) != 0)
00934                         goto err;
00935                 break;
00936         case DB_SET_RANGE:
00937                 newopd = 1;
00938                 if ((ret = __bam_c_search(dbc, key, flags, &exact)) != 0)
00939                         goto err;
00940 
00941                 /*
00942                  * As we didn't require an exact match, the search function
00943                  * may have returned an entry past the end of the page.  Or,
00944                  * we may be referencing a deleted record.  If so, move to
00945                  * the next entry.
00946                  */
00947                 if (cp->indx == NUM_ENT(cp->page) || IS_CUR_DELETED(dbc))
00948                         if ((ret = __bam_c_next(dbc, 0)) != 0)
00949                                 goto err;
00950                 break;
00951         default:
00952                 ret = CDB___db_unknown_flag(dbp->dbenv, "__bam_c_get", flags);
00953                 goto err;
00954         }
00955 
00956         /*
00957          * We may have moved to an off-page duplicate tree.  Return that
00958          * information to our caller.
00959          */
00960         if (newopd && pgnop != NULL)
00961                 (void)__bam_isopd(dbc, pgnop);
00962 
00963         /* Don't return the key, it was passed to us */
00964         if (flags == DB_SET)
00965                 F_SET(key, DB_DBT_ISSET);
00966 
00967 err:    /*
00968          * Regardless of whether we were successful or not, if the cursor
00969          * moved, clear the delete flag, DBcursor->c_get never references
00970          * a deleted key, if it moved at all.
00971          */
00972         if (F_ISSET(cp, C_DELETED)
00973             && (cp->pgno != orig_pgno || cp->indx != orig_indx))
00974                 F_CLR(cp, C_DELETED);
00975 
00976         return (ret);
00977 }
00978 
00979 /*
00980  * __bam_getbothc --
00981  *      Search for a matching data item on a join.
00982  */
00983 static int
00984 __bam_getbothc(dbc, data)
00985         DBC *dbc;
00986         DBT *data;
00987 {
00988         BTREE_CURSOR *cp;
00989         DB *dbp;
00990         int cmp, exact, ret;
00991 
00992         dbp = dbc->dbp;
00993         cp = (BTREE_CURSOR *)dbc->internal;
00994 
00995         /*
00996          * Acquire the current page.  We have at least a read-lock
00997          * already.  The caller may have set DB_RMW asking for a
00998          * write lock, but upgrading to a write lock has no better
00999          * chance of succeeding now instead of later, so don't try.
01000          */
01001         if ((ret = CDB_memp_fget(dbp->mpf, &cp->pgno, 0, &cp->page)) != 0)
01002                 return (ret);
01003 
01004         /*
01005          * An off-page duplicate cursor.  Search the remaining duplicates
01006          * for one which matches (do a normal btree search, then verify
01007          * that the retrieved record is greater than the original one).
01008          */
01009         if (F_ISSET(dbc, DBC_OPD)) {
01010                 /*
01011                  * Check to make sure the desired item comes strictly after
01012                  * the current position;  if it doesn't, return DB_NOTFOUND.
01013                  */
01014                 if ((ret = CDB___bam_cmp(dbp, data, cp->page, cp->indx,
01015                     dbp->dup_compare == NULL ? CDB___bam_defcmp : dbp->dup_compare,
01016                     &cmp)) != 0)
01017                         return (ret);
01018 
01019                 if (cmp <= 0)
01020                         return (DB_NOTFOUND);
01021 
01022                 /* Discard the current page, we're going to do a full search. */
01023                 if ((ret = CDB_memp_fput(dbp->mpf, cp->page, 0)) != 0)
01024                         return (ret);
01025                 cp->page = NULL;
01026 
01027                 return (__bam_c_search(dbc, data, DB_GET_BOTH, &exact));
01028         }
01029 
01030         /*
01031          * We're doing a DBC->c_get(DB_GET_BOTHC) and we're already searching
01032          * a set of on-page duplicates (either sorted or unsorted).  Continue
01033          * a linear search from after the current position.
01034          *
01035          * (Note that we could have just finished a "set" of one duplicate,
01036          * i.e. not a duplicate at all, but the following check will always
01037          * return DB_NOTFOUND in this case, which is the desired behavior.)
01038          */
01039         if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
01040             !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX))
01041                 return (DB_NOTFOUND);
01042         cp->indx += P_INDX;
01043 
01044         return (__bam_getboth_finddatum(dbc, data));
01045 }
01046 
01047 /*
01048  * __bam_getboth_finddatum --
01049  *      Find a matching on-page data item.
01050  */
01051 static int
01052 __bam_getboth_finddatum(dbc, data)
01053         DBC *dbc;
01054         DBT *data;
01055 {
01056         BTREE_CURSOR *cp;
01057         DB *dbp;
01058         db_indx_t base, lim, top;
01059         int cmp, ret;
01060 
01061         dbp = dbc->dbp;
01062         cp = (BTREE_CURSOR *)dbc->internal;
01063 
01064         /*
01065          * Called (sometimes indirectly) from DBC->get to search on-page data
01066          * item(s) for a matching value.  If the original flag was DB_GET_BOTH,
01067          * the cursor argument is set to the first data item for the key.  If
01068          * the original flag was DB_GET_BOTHC, the cursor argument is set to
01069          * the first data item that we can potentially return.  In both cases,
01070          * there may or may not be additional duplicate data items to search.
01071          *
01072          * If the duplicates are not sorted, do a linear search.
01073          *
01074          * If the duplicates are sorted, do a binary search.  The reason for
01075          * this is that large pages and small key/data pairs result in large
01076          * numbers of on-page duplicates before they get pushed off-page.
01077          */
01078         if (dbp->dup_compare == NULL) {
01079                 for (;; cp->indx += P_INDX) {
01080                         if (!IS_CUR_DELETED(dbc) &&
01081                             (ret = CDB___bam_cmp(dbp, data, cp->page,
01082                             cp->indx + O_INDX, CDB___bam_defcmp, &cmp)) != 0)
01083                                 return (ret);
01084                         if (cmp == 0)
01085                                 return (0);
01086 
01087                         if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
01088                             !IS_DUPLICATE(dbc, cp->indx, cp->indx + P_INDX))
01089                                 break;
01090                 }
01091         } else {
01092                 /*
01093                  * Find the top and bottom of the duplicate set.  Binary search
01094                  * requires at least two items, don't loop if there's only one.
01095                  */
01096                 for (base = top = cp->indx;
01097                     top < NUM_ENT(cp->page); top += P_INDX)
01098                         if (!IS_DUPLICATE(dbc, cp->indx, top))
01099                                 break;
01100                 if (base == (top - P_INDX)) {
01101                         if  ((ret = CDB___bam_cmp(dbp, data,
01102                             cp->page, cp->indx + O_INDX,
01103                             dbp->dup_compare, &cmp)) != 0)
01104                                return (ret);
01105                         return (cmp == 0 ? 0 : DB_NOTFOUND);
01106                 }
01107 
01108                 for (lim =
01109                     (top - base) / (db_indx_t)P_INDX; lim != 0; lim >>= 1) {
01110                         cp->indx = base + ((lim >> 1) * P_INDX);
01111                         if ((ret = CDB___bam_cmp(dbp, data, cp->page,
01112                             cp->indx + O_INDX, dbp->dup_compare, &cmp)) != 0)
01113                                 return (ret);
01114                         if (cmp == 0) {
01115                                 if (!IS_CUR_DELETED(dbc))
01116                                         return (0);
01117                                 break;
01118                         }
01119                         if (cmp > 0) {
01120                                 base = cp->indx + P_INDX;
01121                                 --lim;
01122                         }
01123                 }
01124         }
01125         return (DB_NOTFOUND);
01126 }
01127 
01128 /*
01129  * __bam_c_put --
01130  *      Put using a cursor.
01131  */
01132 static int
01133 __bam_c_put(dbc, key, data, flags, pgnop)
01134         DBC *dbc;
01135         DBT *key, *data;
01136         u_int32_t flags;
01137         db_pgno_t *pgnop;
01138 {
01139         BTREE_CURSOR *cp;
01140         DB *dbp;
01141         DBT dbt;
01142         u_int32_t iiop;
01143         int cmp, exact, needkey, ret, stack;
01144         void *arg;
01145 
01146         dbp = dbc->dbp;
01147         cp = (BTREE_CURSOR *)dbc->internal;
01148 
01149 split:  needkey = ret = stack = 0;
01150         switch (flags) {
01151         case DB_AFTER:
01152         case DB_BEFORE:
01153         case DB_CURRENT:
01154                 needkey = 1;
01155                 iiop = flags;
01156 
01157                 /*
01158                  * If the tree has record numbers (and we're not just replacing
01159                  * an existing record), we need a complete stack so that we can
01160                  * adjust the record counts.
01161                  */
01162                 if (F_ISSET(cp, C_RECNUM) &&
01163                     (flags != DB_CURRENT || F_ISSET(cp, C_DELETED))) {
01164                         if ((ret = __bam_c_getstack(dbc)) != 0)
01165                                 goto err;
01166                         stack = 1;
01167                 } else {
01168                         /* Acquire the current page with a write lock. */
01169                         ACQUIRE_WRITE_LOCK(dbc, ret);
01170                         if (ret != 0)
01171                                 goto err;
01172                         if ((ret = CDB_memp_fget(
01173                             dbp->mpf, &cp->pgno, 0, &cp->page)) != 0)
01174                                 goto err;
01175                 }
01176                 break;
01177         case DB_KEYFIRST:
01178         case DB_KEYLAST:
01179         case DB_NODUPDATA:
01180                 /*
01181                  * Searching off-page, sorted duplicate tree: do a tree search
01182                  * for the correct item; __bam_c_search returns the smallest
01183                  * slot greater than the key, use it.
01184                  */
01185                 if (F_ISSET(dbc, DBC_OPD)) {
01186                         if ((ret =
01187                             __bam_c_search(dbc, data, flags, &exact)) != 0)
01188                                 goto err;
01189                         stack = 1;
01190 
01191                         /* Disallow "sorted" duplicate duplicates. */
01192                         if (exact) {
01193                                 ret = CDB___db_duperr(dbp, flags);
01194                                 goto err;
01195                         }
01196                         iiop = DB_BEFORE;
01197                         break;
01198                 }
01199 
01200                 /* Searching a btree. */
01201                 if ((ret = __bam_c_search(dbc, key,
01202                     flags == DB_KEYFIRST || dbp->dup_compare != NULL ?
01203                     DB_KEYFIRST : DB_KEYLAST, &exact)) != 0)
01204                         goto err;
01205                 stack = 1;
01206 
01207                 /*
01208                  * If we don't have an exact match, __bam_c_search returned
01209                  * the smallest slot greater than the key, use it.
01210                  */
01211                 if (!exact) {
01212                         iiop = DB_KEYFIRST;
01213                         break;
01214                 }
01215 
01216                 /*
01217                  * If duplicates aren't supported, replace the current item.
01218                  * (If implementing the DB->put function, our caller already
01219                  * checked the DB_NOOVERWRITE flag.)
01220                  */
01221                 if (!F_ISSET(dbp, DB_AM_DUP)) {
01222                         iiop = DB_CURRENT;
01223                         break;
01224                 }
01225 
01226                 /*
01227                  * If we find a matching entry, it may be an off-page duplicate
01228                  * tree.  Return the page number to our caller, we need a new
01229                  * cursor.
01230                  */
01231                 if (pgnop != NULL && __bam_isopd(dbc, pgnop))
01232                         goto done;
01233 
01234                 /* If the duplicates aren't sorted, move to the right slot. */
01235                 if (dbp->dup_compare == NULL) {
01236                         if (flags == DB_KEYFIRST)
01237                                 iiop = DB_BEFORE;
01238                         else
01239                                 for (;; cp->indx += P_INDX)
01240                                         if (cp->indx + P_INDX >=
01241                                             NUM_ENT(cp->page) ||
01242                                             !IS_DUPLICATE(dbc, cp->indx,
01243                                             cp->indx + P_INDX)) {
01244                                                 iiop = DB_AFTER;
01245                                                 break;
01246                                         }
01247                         break;
01248                 }
01249 
01250                 /*
01251                  * We know that we're looking at the first of a set of sorted
01252                  * on-page duplicates.  Walk the list to find the right slot.
01253                  */
01254                 for (;; cp->indx += P_INDX) {
01255                         if ((ret = CDB___bam_cmp(dbp, data, cp->page,
01256                             cp->indx + O_INDX, dbp->dup_compare, &cmp)) !=0)
01257                                 return (ret);
01258                         if (cmp < 0) {
01259                                 iiop = DB_BEFORE;
01260                                 break;
01261                         }
01262 
01263                         /* Disallow "sorted" duplicate duplicates. */
01264                         if (cmp == 0) {
01265                                 ret = CDB___db_duperr(dbp, flags);
01266                                 goto err;
01267                         }
01268 
01269                         if (cp->indx + P_INDX >= NUM_ENT(cp->page) ||
01270                             ((PAGE *)cp->page)->inp[cp->indx] !=
01271                             ((PAGE *)cp->page)->inp[cp->indx + P_INDX]) {
01272                                 iiop = DB_AFTER;
01273                                 break;
01274                         }
01275                 }
01276                 break;
01277         default:
01278                 ret = CDB___db_unknown_flag(dbp->dbenv, "__bam_c_put", flags);
01279                 goto err;
01280         }
01281 
01282         switch (ret = CDB___bam_iitem(dbc, key, data, iiop, 0)) {
01283         case 0:
01284                 break;
01285         case DB_NEEDSPLIT:
01286                 /*
01287                  * To split, we need a key for the page.  Either use the key
01288                  * argument or get a copy of the key from the page.
01289                  */
01290                 if (flags == DB_AFTER ||
01291                     flags == DB_BEFORE || flags == DB_CURRENT) {
01292                         memset(&dbt, 0, sizeof(DBT));
01293                         if ((ret = CDB___db_ret(dbp, cp->page, 0, &dbt,
01294                             &dbc->rkey.data, &dbc->rkey.ulen)) != 0)
01295                                 goto err;
01296                         arg = &dbt;
01297                 } else
01298                         arg = F_ISSET(dbc, DBC_OPD) ? data : key;
01299 
01300                 /*
01301                  * Discard any locks and pinned pages (the locks are discarded
01302                  * even if we're running with transactions, as they lock pages
01303                  * that we're sorry we ever acquired).  If stack is set and the
01304                  * cursor entries are valid, they point to the same entries as
01305                  * the stack, don't free them twice.
01306                  */
01307                 if (stack)
01308                         ret = CDB___bam_stkrel(dbc, STK_CLRDBC | STK_NOLOCK);
01309                 else
01310                         DISCARD_CUR(dbc, ret);
01311                 if (ret != 0)
01312                         goto err;
01313 
01314                 /* Split the tree. */
01315                 if ((ret = CDB___bam_split(dbc, arg)) != 0)
01316                         return (ret);
01317 
01318                 goto split;
01319         default:
01320                 goto err;
01321         }
01322 
01323 err:
01324 done:   /*
01325          * Discard any pages pinned in the tree and their locks, except for
01326          * the leaf page.  Note, the leaf page participated in any stack we
01327          * acquired, and so we have to adjust the stack as necessary.  If
01328          * there was only a single page on the stack, we don't have to free
01329          * further stack pages.
01330          */
01331         if (stack && BT_STK_POP(cp) != NULL)
01332                 (void)CDB___bam_stkrel(dbc, 0);
01333 
01334         /*
01335          * Regardless of whether we were successful or not, clear the delete
01336          * flag.  If we're successful, we either moved the cursor or the item
01337          * is no longer deleted.  If we're not successful, then we're just a
01338          * copy, no need to have the flag set.
01339          */
01340         F_CLR(cp, C_DELETED);
01341 
01342         return (ret);
01343 }
01344 
01345 /*
01346  * CDB___bam_c_rget --
01347  *      Return the record number for a cursor.
01348  *
01349  * PUBLIC: int CDB___bam_c_rget __P((DBC *, DBT *, u_int32_t));
01350  */
01351 int
01352 CDB___bam_c_rget(dbc, data, flags)
01353         DBC *dbc;
01354         DBT *data;
01355         u_int32_t flags;
01356 {
01357         BTREE_CURSOR *cp;
01358         DB *dbp;
01359         DBT dbt;
01360         db_recno_t recno;
01361         int exact, ret;
01362 
01363         COMPQUIET(flags, 0);
01364         dbp = dbc->dbp;
01365         cp = (BTREE_CURSOR *)dbc->internal;
01366 
01367         /*
01368          * Get the page with the current item on it.
01369          * Get a copy of the key.
01370          * Release the page, making sure we don't release it twice.
01371          */
01372         if ((ret = CDB_memp_fget(dbp->mpf, &cp->pgno, 0, &cp->page)) != 0)
01373                 return (ret);
01374         memset(&dbt, 0, sizeof(DBT));
01375         if ((ret = CDB___db_ret(dbp, cp->page,
01376             cp->indx, &dbt, &dbc->rkey.data, &dbc->rkey.ulen)) != 0)
01377                 goto err;
01378         ret = CDB_memp_fput(dbp->mpf, cp->page, 0);
01379         cp->page = NULL;
01380         if (ret != 0)
01381                 return (ret);
01382 
01383         if ((ret = CDB___bam_search(dbc, &dbt,
01384             F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND,
01385             1, &recno, &exact)) != 0)
01386                 goto err;
01387 
01388         ret = CDB___db_retcopy(dbp, data,
01389             &recno, sizeof(recno), &dbc->rdata.data, &dbc->rdata.ulen);
01390 
01391         /* Release the stack. */
01392 err:    CDB___bam_stkrel(dbc, 0);
01393 
01394         return (ret);
01395 }
01396 
01397 /*
01398  * __bam_c_writelock --
01399  *      Upgrade the cursor to a write lock.
01400  */
01401 static int
01402 __bam_c_writelock(dbc)
01403         DBC *dbc;
01404 {
01405         BTREE_CURSOR *cp;
01406         int ret;
01407 
01408         cp = (BTREE_CURSOR *)dbc->internal;
01409 
01410         if (cp->lock_mode == DB_LOCK_WRITE)
01411                 return (0);
01412 
01413         /*
01414          * When writing to an off-page duplicate tree, we need to have the
01415          * appropriate page in the primary tree locked.  The general DBC
01416          * code calls us first with the primary cursor so we can acquire the
01417          * appropriate lock.
01418          */
01419         ACQUIRE_WRITE_LOCK(dbc, ret);
01420         return (ret);
01421 }
01422 
01423 /*
01424  * __bam_c_first --
01425  *      Return the first record.
01426  */
01427 static int
01428 __bam_c_first(dbc)
01429         DBC *dbc;
01430 {
01431         BTREE_CURSOR *cp;
01432         DB *dbp;
01433         db_pgno_t pgno;
01434         int ret;
01435 
01436         dbp = dbc->dbp;
01437         cp = (BTREE_CURSOR *)dbc->internal;
01438         ret = 0;
01439 
01440         /* Walk down the left-hand side of the tree. */
01441         for (pgno = cp->root;;) {
01442                 ACQUIRE_CUR_SET(dbc, DB_LOCK_READ, pgno, ret);
01443                 if (ret != 0)
01444                         return (ret);
01445 
01446                 /* If we find a leaf page, we're done. */
01447                 if (ISLEAF(cp->page))
01448                         break;
01449 
01450                 pgno = GET_BINTERNAL(cp->page, 0)->pgno;
01451         }
01452 
01453         /* If we want a write lock instead of a read lock, get it now. */
01454         if (F_ISSET(dbc, DBC_RMW)) {
01455                 ACQUIRE_WRITE_LOCK(dbc, ret);
01456                 if (ret != 0)
01457                         return (ret);
01458         }
01459 
01460         /* If on an empty page or a deleted record, move to the next one. */
01461         if (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc))
01462                 if ((ret = __bam_c_next(dbc, 0)) != 0)
01463                         return (ret);
01464 
01465         return (0);
01466 }
01467 
01468 /*
01469  * __bam_c_last --
01470  *      Return the last record.
01471  */
01472 static int
01473 __bam_c_last(dbc)
01474         DBC *dbc;
01475 {
01476         BTREE_CURSOR *cp;
01477         DB *dbp;
01478         db_pgno_t pgno;
01479         int ret;
01480 
01481         dbp = dbc->dbp;
01482         cp = (BTREE_CURSOR *)dbc->internal;
01483         ret = 0;
01484 
01485         /* Walk down the right-hand side of the tree. */
01486         for (pgno = cp->root;;) {
01487                 ACQUIRE_CUR_SET(dbc, DB_LOCK_READ, pgno, ret);
01488                 if (ret != 0)
01489                         return (ret);
01490 
01491                 /* If we find a leaf page, we're done. */
01492                 if (ISLEAF(cp->page))
01493                         break;
01494 
01495                 pgno =
01496                     GET_BINTERNAL(cp->page, NUM_ENT(cp->page) - O_INDX)->pgno;
01497         }
01498 
01499         /* If we want a write lock instead of a read lock, get it now. */
01500         if (F_ISSET(dbc, DBC_RMW)) {
01501                 ACQUIRE_WRITE_LOCK(dbc, ret);
01502                 if (ret != 0)
01503                         return (ret);
01504         }
01505 
01506         cp->indx = NUM_ENT(cp->page) == 0 ? 0 :
01507             NUM_ENT(cp->page) -
01508             (TYPE(cp->page) == P_LBTREE ? P_INDX : O_INDX);
01509 
01510         /* If on an empty page or a deleted record, move to the previous one. */
01511         if (NUM_ENT(cp->page) == 0 || IS_CUR_DELETED(dbc))
01512                 if ((ret = __bam_c_prev(dbc)) != 0)
01513                         return (ret);
01514 
01515         return (0);
01516 }
01517 
01518 /*
01519  * __bam_c_next --
01520  *      Move to the next record.
01521  */
01522 static int
01523 __bam_c_next(dbc, initial_move)
01524         DBC *dbc;
01525         int initial_move;
01526 {
01527         BTREE_CURSOR *cp;
01528         DB *dbp;
01529         db_indx_t adjust;
01530         db_lockmode_t lock_mode;
01531         db_pgno_t pgno;
01532         int ret;
01533 
01534         dbp = dbc->dbp;
01535         cp = (BTREE_CURSOR *)dbc->internal;
01536         ret = 0;
01537 
01538         /*
01539          * We're either moving through a page of duplicates or a btree leaf
01540          * page.
01541          *
01542          * !!!
01543          * This code handles empty pages and pages with only deleted entries.
01544          */
01545         if (F_ISSET(dbc, DBC_OPD)) {
01546                 adjust = O_INDX;
01547                 lock_mode = DB_LOCK_NG;
01548         } else {
01549                 adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX;
01550                 lock_mode =
01551                     F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
01552         }
01553         if (cp->page == NULL) {
01554                 ACQUIRE_CUR(dbc, lock_mode, ret);
01555                 if (ret != 0)
01556                         return (ret);
01557         }
01558 
01559         if (initial_move)
01560                 cp->indx += adjust;
01561 
01562         for (;;) {
01563                 /*
01564                  * If at the end of the page, move to a subsequent page.
01565                  *
01566                  * !!!
01567                  * Check for >= NUM_ENT.  If the original search landed us on
01568                  * NUM_ENT, we may have incremented indx before the test.
01569                  */
01570                 if (cp->indx >= NUM_ENT(cp->page)) {
01571                         if ((pgno
01572                             = NEXT_PGNO(cp->page)) == PGNO_INVALID)
01573                                 return (DB_NOTFOUND);
01574 
01575                         ACQUIRE_CUR_SET(dbc, lock_mode, pgno, ret);
01576                         if (ret != 0)
01577                                 return (ret);
01578                         continue;
01579                 }
01580                 if (IS_CUR_DELETED(dbc)) {
01581                         cp->indx += adjust;
01582                         continue;
01583                 }
01584                 break;
01585         }
01586         return (0);
01587 }
01588 
01589 /*
01590  * __bam_c_prev --
01591  *      Move to the previous record.
01592  */
01593 static int
01594 __bam_c_prev(dbc)
01595         DBC *dbc;
01596 {
01597         BTREE_CURSOR *cp;
01598         DB *dbp;
01599         db_indx_t adjust;
01600         db_lockmode_t lock_mode;
01601         db_pgno_t pgno;
01602         int ret;
01603 
01604         dbp = dbc->dbp;
01605         cp = (BTREE_CURSOR *)dbc->internal;
01606         ret = 0;
01607 
01608         /*
01609          * We're either moving through a page of duplicates or a btree leaf
01610          * page.
01611          *
01612          * !!!
01613          * This code handles empty pages and pages with only deleted entries.
01614          */
01615         if (F_ISSET(dbc, DBC_OPD)) {
01616                 adjust = O_INDX;
01617                 lock_mode = DB_LOCK_NG;
01618         } else {
01619                 adjust = dbc->dbtype == DB_BTREE ? P_INDX : O_INDX;
01620                 lock_mode =
01621                     F_ISSET(dbc, DBC_RMW) ? DB_LOCK_WRITE : DB_LOCK_READ;
01622         }
01623         if (cp->page == NULL) {
01624                 ACQUIRE_CUR(dbc, lock_mode, ret);
01625                 if (ret != 0)
01626                         return (ret);
01627         }
01628 
01629         for (;;) {
01630                 /* If at the beginning of the page, move to a previous one. */
01631                 if (cp->indx == 0) {
01632                         if ((pgno =
01633                             PREV_PGNO(cp->page)) == PGNO_INVALID)
01634                                 return (DB_NOTFOUND);
01635 
01636                         ACQUIRE_CUR_SET(dbc, lock_mode, pgno, ret);
01637                         if (ret != 0)
01638                                 return (ret);
01639 
01640                         if ((cp->indx = NUM_ENT(cp->page)) == 0)
01641                                 continue;
01642                 }
01643 
01644                 /* Ignore deleted records. */
01645                 cp->indx -= adjust;
01646                 if (IS_CUR_DELETED(dbc))
01647                         continue;
01648 
01649                 break;
01650         }
01651         return (0);
01652 }
01653 
01654 /*
01655  * __bam_c_search --
01656  *      Move to a specified record.
01657  */
01658 static int
01659 __bam_c_search(dbc, key, flags, exactp)
01660         DBC *dbc;
01661         const DBT *key;
01662         u_int32_t flags;
01663         int *exactp;
01664 {
01665         BTREE *t;
01666         BTREE_CURSOR *cp;
01667         DB *dbp;
01668         PAGE *h;
01669         db_indx_t indx;
01670         db_recno_t recno;
01671         u_int32_t sflags;
01672         int cmp, ret;
01673 
01674         dbp = dbc->dbp;
01675         cp = (BTREE_CURSOR *)dbc->internal;
01676         t = dbp->bt_internal;
01677         ret = 0;
01678 
01679         /*
01680          * Find an entry in the database.  Discard any lock we currently hold,
01681          * we're going to search the tree.
01682          */
01683         DISCARD_CUR(dbc, ret);
01684         if (ret != 0)
01685                 return (ret);
01686 
01687         switch (flags) {
01688         case DB_SET_RECNO:
01689                 if ((ret = CDB___ram_getno(dbc, key, &recno, 0)) != 0)
01690                         return (ret);
01691                 sflags = (F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND) | S_EXACT;
01692                 if ((ret = CDB___bam_rsearch(dbc, &recno, sflags, 1, exactp)) != 0)
01693                         return (ret);
01694                 break;
01695         case DB_SET:
01696         case DB_GET_BOTH:
01697                 sflags = (F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND) | S_EXACT;
01698                 goto search;
01699         case DB_SET_RANGE:
01700                 sflags =
01701                     (F_ISSET(dbc, DBC_RMW) ? S_WRITE : S_READ) | S_DUPFIRST;
01702                 goto search;
01703         case DB_KEYFIRST:
01704                 sflags = S_KEYFIRST;
01705                 goto fast_search;
01706         case DB_KEYLAST:
01707         case DB_NODUPDATA:
01708                 sflags = S_KEYLAST;
01709 fast_search:    /*
01710                  * If the application has a history of inserting into the first
01711                  * or last pages of the database, we check those pages first to
01712                  * avoid doing a full search.
01713                  *
01714                  * If the tree has record numbers, we need a complete stack so
01715                  * that we can adjust the record counts, so fast_search isn't
01716                  * possible.
01717                  */
01718                 if (F_ISSET(cp, C_RECNUM))
01719                         goto search;
01720 
01721                 /*
01722                  * If the tree has no history of insertion, do it the slow way.
01723                  */
01724                 if (t->bt_lpgno == PGNO_INVALID)
01725                         goto search;
01726 
01727                 /* Lock and retrieve the page on which we last inserted. */
01728                 h = NULL;
01729                 ACQUIRE(dbc, DB_LOCK_WRITE,
01730                     t->bt_lpgno, cp->lock, t->bt_lpgno, h, ret);
01731                 if (ret != 0)
01732                         return (ret);
01733 
01734                 /*
01735                  * It's okay if the page type isn't right or it's empty, it
01736                  * just means that the world changed.
01737                  */
01738                 if (TYPE(h) != P_LBTREE || NUM_ENT(h) == 0)
01739                         goto fast_miss;
01740 
01741                 /*
01742                  * What we do here is test to see if we're at the beginning or
01743                  * end of the tree and if the new item sorts before/after the
01744                  * first/last page entry.  We don't try and catch inserts into
01745                  * the middle of the tree (although we could, as long as there
01746                  * were two keys on the page and we saved both the index and
01747                  * the page number of the last insert).
01748                  */
01749                 if (h->next_pgno == PGNO_INVALID) {
01750                         indx = NUM_ENT(h) - P_INDX;
01751                         if ((ret = CDB___bam_cmp(dbp,
01752                             key, h, indx, t->bt_compare, &cmp)) != 0)
01753                                 return (ret);
01754 
01755                         if (cmp < 0)
01756                                 goto try_begin;
01757                         if (cmp > 0) {
01758                                 indx += P_INDX;
01759                                 goto fast_hit;
01760                         }
01761 
01762                         /*
01763                          * Found a duplicate.  If doing DB_KEYLAST, we're at
01764                          * the correct position, otherwise, move to the first
01765                          * of the duplicates.  If we're looking at off-page
01766                          * duplicates, duplicate duplicates aren't permitted,
01767                          * so we're done.
01768                          */
01769                         if (flags == DB_KEYLAST)
01770                                 goto fast_hit;
01771                         for (;
01772                             indx > 0 && h->inp[indx - P_INDX] == h->inp[indx];
01773                             indx -= P_INDX)
01774                                 ;
01775                         goto fast_hit;
01776                 }
01777 try_begin:      if (h->prev_pgno == PGNO_INVALID) {
01778                         indx = 0;
01779                         if ((ret = CDB___bam_cmp(dbp,
01780                             key, h, indx, t->bt_compare, &cmp)) != 0)
01781                                 return (ret);
01782 
01783                         if (cmp > 0)
01784                                 goto fast_miss;
01785                         if (cmp < 0)
01786                                 goto fast_hit;
01787 
01788                         /*
01789                          * Found a duplicate.  If doing DB_KEYFIRST, we're at
01790                          * the correct position, otherwise, move to the last
01791                          * of the duplicates.  If we're looking at off-page
01792                          * duplicates, duplicate duplicates aren't permitted,
01793                          * so we're done.
01794                          */
01795                         if (flags == DB_KEYFIRST)
01796                                 goto fast_hit;
01797                         for (;
01798                             indx < (db_indx_t)(NUM_ENT(h) - P_INDX) &&
01799                             h->inp[indx] == h->inp[indx + P_INDX];
01800                             indx += P_INDX)
01801                                 ;
01802                         goto fast_hit;
01803                 }
01804                 goto fast_miss;
01805 
01806 fast_hit:       /* Set the exact match flag, we may have found a duplicate. */
01807                 *exactp = cmp == 0;
01808 
01809                 /*
01810                  * Insert the entry in the stack.  (Our caller is likely to
01811                  * call CDB___bam_stkrel() after our return.)
01812                  */
01813                 BT_STK_CLR(cp);
01814                 BT_STK_ENTER(dbp->dbenv,
01815                     cp, h, indx, cp->lock, cp->lock_mode, ret);
01816                 if (ret != 0)
01817                         return (ret);
01818                 break;
01819 
01820 fast_miss:      /*
01821                  * This was not the right page, so we do not need to retain
01822                  * the lock even in the presence of transactions.
01823                  */
01824                 DISCARD(dbc, 1, cp->lock, h, ret);
01825                 if (ret != 0)
01826                         return (ret);
01827 
01828 search:         if ((ret =
01829                     CDB___bam_search(dbc, key, sflags, 1, NULL, exactp)) != 0)
01830                         return (ret);
01831                 break;
01832         default:
01833                 return (CDB___db_unknown_flag(dbp->dbenv, "__bam_c_search", flags));
01834         }
01835 
01836         /* Initialize the cursor from the stack. */
01837         cp->page = cp->csp->page;
01838         cp->pgno = cp->csp->page->pgno;
01839         cp->indx = cp->csp->indx;
01840         cp->lock = cp->csp->lock;
01841         cp->lock_mode = cp->csp->lock_mode;
01842 
01843         /*
01844          * If we inserted a key into the first or last slot of the tree,
01845          * remember where it was so we can do it more quickly next time.
01846          */
01847         if (TYPE(cp->page) == P_LBTREE &&
01848             (flags == DB_KEYFIRST || flags == DB_KEYLAST))
01849                 t->bt_lpgno =
01850                     (NEXT_PGNO(cp->page) == PGNO_INVALID &&
01851                     cp->indx >= NUM_ENT(cp->page)) ||
01852                     (PREV_PGNO(cp->page) == PGNO_INVALID &&
01853                     cp->indx == 0) ? cp->pgno : PGNO_INVALID;
01854         return (0);
01855 }
01856 
01857 /*
01858  * __bam_c_physdel --
01859  *      Physically remove an item from the page.
01860  */
01861 static int
01862 __bam_c_physdel(dbc)
01863         DBC *dbc;
01864 {
01865         BTREE_CURSOR *cp;
01866         DB *dbp;
01867         DBT key;
01868         DB_LOCK lock;
01869         PAGE *h;
01870         db_pgno_t pgno;
01871         int delete_page, empty_page, exact, level, ret;
01872 
01873         dbp = dbc->dbp;
01874         cp = (BTREE_CURSOR *)dbc->internal;
01875         delete_page = empty_page = ret = 0;
01876 
01877         /* If the page is going to be emptied, consider deleting it. */
01878         delete_page = empty_page =
01879             NUM_ENT(cp->page) == (TYPE(cp->page) == P_LBTREE ? 2 : 1);
01880 
01881         /*
01882          * Check if the application turned off reverse splits.  Applications
01883          * can't turn off reverse splits in off-page duplicate trees, that
01884          * space will never be reused unless the exact same key is specified.
01885          */
01886         if (delete_page &&
01887             !F_ISSET(dbc, DBC_OPD) && F_ISSET(dbp, DB_BT_REVSPLIT))
01888                 delete_page = 0;
01889 
01890         /*
01891          * We never delete the last leaf page.  (Not really true -- we delete
01892          * the last leaf page of off-page duplicate trees, but that's handled
01893          * by our caller, not down here.)
01894          */
01895         if (delete_page && cp->pgno == cp->root)
01896                 delete_page = 0;
01897 
01898         /*
01899          * To delete a leaf page other than an empty root page, we need a
01900          * copy of a key from the page.  Use the 0th page index since it's
01901          * the last key the page held.
01902          */
01903         if (delete_page) {
01904                 memset(&key, 0, sizeof(DBT));
01905                 key.app_private = dbp->dbenv->app_private;
01906                 if ((ret = CDB___db_ret(dbp, cp->page,
01907                     0, &key, &dbc->rkey.data, &dbc->rkey.ulen)) != 0)
01908                         return (ret);
01909         }
01910 
01911         /*
01912          * Delete the items.  If page isn't empty, we adjust the cursors.
01913          *
01914          * !!!
01915          * The following operations to delete a page may deadlock.  The easy
01916          * scenario is if we're deleting an item because we're closing cursors
01917          * because we've already deadlocked and want to call CDB_txn_abort().  If
01918          * we fail due to deadlock, we'll leave a locked, possibly empty page
01919          * in the tree, which won't be empty long because we'll undo the delete
01920          * when we undo the transaction's modifications.
01921          *
01922          * !!!
01923          * Delete the key item first, otherwise the on-page duplicate checks
01924          * in CDB___bam_ditem() won't work!
01925          */
01926         if (TYPE(cp->page) == P_LBTREE) {
01927                 if ((ret = CDB___bam_ditem(dbc, cp->page, cp->indx)) != 0)
01928                         return (ret);
01929                 if (!empty_page)
01930                         CDB___bam_ca_di(dbp, PGNO(cp->page), cp->indx, -1);
01931         }
01932         if ((ret = CDB___bam_ditem(dbc, cp->page, cp->indx)) != 0)
01933                 return (ret);
01934         if (!empty_page)
01935                 CDB___bam_ca_di(dbp, PGNO(cp->page), cp->indx, -1);
01936 
01937         /* If we're not going to try and delete the page, we're done. */
01938         if (!delete_page)
01939                 return (0);
01940 
01941         /*
01942          * Call CDB___bam_search to reacquire the empty leaf page, but this time
01943          * get both the leaf page and it's parent, locked.  Jump back up the
01944          * tree, until we have the top pair of pages that we want to delete.
01945          * Once we have the top page that we want to delete locked, lock the
01946          * underlying pages and check to make sure they're still empty.  If
01947          * they are, delete them.
01948          */
01949         for (level = LEAFLEVEL;; ++level) {
01950                 /* Acquire a page and its parent, locked. */
01951                 key.app_private = dbp->dbenv->app_private;
01952                 if ((ret = CDB___bam_search(
01953                     dbc, &key, S_WRPAIR, level, NULL, &exact)) != 0)
01954                         return (ret);
01955 
01956                 /*
01957                  * If we reach the root or the parent page isn't going to be
01958                  * empty when we delete one record, stop.
01959                  */
01960                 h = cp->csp[-1].page;
01961                 if (h->pgno == cp->root || NUM_ENT(h) != 1)
01962                         break;
01963 
01964                 /* Discard the stack, retaining no locks. */
01965                 (void)CDB___bam_stkrel(dbc, STK_NOLOCK);
01966         }
01967 
01968         /*
01969          * Move the stack pointer one after the last entry, we may be about
01970          * to push more items onto the page stack.
01971          */
01972         ++cp->csp;
01973 
01974         /*
01975          * cp->csp[-2].page is now the parent page, which we may or may not be
01976          * going to delete, and cp->csp[-1].page is the first page we know we
01977          * are going to delete.  Walk down the chain of pages, acquiring pages
01978          * until we've acquired a leaf page.  Generally, this shouldn't happen;
01979          * we should only see a single internal page with one item and a single
01980          * leaf page with no items.  The scenario where we could see something
01981          * else is if reverse splits were turned off for awhile and then turned
01982          * back on.  That could result in all sorts of strangeness, e.g., empty
01983          * pages in the tree, trees that looked like linked lists, and so on.
01984          *
01985          * !!!
01986          * Sheer paranoia: if we find any pages that aren't going to be emptied
01987          * by the delete, someone else added an item while we were walking the
01988          * tree, and we discontinue the delete.  Shouldn't be possible, but we
01989          * check regardless.
01990          */
01991         for (h = cp->csp[-1].page;;) {
01992                 if (ISLEAF(h)) {
01993                         if (NUM_ENT(h) != 0)
01994                                 break;
01995                         break;
01996                 } else
01997                         if (NUM_ENT(h) != 1)
01998                                 break;
01999 
02000                 /*
02001                  * Get the next page, write lock it and push it onto the stack.
02002                  * We know it's index 0, because it can only have one element.
02003                  */
02004                 switch (TYPE(h)) {
02005                 case P_IBTREE:
02006                         pgno = GET_BINTERNAL(h, 0)->pgno;
02007                         break;
02008                 case P_IRECNO:
02009                         pgno = GET_RINTERNAL(h, 0)->pgno;
02010                         break;
02011                 default:
02012                         return (CDB___db_pgfmt(dbp, PGNO(h)));
02013                 }
02014 
02015                 if ((ret =
02016                     CDB___db_lget(dbc, 0, pgno, DB_LOCK_WRITE, 0, &lock)) != 0)
02017                         break;
02018                 if ((ret = CDB_memp_fget(dbp->mpf, &pgno, 0, &h)) != 0)
02019                         break;
02020                 BT_STK_PUSH(dbp->dbenv, cp, h, 0, lock, DB_LOCK_WRITE, ret);
02021                 if (ret != 0)
02022                         break;
02023         }
02024 
02025         /* Adjust the cursor stack to reference the last page on the stack. */
02026         BT_STK_POP(cp);
02027 
02028         /*
02029          * If everything worked, delete the stack, otherwise, release the
02030          * stack and page locks without further damage.
02031          */
02032         if (ret == 0)
02033                 ret = CDB___bam_dpages(dbc, cp->sp);
02034         else
02035                 (void)CDB___bam_stkrel(dbc, 0);
02036 
02037         return (ret);
02038 }
02039 
02040 /*
02041  * __bam_c_getstack --
02042  *      Acquire a full stack for a cursor.
02043  */
02044 static int
02045 __bam_c_getstack(dbc)
02046         DBC *dbc;
02047 {
02048         BTREE_CURSOR *cp;
02049         DB *dbp;
02050         DBT dbt;
02051         PAGE *h;
02052         int exact, ret, t_ret;
02053 
02054         dbp = dbc->dbp;
02055         cp = (BTREE_CURSOR *)dbc->internal;
02056 
02057         /*
02058          * Get the page with the current item on it.  The caller of this
02059          * routine has to already hold a read lock on the page, so there
02060          * is no additional lock to acquire.
02061          */
02062         if ((ret = CDB_memp_fget(dbp->mpf, &cp->pgno, 0, &h)) != 0)
02063                 return (ret);
02064 
02065         /* Get a copy of a key from the page. */
02066         memset(&dbt, 0, sizeof(DBT));
02067         if ((ret = CDB___db_ret(dbp,
02068             h, 0, &dbt, &dbc->rkey.data, &dbc->rkey.ulen)) != 0)
02069                 goto err;
02070 
02071         /* Get a write-locked stack for the page. */
02072         exact = 0;
02073         ret = CDB___bam_search(dbc, &dbt, S_KEYFIRST, 1, NULL, &exact);
02074 
02075 err:    /* Discard the key and the page. */
02076         if ((t_ret = CDB_memp_fput(dbp->mpf, h, 0)) != 0 && ret == 0)
02077                 ret = t_ret;
02078 
02079         if (ret == 0) {
02080                 /*
02081                  * Initialize the cursor from the stack.  We don't take the
02082                  * page number or page index.  The former is unchanged, but
02083                  * the latter may have been explicitly set by our caller and
02084                  * we can't change it.
02085                  */
02086                 cp->page = cp->csp->page;
02087                 cp->lock = cp->csp->lock;
02088                 cp->lock_mode = cp->csp->lock_mode;
02089         }
02090 
02091         return (ret);
02092 }
02093 
02094 /*
02095  * __bam_isopd --
02096  *      Return if the cursor references an off-page duplicate tree via its
02097  *      page number.
02098  */
02099 static int
02100 __bam_isopd(dbc, pgnop)
02101         DBC *dbc;
02102         db_pgno_t *pgnop;
02103 {
02104         BOVERFLOW *bo;
02105 
02106         if (TYPE(dbc->internal->page) != P_LBTREE)
02107                 return (0);
02108 
02109         bo = GET_BOVERFLOW(dbc->internal->page, dbc->internal->indx + O_INDX);
02110         if (B_TYPE(bo->type) == B_DUPLICATE) {
02111                 *pgnop = bo->pgno;
02112                 return (1);
02113         }
02114         return (0);
02115 }

Generated on Sun Jun 8 10:56:34 2008 for GNUmifluz by  doxygen 1.5.5