qam.c

Go to the documentation of this file.
00001 /*-
00002  * See the file LICENSE for redistribution information.
00003  *
00004  * Copyright (c) 1999, 2000
00005  *      Sleepycat Software.  All rights reserved.
00006  */
00007 
00008 #include "config.h"
00009 
00010 #ifndef lint
00011 static const char revid[] = "$Id: qam_8c-source.html,v 1.1 2008/06/08 10:21:34 sebdiaz Exp $";
00012 #endif /* not lint */
00013 
00014 #ifndef NO_SYSTEM_INCLUDES
00015 #include <sys/types.h>
00016 
00017 #include <errno.h>
00018 #include <string.h>
00019 #endif
00020 
00021 #include "db_int.h"
00022 #include "db_page.h"
00023 #include "db_shash.h"
00024 #include "db_am.h"
00025 #include "lock.h"
00026 #include "btree.h"
00027 #include "qam.h"
00028 
00029 static int __qam_c_close __P((DBC *, db_pgno_t, int *));
00030 static int __qam_c_del __P((DBC *));
00031 static int __qam_c_destroy __P((DBC *));
00032 static int __qam_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
00033 static int __qam_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
00034 static int __qam_getno __P((DB *, const DBT *, db_recno_t *));
00035 static int __qam_nrecs __P((DBC *, db_recno_t *, db_recno_t *));
00036 static int __qam_position
00037                __P((DBC *, db_recno_t *, db_lockmode_t, db_recno_t, int *));
00038 
00039 /*
00040  * __qam_nrecs --
00041  *      Return the record number for the head and tail of the queue.
00042  */
00043 static int
00044 __qam_nrecs(dbc, rep, start)
00045         DBC *dbc;
00046         db_recno_t *rep, *start;
00047 {
00048         DB *dbp;
00049         DB_LOCK lock;
00050         QMETA *meta;
00051         db_pgno_t pg;
00052         int ret, t_ret;
00053 
00054         dbp = dbc->dbp;
00055 
00056         pg = ((QUEUE *)dbp->q_internal)->q_meta;
00057         if ((ret = CDB___db_lget(dbc, 0, pg,  DB_LOCK_READ, 0, &lock)) != 0)
00058                 return (ret);
00059         if ((ret = CDB_memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) {
00060                 /* We did not fetch it, we can release the lock. */
00061                 (void)__LPUT(dbc, lock);
00062                 return (ret);
00063         }
00064 
00065         *rep = meta->cur_recno;
00066         *start = meta->start;
00067 
00068         ret = CDB_memp_fput(dbp->mpf, meta, 0);
00069 
00070         /* Don't hold the meta page long term. */
00071         if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
00072                 ret = t_ret;
00073 
00074         return (ret);
00075 }
00076 
00077 /*
00078  * __qam_position --
00079  *      Position a queued access method cursor at a record.  This returns
00080  *      the page locked.  *exactp will be set if the record is valid.
00081  */
00082 static int
00083 __qam_position(dbc, recnop, lock_mode, start, exactp)
00084         DBC *dbc;               /* open cursor */
00085         db_recno_t *recnop;     /* pointer to recno to find */
00086         db_lockmode_t lock_mode;/* locking: read or write */
00087         db_recno_t  start;      /* meta.start */
00088         int *exactp;            /* indicate if it was found */
00089 {
00090         QUEUE_CURSOR *cp;
00091         DB *dbp;
00092         QAMDATA  *qp;
00093         db_pgno_t pg;
00094         int ret;
00095 
00096         dbp = dbc->dbp;
00097         cp = (QUEUE_CURSOR *)dbc->internal;
00098 
00099         /* Fetch the page for this recno. */
00100         pg = QAM_RECNO_PAGE(dbp, start, *recnop);
00101 
00102         if ((ret = CDB___db_lget(dbc, 0, pg,  lock_mode, 0, &cp->lock)) != 0)
00103                 return (ret);
00104         if ((ret = CDB_memp_fget(dbp->mpf, &pg,
00105             lock_mode == DB_LOCK_WRITE ? DB_MPOOL_CREATE : 0,
00106             &cp->page)) != 0) {
00107                 /* We did not fetch it, we can release the lock. */
00108                 (void)__LPUT(dbc, cp->lock);
00109                 cp->lock.off = LOCK_INVALID;
00110                 return (ret);
00111         }
00112         cp->pgno = pg;
00113         cp->indx = QAM_RECNO_INDEX(dbp, pg, start, *recnop);
00114 
00115         if (PGNO(cp->page) == 0) {
00116                 if (F_ISSET(dbp, DB_AM_RDONLY)) {
00117                         *exactp = 0;
00118                         return (0);
00119                 }
00120                 PGNO(cp->page) = pg;
00121                 TYPE_SET(cp->page, P_QAMDATA);
00122         }
00123 
00124         qp = QAM_GET_RECORD(dbp, cp->page, cp->indx);
00125         *exactp = F_ISSET(qp, QAM_VALID);
00126 
00127         return (ret);
00128 }
00129 
00130 /*
00131  * CDB___qam_pitem --
00132  *      Put an item on a queue page.  Copy the data to the page and set the
00133  *      VALID and SET bits.  If logging and the record was previously set,
00134  *      log that data, otherwise just log the new data.
00135  *
00136  *   pagep must be write locked
00137  *
00138  * PUBLIC: int CDB___qam_pitem
00139  * PUBLIC:     __P((DBC *,  QPAGE *, u_int32_t, db_recno_t, DBT *));
00140  */
00141 int
00142 CDB___qam_pitem(dbc, pagep, indx, recno, data)
00143         DBC *dbc;
00144         QPAGE *pagep;
00145         u_int32_t indx;
00146         db_recno_t recno;
00147         DBT *data;
00148 {
00149         DB *dbp;
00150         DBT olddata, pdata, *datap;
00151         QAMDATA *qp;
00152         QUEUE *t;
00153         u_int32_t size;
00154         u_int8_t *dest, *p;
00155         int alloced, ret;
00156 
00157         alloced = ret = 0;
00158 
00159         dbp = dbc->dbp;
00160         t = (QUEUE *)dbp->q_internal;
00161 
00162         if (data->size > t->re_len)
00163                 goto len_err;
00164 
00165         qp = QAM_GET_RECORD(dbp, pagep, indx);
00166 
00167         p = qp->data;
00168         size = data->size;
00169         datap = data;
00170         if (F_ISSET(data, DB_DBT_PARTIAL)) {
00171                 if (data->doff + data->dlen > t->re_len) {
00172                         alloced = data->dlen;
00173                         goto len_err;
00174                 }
00175                 if (data->size != data->dlen) {
00176 len_err:                CDB___db_err(dbp->dbenv,
00177                             "Length improper for fixed length record %lu",
00178                             (u_long)(alloced ? alloced : data->size));
00179                         return (EINVAL);
00180                 }
00181                 if (data->size == t->re_len)
00182                         goto no_partial;
00183 
00184                 /*
00185                  * If we are logging, then we have to build the record
00186                  * first, otherwise, we can simply drop the change
00187                  * directly on the page.  After this clause, make
00188                  * sure that datap and p are set up correctly so that
00189                  * copying datap into p does the right thing.
00190                  *
00191                  * Note, I am changing this so that if the existing
00192                  * record is not valid, we create a complete record
00193                  * to log so that both this and the recovery code is simpler.
00194                  */
00195 
00196                 if (DB_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) {
00197                         datap = &pdata;
00198                         memset(datap, 0, sizeof(*datap));
00199 
00200                         if ((ret = CDB___os_malloc(dbp->dbenv,
00201                             t->re_len, NULL, &datap->data)) != 0)
00202                                 return (ret);
00203                         alloced = 1;
00204                         datap->size = t->re_len;
00205 
00206                         /*
00207                          * Construct the record if it's valid, otherwise set it
00208                          * all to the pad character.
00209                          */
00210                         dest = datap->data;
00211                         if (F_ISSET(qp, QAM_VALID))
00212                                 memcpy(dest, p, t->re_len);
00213                         else
00214                                 memset(dest, t->re_pad, t->re_len);
00215 
00216                         dest += data->doff;
00217                         memcpy(dest, data->data, data->size);
00218                 } else {
00219                         datap = data;
00220                         p += data->doff;
00221                 }
00222         }
00223 
00224 no_partial:
00225         if (DB_LOGGING(dbc)) {
00226                 olddata.size = 0;
00227                 if (F_ISSET(qp, QAM_SET)) {
00228                         olddata.data = qp->data;
00229                         olddata.size = t->re_len;
00230                 }
00231                 if ((ret = CDB___qam_add_log(dbp->dbenv, dbc->txn, &LSN(pagep),
00232                     0, dbp->log_fileid, &LSN(pagep), pagep->pgno,
00233                     indx, recno, datap, qp->flags,
00234                     olddata.size == 0 ? NULL : &olddata)) != 0)
00235                         goto err;
00236         }
00237 
00238         F_SET(qp, QAM_VALID | QAM_SET);
00239         memcpy(p, datap->data, datap->size);
00240         if (!F_ISSET(data, DB_DBT_PARTIAL))
00241                 memset(p + datap->size,  t->re_pad, t->re_len - datap->size);
00242 
00243 err:    if (alloced)
00244                 CDB___os_free(datap->data, t->re_len);
00245 
00246         return (ret);
00247 }
00248 /*
00249  * __qam_c_put
00250  *      Cursor put for queued access method.
00251  *      BEFORE and AFTER cannot be specified.
00252  */
00253 static int
00254 __qam_c_put(dbc, key, data, flags, pgnop)
00255         DBC *dbc;
00256         DBT *key, *data;
00257         u_int32_t flags;
00258         db_pgno_t *pgnop;
00259 {
00260         QUEUE_CURSOR *cp;
00261         DB *dbp;
00262         DB_LOCK lock;
00263         QMETA *meta;
00264         db_pgno_t pg;
00265         db_recno_t new_cur, new_first;
00266         u_int32_t opcode;
00267         int exact, ret, t_ret;
00268 
00269         COMPQUIET(key, NULL);
00270 
00271         dbp = dbc->dbp;
00272         if (pgnop != NULL)
00273                 *pgnop = PGNO_INVALID;
00274 
00275         cp = (QUEUE_CURSOR *)dbc->internal;
00276 
00277         /* Write lock the record. */
00278         if ((ret = CDB___db_lget(dbc,
00279             0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
00280                 return (ret);
00281 
00282         if ((ret = __qam_position(dbc,
00283             &cp->recno, DB_LOCK_WRITE, cp->start, &exact)) != 0) {
00284                 /* We could not get the page, we can release the record lock. */
00285                 __LPUT(dbc, lock);
00286                 return (ret);
00287         }
00288 
00289         if (exact && flags == DB_NOOVERWRITE) {
00290                 ret = __TLPUT(dbc, lock);
00291                 /* Doing record locking, release the page lock */
00292                 if ((t_ret = __LPUT(dbc, cp->lock)) == 0)
00293                         cp->lock.off = LOCK_INVALID;
00294                 else
00295                         if (ret == 0)
00296                                 ret = t_ret;
00297                 if ((t_ret = CDB_memp_fput(dbp->mpf, cp->page, 0)) != 0 && ret == 0)
00298                         ret = t_ret;
00299                 cp->page = NULL;
00300                 return (ret == 0 ? DB_KEYEXIST : ret);
00301         }
00302 
00303         /* Put the item on the page. */
00304         ret = CDB___qam_pitem(dbc, (QPAGE *)cp->page, cp->indx, cp->recno, data);
00305 
00306         /* Doing record locking, release the page lock */
00307         if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
00308                 ret = t_ret;
00309         if ((t_ret = CDB_memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY)) && ret == 0)
00310                 ret = t_ret;
00311         cp->page = NULL;
00312         cp->lock = lock;
00313         cp->lock_mode = DB_LOCK_WRITE;
00314         if (ret != 0)
00315                 return (ret);
00316 
00317         /* We may need to reset the head or tail of the queue. */
00318         pg = ((QUEUE *)dbp->q_internal)->q_meta;
00319         if ((ret = CDB___db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0)
00320                 return (ret);
00321         if ((ret = CDB_memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) {
00322                 /* We did not fetch it, we can release the lock. */
00323                 (void)__LPUT(dbc, lock);
00324                 return (ret);
00325         }
00326 
00327         opcode = 0;
00328         new_cur = new_first = 0;
00329 
00330         if (cp->recno > meta->cur_recno) {
00331                 new_cur = cp->recno;
00332                 opcode |= QAM_SETCUR;
00333         }
00334         if (cp->recno < meta->first_recno || meta->first_recno < meta->start) {
00335                 new_first = cp->recno;
00336                 opcode |= QAM_SETFIRST;
00337         }
00338 
00339         if (opcode != 0 && DB_LOGGING(dbc)) {
00340                 ret = CDB___qam_mvptr_log(dbp->dbenv, dbc->txn, &meta->dbmeta.lsn,
00341                     0, opcode, dbp->log_fileid, meta->first_recno, new_first,
00342                     meta->cur_recno, new_cur, &meta->dbmeta.lsn);
00343         }
00344 
00345         if (opcode & QAM_SETCUR)
00346                 meta->cur_recno = cp->recno;
00347         if (opcode & QAM_SETFIRST)
00348                 meta->first_recno = cp->recno;
00349 
00350         if ((t_ret =
00351             CDB_memp_fput(dbp->mpf, meta, opcode != 0 ? DB_MPOOL_DIRTY : 0)) != 0 &&
00352             ret == 0)
00353                 ret = t_ret;
00354 
00355         /* Don't hold the meta page long term. */
00356         if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
00357                 ret = t_ret;
00358         return (ret);
00359 }
00360 
00361 /*
00362  * CDB___qam_put --
00363  *      Add a record to the queue.
00364  *      If we are doing anything but appending, just call qam_c_put to do the
00365  *      work.  Otherwise we fast path things here.
00366  *
00367  * PUBLIC: int CDB___qam_put __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
00368  */
00369 int
00370 CDB___qam_put(dbp, txn, key, data, flags)
00371         DB *dbp;
00372         DB_TXN *txn;
00373         DBT *key, *data;
00374         u_int32_t flags;
00375 {
00376         QUEUE_CURSOR *cp;
00377         DBC *dbc;
00378         DB_LOCK lock;
00379         QMETA *meta;
00380         QPAGE *page;
00381         db_pgno_t pg;
00382         db_recno_t recno, start, total;
00383         int ret, t_ret;
00384 
00385         PANIC_CHECK(dbp->dbenv);
00386 
00387         /* Allocate a cursor. */
00388         if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
00389                 return (ret);
00390 
00391         DEBUG_LWRITE(dbc, txn, "qam_put", key, data, flags);
00392 
00393         cp = (QUEUE_CURSOR *)dbc->internal;
00394 
00395         /* Check for invalid flags. */
00396         if ((ret = CDB___db_putchk(dbp,
00397             key, data, flags, F_ISSET(dbp, DB_AM_RDONLY), 0)) != 0)
00398                 goto done;
00399 
00400         /* If not appending, then just call the cursor routine */
00401         if (flags != DB_APPEND) {
00402                 if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
00403                         goto done;
00404                 __qam_nrecs(dbc, &total, &cp->start);
00405 
00406                 ret = __qam_c_put(dbc, NULL, data, flags, NULL);
00407                 goto done;
00408         }
00409 
00410         /* Write lock the meta page. */
00411         pg = ((QUEUE *)dbp->q_internal)->q_meta;
00412         if ((ret = CDB___db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0)
00413                 goto done;
00414         if ((ret = CDB_memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) {
00415                 /* We did not fetch it, we can release the lock. */
00416                 (void)__LPUT(dbc, lock);
00417                 goto done;
00418         }
00419 
00420         /* Record that we are going to allocate a record. */
00421         if (DB_LOGGING(dbc)) {
00422                 CDB___qam_inc_log(dbp->dbenv,
00423                     txn, &meta->dbmeta.lsn,
00424                     0, dbp->log_fileid, &meta->dbmeta.lsn);
00425         }
00426 
00427         /* Get the next record number. */
00428         recno = ++meta->cur_recno;
00429         start = meta->start;
00430 
00431         if (meta->first_recno < meta->start || meta->first_recno > recno)
00432                 meta->first_recno = recno;
00433 
00434         /* Release the meta page. */
00435         if ((ret = CDB_memp_fput(dbp->mpf, meta, DB_MPOOL_DIRTY)) != 0) {
00436                 (void)__LPUT(dbc, lock);
00437                 goto done;
00438         }
00439 
00440         /* Lock the record and release meta page lock. */
00441         if ((ret = CDB___db_lget(dbc,
00442             1, recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
00443                 goto done;
00444 
00445         cp->lock = lock;
00446         cp->lock_mode = DB_LOCK_WRITE;
00447 
00448         pg = QAM_RECNO_PAGE(dbp, start, recno);
00449 
00450         /* Fetch and write lock the data page. */
00451         if ((ret = CDB___db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0)
00452                 goto done;
00453         if ((ret = CDB_memp_fget(dbp->mpf, &pg, DB_MPOOL_CREATE, &page)) != 0) {
00454                 /* We did not fetch it, we can release the lock. */
00455                 (void)__LPUT(dbc, lock);
00456                 goto done;
00457         }
00458 
00459         /* See if this is a new page. */
00460         if (page->pgno == 0) {
00461                 page->pgno = pg;
00462                 page->type = P_QAMDATA;
00463         }
00464 
00465         /* Put the item on the page and log it. */
00466         ret = CDB___qam_pitem(dbc, page,
00467             QAM_RECNO_INDEX(dbp, pg, start, recno), recno, data);
00468 
00469         /* Doing record locking, release the page lock */
00470         if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
00471                 ret = t_ret;
00472 
00473         if ((t_ret
00474             = CDB_memp_fput(dbp->mpf, page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
00475                 ret = t_ret;
00476 
00477 
00478         /* Return the record number to the user. */
00479          ret = CDB___db_retcopy(dbp, key,
00480              &recno, sizeof(recno), &dbc->rkey.data, &dbc->rkey.ulen);
00481 
00482 done:
00483         /* Discard the cursor. */
00484         if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
00485                 ret = t_ret;
00486 
00487         return (ret);
00488 }
00489 
00490 /*
00491  * __qam_c_del --
00492  *      Qam cursor->am_del function
00493  */
00494 static int
00495 __qam_c_del(dbc)
00496         DBC *dbc;
00497 {
00498         QUEUE_CURSOR *cp;
00499         DB *dbp;
00500         DBT data;
00501         DB_LOCK lock;
00502         PAGE *pagep;
00503         QAMDATA *qp;
00504         db_recno_t start;
00505         db_recno_t total;
00506         int exact, ret, t_ret;
00507 
00508         dbp = dbc->dbp;
00509         cp = (QUEUE_CURSOR *)dbc->internal;
00510         ret = 0;
00511 
00512         __qam_nrecs(dbc, &total, &cp->start);
00513         start = cp->start;
00514 
00515         if (cp->recno > total) {
00516                 ret = DB_NOTFOUND;
00517                 return (ret);
00518         }
00519 
00520         if ((ret = CDB___db_lget(dbc,
00521             0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
00522                 return (ret);
00523 
00524         cp->lock_mode = DB_LOCK_WRITE;
00525         /* Find the record ; delete only deletes exact matches. */
00526         if ((ret = __qam_position(dbc,
00527             &cp->recno, DB_LOCK_WRITE, start, &exact)) != 0) {
00528                 cp->lock = lock;
00529                 return (ret);
00530         }
00531         if (!exact) {
00532                 ret = DB_NOTFOUND;
00533                 goto err1;
00534         }
00535 
00536         pagep = cp->page;
00537         qp = QAM_GET_RECORD(dbp, pagep, cp->indx);
00538 
00539         if (DB_LOGGING(dbc)) {
00540                 data.size = ((QUEUE *)dbp->q_internal)->re_len;
00541                 data.data = qp->data;
00542                 if ((ret = CDB___qam_del_log(dbp->dbenv, dbc->txn,
00543                     &LSN(pagep), 0, dbp->log_fileid, &LSN(pagep),
00544                     pagep->pgno, cp->indx, cp->recno)) != 0)
00545                         goto err1;
00546         }
00547 
00548         F_CLR(qp, QAM_VALID);
00549 
00550 err1:
00551         if ((t_ret = CDB_memp_fput(
00552             dbp->mpf, cp->page, ret == 0 ? DB_MPOOL_DIRTY : 0)) != 0)
00553                 return (ret ? ret : t_ret);
00554         cp->page = NULL;
00555         /* Doing record locking, release the page lock */
00556         if ((t_ret = __LPUT(dbc, cp->lock)) != 0) {
00557                 cp->lock = lock;
00558                 return (ret ? ret : t_ret);
00559         }
00560         cp->lock = lock;
00561         return (ret);
00562 }
00563 
00564 /*
00565  * CDB___qam_delete --
00566  *      Queue db->del function.
00567  *
00568  * PUBLIC: int CDB___qam_delete __P((DB *, DB_TXN *, DBT *, u_int32_t));
00569  */
00570 int
00571 CDB___qam_delete(dbp, txn, key, flags)
00572         DB *dbp;
00573         DB_TXN *txn;
00574         DBT *key;
00575         u_int32_t flags;
00576 {
00577         QUEUE_CURSOR *cp;
00578         DBC *dbc;
00579         int ret, t_ret;
00580 
00581         PANIC_CHECK(dbp->dbenv);
00582 
00583         /* Check for invalid flags. */
00584         if ((ret =
00585             CDB___db_delchk(dbp, key, flags, F_ISSET(dbp, DB_AM_RDONLY))) != 0)
00586                 return (ret);
00587 
00588         /* Acquire a cursor. */
00589         if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
00590                 return (ret);
00591 
00592         DEBUG_LWRITE(dbc, txn, "qam_delete", key, NULL, flags);
00593 
00594         cp = (QUEUE_CURSOR *)dbc->internal;
00595         if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
00596                 goto err;
00597 
00598         ret = __qam_c_del(dbc);
00599 
00600         /* Release the cursor. */
00601 err:    if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
00602                 ret = t_ret;
00603 
00604         return (ret);
00605 }
00606 
00607 /*
00608  * __qam_c_get --
00609  *      Queue cursor->c_get function.
00610  */
00611 static int
00612 __qam_c_get(dbc, key, data, flags, pgnop)
00613         DBC *dbc;
00614         DBT *key, *data;
00615         u_int32_t flags;
00616         db_pgno_t *pgnop;
00617 {
00618         QUEUE_CURSOR *cp;
00619         DB *dbp;
00620         DB_LOCK lock, pglock, metalock, save_lock;
00621         DBT tmp;
00622         PAGE *pg;
00623         QAMDATA *qp;
00624         QMETA *meta;
00625         db_indx_t save_indx;
00626         db_lockmode_t lock_mode;
00627         db_pgno_t metapno, save_page;
00628         db_recno_t start, first, skipped, save_recno;
00629         int exact, is_first, locked, ret, t_ret, with_delete;
00630         int put_mode, meta_dirty;
00631 
00632         cp = (QUEUE_CURSOR *)dbc->internal;
00633         dbp = dbc->dbp;
00634 
00635         PANIC_CHECK(dbp->dbenv);
00636 
00637         with_delete = 0;
00638         lock_mode = DB_LOCK_READ;
00639         put_mode = 0;
00640         t_ret = 0;
00641         *pgnop = 0;
00642 
00643         if (F_ISSET(dbc, DBC_RMW))
00644                 lock_mode = DB_LOCK_WRITE;
00645 
00646         if (flags == DB_CONSUME) {
00647                 with_delete = 1;
00648                 flags = DB_FIRST;
00649                 lock_mode = DB_LOCK_WRITE;
00650         }
00651 
00652         DEBUG_LREAD(dbc, dbc->txn, "qam_c_get",
00653             flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);
00654 
00655         is_first = 0;
00656 
00657         /* get the meta page */
00658         metapno = ((QUEUE *)dbp->q_internal)->q_meta;
00659         if ((ret = CDB___db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
00660                 return (ret);
00661         locked = 1;
00662         if ((ret = CDB_memp_fget(dbp->mpf, &metapno, 0, &meta)) != 0) {
00663                 /* We did not fetch it, we can release the lock. */
00664                 (void)__LPUT(dbc, metalock);
00665                 return (ret);
00666         }
00667 
00668         skipped = 0;
00669 
00670         /* Make lint and friends happy. */
00671         first = 0;
00672         meta_dirty = 0;
00673 
00674         /* Release any previous lock if not in a transaction. */
00675         if (cp->lock.off != LOCK_INVALID) {
00676                 (void)__TLPUT(dbc, cp->lock);
00677                 cp->lock.off = LOCK_INVALID;
00678         }
00679 
00680 retry:  /* Update the record number. */
00681         cp->start = start = meta->start;
00682         switch (flags) {
00683         case DB_CURRENT:
00684                 break;
00685         case DB_NEXT_DUP:
00686                 ret = DB_NOTFOUND;
00687                 goto err;
00688                 /* NOTREACHED */
00689         case DB_NEXT:
00690         case DB_NEXT_NODUP:
00691                 if (cp->recno != RECNO_OOB) {
00692                         ++cp->recno;
00693                         break;
00694                 }
00695                 /* FALLTHROUGH */
00696         case DB_FIRST:
00697                 flags = DB_NEXT;
00698                 is_first = 1;
00699 
00700                 /* get the first record number */
00701                 cp->recno = first = meta->first_recno;
00702 
00703                 /* if we will delete it, then increment */
00704                 if (with_delete && first < meta->cur_recno) {
00705                         if (DB_LOGGING(dbc))
00706                                 CDB___qam_incfirst_log(dbp->dbenv, dbc->txn,
00707                                         &LSN(meta), 0,
00708                                         dbp->log_fileid, first);
00709                         meta->first_recno++;
00710                         meta_dirty = 1;
00711                 }
00712 
00713                 break;
00714         case DB_PREV:
00715         case DB_PREV_NODUP:
00716                 if (cp->recno != RECNO_OOB) {
00717                         if (cp->recno <= meta->first_recno) {
00718                                 ret = DB_NOTFOUND;
00719                                 goto err;
00720                         }
00721                         --cp->recno;
00722                         break;
00723                 }
00724                 /* FALLTHROUGH */
00725         case DB_LAST:
00726                 cp->recno = meta->cur_recno;
00727                 if (cp->recno == 0) {
00728                         ret = DB_NOTFOUND;
00729                         goto err;
00730                 }
00731                 break;
00732         case DB_GET_BOTH:
00733         case DB_SET:
00734         case DB_SET_RANGE:
00735                 if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
00736                         goto err;
00737                 break;
00738         default:
00739                 ret = CDB___db_unknown_flag(dbp->dbenv, "__qam_c_get", flags);
00740                 goto err;
00741         }
00742 
00743         if (cp->recno > meta->cur_recno || cp->recno < start) {
00744                 ret = DB_NOTFOUND;
00745                 pg = NULL;
00746                 if (skipped)
00747                         goto undo_meta;
00748                 goto err;
00749         }
00750 
00751         /* Don't hold the meta page long term. */
00752         if (locked) {
00753                 if ((ret = __LPUT(dbc, metalock)) != 0)
00754                         goto err;
00755                 locked = 0;
00756         }
00757 
00758         /* Lock the record. */
00759         if ((ret = CDB___db_lget(dbc, 0, cp->recno, lock_mode,
00760             with_delete ? DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD,
00761             &lock)) == DB_LOCK_NOTGRANTED && with_delete) {
00762                 /*
00763                  * In the DB_CONSUME case we skip the locked
00764                  * record, someone else will pick it up.
00765                  *
00766                  */
00767                 is_first = 0;
00768                 if (skipped == 0)
00769                         skipped = cp->recno;
00770                 goto retry;
00771         }
00772 
00773         if (ret != 0)
00774                 goto err;
00775 
00776         /*
00777          * In the DB_FIRST or DB_LAST cases we must wait and then start over
00778          * since the first/last may have moved while we slept.
00779          * We release our locks and try again.
00780          */
00781         if ((!with_delete && is_first) || flags == DB_LAST) {
00782                 if ((ret =
00783                     CDB___db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
00784                         goto err;
00785                 if (cp->recno !=
00786                     (is_first ? meta->first_recno : meta->cur_recno)) {
00787                         __LPUT(dbc, lock);
00788                         if (is_first)
00789                                 flags = DB_FIRST;
00790                         locked = 1;
00791                         goto retry;
00792                 }
00793                 /* Don't hold the meta page long term. */
00794                 if ((ret = __LPUT(dbc, metalock)) != 0)
00795                         goto err;
00796         }
00797 
00798         /* Position the cursor on the record. */
00799         if ((ret =
00800             __qam_position(dbc, &cp->recno, lock_mode, start, &exact)) != 0) {
00801                 /* We cannot get the page, release the record lock. */
00802                 (void)__LPUT(dbc, lock);
00803                 goto err;
00804         }
00805 
00806         pg = cp->page;
00807         pglock = cp->lock;
00808         cp->lock = lock;
00809         cp->lock_mode = lock_mode;
00810 
00811         if (!exact) {
00812                 if (flags == DB_NEXT || flags == DB_NEXT_NODUP
00813                     || flags == DB_PREV || flags == DB_PREV_NODUP
00814                     || flags == DB_LAST) {
00815                         /* Release locks and try again. */
00816                         (void)CDB_memp_fput(dbp->mpf, cp->page, 0);
00817                         cp->page = NULL;
00818                         (void)__LPUT(dbc, pglock);
00819                         (void)__LPUT(dbc, cp->lock);
00820                         cp->lock.off = LOCK_INVALID;
00821                         if (flags == DB_LAST)
00822                                 flags = DB_PREV;
00823                         if (!with_delete)
00824                                 is_first = 0;
00825                         goto retry;
00826                 }
00827                 /* this is for the SET and SET_RANGE cases */
00828                 ret = DB_KEYEMPTY;
00829                 goto err1;
00830         }
00831 
00832         /* Return the key if the user didn't give us one. */
00833         if (flags != DB_SET && flags != DB_GET_BOTH &&
00834             (ret = CDB___db_retcopy(dbp, key, &cp->recno, sizeof(cp->recno),
00835             &dbc->rkey.data, &dbc->rkey.ulen)) != 0) {
00836                 if (with_delete)
00837                         goto undo_meta;
00838                 else
00839                         goto err1;
00840         }
00841         F_SET(key, DB_DBT_ISSET);
00842 
00843         qp = QAM_GET_RECORD(dbp, pg, cp->indx);
00844 
00845         /* Return the data item. */
00846         if (flags == DB_GET_BOTH) {
00847                 /*
00848                  * Need to compare
00849                  */
00850                 tmp.data = qp->data;
00851                 tmp.size = ((QUEUE *)dbp->q_internal)->re_len;
00852                 if ((ret = CDB___bam_defcmp(data, &tmp)) != 0) {
00853                         ret = DB_NOTFOUND;
00854                         goto err1;
00855                 }
00856         }
00857         if ((ret = CDB___db_retcopy(dbp, data, qp->data,
00858             ((QUEUE *)dbp->q_internal)->re_len,
00859             &dbc->rdata.data, &dbc->rdata.ulen)) != 0) {
00860                 if (with_delete)
00861                         goto undo_meta;
00862                 else
00863                         goto err1;
00864         }
00865         F_SET(data, DB_DBT_ISSET);
00866 
00867         /* Finally, if we are doing DB_CONSUME mark the record. */
00868         if (with_delete) {
00869                 if (DB_LOGGING(dbc))
00870                         if ((ret = CDB___qam_del_log(dbp->dbenv, dbc->txn,
00871                             &LSN(pg), 0, dbp->log_fileid, &LSN(pg),
00872                             pg->pgno, cp->indx, cp->recno)) != 0)
00873                                 goto undo_meta;
00874 
00875                 F_CLR(qp, QAM_VALID);
00876                 put_mode = DB_MPOOL_DIRTY;
00877 
00878                 /*
00879                  * This code is responsible for correcting metadata.
00880                  * There are 3 cases.
00881                  *      1) We moved ahead more than one record.
00882                  *      2) We did not actually delete cp->recno.
00883                  *      3) We encountered at least one locked.
00884                  *              record and skipped them.
00885                  */
00886                 if (cp->recno != first) {
00887                         if (0) {
00888 undo_meta:                      is_first = 0;
00889                         }
00890                         if (locked == 0 && (t_ret = CDB___db_lget(
00891                             dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
00892                                 goto err1;
00893 
00894                         if (is_first) {
00895                         /*
00896                          * Check to see if we moved past the first record,
00897                          * if so update meta so others can start past the
00898                          * deleted records.
00899                          */
00900                                 if (meta->first_recno > first) {
00901                                         meta->first_recno = cp->recno;
00902                                         meta_dirty = 1;
00903                                 }
00904                         }
00905                         else if (skipped == 0) {
00906                                 /*
00907                                  * Error case: we did not actually delete the
00908                                  * record, restore meta_first so that it is at
00909                                  * least at or before cp->recno
00910                                  */
00911                                 if (meta->first_recno > cp->recno) {
00912                                         meta->first_recno = cp->recno;
00913                                         meta_dirty = 1;
00914                                 }
00915                         }
00916                         else if (meta->first_recno > skipped) {
00917                                 /*
00918                                  * We skipped some records because they were
00919                                  * locked. If the meta-data page reflects a
00920                                  * starting pointer after the skipped records
00921                                  * we need to move it back to the first record
00922                                  * that is not deleted or is sill locked.
00923                                  * Release locks as we go, we are only
00924                                  * reading to optimize future fetches.
00925                                  */
00926                                 first = meta->first_recno;
00927                                 /* Don't hold the meta page long term. */
00928                                 __LPUT(dbc, metalock);
00929                                 locked = 0;
00930 
00931                                 /* reverify the skipped record */
00932                                 save_page = cp->pgno;
00933                                 save_indx = cp->indx;
00934                                 save_recno = cp->recno;
00935                                 save_lock = cp->lock;
00936                                 do {
00937                                         t_ret = CDB___db_lget(dbc, 0, skipped,
00938                                             DB_LOCK_READ,
00939                                             DB_LOCK_NOWAIT | DB_LOCK_RECORD,
00940                                             &lock);
00941                                         if (t_ret == DB_LOCK_NOTGRANTED)
00942                                                 break;
00943                                         if (t_ret != 0)
00944                                                 goto err1;
00945                                         if ((t_ret =
00946                                             __qam_position(dbc, &skipped,
00947                                             DB_LOCK_READ,
00948                                             start, &exact)) != 0) {
00949                                                 (void)__LPUT(dbc, lock);
00950                                                 goto err1;
00951                                         }
00952                                         if ((t_ret = CDB_memp_fput(dbp->mpf,
00953                                              cp->page, put_mode)) != 0)
00954                                                 goto err1;
00955                                         if ((t_ret =__LPUT(dbc, lock)) != 0)
00956                                                 goto err1;
00957                                         if ((t_ret =
00958                                             __LPUT(dbc, cp->lock)) != 0)
00959                                                 goto err1;
00960                                         if (exact)
00961                                                 break;
00962                                 } while (++skipped <= first);
00963 
00964                                 t_ret = 0;
00965                                 if ((t_ret = CDB___db_lget(
00966                                     dbc, 0, metapno,
00967                                     lock_mode, 0, &metalock)) != 0)
00968                                         goto err1;
00969 
00970                                 if (meta->first_recno > skipped) {
00971                                         meta->first_recno = skipped;
00972                                         meta_dirty = 1;
00973                                 }
00974                                 cp->pgno = save_page;
00975                                 cp->indx = save_indx;
00976                                 cp->recno = save_recno;
00977                                 cp->lock = save_lock;
00978                         }
00979                         locked = 1;
00980                 }
00981         }
00982 
00983 err1:
00984         cp->page = NULL;
00985         if (pg != NULL) {
00986                 if (!ret)
00987                         ret = t_ret;
00988                 t_ret = CDB_memp_fput(dbp->mpf, pg, put_mode);
00989 
00990                 if (!ret)
00991                         ret = t_ret;
00992                 /* Doing record locking, release the page lock */
00993                 t_ret = __LPUT(dbc, pglock);
00994         }
00995 
00996 err:
00997         if (meta) {
00998                 if (!ret)
00999                         ret = t_ret;
01000 
01001                 /* release the meta page */
01002                 t_ret = CDB_memp_fput(
01003                     dbp->mpf, meta, meta_dirty ? DB_MPOOL_DIRTY : 0);
01004 
01005                 if (!ret)
01006                         ret = t_ret;
01007 
01008                 /* Don't hold the meta page long term. */
01009                 if (locked)
01010                         t_ret = __LPUT(dbc, metalock);
01011         }
01012 
01013         return (ret ? ret : t_ret);
01014 }
01015 
01016 /*
01017  * __qam_c_close --
01018  *      Close down the cursor from a single use.
01019  */
01020 static int
01021 __qam_c_close(dbc, root_pgno, rmroot)
01022         DBC *dbc;
01023         db_pgno_t root_pgno;
01024         int *rmroot;
01025 {
01026         QUEUE_CURSOR *cp;
01027 
01028         COMPQUIET(root_pgno, 0);
01029         COMPQUIET(rmroot, NULL);
01030 
01031         cp = (QUEUE_CURSOR *)dbc->internal;
01032 
01033         /* Discard any locks not acquired inside of a transaction. */
01034         if (cp->lock.off != LOCK_INVALID) {
01035                 (void)__TLPUT(dbc, cp->lock);
01036                 cp->lock.off = LOCK_INVALID;
01037         }
01038 
01039         cp->page = NULL;
01040         cp->pgno = PGNO_INVALID;
01041         cp->indx = 0;
01042         cp->lock.off = LOCK_INVALID;
01043         cp->lock_mode = DB_LOCK_NG;
01044         cp->recno = RECNO_OOB;
01045         cp->flags = 0;
01046 
01047         return (0);
01048 }
01049 
01050 /*
01051  * CDB___qam_c_dup --
01052  *      Duplicate a queue cursor, such that the new one holds appropriate
01053  *      locks for the position of the original.
01054  *
01055  * PUBLIC: int CDB___qam_c_dup __P((DBC *, DBC *));
01056  */
01057 int
01058 CDB___qam_c_dup(orig_dbc, new_dbc)
01059         DBC *orig_dbc, *new_dbc;
01060 {
01061         QUEUE_CURSOR *orig, *new;
01062 
01063         orig = (QUEUE_CURSOR *)orig_dbc->internal;
01064         new = (QUEUE_CURSOR *)new_dbc->internal;
01065 
01066         new->recno = orig->recno;
01067         new->start = orig->start;
01068 
01069         /* reget the long term lock if we are not in a xact */
01070         if (orig_dbc->txn != NULL ||
01071             !STD_LOCKING(orig_dbc) || orig->lock.off == LOCK_INVALID)
01072                 return (0);
01073 
01074         return (CDB___db_lget(new_dbc,
01075             0, new->recno, new->lock_mode, DB_LOCK_RECORD, &new->lock));
01076 }
01077 
01078 /*
01079  * CDB___qam_c_init
01080  *
01081  * PUBLIC: int CDB___qam_c_init __P((DBC *));
01082  */
01083 int
01084 CDB___qam_c_init(dbc)
01085         DBC *dbc;
01086 {
01087         QUEUE_CURSOR *cp;
01088         DB *dbp;
01089         int ret;
01090 
01091         dbp = dbc->dbp;
01092 
01093         /* Allocate the internal structure. */
01094         cp = (QUEUE_CURSOR *)dbc->internal;
01095         if (cp == NULL) {
01096                 if ((ret =
01097                     CDB___os_calloc(dbp->dbenv, 1, sizeof(QUEUE_CURSOR), &cp)) != 0)
01098                         return (ret);
01099                 dbc->internal = (DBC_INTERNAL *)cp;
01100         }
01101 
01102         /* Initialize methods. */
01103         dbc->c_close = CDB___db_c_close;
01104         dbc->c_count = CDB___db_c_count;
01105         dbc->c_del = CDB___db_c_del;
01106         dbc->c_dup = CDB___db_c_dup;
01107         dbc->c_get = CDB___db_c_get;
01108         dbc->c_put = CDB___db_c_put;
01109         dbc->c_am_close = __qam_c_close;
01110         dbc->c_am_del = __qam_c_del;
01111         dbc->c_am_destroy = __qam_c_destroy;
01112         dbc->c_am_get = __qam_c_get;
01113         dbc->c_am_put = __qam_c_put;
01114         dbc->c_am_writelock = NULL;
01115 
01116         return (0);
01117 }
01118 
01119 /*
01120  * __qam_c_destroy --
01121  *      Close a single cursor -- internal version.
01122  */
01123 static int
01124 __qam_c_destroy(dbc)
01125         DBC *dbc;
01126 {
01127         /* Discard the structures. */
01128         CDB___os_free(dbc->internal, sizeof(QUEUE_CURSOR));
01129 
01130         return (0);
01131 }
01132 
01133 /*
01134  * __qam_getno --
01135  *      Check the user's record number.
01136  */
01137 static int
01138 __qam_getno(dbp, key, rep)
01139         DB *dbp;
01140         const DBT *key;
01141         db_recno_t *rep;
01142 {
01143         if ((*rep = *(db_recno_t *)key->data) == 0) {
01144                 CDB___db_err(dbp->dbenv, "illegal record number of 0");
01145                 return (EINVAL);
01146         }
01147         return (0);
01148 }

Generated on Sun Jun 8 10:56:38 2008 for GNUmifluz by  doxygen 1.5.5