bt_split.c

Go to the documentation of this file.
00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 1996, 1997, 1998, 1999, 2000
00005  *      Sleepycat Software.  All rights reserved.
00006  */
00007 /*
00008  * Copyright (c) 1990, 1993, 1994, 1995, 1996
00009  *      Keith Bostic.  All rights reserved.
00010  */
00011 /*
00012  * Copyright (c) 1990, 1993, 1994, 1995
00013  *      The Regents of the University of California.  All rights reserved.
00014  *
00015  * Redistribution and use in source and binary forms, with or without
00016  * modification, are permitted provided that the following conditions
00017  * are met:
00018  * 1. Redistributions of source code must retain the above copyright
00019  *    notice, this list of conditions and the following disclaimer.
00020  * 2. Redistributions in binary form must reproduce the above copyright
00021  *    notice, this list of conditions and the following disclaimer in the
00022  *    documentation and/or other materials provided with the distribution.
00023  * 3. Neither the name of the University nor the names of its contributors
00024  *    may be used to endorse or promote products derived from this software
00025  *    without specific prior written permission.
00026  *
00027  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
00028  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00029  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00030  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
00031  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00032  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
00033  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
00034  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00035  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
00036  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
00037  * SUCH DAMAGE.
00038  */
00039 
00040 #include "config.h"
00041 
00042 #ifndef lint
00043 static const char revid[] = "$Id: bt__split_8c-source.html,v 1.1 2008/06/08 10:13:45 sebdiaz Exp $";
00044 #endif /* not lint */
00045 
00046 #ifndef NO_SYSTEM_INCLUDES
00047 #include <sys/types.h>
00048 
00049 #include <errno.h>
00050 #include <limits.h>
00051 #include <string.h>
00052 #endif
00053 
00054 #include "db_int.h"
00055 #include "db_page.h"
00056 #include "db_shash.h"
00057 #include "lock.h"
00058 #include "btree.h"
00059 
00060 #ifdef DEBUG
00061 #include "WordMonitor.h"
00062 #endif /* DEBUG */
00063 
00064 static int __bam_broot __P((DBC *, PAGE *, PAGE *, PAGE *));
00065 static int __bam_page __P((DBC *, EPG *, EPG *));
00066 static int __bam_pinsert __P((DBC *, EPG *, PAGE *, PAGE *, int));
00067 static int __bam_psplit __P((DBC *, EPG *, PAGE *, PAGE *, db_indx_t *));
00068 static int __bam_root __P((DBC *, EPG *));
00069 static int __ram_root __P((DBC *, PAGE *, PAGE *, PAGE *));
00070 
00071 /*
00072  * CDB___bam_split --
00073  *      Split a page.
00074  *
00075  * PUBLIC: int CDB___bam_split __P((DBC *, void *));
00076  */
00077 int
00078 CDB___bam_split(dbc, arg)
00079         DBC *dbc;
00080         void *arg;
00081 {
00082         BTREE *t;
00083         BTREE_CURSOR *cp;
00084         DB *dbp;
00085         enum { UP, DOWN } dir;
00086         db_pgno_t root_pgno;
00087         int exact, level, ret;
00088 
00089         dbp = dbc->dbp;
00090         cp = (BTREE_CURSOR *)dbc->internal;
00091         root_pgno = cp->root;
00092 
00093         /*
00094          * The locking protocol we use to avoid deadlock to acquire locks by
00095          * walking down the tree, but we do it as lazily as possible, locking
00096          * the root only as a last resort.  We expect all stack pages to have
00097          * been discarded before we're called; we discard all short-term locks.
00098          *
00099          * When CDB___bam_split is first called, we know that a leaf page was too
00100          * full for an insert.  We don't know what leaf page it was, but we
00101          * have the key/recno that caused the problem.  We call XX_search to
00102          * reacquire the leaf page, but this time get both the leaf page and
00103          * its parent, locked.  We then split the leaf page and see if the new
00104          * internal key will fit into the parent page.  If it will, we're done.
00105          *
00106          * If it won't, we discard our current locks and repeat the process,
00107          * only this time acquiring the parent page and its parent, locked.
00108          * This process repeats until we succeed in the split, splitting the
00109          * root page as the final resort.  The entire process then repeats,
00110          * as necessary, until we split a leaf page.
00111          *
00112          * XXX
00113          * A traditional method of speeding this up is to maintain a stack of
00114          * the pages traversed in the original search.  You can detect if the
00115          * stack is correct by storing the page's LSN when it was searched and
00116          * comparing that LSN with the current one when it's locked during the
00117          * split.  This would be an easy change for this code, but I have no
00118          * numbers that indicate it's worthwhile.
00119          */
00120         t = dbp->bt_internal;
00121         for (dir = UP, level = LEAFLEVEL;; dir == UP ? ++level : --level) {
00122                 /*
00123                  * Acquire a page and its parent, locked.
00124                  */
00125                 if ((ret = (dbc->dbtype == DB_BTREE ?
00126                     CDB___bam_search(dbc, arg, S_WRPAIR, level, NULL, &exact) :
00127                     CDB___bam_rsearch(dbc,
00128                         (db_recno_t *)arg, S_WRPAIR, level, &exact))) != 0)
00129                         return (ret);
00130 
00131                 /*
00132                  * Split the page if it still needs it (it's possible another
00133                  * thread of control has already split the page).  If we are
00134                  * guaranteed that two items will fit on the page, the split
00135                  * is no longer necessary.
00136                  */
00137                 if (cp->ovflsize * 2 <=
00138                     (db_indx_t)P_FREESPACE(cp->csp[0].page)) {
00139                         CDB___bam_stkrel(dbc, STK_NOLOCK);
00140                         return (0);
00141                 }
00142                 ret = cp->csp[0].page->pgno == root_pgno ?
00143                     __bam_root(dbc, &cp->csp[0]) :
00144                     __bam_page(dbc, &cp->csp[-1], &cp->csp[0]);
00145                 BT_STK_CLR(cp);
00146 
00147                 switch (ret) {
00148                 case 0:
00149                         /* Once we've split the leaf page, we're done. */
00150                         if (level == LEAFLEVEL)
00151                                 return (0);
00152 
00153                         /* Switch directions. */
00154                         if (dir == UP)
00155                                 dir = DOWN;
00156                         break;
00157                 case DB_NEEDSPLIT:
00158                         /*
00159                          * It's possible to fail to split repeatedly, as other
00160                          * threads may be modifying the tree, or the page CDB_usage
00161                          * is sufficiently bad that we don't get enough space
00162                          * the first time.
00163                          */
00164                         if (dir == DOWN)
00165                                 dir = UP;
00166                         break;
00167                 default:
00168                         return (ret);
00169                 }
00170         }
00171         /* NOTREACHED */
00172 }
00173 
00174 /*
00175  * __bam_root --
00176  *      Split the root page of a btree.
00177  */
00178 static int
00179 __bam_root(dbc, cp)
00180         DBC *dbc;
00181         EPG *cp;
00182 {
00183         DB *dbp;
00184         DBT log_dbt;
00185         DB_LSN log_lsn;
00186         PAGE *lp, *rp;
00187         db_indx_t split;
00188         u_int32_t opflags;
00189         int ret;
00190 
00191         dbp = dbc->dbp;
00192 
00193         /* Yeah, right. */
00194         if (cp->page->level >= MAXBTREELEVEL) {
00195                 CDB___db_err(dbp->dbenv,
00196                     "Too many btree levels: %d", cp->page->level);
00197                 ret = ENOSPC;
00198                 goto err;
00199         }
00200 
00201         /* Create new left and right pages for the split. */
00202         lp = rp = NULL;
00203         if ((ret = CDB___db_new(dbc, TYPE_TAGS(cp->page), &lp)) != 0 ||
00204             (ret = CDB___db_new(dbc, TYPE_TAGS(cp->page), &rp)) != 0)
00205                 goto err;
00206         P_INIT(lp, dbp->pgsize, lp->pgno,
00207             PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno,
00208             cp->page->level, TYPE(cp->page), TAGS(cp->page));
00209         P_INIT(rp, dbp->pgsize, rp->pgno,
00210             ISINTERNAL(cp->page) ?  PGNO_INVALID : lp->pgno, PGNO_INVALID,
00211             cp->page->level, TYPE(cp->page), TAGS(cp->page));
00212 
00213         /* Split the page. */
00214         if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
00215                 goto err;
00216 
00217         /* Log the change. */
00218         if (DB_LOGGING(dbc)) {
00219                 memset(&log_dbt, 0, sizeof(log_dbt));
00220                 log_dbt.data = cp->page;
00221                 log_dbt.size = dbp->pgsize;
00222                 ZERO_LSN(log_lsn);
00223                 opflags = F_ISSET(
00224                     (BTREE_CURSOR *)dbc->internal, C_RECNUM) ? SPL_NRECS : 0;
00225                 if ((ret = CDB___bam_split_log(dbp->dbenv, dbc->txn,
00226                     &LSN(cp->page), 0, dbp->log_fileid, PGNO(lp), &LSN(lp),
00227                     PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp), 0, &log_lsn,
00228                     dbc->internal->root, &log_dbt, opflags)) != 0)
00229                         goto err;
00230                 LSN(lp) = LSN(rp) = LSN(cp->page);
00231         }
00232 
00233         /* Clean up the new root page. */
00234         if ((ret = (dbc->dbtype == DB_RECNO ?
00235             __ram_root(dbc, cp->page, lp, rp) :
00236             __bam_broot(dbc, cp->page, lp, rp))) != 0)
00237                 goto err;
00238 
00239         /* Adjust any cursors.  Do it last so we don't have to undo it. */
00240         CDB___bam_ca_split(dbp, cp->page->pgno, lp->pgno, rp->pgno, split, 1);
00241 
00242         /* Success -- write the real pages back to the store. */
00243         (void)CDB_memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
00244         (void)__TLPUT(dbc, cp->lock);
00245         (void)CDB_memp_fput(dbp->mpf, lp, DB_MPOOL_DIRTY);
00246         (void)CDB_memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY);
00247 
00248         return (0);
00249 
00250 err:    if (lp != NULL)
00251                 (void)CDB___db_free(dbc, lp);
00252         if (rp != NULL)
00253                 (void)CDB___db_free(dbc, rp);
00254         (void)CDB_memp_fput(dbp->mpf, cp->page, 0);
00255         (void)__TLPUT(dbc, cp->lock);
00256         return (ret);
00257 }
00258 
00259 /*
00260  * __bam_page --
00261  *      Split the non-root page of a btree.
00262  */
00263 static int
00264 __bam_page(dbc, pp, cp)
00265         DBC *dbc;
00266         EPG *pp, *cp;
00267 {
00268         BTREE_CURSOR *bc;
00269         DBT log_dbt;
00270         DB_LSN log_lsn;
00271         DB *dbp;
00272         DB_LOCK tplock;
00273         DB_LSN save_lsn;
00274         PAGE *lp, *rp, *alloc_rp, *tp;
00275         db_indx_t split;
00276         u_int32_t opflags;
00277         int ret, t_ret;
00278 
00279         dbp = dbc->dbp;
00280         alloc_rp = lp = rp = tp = NULL;
00281         tplock.off = LOCK_INVALID;
00282         ret = -1;
00283 
00284         /*
00285          * Create a new right page for the split, and fill in everything
00286          * except its LSN and page number.
00287          *
00288          * We malloc space for both the left and right pages, so we don't get
00289          * a new page from the underlying buffer pool until we know the split
00290          * is going to succeed.  The reason is that we can't release locks
00291          * acquired during the get-a-new-page process because metadata page
00292          * locks can't be discarded on failure since we may have modified the
00293          * free list.  So, if you assume that we're holding a write lock on the
00294          * leaf page which ran out of space and started this split (e.g., we
00295          * have already written records to the page, or we retrieved a record
00296          * from it with the DB_RMW flag set), failing in a split with both a
00297          * leaf page locked and the metadata page locked can potentially lock
00298          * up the tree badly, because we've violated the rule of always locking
00299          * down the tree, and never up.
00300          */
00301         if ((ret = CDB___os_malloc(dbp->dbenv, dbp->pgsize, NULL, &rp)) != 0)
00302                 goto err;
00303         P_INIT(rp, dbp->pgsize, 0,
00304             ISINTERNAL(cp->page) ? PGNO_INVALID : PGNO(cp->page),
00305             ISINTERNAL(cp->page) ? PGNO_INVALID : NEXT_PGNO(cp->page),
00306             cp->page->level, TYPE(cp->page), TAGS(cp->page));
00307 
00308         /*
00309          * Create new left page for the split, and fill in everything
00310          * except its LSN and next-page page number.
00311          */
00312         if ((ret = CDB___os_malloc(dbp->dbenv, dbp->pgsize, NULL, &lp)) != 0)
00313                 goto err;
00314         P_INIT(lp, dbp->pgsize, PGNO(cp->page),
00315             ISINTERNAL(cp->page) ?  PGNO_INVALID : PREV_PGNO(cp->page),
00316             ISINTERNAL(cp->page) ?  PGNO_INVALID : 0,
00317             cp->page->level, TYPE(cp->page), TAGS(cp->page));
00318 
00319         /*
00320          * Split right.
00321          *
00322          * Only the indices are sorted on the page, i.e., the key/data pairs
00323          * aren't, so it's simpler to copy the data from the split page onto
00324          * two new pages instead of copying half the data to a new right page
00325          * and compacting the left page in place.  Since the left page can't
00326          * change, we swap the original and the allocated left page after the
00327          * split.
00328          */
00329         if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
00330                 goto err;
00331 
00332         /*
00333          * Test to see if we are going to be able to insert the new pages into
00334          * the parent page.  The interesting failure here is that the parent
00335          * page can't hold the new keys, and has to be split in turn, in which
00336          * case we want to release all the locks we can.
00337          */
00338         if ((ret = __bam_pinsert(dbc, pp, lp, rp, 1)) != 0)
00339                 goto err;
00340 
00341         /*
00342          * Fix up the previous pointer of any leaf page following the split
00343          * page.
00344          *
00345          * There's interesting deadlock situations here as we try to write-lock
00346          * a page that's not in our direct ancestry.  Consider a cursor walking
00347          * backward through the leaf pages, that has our following page locked,
00348          * and is waiting on a lock for the page we're splitting.  In that case
00349          * we're going to deadlock here .  It's probably OK, stepping backward
00350          * through the tree isn't a common operation.
00351          */
00352         if (ISLEAF(cp->page) && NEXT_PGNO(cp->page) != PGNO_INVALID) {
00353                 if ((ret = CDB___db_lget(dbc,
00354                     0, NEXT_PGNO(cp->page), DB_LOCK_WRITE, 0, &tplock)) != 0)
00355                         goto err;
00356                 if ((ret =
00357                     CDB_memp_fget(dbp->mpf, &NEXT_PGNO(cp->page), 0, &tp)) != 0)
00358                         goto err;
00359         }
00360 
00361         /*
00362          * We've got everything locked down we need, and we know the split
00363          * is going to succeed.  Go and get the additional page we'll need.
00364          */
00365         if ((ret = CDB___db_new(dbc, TYPE_TAGS(cp->page), &alloc_rp)) != 0)
00366                 goto err;
00367 
00368         /*
00369          * Fix up the page numbers we didn't have before.  We have to do this
00370          * before calling __bam_pinsert because it may copy a page number onto
00371          * the parent page and it takes the page number from its page argument.
00372          */
00373         PGNO(rp) = NEXT_PGNO(lp) = PGNO(alloc_rp);
00374 
00375         /* Actually update the parent page. */
00376         if ((ret = __bam_pinsert(dbc, pp, lp, rp, 0)) != 0)
00377                 goto err;
00378 
00379         bc = (BTREE_CURSOR *)dbc->internal;
00380         /* Log the change. */
00381         if (DB_LOGGING(dbc)) {
00382                 memset(&log_dbt, 0, sizeof(log_dbt));
00383                 log_dbt.data = cp->page;
00384                 log_dbt.size = dbp->pgsize;
00385                 if (tp == NULL)
00386                         ZERO_LSN(log_lsn);
00387                 opflags = F_ISSET(bc, C_RECNUM) ? SPL_NRECS : 0;
00388                 if ((ret = CDB___bam_split_log(dbp->dbenv, dbc->txn,
00389                     &LSN(cp->page), 0, dbp->log_fileid, PGNO(cp->page),
00390                     &LSN(cp->page), PGNO(alloc_rp), &LSN(alloc_rp),
00391                     (u_int32_t)NUM_ENT(lp),
00392                     tp == NULL ? 0 : PGNO(tp),
00393                     tp == NULL ? &log_lsn : &LSN(tp),
00394                     bc->root, &log_dbt, opflags)) != 0)
00395                         goto err;
00396 
00397                 /* Update the LSNs for all involved pages. */
00398                 LSN(alloc_rp) = LSN(lp) = LSN(rp) = LSN(cp->page);
00399                 if (tp != NULL)
00400                         LSN(tp) = LSN(cp->page);
00401         }
00402 
00403         /*
00404          * Copy the left and right pages into place.  There are two paths
00405          * through here.  Either we are logging and we set the LSNs in the
00406          * logging path.  However, if we are not logging, then we do not
00407          * have valid LSNs on lp or rp.  The correct LSNs to use are the
00408          * ones on the page we got from CDB___db_new or the one that was
00409          * originally on cp->page.  In both cases, we save the LSN from the
00410          * real database page (not a malloc'd one) and reapply it after we
00411          * do the copy.
00412          */
00413         save_lsn = alloc_rp->lsn;
00414         memcpy(alloc_rp, rp, LOFFSET(rp));
00415         memcpy((u_int8_t *)alloc_rp + HOFFSET(rp),
00416             (u_int8_t *)rp + HOFFSET(rp), dbp->pgsize - HOFFSET(rp));
00417         alloc_rp->lsn = save_lsn;
00418 
00419         save_lsn = cp->page->lsn;
00420         memcpy(cp->page, lp, LOFFSET(lp));
00421         memcpy((u_int8_t *)cp->page + HOFFSET(lp),
00422             (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp));
00423         cp->page->lsn = save_lsn;
00424 
00425         /* Fix up the next-page link. */
00426         if (tp != NULL)
00427                 PREV_PGNO(tp) = PGNO(rp);
00428 
00429         /* Adjust any cursors.  Do it last so we don't have to undo it. */
00430         CDB___bam_ca_split(dbp, PGNO(cp->page), PGNO(cp->page), PGNO(rp), split, 0);
00431 
00432         CDB___os_free(lp, dbp->pgsize);
00433         CDB___os_free(rp, dbp->pgsize);
00434 
00435         /*
00436          * Success -- write the real pages back to the store.  As we never
00437          * acquired any sort of lock on the new page, we release it before
00438          * releasing locks on the pages that reference it.  We're finished
00439          * modifying the page so it's not really necessary, but it's neater.
00440          */
00441         if ((t_ret =
00442             CDB_memp_fput(dbp->mpf, alloc_rp, DB_MPOOL_DIRTY)) != 0 && ret == 0)
00443                 ret = t_ret;
00444         if ((t_ret =
00445             CDB_memp_fput(dbp->mpf, pp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
00446                 ret = t_ret;
00447         (void)__TLPUT(dbc, pp->lock);
00448         if ((t_ret =
00449             CDB_memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
00450                 ret = t_ret;
00451         (void)__TLPUT(dbc, cp->lock);
00452         if (tp != NULL) {
00453                 if ((t_ret =
00454                     CDB_memp_fput(dbp->mpf, tp, DB_MPOOL_DIRTY)) != 0 && ret == 0)
00455                         ret = t_ret;
00456                 (void)__TLPUT(dbc, tplock);
00457         }
00458         return (ret);
00459 
00460 err:    if (lp != NULL)
00461                 CDB___os_free(lp, dbp->pgsize);
00462         if (rp != NULL)
00463                 CDB___os_free(rp, dbp->pgsize);
00464         if (alloc_rp != NULL)
00465                 (void)CDB___db_free(dbc, alloc_rp);
00466 
00467         if (tp != NULL)
00468                 (void)CDB_memp_fput(dbp->mpf, tp, 0);
00469         if (tplock.off != LOCK_INVALID)
00470                 /* We never updated the next page, we can release it. */
00471                 (void)__LPUT(dbc, tplock);
00472 
00473         (void)CDB_memp_fput(dbp->mpf, pp->page, 0);
00474         if (ret == DB_NEEDSPLIT)
00475                 (void)__LPUT(dbc, pp->lock);
00476         else
00477                 (void)__TLPUT(dbc, pp->lock);
00478 
00479         (void)CDB_memp_fput(dbp->mpf, cp->page, 0);
00480         if (ret == DB_NEEDSPLIT)
00481                 (void)__LPUT(dbc, cp->lock);
00482         else
00483                 (void)__TLPUT(dbc, cp->lock);
00484 
00485         return (ret);
00486 }
00487 
00488 /*
00489  * __bam_broot --
00490  *      Fix up the btree root page after it has been split.
00491  */
00492 static int
00493 __bam_broot(dbc, rootp, lp, rp)
00494         DBC *dbc;
00495         PAGE *rootp, *lp, *rp;
00496 {
00497         BINTERNAL bi, *child_bi;
00498         BKEYDATA *child_bk;
00499         BTREE_CURSOR *cp;
00500         DB *dbp;
00501         DBT hdr, data;
00502         db_pgno_t root_pgno;
00503         int ret;
00504 
00505         dbp = dbc->dbp;
00506         cp = (BTREE_CURSOR *)dbc->internal;
00507 
00508         /*
00509          * If the root page was a leaf page, change it into an internal page.
00510          * We copy the key we split on (but not the key's data, in the case of
00511          * a leaf page) to the new root page.
00512          */
00513         root_pgno = cp->root;
00514         P_INIT(rootp, dbp->pgsize,
00515             root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE, TAGS(lp));
00516 #ifdef DEBUG
00517         word_monitor_set(DB_MONITOR(dbp->dbenv), WORD_MONITOR_LEVEL, LEVEL(rootp));
00518 #endif /* DEBUG */
00519         
00520         memset(&data, 0, sizeof(data));
00521         memset(&hdr, 0, sizeof(hdr));
00522 
00523         /*
00524          * The btree comparison code guarantees that the left-most key on any
00525          * internal btree page is never used, so it doesn't need to be filled
00526          * in.  Set the record count if necessary.
00527          */
00528         memset(&bi, 0, sizeof(bi));
00529         bi.len = 0;
00530         B_TSET(bi.type, B_KEYDATA, 0);
00531         bi.pgno = lp->pgno;
00532         if (F_ISSET(cp, C_RECNUM)) {
00533                 bi.nrecs = CDB___bam_total(lp);
00534                 RE_NREC_SET(rootp, bi.nrecs);
00535         }
00536         hdr.data = &bi;
00537         hdr.size = SSZA(BINTERNAL, data);
00538         if ((ret =
00539             CDB___db_pitem(dbc, rootp, 0, BINTERNAL_SIZE(0), &hdr, NULL)) != 0)
00540                 return (ret);
00541 
00542         switch (TYPE(rp)) {
00543         case P_IBTREE:
00544                 /* Copy the first key of the child page onto the root page. */
00545                 child_bi = GET_BINTERNAL(rp, 0);
00546 
00547                 bi.len = child_bi->len;
00548                 B_TSET(bi.type, child_bi->type, 0);
00549                 bi.pgno = rp->pgno;
00550                 if (F_ISSET(cp, C_RECNUM)) {
00551                         bi.nrecs = CDB___bam_total(rp);
00552                         RE_NREC_ADJ(rootp, bi.nrecs);
00553                 }
00554                 hdr.data = &bi;
00555                 hdr.size = SSZA(BINTERNAL, data);
00556                 data.data = child_bi->data;
00557                 data.size = child_bi->len;
00558                 if ((ret = CDB___db_pitem(dbc, rootp, 1,
00559                     BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
00560                         return (ret);
00561 
00562                 /* Increment the overflow ref count. */
00563                 if (B_TYPE(child_bi->type) == B_OVERFLOW)
00564                         if ((ret = CDB___db_ovref(dbc,
00565                             ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
00566                                 return (ret);
00567                 break;
00568         case P_LDUP:
00569         case P_LBTREE:
00570                 /* Copy the first key of the child page onto the root page. */
00571                 child_bk = GET_BKEYDATA(rp, 0);
00572                 switch (B_TYPE(child_bk->type)) {
00573                 case B_KEYDATA:
00574                         bi.len = child_bk->len;
00575                         B_TSET(bi.type, child_bk->type, 0);
00576                         bi.pgno = rp->pgno;
00577                         if (F_ISSET(cp, C_RECNUM)) {
00578                                 bi.nrecs = CDB___bam_total(rp);
00579                                 RE_NREC_ADJ(rootp, bi.nrecs);
00580                         }
00581                         hdr.data = &bi;
00582                         hdr.size = SSZA(BINTERNAL, data);
00583                         data.data = child_bk->data;
00584                         data.size = child_bk->len;
00585                         if ((ret = CDB___db_pitem(dbc, rootp, 1,
00586                             BINTERNAL_SIZE(child_bk->len), &hdr, &data)) != 0)
00587                                 return (ret);
00588                         break;
00589                 case B_DUPLICATE:
00590                 case B_OVERFLOW:
00591                         bi.len = BOVERFLOW_SIZE;
00592                         B_TSET(bi.type, child_bk->type, 0);
00593                         bi.pgno = rp->pgno;
00594                         if (F_ISSET(cp, C_RECNUM)) {
00595                                 bi.nrecs = CDB___bam_total(rp);
00596                                 RE_NREC_ADJ(rootp, bi.nrecs);
00597                         }
00598                         hdr.data = &bi;
00599                         hdr.size = SSZA(BINTERNAL, data);
00600                         data.data = child_bk;
00601                         data.size = BOVERFLOW_SIZE;
00602                         if ((ret = CDB___db_pitem(dbc, rootp, 1,
00603                             BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
00604                                 return (ret);
00605 
00606                         /* Increment the overflow ref count. */
00607                         if (B_TYPE(child_bk->type) == B_OVERFLOW)
00608                                 if ((ret = CDB___db_ovref(dbc,
00609                                     ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
00610                                         return (ret);
00611                         break;
00612                 default:
00613                         return (CDB___db_pgfmt(dbp, rp->pgno));
00614                 }
00615                 break;
00616         default:
00617                 return (CDB___db_pgfmt(dbp, rp->pgno));
00618         }
00619         return (0);
00620 }
00621 
00622 /*
00623  * __ram_root --
00624  *      Fix up the recno root page after it has been split.
00625  */
00626 static int
00627 __ram_root(dbc, rootp, lp, rp)
00628         DBC *dbc;
00629         PAGE *rootp, *lp, *rp;
00630 {
00631         DB *dbp;
00632         DBT hdr;
00633         RINTERNAL ri;
00634         db_pgno_t root_pgno;
00635         int ret;
00636 
00637         dbp = dbc->dbp;
00638         root_pgno = dbc->internal->root;
00639 
00640         /* Initialize the page. */
00641         P_INIT(rootp, dbp->pgsize,
00642             root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO, TAGS(lp));
00643 
00644         /* Initialize the header. */
00645         memset(&hdr, 0, sizeof(hdr));
00646         hdr.data = &ri;
00647         hdr.size = RINTERNAL_SIZE;
00648 
00649         /* Insert the left and right keys, set the header information. */
00650         ri.pgno = lp->pgno;
00651         ri.nrecs = CDB___bam_total(lp);
00652         if ((ret = CDB___db_pitem(dbc, rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0)
00653                 return (ret);
00654         RE_NREC_SET(rootp, ri.nrecs);
00655         ri.pgno = rp->pgno;
00656         ri.nrecs = CDB___bam_total(rp);
00657         if ((ret = CDB___db_pitem(dbc, rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0)
00658                 return (ret);
00659         RE_NREC_ADJ(rootp, ri.nrecs);
00660         return (0);
00661 }
00662 
00663 /*
00664  * __bam_pinsert --
00665  *      Insert a new key into a parent page, completing the split.
00666  */
00667 static int
00668 __bam_pinsert(dbc, parent, lchild, rchild, space_check)
00669         DBC *dbc;
00670         EPG *parent;
00671         PAGE *lchild, *rchild;
00672         int space_check;
00673 {
00674         BINTERNAL bi, *child_bi;
00675         BKEYDATA *child_bk, *tmp_bk;
00676         BTREE *t;
00677         BTREE_CURSOR *cp;
00678         DB *dbp;
00679         DBT a, b, hdr, data;
00680         PAGE *ppage;
00681         RINTERNAL ri;
00682         db_indx_t off;
00683         db_recno_t nrecs;
00684         size_t (*func) __P((const DBT *, const DBT *));
00685         u_int32_t n, nbytes, nksize;
00686         int ret;
00687 
00688         dbp = dbc->dbp;
00689         cp = (BTREE_CURSOR *)dbc->internal;
00690         t = dbp->bt_internal;
00691         ppage = parent->page;
00692 
00693         /* If handling record numbers, count records split to the right page. */
00694         nrecs = F_ISSET(cp, C_RECNUM) && !space_check ? CDB___bam_total(rchild) : 0;
00695 
00696         /*
00697          * Now we insert the new page's first key into the parent page, which
00698          * completes the split.  The parent points to a PAGE and a page index
00699          * offset, where the new key goes ONE AFTER the index, because we split
00700          * to the right.
00701          *
00702          * XXX
00703          * Some btree algorithms replace the key for the old page as well as
00704          * the new page.  We don't, as there's no reason to believe that the
00705          * first key on the old page is any better than the key we have, and,
00706          * in the case of a key being placed at index 0 causing the split, the
00707          * key is unavailable.
00708          */
00709         off = parent->indx + O_INDX;
00710 
00711         /*
00712          * Calculate the space needed on the parent page.
00713          *
00714          * Prefix trees: space hack used when inserting into BINTERNAL pages.
00715          * Retain only what's needed to distinguish between the new entry and
00716          * the LAST entry on the page to its left.  If the keys compare equal,
00717          * retain the entire key.  We ignore overflow keys, and the entire key
00718          * must be retained for the next-to-leftmost key on the leftmost page
00719          * of each level, or the search will fail.  Applicable ONLY to internal
00720          * pages that have leaf pages as children.  Further reduction of the
00721          * key between pairs of internal pages loses too much information.
00722          */
00723         switch (TYPE(rchild)) {
00724         case P_IBTREE:
00725                 child_bi = GET_BINTERNAL(rchild, 0);
00726                 nbytes = BINTERNAL_PSIZE(child_bi->len);
00727 
00728                 if (P_FREESPACE(ppage) < nbytes)
00729                         return (DB_NEEDSPLIT);
00730                 if (space_check)
00731                         return (0);
00732 
00733                 /* Add a new record for the right page. */
00734                 memset(&bi, 0, sizeof(bi));
00735                 bi.len = child_bi->len;
00736                 B_TSET(bi.type, child_bi->type, 0);
00737                 bi.pgno = rchild->pgno;
00738                 bi.nrecs = nrecs;
00739                 memset(&hdr, 0, sizeof(hdr));
00740                 hdr.data = &bi;
00741                 hdr.size = SSZA(BINTERNAL, data);
00742                 memset(&data, 0, sizeof(data));
00743                 data.data = child_bi->data;
00744                 data.size = child_bi->len;
00745                 if ((ret = CDB___db_pitem(dbc, ppage, off,
00746                     BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
00747                         return (ret);
00748 
00749                 /* Increment the overflow ref count. */
00750                 if (B_TYPE(child_bi->type) == B_OVERFLOW)
00751                         if ((ret = CDB___db_ovref(dbc,
00752                             ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
00753                                 return (ret);
00754                 break;
00755         case P_LDUP:
00756         case P_LBTREE:
00757                 child_bk = GET_BKEYDATA(rchild, 0);
00758                 switch (B_TYPE(child_bk->type)) {
00759                 case B_KEYDATA:
00760                         /*
00761                          * We set t->bt_prefix to NULL if we have a comparison
00762                          * callback but no prefix compression callback.  But,
00763                          * if we're splitting in an off-page duplicates tree,
00764                          * we still have to do some checking.  If using the
00765                          * default off-page duplicates comparison routine we
00766                          * can use the default prefix compression callback. If
00767                          * not using the default off-page duplicates comparison
00768                          * routine, we can't do any kind of prefix compression
00769                          * as there's no way for an application to specify a
00770                          * prefix compression callback that corresponds to its
00771                          * comparison callback.
00772                          */
00773                         if (F_ISSET(dbc, DBC_OPD)) {
00774                                 if (dbp->dup_compare == CDB___bam_defcmp)
00775                                         func = CDB___bam_defpfx;
00776                                 else
00777                                         func = NULL;
00778                         } else
00779                                 func = t->bt_prefix;
00780 
00781                         nbytes = BINTERNAL_PSIZE(child_bk->len);
00782                         nksize = child_bk->len;
00783                         if (func == NULL)
00784                                 goto noprefix;
00785                         if (ppage->prev_pgno == PGNO_INVALID && off <= 1)
00786                                 goto noprefix;
00787                         tmp_bk = GET_BKEYDATA(lchild, NUM_ENT(lchild) -
00788                             (TYPE(lchild) == P_LDUP ? O_INDX : P_INDX));
00789                         if (B_TYPE(tmp_bk->type) != B_KEYDATA)
00790                                 goto noprefix;
00791                         memset(&a, 0, sizeof(a));
00792                         a.size = tmp_bk->len;
00793                         a.data = tmp_bk->data;
00794                         memset(&b, 0, sizeof(b));
00795                         b.size = child_bk->len;
00796                         b.data = child_bk->data;
00797                         nksize = func(&a, &b);
00798                         if ((n = BINTERNAL_PSIZE(nksize)) < nbytes)
00799                                 nbytes = n;
00800                         else
00801 noprefix:                       nksize = child_bk->len;
00802 
00803                         if (P_FREESPACE(ppage) < nbytes)
00804                                 return (DB_NEEDSPLIT);
00805                         if (space_check)
00806                                 return (0);
00807 
00808                         memset(&bi, 0, sizeof(bi));
00809                         bi.len = nksize;
00810                         B_TSET(bi.type, child_bk->type, 0);
00811                         bi.pgno = rchild->pgno;
00812                         bi.nrecs = nrecs;
00813                         memset(&hdr, 0, sizeof(hdr));
00814                         hdr.data = &bi;
00815                         hdr.size = SSZA(BINTERNAL, data);
00816                         memset(&data, 0, sizeof(data));
00817                         data.data = child_bk->data;
00818                         data.size = nksize;
00819                         if ((ret = CDB___db_pitem(dbc, ppage, off,
00820                             BINTERNAL_SIZE(nksize), &hdr, &data)) != 0)
00821                                 return (ret);
00822                         break;
00823                 case B_DUPLICATE:
00824                 case B_OVERFLOW:
00825                         nbytes = BINTERNAL_PSIZE(BOVERFLOW_SIZE);
00826 
00827                         if (P_FREESPACE(ppage) < nbytes)
00828                                 return (DB_NEEDSPLIT);
00829                         if (space_check)
00830                                 return (0);
00831 
00832                         memset(&bi, 0, sizeof(bi));
00833                         bi.len = BOVERFLOW_SIZE;
00834                         B_TSET(bi.type, child_bk->type, 0);
00835                         bi.pgno = rchild->pgno;
00836                         bi.nrecs = nrecs;
00837                         memset(&hdr, 0, sizeof(hdr));
00838                         hdr.data = &bi;
00839                         hdr.size = SSZA(BINTERNAL, data);
00840                         memset(&data, 0, sizeof(data));
00841                         data.data = child_bk;
00842                         data.size = BOVERFLOW_SIZE;
00843                         if ((ret = CDB___db_pitem(dbc, ppage, off,
00844                             BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
00845                                 return (ret);
00846 
00847                         /* Increment the overflow ref count. */
00848                         if (B_TYPE(child_bk->type) == B_OVERFLOW)
00849                                 if ((ret = CDB___db_ovref(dbc,
00850                                     ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
00851                                         return (ret);
00852                         break;
00853                 default:
00854                         return (CDB___db_pgfmt(dbp, rchild->pgno));
00855                 }
00856                 break;
00857         case P_IRECNO:
00858         case P_LRECNO:
00859                 nbytes = RINTERNAL_PSIZE;
00860 
00861                 if (P_FREESPACE(ppage) < nbytes)
00862                         return (DB_NEEDSPLIT);
00863                 if (space_check)
00864                         return (0);
00865 
00866                 /* Add a new record for the right page. */
00867                 memset(&hdr, 0, sizeof(hdr));
00868                 hdr.data = &ri;
00869                 hdr.size = RINTERNAL_SIZE;
00870                 ri.pgno = rchild->pgno;
00871                 ri.nrecs = nrecs;
00872                 if ((ret = CDB___db_pitem(dbc,
00873                     ppage, off, RINTERNAL_SIZE, &hdr, NULL)) != 0)
00874                         return (ret);
00875                 break;
00876         default:
00877                 return (CDB___db_pgfmt(dbp, rchild->pgno));
00878         }
00879 
00880         /*
00881          * If a Recno or Btree with record numbers AM page, or an off-page
00882          * duplicates tree, adjust the parent page's left page record count.
00883          */
00884         if (F_ISSET(cp, C_RECNUM)) {
00885                 /* Log the change. */
00886                 if (DB_LOGGING(dbc) &&
00887                     (ret = CDB___bam_cadjust_log(dbp->dbenv, dbc->txn,
00888                     &LSN(ppage), 0, dbp->log_fileid, PGNO(ppage),
00889                     &LSN(ppage), parent->indx, -(int32_t)nrecs, 0)) != 0)
00890                         return (ret);
00891 
00892                 /* Update the left page count. */
00893                 if (dbc->dbtype == DB_RECNO)
00894                         GET_RINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
00895                 else
00896                         GET_BINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
00897         }
00898 
00899         return (0);
00900 }
00901 
00902 /*
00903  * __bam_psplit --
00904  *      Do the real work of splitting the page.
00905  */
00906 static int
00907 __bam_psplit(dbc, cp, lp, rp, splitret)
00908         DBC *dbc;
00909         EPG *cp;
00910         PAGE *lp, *rp;
00911         db_indx_t *splitret;
00912 {
00913         DB *dbp;
00914         PAGE *pp;
00915         db_indx_t half, nbytes, off, splitp, top;
00916         int adjust, cnt, iflag, isbigkey, ret;
00917 
00918         dbp = dbc->dbp;
00919         pp = cp->page;
00920         adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX;
00921 
00922         /*
00923          * If we're splitting the first (last) page on a level because we're
00924          * inserting (appending) a key to it, it's likely that the data is
00925          * sorted.  Moving a single item to the new page is less work and can
00926          * push the fill factor higher than normal.  If we're wrong it's not
00927          * a big deal, we'll just do the split the right way next time.
00928          */
00929         off = 0;
00930         if (NEXT_PGNO(pp) == PGNO_INVALID &&
00931             ((ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page) - 1) ||
00932             (!ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page))))
00933                 off = NUM_ENT(cp->page) - adjust;
00934         else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0)
00935                 off = adjust;
00936 
00937         if (off != 0)
00938                 goto sort;
00939 
00940         /*
00941          * Split the data to the left and right pages.  Try not to split on
00942          * an overflow key.  (Overflow keys on internal pages will slow down
00943          * searches.)  Refuse to split in the middle of a set of duplicates.
00944          *
00945          * First, find the optimum place to split.
00946          *
00947          * It's possible to try and split past the last record on the page if
00948          * there's a very large record at the end of the page.  Make sure this
00949          * doesn't happen by bounding the check at the next-to-last entry on
00950          * the page.
00951          *
00952          * Note, we try and split half the data present on the page.  This is
00953          * because another process may have already split the page and left
00954          * it half empty.  We don't try and skip the split -- we don't know
00955          * how much space we're going to need on the page, and we may need up
00956          * to half the page for a big item, so there's no easy test to decide
00957          * if we need to split or not.  Besides, if two threads are inserting
00958          * data into the same place in the database, we're probably going to
00959          * need more space soon anyway.
00960          */
00961         top = NUM_ENT(pp) - adjust;
00962         half = (dbp->pgsize - HOFFSET(pp)) / 2;
00963         for (nbytes = 0, off = 0; off < top && nbytes < half; ++off)
00964                 switch (TYPE(pp)) {
00965                 case P_IBTREE:
00966                         if (B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA)
00967                                 nbytes +=
00968                                    BINTERNAL_SIZE(GET_BINTERNAL(pp, off)->len);
00969                         else
00970                                 nbytes += BINTERNAL_SIZE(BOVERFLOW_SIZE);
00971                         break;
00972                 case P_LBTREE:
00973                         if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
00974                                 nbytes +=
00975                                     BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
00976                         else
00977                                 nbytes += BOVERFLOW_SIZE;
00978 
00979                         ++off;
00980                         /* FALLTHROUGH */
00981                 case P_LDUP:
00982                 case P_LRECNO:
00983                         if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
00984                                 nbytes +=
00985                                     BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
00986                         else
00987                                 nbytes += BOVERFLOW_SIZE;
00988                         break;
00989                 case P_IRECNO:
00990                         nbytes += RINTERNAL_SIZE;
00991                         break;
00992                 default:
00993                         return (CDB___db_pgfmt(dbp, pp->pgno));
00994                 }
00995 sort:   splitp = off;
00996 
00997         /*
00998          * Splitp is either at or just past the optimum split point.  If the
00999          * tree type is such that we're going to promote a key to an internal
01000          * page, and our current choice is an overflow key, look for something
01001          * close by that's smaller.
01002          */
01003         switch (TYPE(pp)) {
01004         case P_IBTREE:
01005                 iflag = 1;
01006                 isbigkey = B_TYPE(GET_BINTERNAL(pp, off)->type) != B_KEYDATA;
01007                 break;
01008         case P_LBTREE:
01009         case P_LDUP:
01010                 iflag = 0;
01011                 isbigkey = B_TYPE(GET_BKEYDATA(pp, off)->type) != B_KEYDATA;
01012                 break;
01013         default:
01014                 iflag = isbigkey = 0;
01015         }
01016         if (isbigkey)
01017                 for (cnt = 1; cnt <= 3; ++cnt) {
01018                         off = splitp + cnt * adjust;
01019                         if (off < (db_indx_t)NUM_ENT(pp) &&
01020                             ((iflag &&
01021                             B_TYPE(GET_BINTERNAL(pp,off)->type) == B_KEYDATA) ||
01022                             B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)) {
01023                                 splitp = off;
01024                                 break;
01025                         }
01026                         if (splitp <= (db_indx_t)(cnt * adjust))
01027                                 continue;
01028                         off = splitp - cnt * adjust;
01029                         if (iflag ?
01030                             B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA :
01031                             B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA) {
01032                                 splitp = off;
01033                                 break;
01034                         }
01035                 }
01036 
01037         /*
01038          * We can't split in the middle a set of duplicates.  We know that
01039          * no duplicate set can take up more than about 25% of the page,
01040          * because that's the point where we push it off onto a duplicate
01041          * page set.  So, this loop can't be unbounded.
01042          */
01043         if (TYPE(pp) == P_LBTREE &&
01044             pp->inp[splitp] == pp->inp[splitp - adjust])
01045                 for (cnt = 1;; ++cnt) {
01046                         off = splitp + cnt * adjust;
01047                         if (off < NUM_ENT(pp) &&
01048                             pp->inp[splitp] != pp->inp[off]) {
01049                                 splitp = off;
01050                                 break;
01051                         }
01052                         if (splitp <= (db_indx_t)(cnt * adjust))
01053                                 continue;
01054                         off = splitp - cnt * adjust;
01055                         if (pp->inp[splitp] != pp->inp[off]) {
01056                                 splitp = off + adjust;
01057                                 break;
01058                         }
01059                 }
01060 
01061         /* We're going to split at splitp. */
01062         if ((ret = CDB___bam_copy(dbp, pp, lp, 0, splitp)) != 0)
01063                 return (ret);
01064         if ((ret = CDB___bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0)
01065                 return (ret);
01066 
01067         *splitret = splitp;
01068         return (0);
01069 }
01070 
01071 /*
01072  * CDB___bam_copy --
01073  *      Copy a set of records from one page to another.
01074  *
01075  * PUBLIC: int CDB___bam_copy __P((DB *, PAGE *, PAGE *, u_int32_t, u_int32_t));
01076  */
01077 int
01078 CDB___bam_copy(dbp, pp, cp, nxt, stop)
01079         DB *dbp;
01080         PAGE *pp, *cp;
01081         u_int32_t nxt, stop;
01082 {
01083         db_indx_t nbytes, off;
01084 
01085         /*
01086          * Copy the rest of the data to the right page.  Nxt is the next
01087          * offset placed on the target page.
01088          */
01089         for (off = 0; nxt < stop; ++nxt, ++NUM_ENT(cp), ++off) {
01090                 switch (TYPE(pp)) {
01091                 case P_IBTREE:
01092                         if (B_TYPE(GET_BINTERNAL(pp, nxt)->type) == B_KEYDATA)
01093                                 nbytes =
01094                                     BINTERNAL_SIZE(GET_BINTERNAL(pp, nxt)->len);
01095                         else
01096                                 nbytes = BINTERNAL_SIZE(BOVERFLOW_SIZE);
01097                         break;
01098                 case P_LBTREE:
01099                         /*
01100                          * If we're on a key and it's a duplicate, just copy
01101                          * the offset.
01102                          */
01103                         if (off != 0 && (nxt % P_INDX) == 0 &&
01104                             pp->inp[nxt] == pp->inp[nxt - P_INDX]) {
01105                                 cp->inp[off] = cp->inp[off - P_INDX];
01106                                 continue;
01107                         }
01108                         /* FALLTHROUGH */
01109                 case P_LDUP:
01110                 case P_LRECNO:
01111                         if (B_TYPE(GET_BKEYDATA(pp, nxt)->type) == B_KEYDATA)
01112                                 nbytes =
01113                                     BKEYDATA_SIZE(GET_BKEYDATA(pp, nxt)->len);
01114                         else
01115                                 nbytes = BOVERFLOW_SIZE;
01116                         break;
01117                 case P_IRECNO:
01118                         nbytes = RINTERNAL_SIZE;
01119                         break;
01120                 default:
01121                         return (CDB___db_pgfmt(dbp, pp->pgno));
01122                 }
01123                 cp->inp[off] = HOFFSET(cp) -= nbytes;
01124                 memcpy(P_ENTRY(cp, off), P_ENTRY(pp, nxt), nbytes);
01125         }
01126         return (0);
01127 }

Generated on Sun Jun 8 10:56:35 2008 for GNUmifluz by  doxygen 1.5.5