hash.c

Go to the documentation of this file.
00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 1996, 1997, 1998, 1999, 2000
00005  *      Sleepycat Software.  All rights reserved.
00006  */
00007 /*
00008  * Copyright (c) 1990, 1993, 1994
00009  *      Margo Seltzer.  All rights reserved.
00010  */
00011 /*
00012  * Copyright (c) 1990, 1993, 1994
00013  *      The Regents of the University of California.  All rights reserved.
00014  *
00015  * This code is derived from software contributed to Berkeley by
00016  * Margo Seltzer.
00017  *
00018  * Redistribution and use in source and binary forms, with or without
00019  * modification, are permitted provided that the following conditions
00020  * are met:
00021  * 1. Redistributions of source code must retain the above copyright
00022  *    notice, this list of conditions and the following disclaimer.
00023  * 2. Redistributions in binary form must reproduce the above copyright
00024  *    notice, this list of conditions and the following disclaimer in the
00025  *    documentation and/or other materials provided with the distribution.
00026  * 3. Neither the name of the University nor the names of its contributors
00027  *    may be used to endorse or promote products derived from this software
00028  *    without specific prior written permission.
00029  *
00030  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
00031  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00032  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00033  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
00034  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00035  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
00036  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
00037  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00038  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
00039  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
00040  * SUCH DAMAGE.
00041  */
00042 
00043 #include "config.h"
00044 
00045 #ifndef lint
00046 static const char revid[] = "$Id: hash_8c-source.html,v 1.1 2008/06/08 10:19:09 sebdiaz Exp $";
00047 #endif /* not lint */
00048 
00049 #ifndef NO_SYSTEM_INCLUDES
00050 #include <sys/types.h>
00051 
00052 #include <errno.h>
00053 #include <stdlib.h>
00054 #include <string.h>
00055 #endif
00056 
00057 #include "db_int.h"
00058 #include "db_page.h"
00059 #include "db_am.h"
00060 #include "db_ext.h"
00061 #include "db_shash.h"
00062 #include "db_swap.h"
00063 #include "hash.h"
00064 #include "btree.h"
00065 #include "log.h"
00066 #include "lock.h"
00067 #include "txn.h"
00068 
00069 static int  __ham_c_close __P((DBC *, db_pgno_t, int *));
00070 static int  __ham_c_del __P((DBC *));
00071 static int  __ham_c_destroy __P((DBC *));
00072 static int  __ham_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
00073 static int  __ham_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
00074 static int  __ham_c_writelock __P((DBC *));
00075 static int  __ham_del_dups __P((DBC *, DBT *));
00076 static int  __ham_delete __P((DB *, DB_TXN *, DBT *, u_int32_t));
00077 static int  __ham_dup_return __P((DBC *, DBT *, u_int32_t));
00078 static int  __ham_expand_table __P((DBC *));
00079 static int  __ham_init_htab __P((DBC *,
00080                 const char *, db_pgno_t, u_int32_t, u_int32_t));
00081 static int  __ham_lookup __P((DBC *,
00082                 const DBT *, u_int32_t, db_lockmode_t, db_pgno_t *));
00083 static int  __ham_overwrite __P((DBC *, DBT *, u_int32_t));
00084 
00085 /*
00086  * CDB___ham_metachk --
00087  *
00088  * PUBLIC: int CDB___ham_metachk __P((DB *, const char *, HMETA *));
00089  */
00090 int
00091 CDB___ham_metachk(dbp, name, hashm)
00092         DB *dbp;
00093         const char *name;
00094         HMETA *hashm;
00095 {
00096         DB_ENV *dbenv;
00097         u_int32_t vers;
00098         int ret;
00099 
00100         dbenv = dbp->dbenv;
00101 
00102         /*
00103          * At this point, all we know is that the magic number is for a Hash.
00104          * Check the version, the database may be out of date.
00105          */
00106         vers = hashm->dbmeta.version;
00107         if (F_ISSET(dbp, DB_AM_SWAP))
00108                 M_32_SWAP(vers);
00109         switch (vers) {
00110         case 4:
00111         case 5:
00112         case 6:
00113                 CDB___db_err(dbenv,
00114                     "%s: hash version %lu requires a version upgrade",
00115                     name, (u_long)vers);
00116                 return (DB_OLD_VERSION);
00117         case 7:
00118                 break;
00119         default:
00120                 CDB___db_err(dbenv,
00121                     "%s: unsupported hash version: %lu", name, (u_long)vers);
00122                 return (EINVAL);
00123         }
00124 
00125         /* Swap the page if we need to. */
00126         if (F_ISSET(dbp, DB_AM_SWAP) && (ret = CDB___ham_mswap((PAGE *)hashm)) != 0)
00127                 return (ret);
00128 
00129         /* Check the type. */
00130         if (dbp->type != DB_HASH && dbp->type != DB_UNKNOWN)
00131                 return (EINVAL);
00132         dbp->type = DB_HASH;
00133         DB_ILLEGAL_METHOD(dbp, DB_OK_HASH);
00134 
00135         /*
00136          * Check application info against metadata info, and set info, flags,
00137          * and type based on metadata info.
00138          */
00139         if ((ret = CDB___db_fchk(dbenv,
00140             "DB->open", hashm->dbmeta.flags,
00141             DB_HASH_DUP | DB_HASH_SUBDB | DB_HASH_DUPSORT)) != 0)
00142                 return (ret);
00143 
00144         if (F_ISSET(&hashm->dbmeta, DB_HASH_DUP))
00145                 F_SET(dbp, DB_AM_DUP);
00146         else
00147                 if (F_ISSET(dbp, DB_AM_DUP)) {
00148                         CDB___db_err(dbenv,
00149                 "%s: DB_DUP specified to open method but not set in database",
00150                             name);
00151                         return (EINVAL);
00152                 }
00153 
00154         if (F_ISSET(&hashm->dbmeta, DB_HASH_SUBDB))
00155                 F_SET(dbp, DB_AM_SUBDB);
00156         else
00157                 if (F_ISSET(dbp, DB_AM_SUBDB)) {
00158                         CDB___db_err(dbenv,
00159             "%s: multiple databases specified but not supported in file",
00160                         name);
00161                         return (EINVAL);
00162                 }
00163 
00164         if (F_ISSET(&hashm->dbmeta, DB_HASH_DUPSORT)) {
00165                 if (dbp->dup_compare == NULL)
00166                         dbp->dup_compare = CDB___bam_defcmp;
00167         } else
00168                 if (dbp->dup_compare != NULL) {
00169                         CDB___db_err(dbenv,
00170                 "%s: duplicate sort function specified but not set in database",
00171                             name);
00172                         return (EINVAL);
00173                 }
00174 
00175         /* Set the page size. */
00176         dbp->pgsize = hashm->dbmeta.pagesize;
00177 
00178         /* Copy the file's ID. */
00179         memcpy(dbp->fileid, hashm->dbmeta.uid, DB_FILE_ID_LEN);
00180 
00181         return (0);
00182 }
00183 
00184 /*
00185  * CDB___ham_open --
00186  *
00187  * PUBLIC: int CDB___ham_open __P((DB *, const char *, db_pgno_t, u_int32_t));
00188  */
00189 int
00190 CDB___ham_open(dbp, name, base_pgno, flags)
00191         DB *dbp;
00192         const char *name;
00193         db_pgno_t base_pgno;
00194         u_int32_t flags;
00195 {
00196         DB_ENV *dbenv;
00197         DBC *dbc;
00198         HASH_CURSOR *hcp;
00199         HASH *hashp;
00200         int need_sync, ret, t_ret;
00201 
00202         dbc = NULL;
00203         dbenv = dbp->dbenv;
00204         need_sync = 0;
00205 
00206         /* Initialize the remaining fields/methods of the DB. */
00207         dbp->del = __ham_delete;
00208         dbp->stat = CDB___ham_stat;
00209 
00210         /* 
00211          * Get a cursor.  If DB_CREATE is specified, we may be creating
00212          * pages, and to do that safely in CDB we need a write cursor.
00213          * In STD_LOCKING mode, we'll synchronize using the meta page
00214          * lock instead.
00215          */
00216         if ((ret = dbp->cursor(dbp,
00217             dbp->open_txn, &dbc, LF_ISSET(DB_CREATE) && LOCKING(dbenv) ? 
00218             DB_WRITECURSOR : 0)) != 0)
00219                 return (ret);
00220 
00221         hcp = (HASH_CURSOR *)dbc->internal;
00222         hashp = dbp->h_internal;
00223         hashp->meta_pgno = base_pgno;
00224         if ((ret = CDB___ham_get_meta(dbc)) != 0)
00225                 goto err1;
00226 
00227         /*
00228          * If this is a new file, initialize it, and put it back dirty.
00229          *
00230          * Initialize the hdr structure.
00231          */
00232         if (hcp->hdr->dbmeta.magic == DB_HASHMAGIC) {
00233                 /* File exists, verify the data in the header. */
00234                 if (hashp->h_hash == NULL)
00235                         hashp->h_hash = hcp->hdr->dbmeta.version < 5
00236                         ? CDB___ham_func4 : CDB___ham_func5;
00237                 if (hashp->h_hash(CHARKEY, sizeof(CHARKEY)) !=
00238                     hcp->hdr->h_charkey) {
00239                         CDB___db_err(dbp->dbenv,
00240                             "hash: incompatible hash function");
00241                         ret = EINVAL;
00242                         goto err2;
00243                 }
00244                 if (F_ISSET(&hcp->hdr->dbmeta, DB_HASH_DUP))
00245                         F_SET(dbp, DB_AM_DUP);
00246                 if (F_ISSET(&hcp->hdr->dbmeta, DB_HASH_DUPSORT))
00247                         F_SET(dbp, DB_AM_DUPSORT);
00248                 if (F_ISSET(&hcp->hdr->dbmeta, DB_HASH_SUBDB))
00249                         F_SET(dbp, DB_AM_SUBDB);
00250         } else if (!IS_RECOVERING(dbenv)) {
00251                 /*
00252                  * File does not exist, we must initialize the header.  If
00253                  * locking is enabled that means getting a write lock first.
00254                  * During recovery the meta page will be in the log.
00255                  */
00256                 dbc->lock.pgno = base_pgno;
00257 
00258                 if (STD_LOCKING(dbc) &&
00259                     ((ret = CDB_lock_put(dbenv, &hcp->hlock)) != 0 ||
00260                     (ret = CDB_lock_get(dbenv, dbc->locker,
00261                     DB_NONBLOCK(dbc) ? DB_LOCK_NOWAIT : 0,
00262                     &dbc->lock_dbt, DB_LOCK_WRITE, &hcp->hlock)) != 0))
00263                         goto err2;
00264                 else if (LOCKING(dbp->dbenv)) {
00265                         DB_ASSERT(LF_ISSET(DB_CREATE));
00266                         if ((ret = CDB_lock_get(dbenv, dbc->locker,
00267                             DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
00268                             &dbc->mylock)) != 0)
00269                                 goto err2;
00270                 }
00271                 if ((ret = __ham_init_htab(dbc, name,
00272                     base_pgno, hashp->h_nelem, hashp->h_ffactor)) != 0)
00273                         goto err2;
00274 
00275                 need_sync = 1;
00276         }
00277 
00278 err2:   /* Release the meta data page */
00279         if ((t_ret = CDB___ham_release_meta(dbc)) != 0 && ret == 0)
00280                 ret = t_ret;
00281 err1:   if ((t_ret  = dbc->c_close(dbc)) != 0 && ret == 0)
00282                 ret = t_ret;
00283 
00284         /* Sync the file so that we know that the meta data goes to disk. */
00285         if (ret == 0 && need_sync)
00286                 ret = dbp->sync(dbp, 0);
00287 #if CONFIG_TEST
00288         if (ret == 0)
00289                 DB_TEST_RECOVERY(dbp, DB_TEST_POSTSYNC, ret, name);
00290 
00291 DB_TEST_RECOVERY_LABEL
00292 #endif
00293         if (ret != 0)
00294                 (void)CDB___ham_db_close(dbp);
00295 
00296         return (ret);
00297 }
00298 
00299 /************************** LOCAL CREATION ROUTINES **********************/
00300 /*
00301  * Returns 0 on No Error
00302  */
00303 static int
00304 __ham_init_htab(dbc, name, pgno, nelem, ffactor)
00305         DBC *dbc;
00306         const char *name;
00307         db_pgno_t pgno;
00308         u_int32_t nelem, ffactor;
00309 {
00310         DB *dbp;
00311         DB_LOCK metalock;
00312         DB_LSN orig_lsn;
00313         DBMETA *mmeta;
00314         HASH_CURSOR *hcp;
00315         HASH *hashp;
00316         PAGE *h;
00317         db_pgno_t mpgno;
00318         int32_t l2, nbuckets;
00319         int dirty_mmeta, i, ret, t_ret;
00320 
00321         hcp = (HASH_CURSOR *)dbc->internal;
00322         dbp = dbc->dbp;
00323         hashp = dbp->h_internal;
00324         mmeta = NULL;
00325         dirty_mmeta = 0;
00326         metalock.off = LOCK_INVALID;
00327 
00328         if (hashp->h_hash == NULL)
00329                 hashp->h_hash = DB_HASHVERSION < 5 ? CDB___ham_func4 : CDB___ham_func5;
00330 
00331         if (nelem != 0 && ffactor != 0) {
00332                 nelem = (nelem - 1) / ffactor + 1;
00333                 l2 = CDB___db_log2(nelem > 2 ? nelem : 2);
00334         } else
00335                 l2 = 1;
00336         nbuckets = 1 << l2;
00337 
00338         orig_lsn = hcp->hdr->dbmeta.lsn;
00339         memset(hcp->hdr, 0, sizeof(HMETA));
00340         ZERO_LSN(hcp->hdr->dbmeta.lsn);
00341         hcp->hdr->dbmeta.pgno = pgno;
00342         hcp->hdr->dbmeta.magic = DB_HASHMAGIC;
00343         hcp->hdr->dbmeta.version = DB_HASHVERSION;
00344         hcp->hdr->dbmeta.pagesize = dbp->pgsize;
00345         hcp->hdr->dbmeta.type = P_HASHMETA;
00346         hcp->hdr->dbmeta.free = PGNO_INVALID;
00347         hcp->hdr->max_bucket = hcp->hdr->high_mask = nbuckets - 1;
00348         hcp->hdr->low_mask = (nbuckets >> 1) - 1;
00349         hcp->hdr->ffactor = ffactor;
00350         hcp->hdr->h_charkey = hashp->h_hash(CHARKEY, sizeof(CHARKEY));
00351         memcpy(hcp->hdr->dbmeta.uid, dbp->fileid, DB_FILE_ID_LEN);
00352 
00353         if (F_ISSET(dbp, DB_AM_DUP))
00354                 F_SET(&hcp->hdr->dbmeta, DB_HASH_DUP);
00355         if (F_ISSET(dbp, DB_AM_SUBDB))
00356                 F_SET(&hcp->hdr->dbmeta, DB_HASH_SUBDB);
00357         if (dbp->dup_compare != NULL)
00358                 F_SET(&hcp->hdr->dbmeta, DB_HASH_DUPSORT);
00359 
00360         if ((ret = CDB___ham_dirty_page(dbp, (PAGE *)hcp->hdr)) != 0)
00361                 goto err;
00362 
00363         /*
00364          * Create the first and second buckets pages so that we have the
00365          * page numbers for them and we can store that page number
00366          * in the meta-data header (spares[0]).
00367          */
00368         hcp->hdr->spares[0] = nbuckets;
00369         if ((ret = CDB_memp_fget(dbp->mpf,
00370             &hcp->hdr->spares[0], DB_MPOOL_NEW_GROUP, &h)) != 0)
00371                 goto err;
00372 
00373         P_INIT(h, dbp->pgsize, hcp->hdr->spares[0], PGNO_INVALID,
00374             PGNO_INVALID, 0, P_HASH, 0);
00375 
00376         /* Fill in the last fields of the meta data page. */
00377         hcp->hdr->spares[0] -= (nbuckets - 1);
00378         for (i = 1; i <= l2; i++)
00379                 hcp->hdr->spares[i] = hcp->hdr->spares[0];
00380         for (; i < NCACHED; i++)
00381                 hcp->hdr->spares[i] = PGNO_INVALID;
00382 
00383         /*
00384          * Before we are about to put any dirty pages, we need to log
00385          * the meta-data page create.
00386          */
00387         ret = CDB___db_log_page(dbp, name, &orig_lsn, pgno, (PAGE *)hcp->hdr);
00388 
00389         if (dbp->open_txn != NULL) {
00390                 mmeta = (DBMETA *) hcp->hdr;
00391                 if (F_ISSET(dbp, DB_AM_SUBDB)) {
00392 
00393                         /*
00394                          * If this is a subdatabase, then we need to
00395                          * get the LSN off the master meta data page
00396                          * because that's where free pages are linked
00397                          * and during recovery we need to access
00398                          * that page and roll it backward/forward
00399                          * correctly with respect to LSN.
00400                          */
00401                         mpgno = PGNO_BASE_MD;
00402                         if ((ret = CDB___db_lget(dbc,
00403                            0, mpgno, DB_LOCK_WRITE, 0, &metalock)) != 0)
00404                                 return (ret);
00405                         if ((ret = CDB_memp_fget(dbp->mpf,
00406                             &mpgno, 0, (PAGE **)&mmeta)) != 0)
00407                                 goto err;
00408                 }
00409                 if ((t_ret = CDB___ham_groupalloc_log(dbp->dbenv,
00410                     dbp->open_txn, &LSN(mmeta), 0, dbp->log_fileid,
00411                     &LSN(mmeta), &mmeta->alloc_lsn, hcp->hdr->spares[0],
00412                     hcp->hdr->max_bucket + 1, mmeta->free)) != 0 && ret == 0)
00413                         ret = t_ret;
00414                 if (ret == 0) {
00415                         /* need to update real LSN for buffer manager */
00416                         mmeta->alloc_lsn = LSN(mmeta);
00417                         dirty_mmeta = 1;
00418                 }
00419 
00420         }
00421 
00422         DB_TEST_RECOVERY(dbp, DB_TEST_POSTLOG, ret, name);
00423 
00424 DB_TEST_RECOVERY_LABEL
00425         if ((t_ret = CDB_memp_fput(dbp->mpf, h, DB_MPOOL_DIRTY)) != 0 && ret == 0)
00426                 ret = t_ret;
00427 
00428 err:    if (F_ISSET(dbp, DB_AM_SUBDB) && mmeta != NULL)
00429                 if ((t_ret = CDB_memp_fput(dbp->mpf, mmeta,
00430                     dirty_mmeta ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0)
00431                         ret = t_ret;
00432         if (metalock.off != LOCK_INVALID)
00433                 (void)__TLPUT(dbc, metalock);
00434 
00435         return (ret);
00436 }
00437 
00438 static int
00439 __ham_delete(dbp, txn, key, flags)
00440         DB *dbp;
00441         DB_TXN *txn;
00442         DBT *key;
00443         u_int32_t flags;
00444 {
00445         DBC *dbc;
00446         HASH_CURSOR *hcp;
00447         db_pgno_t pgno;
00448         int ret, t_ret;
00449 
00450         /*
00451          * This is the only access method routine called directly from
00452          * the dbp, so we have to do error checking.
00453          */
00454 
00455         PANIC_CHECK(dbp->dbenv);
00456         DB_ILLEGAL_BEFORE_OPEN(dbp, "DB->del");
00457 
00458         if ((ret =
00459             CDB___db_delchk(dbp, key, flags, F_ISSET(dbp, DB_AM_RDONLY))) != 0)
00460                 return (ret);
00461 
00462         if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
00463                 return (ret);
00464 
00465         DEBUG_LWRITE(dbc, txn, "ham_delete", key, NULL, flags);
00466 
00467         hcp = (HASH_CURSOR *)dbc->internal;
00468         if ((ret = CDB___ham_get_meta(dbc)) != 0)
00469                 goto out;
00470 
00471         pgno = PGNO_INVALID;
00472         if ((ret = __ham_lookup(dbc, key, 0, DB_LOCK_WRITE, &pgno)) == 0) {
00473                 if (F_ISSET(hcp, H_OK)) {
00474                         if (pgno == PGNO_INVALID)
00475                                 ret = CDB___ham_del_pair(dbc, 1);
00476                         else {
00477                                 /* When we close the cursor in __ham_del_dups,
00478                                  * that will make the off-page dup tree go
00479                                  * go away as well as our current entry.  When
00480                                  * it updates cursors, ours should get marked
00481                                  * as H_DELETED.
00482                                  */
00483                                 ret = __ham_del_dups(dbc, key);
00484                         }
00485                 } else
00486                         ret = DB_NOTFOUND;
00487         }
00488 
00489         if ((t_ret = CDB___ham_release_meta(dbc)) != 0 && ret == 0)
00490                 ret = t_ret;
00491 
00492 out:    if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
00493                 ret = t_ret;
00494         return (ret);
00495 }
00496 
00497 /* ****************** CURSORS ********************************** */
00498 /*
00499  * CDB___ham_c_init --
00500  *      Initialize the hash-specific portion of a cursor.
00501  *
00502  * PUBLIC: int CDB___ham_c_init __P((DBC *));
00503  */
00504 int
00505 CDB___ham_c_init(dbc)
00506         DBC *dbc;
00507 {
00508         DB_ENV *dbenv;
00509         HASH_CURSOR *new_curs;
00510         int ret;
00511 
00512         dbenv = dbc->dbp->dbenv;
00513         if ((ret = CDB___os_calloc(dbenv,
00514             1, sizeof(struct cursor_t), &new_curs)) != 0)
00515                 return (ret);
00516         if ((ret = CDB___os_malloc(dbenv,
00517             dbc->dbp->pgsize, NULL, &new_curs->split_buf)) != 0) {
00518                 CDB___os_free(new_curs, sizeof(*new_curs));
00519                 return (ret);
00520         }
00521 
00522         dbc->internal = (DBC_INTERNAL *) new_curs;
00523         dbc->c_close = CDB___db_c_close;
00524         dbc->c_count = CDB___db_c_count;
00525         dbc->c_del = CDB___db_c_del;
00526         dbc->c_dup = CDB___db_c_dup;
00527         dbc->c_get = CDB___db_c_get;
00528         dbc->c_put = CDB___db_c_put;
00529         dbc->c_am_close = __ham_c_close;
00530         dbc->c_am_del = __ham_c_del;
00531         dbc->c_am_destroy = __ham_c_destroy;
00532         dbc->c_am_get = __ham_c_get;
00533         dbc->c_am_put = __ham_c_put;
00534         dbc->c_am_writelock = __ham_c_writelock;
00535 
00536         CDB___ham_item_init(dbc);
00537 
00538         return (0);
00539 }
00540 
00541 /*
00542  * __ham_c_close --
00543  *      Close down the cursor from a single use.
00544  */
00545 static int
00546 __ham_c_close(dbc, root_pgno, rmroot)
00547         DBC *dbc;
00548         db_pgno_t root_pgno;
00549         int *rmroot;
00550 {
00551         HASH_CURSOR *hcp;
00552         HKEYDATA *dp;
00553         int dirty, doroot, gotmeta, ret, t_ret;
00554 
00555         COMPQUIET(rmroot, 0);
00556         dirty = doroot = gotmeta = ret = 0;
00557         hcp = (HASH_CURSOR *) dbc->internal;
00558 
00559         /* Check for off page dups. */
00560         if (dbc->internal->opd != NULL) {
00561                 if ((ret = CDB___ham_get_meta(dbc)) != 0)
00562                         goto done;
00563                 gotmeta = 1;
00564                 if ((ret = CDB___ham_get_cpage(dbc, DB_LOCK_READ)) != 0)
00565                         goto out;
00566                 dp = (HKEYDATA *)H_PAIRDATA(hcp->page, hcp->indx);
00567                 DB_ASSERT(HPAGE_PTYPE(dp) == H_OFFDUP);
00568                 memcpy(&root_pgno, HOFFPAGE_PGNO(dp), sizeof(db_pgno_t));
00569 
00570                 /*
00571                  * If we're doing CDB locking, it's possible that in order to
00572                  * close, the off-page btree cursor will need to upgrade a
00573                  * write lock;  it may be pointing to a deleted item it needs
00574                  * to get rid of.  If so, it needs our locker information.
00575                  */
00576                 if (LOCKING(dbc->dbp->dbenv))
00577                         CDB___db_cdb_cdup(dbc, hcp->opd);
00578 
00579                 if ((ret =
00580                     hcp->opd->c_am_close(hcp->opd, root_pgno, &doroot)) != 0)
00581                         goto out;
00582                 if (doroot != 0) {
00583                         if ((ret = CDB___ham_del_pair(dbc, 1)) != 0)
00584                                 goto out;
00585                         dirty = 1;
00586                 }
00587         }
00588 
00589 out:    if (hcp->page != NULL &&
00590             (t_ret = CDB___ham_put_page(dbc->dbp,
00591             hcp->page, dirty)) != 0 && ret == 0)
00592                 ret = t_ret;
00593         if (gotmeta != 0 && (t_ret = CDB___ham_release_meta(dbc)) != 0 && ret == 0)
00594                 ret = t_ret;
00595 
00596 done:
00597         CDB___ham_item_init(dbc);
00598         return (ret);
00599 }
00600 
00601 /*
00602  * __ham_c_destroy --
00603  *      Cleanup the access method private part of a cursor.
00604  */
00605 static int
00606 __ham_c_destroy(dbc)
00607         DBC *dbc;
00608 {
00609         HASH_CURSOR *hcp;
00610 
00611         hcp = (HASH_CURSOR *)dbc->internal;
00612         if (hcp->split_buf != NULL)
00613                 CDB___os_free(hcp->split_buf, dbc->dbp->pgsize);
00614         CDB___os_free(hcp, sizeof(HASH_CURSOR));
00615 
00616         return (0);
00617 }
00618 
00619 /*
00620  * CDB___ham_c_count --
00621  *      Return a count of on-page duplicates.
00622  *
00623  * PUBLIC: int CDB___ham_c_count __P((DBC *, db_recno_t *));
00624  */
00625 int
00626 CDB___ham_c_count(dbc, recnop)
00627         DBC *dbc;
00628         db_recno_t *recnop;
00629 {
00630         DB *dbp;
00631         HASH_CURSOR *hcp;
00632         db_indx_t len;
00633         db_recno_t recno;
00634         int ret, t_ret;
00635         u_int8_t *p, *pend;
00636 
00637         dbp = dbc->dbp;
00638         hcp = (HASH_CURSOR *) dbc->internal;
00639 
00640         recno = 0;
00641 
00642         if ((ret = CDB___ham_get_cpage(dbc, DB_LOCK_READ)) != 0)
00643                 return (ret);
00644 
00645         switch (HPAGE_PTYPE(H_PAIRDATA(hcp->page, hcp->indx))) {
00646         case H_KEYDATA:
00647         case H_OFFPAGE:
00648                 recno = 1;
00649                 break;
00650         case H_DUPLICATE:
00651                 p = HKEYDATA_DATA(H_PAIRDATA(hcp->page, hcp->indx));
00652                 pend = p +
00653                     LEN_HDATA(hcp->page, dbp->pgsize, hcp->indx);
00654                 for (; p < pend; recno++) {
00655                         /* p may be odd, so copy rather than just dereffing */
00656                         memcpy(&len, p, sizeof(db_indx_t));
00657                         p += 2 * sizeof(db_indx_t) + len;
00658                 }
00659 
00660                 break;
00661         default:
00662                 ret = CDB___db_unknown_type(dbp->dbenv, "CDB___ham_c_count",
00663                     HPAGE_PTYPE(H_PAIRDATA(hcp->page, hcp->indx)));
00664                 goto err;
00665         }
00666 
00667         *recnop = recno;
00668 
00669 err:    if ((t_ret =  CDB___ham_put_page(dbc->dbp, hcp->page, 0)) != 0 && ret == 0)
00670                 ret = t_ret;
00671         hcp->page = NULL;
00672         return (ret);
00673 }
00674 
00675 static int
00676 __ham_c_del(dbc)
00677         DBC *dbc;
00678 {
00679         DB *dbp;
00680         DBT repldbt;
00681         HASH_CURSOR *hcp;
00682         int ret, t_ret;
00683 
00684         dbp = dbc->dbp;
00685         hcp = (HASH_CURSOR *)dbc->internal;
00686 
00687         if (F_ISSET(hcp, H_DELETED))
00688                 return (DB_NOTFOUND);
00689 
00690         if ((ret = CDB___ham_get_meta(dbc)) != 0)
00691                 goto out;
00692 
00693         if ((ret = CDB___ham_get_cpage(dbc, DB_LOCK_WRITE)) != 0)
00694                 goto out;
00695 
00696         /* Off-page duplicates. */
00697         if (HPAGE_TYPE(hcp->page, H_DATAINDEX(hcp->indx)) == H_OFFDUP)
00698                 goto out;
00699 
00700         if (F_ISSET(hcp, H_ISDUP)) { /* On-page duplicate. */
00701                 if (hcp->dup_off == 0 &&
00702                     DUP_SIZE(hcp->dup_len) == LEN_HDATA(hcp->page,
00703                     hcp->hdr->dbmeta.pagesize, hcp->indx))
00704                         ret = CDB___ham_del_pair(dbc, 1);
00705                 else {
00706                         repldbt.flags = 0;
00707                         F_SET(&repldbt, DB_DBT_PARTIAL);
00708                         repldbt.doff = hcp->dup_off;
00709                         repldbt.dlen = DUP_SIZE(hcp->dup_len);
00710                         repldbt.size = 0;
00711                         repldbt.data = HKEYDATA_DATA(H_PAIRDATA(hcp->page,
00712                             hcp->indx));
00713                         ret = CDB___ham_replpair(dbc, &repldbt, 0);
00714                         hcp->dup_tlen -= DUP_SIZE(hcp->dup_len);
00715                         F_SET(hcp, H_DELETED);
00716                         CDB___ham_c_update(dbc, hcp->pgno,
00717                             DUP_SIZE(hcp->dup_len), 0, 1);
00718                 }
00719 
00720         } else /* Not a duplicate */
00721                 ret = CDB___ham_del_pair(dbc, 1);
00722 
00723 out:    if (ret == 0 && hcp->page != NULL &&
00724             (t_ret = CDB___ham_put_page(dbp, hcp->page, 1)) != 0)
00725                 ret = t_ret;
00726         hcp->page = NULL;
00727         if ((t_ret = CDB___ham_release_meta(dbc)) != 0 && ret == 0)
00728                 ret = t_ret;
00729         return (ret);
00730 }
00731 
00732 /*
00733  * CDB___ham_c_dup --
00734  *      Duplicate a hash cursor, such that the new one holds appropriate
00735  *      locks for the position of the original.
00736  *
00737  * PUBLIC: int CDB___ham_c_dup __P((DBC *, DBC *));
00738  */
00739 int
00740 CDB___ham_c_dup(orig_dbc, new_dbc)
00741         DBC *orig_dbc, *new_dbc;
00742 {
00743         HASH_CURSOR *orig, *new;
00744 
00745         orig = (HASH_CURSOR *)orig_dbc->internal;
00746         new = (HASH_CURSOR *)new_dbc->internal;
00747 
00748         new->bucket = orig->bucket;
00749         new->lbucket = orig->lbucket;
00750         new->dup_off = orig->dup_off;
00751         new->dup_len = orig->dup_len;
00752         new->dup_tlen = orig->dup_tlen;
00753 
00754         if (F_ISSET(orig, H_DELETED))
00755                 F_SET(new, H_DELETED);
00756         if (F_ISSET(orig, H_ISDUP))
00757                 F_SET(new, H_ISDUP);
00758 
00759         /*
00760          * If the old cursor held a lock and we're not in transactions, get one
00761          * for the new one.   The reason that we don't need a new lock if we're
00762          * in a transaction is because we already hold a lock and will continue
00763          * to do so until commit, so there is no point in reaquiring it. We
00764          * don't know if the old lock was a read or write lock, but it doesn't
00765          * matter. We'll get a read lock.  We know that this locker already
00766          * holds a lock of the correct type, so if we need a write lock and
00767          * request it, we know that we'll get it.
00768          */
00769         if (orig->lock.off == LOCK_INVALID || orig_dbc->txn != NULL)
00770                 return (0);
00771 
00772         return (CDB___ham_lock_bucket(new_dbc, DB_LOCK_READ));
00773 }
00774 
00775 static int
00776 __ham_c_get(dbc, key, data, flags, pgnop)
00777         DBC *dbc;
00778         DBT *key;
00779         DBT *data;
00780         u_int32_t flags;
00781         db_pgno_t *pgnop;
00782 {
00783         DB *dbp;
00784         HASH_CURSOR *hcp;
00785         db_lockmode_t lock_type;
00786         int get_key, ret, t_ret;
00787 
00788         hcp = (HASH_CURSOR *)dbc->internal;
00789         dbp = dbc->dbp;
00790 
00791         /* Clear OR'd in additional bits so we can check for flag equality. */
00792         if (F_ISSET(dbc, DBC_RMW))
00793                 lock_type = DB_LOCK_WRITE;
00794         else
00795                 lock_type = DB_LOCK_READ;
00796 
00797         if ((ret = CDB___ham_get_meta(dbc)) != 0)
00798                 return (ret);
00799         hcp->seek_size = 0;
00800 
00801         ret = 0;
00802         get_key = 1;
00803         switch (flags) {
00804         case DB_PREV_NODUP:
00805                 F_SET(hcp, H_NEXT_NODUP);
00806                 /* FALLTHROUGH */
00807         case DB_PREV:
00808                 if (hcp->bucket != BUCKET_INVALID) {
00809                         ret = CDB___ham_item_prev(dbc, lock_type, pgnop);
00810                         break;
00811                 }
00812                 /* FALLTHROUGH */
00813         case DB_LAST:
00814                 ret = CDB___ham_item_last(dbc, lock_type, pgnop);
00815                 break;
00816         case DB_NEXT_NODUP:
00817                 F_SET(hcp, H_NEXT_NODUP);
00818                 /* FALLTHROUGH */
00819         case DB_NEXT:
00820                 if (hcp->bucket != BUCKET_INVALID) {
00821                         ret = CDB___ham_item_next(dbc, lock_type, pgnop);
00822                         break;
00823                 }
00824                 /* FALLTHROUGH */
00825         case DB_FIRST:
00826                 ret = CDB___ham_item_first(dbc, lock_type, pgnop);
00827                 break;
00828         case DB_NEXT_DUP:
00829                 /* cgetchk has already determined that the cursor is set. */
00830                 F_SET(hcp, H_DUPONLY);
00831                 ret = CDB___ham_item_next(dbc, lock_type, pgnop);
00832                 break;
00833         case DB_SET:
00834         case DB_SET_RANGE:
00835         case DB_GET_BOTH:
00836                 ret = __ham_lookup(dbc, key, 0, lock_type, pgnop);
00837                 get_key = 0;
00838                 break;
00839         case DB_GET_BOTHC:
00840                 F_SET(hcp, H_DUPONLY);
00841 
00842                 ret = CDB___ham_item_next(dbc, lock_type, pgnop);
00843                 get_key = 0;
00844                 break;
00845         case DB_CURRENT:
00846                 /* cgetchk has already determined that the cursor is set. */
00847                 if (F_ISSET(hcp, H_DELETED)) {
00848                         ret = DB_KEYEMPTY;
00849                         goto err;
00850                 }
00851 
00852                 ret = CDB___ham_item(dbc, lock_type, pgnop);
00853                 break;
00854         }
00855 
00856         /*
00857          * Must always enter this loop to do error handling and
00858          * check for big key/data pair.
00859          */
00860         for (;;) {
00861                 if (ret != 0 && ret != DB_NOTFOUND)
00862                         goto err;
00863                 else if (F_ISSET(hcp, H_OK)) {
00864                         if (*pgnop == PGNO_INVALID)
00865                                 ret = __ham_dup_return(dbc, data, flags);
00866                         break;
00867                 } else if (!F_ISSET(hcp, H_NOMORE)) {
00868                         CDB___db_err(dbp->dbenv,
00869                              "H_NOMORE returned to __ham_c_get");
00870                         ret = EINVAL;
00871                         break;
00872                 }
00873 
00874                 /*
00875                  * Ran out of entries in a bucket; change buckets.
00876                  */
00877                 switch (flags) {
00878                         case DB_LAST:
00879                         case DB_PREV:
00880                         case DB_PREV_NODUP:
00881                                 ret = CDB___ham_put_page(dbp, hcp->page, 0);
00882                                 hcp->page = NULL;
00883                                 if (hcp->bucket == 0) {
00884                                         ret = DB_NOTFOUND;
00885                                         hcp->pgno = PGNO_INVALID;
00886                                         goto err;
00887                                 }
00888                                 F_CLR(hcp, H_ISDUP);
00889                                 hcp->bucket--;
00890                                 hcp->indx = NDX_INVALID;
00891                                 hcp->pgno = BUCKET_TO_PAGE(hcp, hcp->bucket);
00892                                 if (ret == 0)
00893                                         ret = CDB___ham_item_prev(dbc,
00894                                             lock_type, pgnop);
00895                                 break;
00896                         case DB_FIRST:
00897                         case DB_NEXT:
00898                         case DB_NEXT_NODUP:
00899                                 ret = CDB___ham_put_page(dbp, hcp->page, 0);
00900                                 hcp->page = NULL;
00901                                 hcp->indx = NDX_INVALID;
00902                                 hcp->bucket++;
00903                                 F_CLR(hcp, H_ISDUP);
00904                                 hcp->pgno = BUCKET_TO_PAGE(hcp, hcp->bucket);
00905                                 if (hcp->bucket > hcp->hdr->max_bucket) {
00906                                         ret = DB_NOTFOUND;
00907                                         hcp->pgno = PGNO_INVALID;
00908                                         goto err;
00909                                 }
00910                                 if (ret == 0)
00911                                         ret = CDB___ham_item_next(dbc,
00912                                             lock_type, pgnop);
00913                                 break;
00914                         case DB_GET_BOTH:
00915                         case DB_GET_BOTHC:
00916                         case DB_NEXT_DUP:
00917                         case DB_SET:
00918                         case DB_SET_RANGE:
00919                                 /* Key not found. */
00920                                 ret = DB_NOTFOUND;
00921                                 goto err;
00922                         case DB_CURRENT:
00923                                 /*
00924                                  * This should only happen if you are doing
00925                                  * deletes and reading with concurrent threads
00926                                  * and not doing proper locking.  We return
00927                                  * the same error code as we would if the
00928                                  * cursor were deleted.
00929                                  */
00930                                 ret = DB_KEYEMPTY;
00931                                 goto err;
00932                         default:
00933                                 DB_ASSERT(0);
00934                 }
00935         }
00936 
00937         if (get_key == 0)
00938                 F_SET(key, DB_DBT_ISSET);
00939 
00940 err:    if ((t_ret = CDB___ham_release_meta(dbc)) != 0 && ret == 0)
00941                 ret = t_ret;
00942 
00943         F_CLR(hcp, H_DUPONLY);
00944         F_CLR(hcp, H_NEXT_NODUP);
00945 
00946         return (ret);
00947 }
00948 
00949 static int
00950 __ham_c_put(dbc, key, data, flags, pgnop)
00951         DBC *dbc;
00952         DBT *key;
00953         DBT *data;
00954         u_int32_t flags;
00955         db_pgno_t *pgnop;
00956 {
00957         DB *dbp;
00958         DBT tmp_val, *myval;
00959         HASH_CURSOR *hcp;
00960         u_int32_t nbytes;
00961         int ret, t_ret;
00962 
00963         /*
00964          * The compiler doesn't realize that we only use this when ret is
00965          * equal to 0 and that if ret is equal to 0, that we must have set
00966          * myval.  So, we initialize it here to shut the compiler up.
00967          */
00968         COMPQUIET(myval, NULL);
00969 
00970         dbp = dbc->dbp;
00971         hcp = (HASH_CURSOR *)dbc->internal;
00972 
00973         if (F_ISSET(hcp, H_DELETED) &&
00974             flags != DB_KEYFIRST && flags != DB_KEYLAST)
00975                 return (DB_NOTFOUND);
00976 
00977         if ((ret = CDB___ham_get_meta(dbc)) != 0)
00978                 goto err1;
00979 
00980         switch (flags) {
00981         case DB_KEYLAST:
00982         case DB_KEYFIRST:
00983         case DB_NODUPDATA:
00984                 nbytes = (ISBIG(hcp, key->size) ? HOFFPAGE_PSIZE :
00985                     HKEYDATA_PSIZE(key->size)) +
00986                     (ISBIG(hcp, data->size) ? HOFFPAGE_PSIZE :
00987                     HKEYDATA_PSIZE(data->size));
00988                 if ((ret = __ham_lookup(dbc,
00989                     key, nbytes, DB_LOCK_WRITE, pgnop)) == DB_NOTFOUND) {
00990                         ret = 0;
00991                         if (hcp->seek_found_page != PGNO_INVALID &&
00992                             hcp->seek_found_page != hcp->pgno) {
00993                                 if ((ret = CDB___ham_put_page(dbp, hcp->page, 0))
00994                                     != 0)
00995                                         goto err2;
00996                                 hcp->page = NULL;
00997                                 hcp->pgno = hcp->seek_found_page;
00998                                 hcp->indx = NDX_INVALID;
00999                         }
01000 
01001                         if (F_ISSET(data, DB_DBT_PARTIAL) && data->doff != 0) {
01002                                 /*
01003                                  * A partial put, but the key does not exist
01004                                  * and we are not beginning the write at 0.
01005                                  * We must create a data item padded up to doff
01006                                  * and then write the new bytes represented by
01007                                  * val.
01008                                  */
01009                                 if ((ret = CDB___ham_init_dbt(dbp->dbenv,
01010                                     &tmp_val, data->size + data->doff,
01011                                     &dbc->rdata.data, &dbc->rdata.ulen)) == 0) {
01012                                         memset(tmp_val.data, 0, data->doff);
01013                                         memcpy((u_int8_t *)tmp_val.data +
01014                                             data->doff, data->data, data->size);
01015                                         myval = &tmp_val;
01016                                 }
01017                         } else
01018                                 myval = (DBT *)data;
01019 
01020                         if (ret == 0)
01021                                 ret = CDB___ham_add_el(dbc, key, myval, H_KEYDATA);
01022                         goto done;
01023                 }
01024                 break;
01025         case DB_BEFORE:
01026         case DB_AFTER:
01027         case DB_CURRENT:
01028                 ret = CDB___ham_item(dbc, DB_LOCK_WRITE, pgnop);
01029                 break;
01030         }
01031 
01032         if (*pgnop == PGNO_INVALID && ret == 0) {
01033                 if (flags == DB_CURRENT ||
01034                     ((flags == DB_KEYFIRST ||
01035                     flags == DB_KEYLAST || flags == DB_NODUPDATA) &&
01036                     !(F_ISSET(dbp, DB_AM_DUP) || F_ISSET(key, DB_DBT_DUPOK))))
01037                         ret = __ham_overwrite(dbc, data, flags);
01038                 else
01039                         ret = CDB___ham_add_dup(dbc, data, flags, pgnop);
01040         }
01041 
01042 done:   if (ret == 0 && F_ISSET(hcp, H_EXPAND)) {
01043                 ret = __ham_expand_table(dbc);
01044                 F_CLR(hcp, H_EXPAND);
01045         }
01046 
01047         if (ret == 0 && (t_ret = CDB___ham_dirty_page(dbp, hcp->page)) != 0)
01048                 ret = t_ret;
01049 
01050 err2:   if ((t_ret = CDB___ham_release_meta(dbc)) != 0 && ret == 0)
01051                 ret = t_ret;
01052 
01053 err1:   return (ret);
01054 }
01055 
01056 /********************************* UTILITIES ************************/
01057 
01058 /*
01059  * __ham_expand_table --
01060  */
01061 static int
01062 __ham_expand_table(dbc)
01063         DBC *dbc;
01064 {
01065         DB *dbp;
01066         PAGE *h;
01067         HASH_CURSOR *hcp;
01068         db_pgno_t pgno;
01069         u_int32_t old_bucket, new_bucket;
01070         int ret;
01071 
01072         dbp = dbc->dbp;
01073         hcp = (HASH_CURSOR *)dbc->internal;
01074         if ((ret = CDB___ham_dirty_meta(dbc)) != 0)
01075                 return (ret);
01076 
01077         /*
01078          * If the split point is about to increase, make sure that we
01079          * have enough extra pages.  The calculation here is weird.
01080          * We'd like to do this after we've upped max_bucket, but it's
01081          * too late then because we've logged the meta-data split.  What
01082          * we'll do between then and now is increment max bucket and then
01083          * see what the log of one greater than that is; here we have to
01084          * look at the log of max + 2.  VERY NASTY STUFF.
01085          *
01086          * It just got even nastier.  With subdatabases, we have to request
01087          * a chunk of contiguous pages, so we do that here using an
01088          * undocumented feature of mpool (the MPOOL_NEW_GROUP flag) to
01089          * give us a number of contiguous pages.  Ouch.
01090          */
01091         if (hcp->hdr->max_bucket == hcp->hdr->high_mask) {
01092                 /*
01093                  * Ask mpool to give us a set of contiguous page numbers
01094                  * large enough to contain the next doubling.
01095                  *
01096                  * Figure out how many new pages we need.   This will return
01097                  * us the last page.  We calculate its page number, initialize
01098                  * the page and then write it back to reserve all the pages
01099                  * in between.  It is possible that the allocation of new pages
01100                  * has already been done, but the tranaction aborted.  Since
01101                  * we don't undo the allocation, check for a valid pgno before
01102                  * doing the allocation.
01103                  */
01104                 pgno = hcp->hdr->max_bucket + 1;
01105                 if (hcp->hdr->spares[CDB___db_log2(pgno) + 1] == PGNO_INVALID)
01106                         /* Allocate a group of pages. */
01107                         ret = CDB_memp_fget(dbp->mpf,
01108                             &pgno, DB_MPOOL_NEW_GROUP, &h);
01109                 else {
01110                         /* Just read in the last page of the batch */
01111                         pgno = hcp->hdr->spares[CDB___db_log2(pgno) + 1] +
01112                             hcp->hdr->max_bucket + 1;
01113                         ret = CDB_memp_fget(dbp->mpf,
01114                             &pgno, DB_MPOOL_CREATE, &h);
01115                 }
01116                 if (ret != 0)
01117                         return (ret);
01118 
01119                 P_INIT(h, dbp->pgsize, pgno,
01120                     PGNO_INVALID, PGNO_INVALID, 0, P_HASH, 0);
01121                 pgno -= hcp->hdr->max_bucket;
01122         } else {
01123                 pgno = BUCKET_TO_PAGE(hcp, hcp->hdr->max_bucket + 1);
01124                 if ((ret =
01125                     CDB_memp_fget(dbp->mpf, &pgno, DB_MPOOL_CREATE, &h)) != 0)
01126                         return (ret);
01127         }
01128 
01129         /* Now we can log the meta-data split. */
01130         if (DB_LOGGING(dbc)) {
01131                 if ((ret = CDB___ham_metagroup_log(dbp->dbenv,
01132                     dbc->txn, &h->lsn, 0, dbp->log_fileid,
01133                     hcp->hdr->max_bucket, pgno, &hcp->hdr->dbmeta.lsn,
01134                     &h->lsn)) != 0)
01135                         return (ret);
01136 
01137                 hcp->hdr->dbmeta.lsn = h->lsn;
01138         }
01139 
01140         /* If we allocated some new pages, write out the last page. */
01141         if ((ret = CDB_memp_fput(dbp->mpf, h, DB_MPOOL_DIRTY)) != 0)
01142                 return (ret);
01143 
01144         new_bucket = ++hcp->hdr->max_bucket;
01145         old_bucket = (hcp->hdr->max_bucket & hcp->hdr->low_mask);
01146 
01147         /*
01148          * If we started a new doubling, fill in the spares array with
01149          * the starting page number negatively offset by the bucket number.
01150          */
01151         if (new_bucket > hcp->hdr->high_mask) {
01152                 /* Starting a new doubling */
01153                 hcp->hdr->low_mask = hcp->hdr->high_mask;
01154                 hcp->hdr->high_mask = new_bucket | hcp->hdr->low_mask;
01155                 if (hcp->hdr->spares[CDB___db_log2(new_bucket) + 1] == PGNO_INVALID)
01156                         hcp->hdr->spares[CDB___db_log2(new_bucket) + 1] =
01157                             pgno - new_bucket;
01158         }
01159 
01160         /* Relocate records to the new bucket */
01161         return (CDB___ham_split_page(dbc, old_bucket, new_bucket));
01162 }
01163 
01164 /*
01165  * PUBLIC: u_int32_t CDB___ham_call_hash __P((DBC *, u_int8_t *, int32_t));
01166  */
01167 u_int32_t
01168 CDB___ham_call_hash(dbc, k, len)
01169         DBC *dbc;
01170         u_int8_t *k;
01171         int32_t len;
01172 {
01173         u_int32_t n, bucket;
01174         HASH_CURSOR *hcp;
01175         HASH *hashp;
01176 
01177         hcp = (HASH_CURSOR *)dbc->internal;
01178         hashp = dbc->dbp->h_internal;
01179 
01180         n = (u_int32_t)(hashp->h_hash(k, len));
01181 
01182         bucket = n & hcp->hdr->high_mask;
01183         if (bucket > hcp->hdr->max_bucket)
01184                 bucket = bucket & hcp->hdr->low_mask;
01185         return (bucket);
01186 }
01187 
01188 /*
01189  * Check for duplicates, and call CDB___db_ret appropriately.  Release
01190  * everything held by the cursor.
01191  */
01192 static int
01193 __ham_dup_return(dbc, val, flags)
01194         DBC *dbc;
01195         DBT *val;
01196         u_int32_t flags;
01197 {
01198         DB *dbp;
01199         HASH_CURSOR *hcp;
01200         PAGE *pp;
01201         DBT *myval, tmp_val;
01202         db_indx_t ndx;
01203         db_pgno_t pgno;
01204         u_int32_t off, tlen;
01205         u_int8_t *hk, type;
01206         int cmp, ret;
01207         db_indx_t len;
01208 
01209         /* Check for duplicate and return the first one. */
01210         dbp = dbc->dbp;
01211         hcp = (HASH_CURSOR *)dbc->internal;
01212         ndx = H_DATAINDEX(hcp->indx);
01213         type = HPAGE_TYPE(hcp->page, ndx);
01214         pp = hcp->page;
01215         myval = val;
01216 
01217         /*
01218          * There are 4 cases:
01219          * 1. We are not in duplicate, simply return; the upper layer
01220          *    will do the right thing.
01221          * 2. We are looking at keys and stumbled onto a duplicate.
01222          * 3. We are in the middle of a duplicate set. (ISDUP set)
01223          * 4. We need to check for particular data match.
01224          */
01225 
01226         /* We should never get here with off-page dups. */
01227         DB_ASSERT(type != H_OFFDUP);
01228 
01229         /* Case 1 */
01230         if (type != H_DUPLICATE &&
01231             flags != DB_GET_BOTH && flags != DB_GET_BOTHC)
01232                 return (0);
01233 
01234         /*
01235          * Here we check for the case where we just stumbled onto a
01236          * duplicate.  In this case, we do initialization and then
01237          * let the normal duplicate code handle it. (Case 2)
01238          */
01239         if (!F_ISSET(hcp, H_ISDUP) && type == H_DUPLICATE) {
01240                 F_SET(hcp, H_ISDUP);
01241                 hcp->dup_tlen = LEN_HDATA(hcp->page,
01242                     hcp->hdr->dbmeta.pagesize, hcp->indx);
01243                 hk = H_PAIRDATA(hcp->page, hcp->indx);
01244                 if (flags == DB_LAST
01245                     || flags == DB_PREV || flags == DB_PREV_NODUP) {
01246                         hcp->dup_off = 0;
01247                         do {
01248                                 memcpy(&len,
01249                                     HKEYDATA_DATA(hk) + hcp->dup_off,
01250                                     sizeof(db_indx_t));
01251                                 hcp->dup_off += DUP_SIZE(len);
01252                         } while (hcp->dup_off < hcp->dup_tlen);
01253                         hcp->dup_off -= DUP_SIZE(len);
01254                 } else {
01255                         memcpy(&len,
01256                             HKEYDATA_DATA(hk), sizeof(db_indx_t));
01257                         hcp->dup_off = 0;
01258                 }
01259                 hcp->dup_len = len;
01260         }
01261 
01262         /*
01263          * If we are retrieving a specific key/data pair, then we
01264          * may need to adjust the cursor before returning data.
01265          * Case 4
01266          */
01267         if (flags == DB_GET_BOTH || flags == DB_GET_BOTHC) {
01268                 if (F_ISSET(hcp, H_ISDUP)) {
01269                         /*
01270                          * If we're doing a join, search forward from the
01271                          * current position, not the beginning of the dup set.
01272                          */
01273                         if (flags == DB_GET_BOTHC)
01274                                 F_SET(hcp, H_CONTINUE);
01275 
01276                         CDB___ham_dsearch(dbc, val, &off, &cmp);
01277 
01278                         /*
01279                          * This flag is set nowhere else and is safe to
01280                          * clear unconditionally.
01281                          */
01282                         F_CLR(hcp, H_CONTINUE);
01283                         hcp->dup_off = off;
01284                 } else {
01285                         hk = H_PAIRDATA(hcp->page, hcp->indx);
01286                         if (((HKEYDATA *)hk)->type == H_OFFPAGE) {
01287                                 memcpy(&tlen,
01288                                     HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
01289                                 memcpy(&pgno,
01290                                     HOFFPAGE_PGNO(hk), sizeof(db_pgno_t));
01291                                 if ((ret = CDB___db_moff(dbp, val,
01292                                     pgno, tlen, dbp->dup_compare, &cmp)) != 0)
01293                                         return (ret);
01294                         } else {
01295                                 /*
01296                                  * We do not zero tmp_val since the comparison
01297                                  * routines may only look at data and size.
01298                                  */
01299                                 tmp_val.data = HKEYDATA_DATA(hk);
01300                                 tmp_val.size = LEN_HDATA(hcp->page,
01301                                     dbp->pgsize, hcp->indx);
01302                                 cmp = dbp->dup_compare == NULL ?
01303                                     CDB___bam_defcmp(&tmp_val, val) :
01304                                     dbp->dup_compare(&tmp_val, val);
01305                         }
01306                 }
01307 
01308                 if (cmp != 0)
01309                         return (DB_NOTFOUND);
01310         }
01311 
01312         /*
01313          * Now, everything is initialized, grab a duplicate if
01314          * necessary.
01315          */
01316         if (F_ISSET(hcp, H_ISDUP)) {    /* Case 3 */
01317                 /*
01318                  * Copy the DBT in case we are retrieving into user
01319                  * memory and we need the parameters for it.  If the
01320                  * user requested a partial, then we need to adjust
01321                  * the user's parameters to get the partial of the
01322                  * duplicate which is itself a partial.
01323                  */
01324                 memcpy(&tmp_val, val, sizeof(*val));
01325                 if (F_ISSET(&tmp_val, DB_DBT_PARTIAL)) {
01326                         /*
01327                          * Take the user's length unless it would go
01328                          * beyond the end of the duplicate.
01329                          */
01330                         if (tmp_val.doff + hcp->dup_off > hcp->dup_len)
01331                                 tmp_val.dlen = 0;
01332                         else if (tmp_val.dlen + tmp_val.doff >
01333                             hcp->dup_len)
01334                                 tmp_val.dlen =
01335                                     hcp->dup_len - tmp_val.doff;
01336 
01337                         /*
01338                          * Calculate the new offset.
01339                          */
01340                         tmp_val.doff += hcp->dup_off;
01341                 } else {
01342                         F_SET(&tmp_val, DB_DBT_PARTIAL);
01343                         tmp_val.dlen = hcp->dup_len;
01344                         tmp_val.doff = hcp->dup_off + sizeof(db_indx_t);
01345                 }
01346                 myval = &tmp_val;
01347         }
01348 
01349         /*
01350          * Finally, if we had a duplicate, pp, ndx, and myval should be
01351          * set appropriately.
01352          */
01353         if ((ret = CDB___db_ret(dbp, pp, ndx, myval, &dbc->rdata.data,
01354             &dbc->rdata.ulen)) != 0)
01355                 return (ret);
01356 
01357         /*
01358          * In case we sent a temporary off to db_ret, set the real
01359          * return values.
01360          */
01361         val->data = myval->data;
01362         val->size = myval->size;
01363 
01364         F_SET(val, DB_DBT_ISSET);
01365 
01366         return (0);
01367 }
01368 
01369 static int
01370 __ham_overwrite(dbc, nval, flags)
01371         DBC *dbc;
01372         DBT *nval;
01373         u_int32_t flags;
01374 {
01375         HASH_CURSOR *hcp;
01376         DBT *myval, tmp_val, tmp_val2;
01377         void *newrec;
01378         u_int8_t *hk, *p;
01379         u_int32_t len, nondup_size;
01380         db_indx_t newsize;
01381         int ret;
01382 
01383         hcp = (HASH_CURSOR *)dbc->internal;
01384         if (F_ISSET(hcp, H_ISDUP)) {
01385                 /*
01386                  * This is an overwrite of a duplicate. We should never
01387                  * be off-page at this point.
01388                  */
01389                 DB_ASSERT(hcp->opd == NULL);
01390                 /* On page dups */
01391                 if (F_ISSET(nval, DB_DBT_PARTIAL)) {
01392                         /*
01393                          * We're going to have to get the current item, then
01394                          * construct the record, do any padding and do a
01395                          * replace.
01396                          */
01397                         memset(&tmp_val, 0, sizeof(tmp_val));
01398                         if ((ret =
01399                             __ham_dup_return(dbc, &tmp_val, DB_CURRENT)) != 0)
01400                                 return (ret);
01401 
01402                         /* Figure out new size. */
01403                         nondup_size = tmp_val.size;
01404                         newsize = nondup_size;
01405 
01406                         /*
01407                          * Three cases:
01408                          * 1. strictly append (may need to allocate space
01409                          *      for pad bytes; really gross).
01410                          * 2. overwrite some and append.
01411                          * 3. strictly overwrite.
01412                          */
01413                         if (nval->doff > nondup_size)
01414                                 newsize +=
01415                                     (nval->doff - nondup_size + nval->size);
01416                         else if (nval->doff + nval->dlen > nondup_size)
01417                                 newsize += nval->size -
01418                                     (nondup_size - nval->doff);
01419                         else
01420                                 newsize += nval->size - nval->dlen;
01421 
01422                         /*
01423                          * Make sure that the new size doesn't put us over
01424                          * the onpage duplicate size in which case we need
01425                          * to convert to off-page duplicates.
01426                          */
01427                         if (ISBIG(hcp, hcp->dup_tlen - nondup_size + newsize)) {
01428                                 if ((ret = CDB___ham_dup_convert(dbc)) != 0)
01429                                         return (ret);
01430                                 return(hcp->opd->c_am_put(hcp->opd,
01431                                     NULL, nval, flags, NULL));
01432                         }
01433 
01434                         if ((ret = CDB___os_malloc(dbc->dbp->dbenv,
01435                             DUP_SIZE(newsize), NULL, &newrec)) != 0)
01436                                 return (ret);
01437                         memset(&tmp_val2, 0, sizeof(tmp_val2));
01438                         F_SET(&tmp_val2, DB_DBT_PARTIAL);
01439 
01440                         /* Construct the record. */
01441                         p = newrec;
01442                         /* Initial size. */
01443                         memcpy(p, &newsize, sizeof(db_indx_t));
01444                         p += sizeof(db_indx_t);
01445 
01446                         /* First part of original record. */
01447                         len = nval->doff > tmp_val.size
01448                             ? tmp_val.size : nval->doff;
01449                         memcpy(p, tmp_val.data, len);
01450                         p += len;
01451 
01452                         if (nval->doff > tmp_val.size) {
01453                                 /* Padding */
01454                                 memset(p, 0, nval->doff - tmp_val.size);
01455                                 p += nval->doff - tmp_val.size;
01456                         }
01457 
01458                         /* New bytes */
01459                         memcpy(p, nval->data, nval->size);
01460                         p += nval->size;
01461 
01462                         /* End of original record (if there is any) */
01463                         if (nval->doff + nval->dlen < tmp_val.size) {
01464                                 len = tmp_val.size - nval->doff - nval->dlen;
01465                                 memcpy(p, (u_int8_t *)tmp_val.data +
01466                                     nval->doff + nval->dlen, len);
01467                                 p += len;
01468                         }
01469 
01470                         /* Final size. */
01471                         memcpy(p, &newsize, sizeof(db_indx_t));
01472 
01473                         /*
01474                          * Make sure that the caller isn't corrupting
01475                          * the sort order.
01476                          */
01477                         if (dbc->dbp->dup_compare != NULL) {
01478                                 tmp_val2.data =
01479                                     (u_int8_t *)newrec + sizeof(db_indx_t);
01480                                 tmp_val2.size = newsize;
01481                                 if (dbc->dbp->dup_compare(&tmp_val, &tmp_val2)
01482                                     != 0) {
01483                                         (void)CDB___os_free(newrec,
01484                                             DUP_SIZE(newsize));
01485                                         return (CDB___db_duperr(dbc->dbp, flags));
01486                                 }
01487                         }
01488 
01489                         tmp_val2.data = newrec;
01490                         tmp_val2.size = DUP_SIZE(newsize);
01491                         tmp_val2.doff = hcp->dup_off;
01492                         tmp_val2.dlen = DUP_SIZE(hcp->dup_len);
01493 
01494                         ret = CDB___ham_replpair(dbc, &tmp_val2, 0);
01495                         (void)CDB___os_free(newrec, DUP_SIZE(newsize));
01496 
01497                         /* Update cursor */
01498                         if (ret != 0)
01499                                 return (ret);
01500 
01501                         if (newsize > nondup_size)
01502                                 hcp->dup_tlen += (newsize - nondup_size);
01503                         else
01504                                 hcp->dup_tlen -= (nondup_size - newsize);
01505                         hcp->dup_len = DUP_SIZE(newsize);
01506                         return (0);
01507                 } else {
01508                         /* Check whether we need to convert to off page. */
01509                         if (ISBIG(hcp,
01510                             hcp->dup_tlen - hcp->dup_len + nval->size)) {
01511                                 if ((ret = CDB___ham_dup_convert(dbc)) != 0)
01512                                         return (ret);
01513                                 return(hcp->opd->c_am_put(hcp->opd,
01514                                     NULL, nval, flags, NULL));
01515                         }
01516 
01517                         /* Make sure we maintain sort order. */
01518                         if (dbc->dbp->dup_compare != NULL) {
01519                                 tmp_val2.data =
01520                                     HKEYDATA_DATA(H_PAIRDATA(hcp->page,
01521                                     hcp->indx)) + hcp->dup_off +
01522                                     sizeof(db_indx_t);
01523                                 tmp_val2.size = hcp->dup_len;
01524                                 if (dbc->dbp->dup_compare(nval, &tmp_val2) != 0)
01525                                         return (EINVAL);
01526                         }
01527                         /* Overwriting a complete duplicate. */
01528                         if ((ret =
01529                             CDB___ham_make_dup(dbc->dbp->dbenv, nval,
01530                             &tmp_val, &dbc->rdata.data, &dbc->rdata.ulen)) != 0)
01531                                 return (ret);
01532                         /* Now fix what we are replacing. */
01533                         tmp_val.doff = hcp->dup_off;
01534                         tmp_val.dlen = DUP_SIZE(hcp->dup_len);
01535 
01536                         /* Update cursor */
01537                         if (nval->size > hcp->dup_len)
01538                                 hcp->dup_tlen += (nval->size - hcp->dup_len);
01539                         else
01540                                 hcp->dup_tlen -= (hcp->dup_len - nval->size);
01541                         hcp->dup_len = DUP_SIZE(nval->size);
01542                 }
01543                 myval = &tmp_val;
01544         } else if (!F_ISSET(nval, DB_DBT_PARTIAL)) {
01545                 /* Put/overwrite */
01546                 memcpy(&tmp_val, nval, sizeof(*nval));
01547                 F_SET(&tmp_val, DB_DBT_PARTIAL);
01548                 tmp_val.doff = 0;
01549                 hk = H_PAIRDATA(hcp->page, hcp->indx);
01550                 if (HPAGE_PTYPE(hk) == H_OFFPAGE)
01551                         memcpy(&tmp_val.dlen,
01552                             HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
01553                 else
01554                         tmp_val.dlen = LEN_HDATA(hcp->page,
01555                             hcp->hdr->dbmeta.pagesize, hcp->indx);
01556                 myval = &tmp_val;
01557         } else
01558                 /* Regular partial put */
01559                 myval = nval;
01560 
01561         return (CDB___ham_replpair(dbc, myval, 0));
01562 }
01563 
01564 /*
01565  * Given a key and a cursor, sets the cursor to the page/ndx on which
01566  * the key resides.  If the key is found, the cursor H_OK flag is set
01567  * and the pagep, bndx, pgno (dpagep, dndx, dpgno) fields are set.
01568  * If the key is not found, the H_OK flag is not set.  If the sought
01569  * field is non-0, the pagep, bndx, pgno (dpagep, dndx, dpgno) fields
01570  * are set indicating where an add might take place.  If it is 0,
01571  * non of the cursor pointer field are valid.
01572  */
01573 static int
01574 __ham_lookup(dbc, key, sought, mode, pgnop)
01575         DBC *dbc;
01576         const DBT *key;
01577         u_int32_t sought;
01578         db_lockmode_t mode;
01579         db_pgno_t *pgnop;
01580 {
01581         DB *dbp;
01582         HASH_CURSOR *hcp;
01583         db_pgno_t pgno;
01584         u_int32_t tlen;
01585         int match, ret;
01586         u_int8_t *hk, *dk;
01587 
01588         dbp = dbc->dbp;
01589         hcp = (HASH_CURSOR *)dbc->internal;
01590         /*
01591          * Set up cursor so that we're looking for space to add an item
01592          * as we cycle through the pages looking for the key.
01593          */
01594         if ((ret = CDB___ham_item_reset(dbc)) != 0)
01595                 return (ret);
01596         hcp->seek_size = sought;
01597 
01598         hcp->bucket = CDB___ham_call_hash(dbc, (u_int8_t *)key->data, key->size);
01599         hcp->pgno = BUCKET_TO_PAGE(hcp, hcp->bucket);
01600 
01601         while (1) {
01602                 *pgnop = PGNO_INVALID;
01603                 if ((ret = CDB___ham_item_next(dbc, mode, pgnop)) != 0)
01604                         return (ret);
01605 
01606                 if (F_ISSET(hcp, H_NOMORE))
01607                         break;
01608 
01609                 hk = H_PAIRKEY(hcp->page, hcp->indx);
01610                 switch (HPAGE_PTYPE(hk)) {
01611                 case H_OFFPAGE:
01612                         memcpy(&tlen, HOFFPAGE_TLEN(hk), sizeof(u_int32_t));
01613                         if (tlen == key->size) {
01614                                 memcpy(&pgno,
01615                                     HOFFPAGE_PGNO(hk), sizeof(db_pgno_t));
01616                                 if ((ret = CDB___db_moff(dbp,
01617                                     key, pgno, tlen, NULL, &match)) != 0)
01618                                         return (ret);
01619                                 if (match == 0)
01620                                         goto found_key;
01621                         }
01622                         break;
01623                 case H_KEYDATA:
01624                         if (key->size ==
01625                             LEN_HKEY(hcp->page, dbp->pgsize, hcp->indx) &&
01626                             memcmp(key->data,
01627                             HKEYDATA_DATA(hk), key->size) == 0) {
01628                                 /* Found the key, check for data type. */
01629 found_key:                      F_SET(hcp, H_OK);
01630                                 dk = H_PAIRDATA(hcp->page, hcp->indx);
01631                                 if (HPAGE_PTYPE(dk) == H_OFFDUP)
01632                                         memcpy(pgnop, HOFFDUP_PGNO(dk),
01633                                             sizeof(db_pgno_t));
01634                                 return (0);
01635                         }
01636                         break;
01637                 case H_DUPLICATE:
01638                 case H_OFFDUP:
01639                         /*
01640                          * These are errors because keys are never
01641                          * duplicated, only data items are.
01642                          */
01643                         return (CDB___db_pgfmt(dbp, PGNO(hcp->page)));
01644                 }
01645         }
01646 
01647         /*
01648          * Item was not found.
01649          */
01650 
01651         if (sought != 0)
01652                 return (ret);
01653 
01654         return (ret);
01655 }
01656 
01657 /*
01658  * CDB___ham_init_dbt --
01659  *      Initialize a dbt using some possibly already allocated storage
01660  *      for items.
01661  *
01662  * PUBLIC: int CDB___ham_init_dbt __P((DB_ENV *,
01663  * PUBLIC:     DBT *, u_int32_t, void **, u_int32_t *));
01664  */
01665 int
01666 CDB___ham_init_dbt(dbenv, dbt, size, bufp, sizep)
01667         DB_ENV *dbenv;
01668         DBT *dbt;
01669         u_int32_t size;
01670         void **bufp;
01671         u_int32_t *sizep;
01672 {
01673         int ret;
01674 
01675         memset(dbt, 0, sizeof(*dbt));
01676         if (*sizep < size) {
01677                 if ((ret = CDB___os_realloc(dbenv, size, NULL, bufp)) != 0) {
01678                         *sizep = 0;
01679                         return (ret);
01680                 }
01681                 *sizep = size;
01682         }
01683         dbt->data = *bufp;
01684         dbt->size = size;
01685         return (0);
01686 }
01687 
01688 /*
01689  * Adjust the cursor after an insert or delete.  The cursor passed is
01690  * the one that was operated upon; we just need to check any of the
01691  * others.
01692  *
01693  * len indicates the length of the item added/deleted
01694  * add indicates if the item indicated by the cursor has just been
01695  * added (add == 1) or deleted (add == 0).
01696  * dup indicates if the addition occurred into a duplicate set.
01697  *
01698  * PUBLIC: void CDB___ham_c_update
01699  * PUBLIC:    __P((DBC *, db_pgno_t, u_int32_t, int, int));
01700  */
01701 void
01702 CDB___ham_c_update(dbc, chg_pgno, len, add, is_dup)
01703         DBC *dbc;
01704         db_pgno_t chg_pgno;
01705         u_int32_t len;
01706         int add, is_dup;
01707 {
01708         DB *dbp;
01709         DBC *cp;
01710         HASH_CURSOR *hcp, *lcp;
01711         int page_deleted;
01712 
01713         dbp = dbc->dbp;
01714         hcp = (HASH_CURSOR *)dbc->internal;
01715 
01716         /*
01717          * Regular adds are always at the end of a given page, so we never
01718          * have to adjust anyone's cursor after a regular add.
01719          */
01720         if (!is_dup && add)
01721                 return;
01722 
01723         /*
01724          * Determine if a page was deleted.    If this is a regular update
01725          * (i.e., not is_dup) then the deleted page's number will be that in
01726          * chg_pgno, and the pgno in the cursor will be different.  If this
01727          * was an onpage-duplicate, then the same conditions apply.  If this
01728          * was an off-page duplicate, then we need to verify if hcp->dpgno
01729          * is the same (no delete) or different (delete) than chg_pgno.
01730          */
01731         page_deleted = chg_pgno != PGNO_INVALID && chg_pgno != hcp->pgno;
01732 
01733         MUTEX_THREAD_LOCK(dbp->mutexp);
01734 
01735         for (cp = TAILQ_FIRST(&dbp->active_queue); cp != NULL;
01736             cp = TAILQ_NEXT(cp, links)) {
01737                 if (cp == dbc)
01738                         continue;
01739 
01740                 lcp = (HASH_CURSOR *)cp->internal;
01741 
01742                 if (lcp->pgno != chg_pgno)
01743                         continue;
01744 
01745                 if (is_dup && F_ISSET(hcp, H_DELETED) && lcp->pgno != chg_pgno)
01746                         continue;
01747 
01748                 if (page_deleted) {
01749                         lcp->pgno = hcp->pgno;
01750                         lcp->indx = hcp->indx;
01751                         lcp->bucket = hcp->bucket;
01752                         F_CLR(lcp, H_ISDUP);
01753                         continue;
01754                 }
01755 
01756                 if (lcp->indx == NDX_INVALID)
01757                         continue;
01758 
01759                 if (!is_dup && lcp->indx > hcp->indx)
01760                         lcp->indx -= 2;
01761                 else if (!is_dup && lcp->indx == hcp->indx) {
01762 
01763                         if (add)
01764                                 lcp->indx += 2;
01765                         else
01766                                 F_SET(lcp, H_DELETED);
01767                 } else if (is_dup && lcp->pgno == chg_pgno &&
01768                     lcp->indx == hcp->indx) {
01769                         /* On-page duplicate. */
01770                         if (add) {
01771                                 lcp->dup_tlen += len;
01772                                 if (lcp->dup_off >= hcp->dup_off)
01773                                         lcp->dup_off += len;
01774                         } else {
01775                                 lcp->dup_tlen -= len;
01776                                 if (lcp->dup_off > hcp->dup_off)
01777                                         lcp->dup_off -= len;
01778                                 else if (lcp->dup_off == hcp->dup_off)
01779                                         F_SET(lcp, H_DELETED);
01780                         }
01781                 }
01782         }
01783 
01784         MUTEX_THREAD_UNLOCK(dbp->mutexp);
01785 }
01786 
01787 /*
01788  * CDB___ham_get_clist --
01789  *
01790  * Get a list of cursors either on a particular bucket or on a particular
01791  * page and index combination.  The former is so that we can update
01792  * cursors on a split.  The latter is so we can update cursors when we
01793  * move items off page.
01794  *
01795  * PUBLIC: int CDB___ham_get_clist __P((DB *,
01796  * PUBLIC:     db_pgno_t, u_int32_t, DBC ***));
01797  */
01798 int
01799 CDB___ham_get_clist(dbp, bucket, indx, listp)
01800         DB *dbp;
01801         db_pgno_t bucket;
01802         u_int32_t indx;
01803         DBC ***listp;
01804 {
01805         DBC *cp;
01806         int nalloc, nused, ret;
01807 
01808         /*
01809          * Assume that finding anything is the exception, so optimize for
01810          * the case where there aren't any.
01811          */
01812         nalloc = nused = 0;
01813         *listp = NULL;
01814 
01815         MUTEX_THREAD_LOCK(dbp->mutexp);
01816 
01817         for (cp = TAILQ_FIRST(&dbp->active_queue); cp != NULL;
01818             cp = TAILQ_NEXT(cp, links))
01819                 if (cp->dbtype == DB_HASH &&
01820                     ((indx == NDX_INVALID &&
01821                     ((HASH_CURSOR *)(cp->internal))->bucket == bucket) ||
01822                     (indx != NDX_INVALID &&
01823                     cp->internal->pgno == bucket &&
01824                     cp->internal->indx == indx))) {
01825                         if (nused >= nalloc) {
01826                                 nalloc += 10;
01827                                 if ((ret = CDB___os_realloc(dbp->dbenv,
01828                                     nalloc * sizeof(HASH_CURSOR *),
01829                                     NULL, listp)) != 0)
01830                                         return (ret);
01831                         }
01832                         (*listp)[nused++] = cp;
01833                 }
01834 
01835         MUTEX_THREAD_UNLOCK(dbp->mutexp);
01836         if (listp != NULL) {
01837                 if (nused >= nalloc) {
01838                         nalloc++;
01839                         if ((ret = CDB___os_realloc(dbp->dbenv,
01840                             nalloc * sizeof(HASH_CURSOR *), NULL, listp)) != 0)
01841                                 return (ret);
01842                 }
01843                 (*listp)[nused] = NULL;
01844         }
01845         return (0);
01846 }
01847 
01848 static int
01849 __ham_del_dups(orig_dbc, key)
01850         DBC *orig_dbc;
01851         DBT *key;
01852 {
01853         DBC *dbc;
01854         DBT data, lkey;
01855         int ret, t_ret;
01856 
01857         /* Allocate a cursor. */
01858         if ((ret = orig_dbc->c_dup(orig_dbc, &dbc, 0)) != 0)
01859                 return (ret);
01860 
01861         /*
01862          * Walk a cursor through the key/data pairs, deleting as we go.  Set
01863          * the DB_DBT_USERMEM flag, as this might be a threaded application
01864          * and the flags checking will catch us.  We don't actually want the
01865          * keys or data, so request a partial of length 0.
01866          */
01867         memset(&lkey, 0, sizeof(lkey));
01868         F_SET(&lkey, DB_DBT_USERMEM | DB_DBT_PARTIAL);
01869         memset(&data, 0, sizeof(data));
01870         F_SET(&data, DB_DBT_USERMEM | DB_DBT_PARTIAL);
01871 
01872         /* Walk through the set of key/data pairs, deleting as we go. */
01873         if ((ret = dbc->c_get(dbc, key, &data, DB_SET)) != 0) {
01874                 if (ret == DB_NOTFOUND)
01875                         ret = 0;
01876                 goto err;
01877         }
01878 
01879         for (;;) {
01880                 if ((ret = dbc->c_del(dbc, 0)) != 0)
01881                         goto err;
01882                 if ((ret = dbc->c_get(dbc, &lkey, &data, DB_NEXT_DUP)) != 0) {
01883                         if (ret == DB_NOTFOUND) {
01884                                 ret = 0;
01885                                 break;
01886                         }
01887                         goto err;
01888                 }
01889         }
01890 
01891 err:    /*
01892          * Discard the cursor.  This will cause the underlying off-page dup
01893          * tree to go away as well as the actual entry on the page.
01894          */
01895         if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
01896                 ret = t_ret;
01897 
01898         return (ret);
01899 
01900 }
01901 
01902 static int
01903 __ham_c_writelock(dbc)
01904         DBC *dbc;
01905 {
01906         HASH_CURSOR *hcp;
01907         DB_LOCK tmp_lock;
01908         int ret;
01909 
01910         /*
01911          * All we need do is acquire the lock and let the off-page
01912          * dup tree do its thing.
01913          */
01914         if (!STD_LOCKING(dbc))
01915                 return (0);
01916 
01917         hcp = (HASH_CURSOR *)dbc->internal;
01918         if ((hcp->lock.off == LOCK_INVALID || hcp->lock_mode == DB_LOCK_READ)) {
01919                 tmp_lock = hcp->lock;
01920                 if ((ret = CDB___ham_lock_bucket(dbc, DB_LOCK_WRITE)) != 0)
01921                         return (ret);
01922                 if (tmp_lock.off != LOCK_INVALID &&
01923                     (ret = CDB_lock_put(dbc->dbp->dbenv, &tmp_lock)) != 0)
01924                         return (ret);
01925         }
01926         return (0);
01927 }
01928 
01929 /*
01930  * CDB___ham_c_chgpg --
01931  *
01932  * Adjust the cursors after moving an item from one page to another.
01933  * If the old_index is NDX_INVALID, that means that we copied the
01934  * page wholesale and we're leaving indices intact and just changing
01935  * the page number.
01936  *
01937  * PUBLIC: void CDB___ham_c_chgpg
01938  * PUBLIC:    __P((DBC *, db_pgno_t, u_int32_t, db_pgno_t, u_int32_t));
01939  */
01940 void
01941 CDB___ham_c_chgpg(dbc, old_pgno, old_index, new_pgno, new_index)
01942         DBC *dbc;
01943         db_pgno_t old_pgno, new_pgno;
01944         u_int32_t old_index, new_index;
01945 {
01946         DB *dbp;
01947         DBC *cp;
01948         HASH_CURSOR *hcp;
01949 
01950         dbp = dbc->dbp;
01951 
01952         MUTEX_THREAD_LOCK(dbp->mutexp);
01953 
01954         for (cp = TAILQ_FIRST(&dbp->active_queue); cp != NULL;
01955             cp = TAILQ_NEXT(cp, links)) {
01956                 if (cp == dbc)
01957                         continue;
01958 
01959                 hcp = (HASH_CURSOR *)cp->internal;
01960                 if (hcp->pgno == old_pgno) {
01961                         if (old_index == NDX_INVALID)
01962                                 hcp->pgno = new_pgno;
01963                         else if (hcp->indx == old_index) {
01964                                 hcp->pgno = new_pgno;
01965                                 hcp->indx = new_index;
01966                         }
01967                 }
01968         }
01969 
01970         MUTEX_THREAD_UNLOCK(dbp->mutexp);
01971 }

Generated on Sun Jun 8 10:56:37 2008 for GNUmifluz by  doxygen 1.5.5