00001
00002
00003
00004
00005
00006
00007
00008 #include "config.h"
00009
00010 #ifndef lint
00011 static const char revid[] = "$Id: db__join_8c-source.html,v 1.1 2008/06/08 10:17:49 sebdiaz Exp $";
00012 #endif
00013
00014 #ifndef NO_SYSTEM_INCLUDES
00015 #include <sys/types.h>
00016
00017 #include <errno.h>
00018 #include <stdlib.h>
00019 #include <string.h>
00020 #endif
00021
00022 #include "db_int.h"
00023 #include "db_page.h"
00024 #include "db_join.h"
00025 #include "db_am.h"
00026 #include "btree.h"
00027
00028 static int __db_join_close __P((DBC *));
00029 static int __db_join_cmp __P((const void *, const void *));
00030 static int __db_join_del __P((DBC *, u_int32_t));
00031 static int __db_join_get __P((DBC *, DBT *, DBT *, u_int32_t));
00032 static int __db_join_getnext __P((DBC *, DBT *, DBT *, u_int32_t));
00033 static int __db_join_put __P((DBC *, DBT *, DBT *, u_int32_t));
00034
00035
00036
00037
00038
00039 #define SORTED_SET(jc, n) ((jc)->j_curslist[(n)]->dbp->dup_compare != NULL)
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078 int
00079 CDB___db_join(primary, curslist, dbcp, flags)
00080 DB *primary;
00081 DBC **curslist, **dbcp;
00082 u_int32_t flags;
00083 {
00084 DB_ENV *dbenv;
00085 DBC *dbc;
00086 JOIN_CURSOR *jc;
00087 int ret;
00088 u_int32_t i, ncurs, nslots;
00089
00090 COMPQUIET(nslots, 0);
00091
00092 PANIC_CHECK(primary->dbenv);
00093
00094 if ((ret = CDB___db_joinchk(primary, flags)) != 0)
00095 return (ret);
00096
00097 if (curslist == NULL || curslist[0] == NULL)
00098 return (EINVAL);
00099
00100 dbc = NULL;
00101 jc = NULL;
00102 dbenv = primary->dbenv;
00103
00104 if ((ret = CDB___os_calloc(dbenv, 1, sizeof(DBC), &dbc)) != 0)
00105 goto err;
00106
00107 if ((ret = CDB___os_calloc(dbenv,
00108 1, sizeof(JOIN_CURSOR), &jc)) != 0)
00109 goto err;
00110
00111 if ((ret = CDB___os_malloc(dbenv, 256, NULL, &jc->j_key.data)) != 0)
00112 goto err;
00113 jc->j_key.ulen = 256;
00114 F_SET(&jc->j_key, DB_DBT_USERMEM);
00115
00116 for (jc->j_curslist = curslist;
00117 *jc->j_curslist != NULL; jc->j_curslist++)
00118 ;
00119
00120
00121
00122
00123
00124
00125 ncurs = jc->j_curslist - curslist;
00126 nslots = ncurs + 1;
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168 jc->j_curslist = NULL;
00169 jc->j_workcurs = NULL;
00170 jc->j_fdupcurs = NULL;
00171 jc->j_exhausted = NULL;
00172
00173 if ((ret = CDB___os_calloc(dbenv, nslots, sizeof(DBC *),
00174 &jc->j_curslist)) != 0)
00175 goto err;
00176 if ((ret = CDB___os_calloc(dbenv, nslots, sizeof(DBC *),
00177 &jc->j_workcurs)) != 0)
00178 goto err;
00179 if ((ret = CDB___os_calloc(dbenv, nslots, sizeof(DBC *),
00180 &jc->j_fdupcurs)) != 0)
00181 goto err;
00182 if ((ret = CDB___os_calloc(dbenv, nslots, sizeof(u_int8_t),
00183 &jc->j_exhausted)) != 0)
00184 goto err;
00185 for (i = 0; curslist[i] != NULL; i++) {
00186 jc->j_curslist[i] = curslist[i];
00187 jc->j_workcurs[i] = NULL;
00188 jc->j_fdupcurs[i] = NULL;
00189 jc->j_exhausted[i] = 0;
00190 }
00191 jc->j_ncurs = ncurs;
00192
00193
00194
00195
00196
00197 if (!LF_ISSET(DB_JOIN_NOSORT))
00198 qsort(jc->j_curslist, ncurs, sizeof(DBC *), __db_join_cmp);
00199
00200
00201
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211 if ((ret = jc->j_curslist[0]->c_dup(jc->j_curslist[0], jc->j_workcurs,
00212 DB_POSITIONI)) != 0)
00213 goto err;
00214
00215 dbc->c_close = __db_join_close;
00216 dbc->c_del = __db_join_del;
00217 dbc->c_get = __db_join_get;
00218 dbc->c_put = __db_join_put;
00219 dbc->internal = (DBC_INTERNAL *) jc;
00220 dbc->dbp = primary;
00221 jc->j_primary = primary;
00222
00223 *dbcp = dbc;
00224
00225 MUTEX_THREAD_LOCK(primary->mutexp);
00226 TAILQ_INSERT_TAIL(&primary->join_queue, dbc, links);
00227 MUTEX_THREAD_UNLOCK(primary->mutexp);
00228
00229 return (0);
00230
00231 err: if (jc != NULL) {
00232 if (jc->j_curslist != NULL)
00233 CDB___os_free(jc->j_curslist, nslots * sizeof(DBC *));
00234 if (jc->j_workcurs != NULL) {
00235 if (jc->j_workcurs[0] != NULL)
00236 CDB___os_free(jc->j_workcurs[0], sizeof(DBC));
00237 CDB___os_free(jc->j_workcurs, nslots * sizeof(DBC *));
00238 }
00239 if (jc->j_fdupcurs != NULL)
00240 CDB___os_free(jc->j_fdupcurs, nslots * sizeof(DBC *));
00241 if (jc->j_exhausted != NULL)
00242 CDB___os_free(jc->j_exhausted, nslots * sizeof(u_int8_t));
00243 CDB___os_free(jc, sizeof(JOIN_CURSOR));
00244 }
00245 if (dbc != NULL)
00246 CDB___os_free(dbc, sizeof(DBC));
00247 return (ret);
00248 }
00249
00250 static int
00251 __db_join_put(dbc, key, data, flags)
00252 DBC *dbc;
00253 DBT *key;
00254 DBT *data;
00255 u_int32_t flags;
00256 {
00257 PANIC_CHECK(dbc->dbp->dbenv);
00258
00259 COMPQUIET(key, NULL);
00260 COMPQUIET(data, NULL);
00261 COMPQUIET(flags, 0);
00262 return (EINVAL);
00263 }
00264
00265 static int
00266 __db_join_del(dbc, flags)
00267 DBC *dbc;
00268 u_int32_t flags;
00269 {
00270 PANIC_CHECK(dbc->dbp->dbenv);
00271
00272 COMPQUIET(flags, 0);
00273 return (EINVAL);
00274 }
00275
00276 static int
00277 __db_join_get(dbc, key_arg, data_arg, flags)
00278 DBC *dbc;
00279 DBT *key_arg, *data_arg;
00280 u_int32_t flags;
00281 {
00282 DBT *key_n, key_n_mem;
00283 DB *dbp;
00284 DBC *cp;
00285 JOIN_CURSOR *jc;
00286 int ret;
00287 u_int32_t i, j, operation;
00288
00289 dbp = dbc->dbp;
00290 jc = (JOIN_CURSOR *)dbc->internal;
00291
00292 PANIC_CHECK(dbp->dbenv);
00293
00294 operation = LF_ISSET(DB_OPFLAGS_MASK);
00295
00296 if ((ret = CDB___db_joingetchk(dbp, key_arg, flags)) != 0)
00297 return (ret);
00298
00299
00300
00301
00302
00303
00304
00305 if (F_ISSET(key_arg, DB_DBT_USERMEM) ||
00306 F_ISSET(key_arg, DB_DBT_MALLOC)) {
00307
00308 key_n = &key_n_mem;
00309 memset(key_n, 0, sizeof(DBT));
00310 } else {
00311
00312
00313
00314
00315 key_n = key_arg;
00316 }
00317
00318
00319
00320
00321
00322 if (F_ISSET(jc, JOIN_RETRY))
00323 goto samekey;
00324 F_CLR(jc, JOIN_RETRY);
00325
00326 retry: ret = jc->j_workcurs[0]->c_get(jc->j_workcurs[0],
00327 &jc->j_key, key_n, jc->j_exhausted[0] ? DB_NEXT_DUP : DB_CURRENT);
00328
00329 if (ret == ENOMEM) {
00330 jc->j_key.ulen <<= 1;
00331 if ((ret = CDB___os_realloc(dbp->dbenv,
00332 jc->j_key.ulen, NULL, &jc->j_key.data)) != 0)
00333 goto mem_err;
00334 goto retry;
00335 }
00336
00337
00338
00339
00340
00341
00342 if (ret != 0)
00343 goto err;
00344
00345
00346
00347
00348
00349
00350
00351
00352 for (i = 1; i < jc->j_ncurs; i++) {
00353 if (jc->j_fdupcurs[i] != NULL &&
00354 (ret = jc->j_fdupcurs[i]->c_close(jc->j_fdupcurs[i])) != 0)
00355 goto err;
00356 jc->j_fdupcurs[i] = NULL;
00357 }
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368 if (jc->j_curslist[1] == NULL)
00369 jc->j_exhausted[0] = 1;
00370 else
00371 jc->j_exhausted[0] = 0;
00372
00373
00374 for (i = 1; i < jc->j_ncurs; i++) {
00375 DB_ASSERT(jc->j_curslist[i] != NULL);
00376 if (jc->j_workcurs[i] == NULL)
00377
00378 if ((ret = jc->j_curslist[i]->c_dup(
00379 jc->j_curslist[i], jc->j_workcurs + i,
00380 DB_POSITIONI)) != 0)
00381 goto err;
00382
00383 retry2: cp = jc->j_workcurs[i];
00384
00385 if ((ret = __db_join_getnext(cp, &jc->j_key, key_n,
00386 jc->j_exhausted[i])) == DB_NOTFOUND) {
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398 --i;
00399 jc->j_exhausted[i] = 1;
00400
00401 if (i == 0) {
00402 for (j = 1; jc->j_workcurs[j] != NULL; j++) {
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431 if ((ret = jc->j_workcurs[j]->c_close(
00432 jc->j_workcurs[j])) != 0)
00433 goto err;
00434 if (!SORTED_SET(jc, 0) ||
00435 !SORTED_SET(jc, j) ||
00436 jc->j_fdupcurs[j] == NULL)
00437
00438
00439
00440
00441 jc->j_workcurs[j] = NULL;
00442 else
00443
00444 if ((jc->j_fdupcurs[j]->c_dup(
00445 jc->j_fdupcurs[j],
00446 &jc->j_workcurs[j],
00447 DB_POSITIONI)) != 0)
00448 goto err;
00449 jc->j_exhausted[j] = 0;
00450 }
00451 goto retry;
00452
00453 }
00454
00455
00456
00457
00458
00459
00460 for (j = i + 1;
00461 jc->j_workcurs[j] != NULL;
00462 j++) {
00463 if ((ret = jc->j_workcurs[j]->c_close(
00464 jc->j_workcurs[j])) != 0)
00465 goto err;
00466 jc->j_exhausted[j] = 0;
00467 if (jc->j_fdupcurs[j] != NULL &&
00468 (ret = jc->j_fdupcurs[j]->c_dup(
00469 jc->j_fdupcurs[j], &jc->j_workcurs[j],
00470 DB_POSITIONI)) != 0)
00471 goto err;
00472 else
00473 jc->j_workcurs[j] = NULL;
00474 }
00475 goto retry2;
00476
00477 }
00478
00479 if (ret == ENOMEM) {
00480 jc->j_key.ulen <<= 1;
00481 if ((ret = CDB___os_realloc(dbp->dbenv, jc->j_key.ulen,
00482 NULL, &jc->j_key.data)) != 0) {
00483 mem_err: CDB___db_err(dbp->dbenv,
00484 "Allocation failed for join key, len = %lu",
00485 (u_long)jc->j_key.ulen);
00486 goto err;
00487 }
00488 goto retry2;
00489 }
00490
00491 if (ret != 0)
00492 goto err;
00493
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504 if (i + 1 != jc->j_ncurs)
00505 jc->j_exhausted[i] = 0;
00506 else
00507 jc->j_exhausted[i] = 1;
00508
00509
00510
00511
00512
00513
00514
00515
00516 if (SORTED_SET(jc, i) && jc->j_fdupcurs[i] == NULL && (ret =
00517 cp->c_dup(cp, &jc->j_fdupcurs[i], DB_POSITIONI)) != 0)
00518 goto err;
00519
00520 }
00521
00522 err: if (ret != 0)
00523 return (ret);
00524
00525 if (0) {
00526 samekey:
00527
00528
00529
00530 if ((ret = jc->j_workcurs[0]->c_get(jc->j_workcurs[0],
00531 &jc->j_key, key_n, DB_CURRENT)) != 0)
00532 return (ret);
00533 F_CLR(jc, JOIN_RETRY);
00534 }
00535
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546 if (F_ISSET(key_arg, DB_DBT_USERMEM) ||
00547 F_ISSET(key_arg, DB_DBT_MALLOC)) {
00548
00549
00550
00551
00552 if ((ret = CDB___db_retcopy(dbp,
00553 key_arg, key_n->data, key_n->size, NULL, NULL)) != 0) {
00554
00555
00556
00557
00558
00559 F_SET(jc, JOIN_RETRY);
00560 return (ret);
00561 }
00562 } else
00563 DB_ASSERT(key_n == key_arg);
00564
00565
00566
00567
00568
00569
00570
00571
00572
00573 if (operation == DB_JOIN_ITEM)
00574 return (0);
00575
00576 if ((ret = jc->j_primary->get(jc->j_primary,
00577 jc->j_curslist[0]->txn, key_arg, data_arg, 0)) != 0)
00578
00579
00580
00581
00582
00583 F_SET(jc, JOIN_RETRY);
00584
00585 return (ret);
00586 }
00587
00588 static int
00589 __db_join_close(dbc)
00590 DBC *dbc;
00591 {
00592 DB *dbp;
00593 JOIN_CURSOR *jc;
00594 int ret, t_ret;
00595 u_int32_t i;
00596
00597 jc = (JOIN_CURSOR *)dbc->internal;
00598 dbp = dbc->dbp;
00599 ret = t_ret = 0;
00600
00601
00602
00603
00604
00605
00606 MUTEX_THREAD_LOCK(dbp->mutexp);
00607 TAILQ_REMOVE(&dbp->join_queue, dbc, links);
00608 MUTEX_THREAD_UNLOCK(dbp->mutexp);
00609
00610 PANIC_CHECK(dbc->dbp->dbenv);
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623 for (i = 0; i < jc->j_ncurs; i++) {
00624 if (jc->j_workcurs[i] != NULL && (t_ret =
00625 jc->j_workcurs[i]->c_close(jc->j_workcurs[i])) != 0)
00626 ret = t_ret;
00627 if (jc->j_fdupcurs[i] != NULL && (t_ret =
00628 jc->j_fdupcurs[i]->c_close(jc->j_fdupcurs[i])) != 0)
00629 ret = t_ret;
00630 }
00631
00632 CDB___os_free(jc->j_exhausted, 0);
00633 CDB___os_free(jc->j_curslist, 0);
00634 CDB___os_free(jc->j_workcurs, 0);
00635 CDB___os_free(jc->j_fdupcurs, 0);
00636 CDB___os_free(jc->j_key.data, jc->j_key.ulen);
00637 CDB___os_free(jc, sizeof(JOIN_CURSOR));
00638 CDB___os_free(dbc, sizeof(DBC));
00639
00640 return (ret);
00641 }
00642
00643
00644
00645
00646
00647
00648
00649
00650
00651
00652
00653
00654
00655
00656
00657
00658 static int
00659 __db_join_getnext(dbc, key, data, exhausted)
00660 DBC *dbc;
00661 DBT *key, *data;
00662 u_int32_t exhausted;
00663 {
00664 int ret, cmp;
00665 DB *dbp;
00666 DBT ldata;
00667 int (*func) __P((const DBT *, const DBT *));
00668
00669 dbp = dbc->dbp;
00670
00671 func = (dbp->dup_compare == NULL) ? CDB___bam_defcmp : dbp->dup_compare;
00672
00673 switch (exhausted) {
00674 case 0:
00675 memset(&ldata, 0, sizeof(DBT));
00676
00677 F_SET(&ldata, DB_DBT_MALLOC);
00678 if ((ret = dbc->c_get(dbc, key, &ldata, DB_CURRENT)) != 0)
00679 break;
00680 cmp = func(data, &ldata);
00681 if (cmp == 0) {
00682
00683
00684
00685
00686
00687 if ((ret = CDB___db_retcopy(dbp, data, ldata.data,
00688 ldata.size, &data->data, &data->size)) != 0)
00689 return (ret);
00690 CDB___os_free(ldata.data, 0);
00691 return (0);
00692 }
00693
00694
00695
00696
00697
00698
00699 CDB___os_free(ldata.data, 0);
00700
00701 case 1:
00702 ret = dbc->c_get(dbc, key, data, DB_GET_BOTHC);
00703 break;
00704 default:
00705 ret = EINVAL;
00706 break;
00707 }
00708
00709 return (ret);
00710 }
00711
00712
00713
00714
00715
00716
00717 static int
00718 __db_join_cmp(a, b)
00719 const void *a, *b;
00720 {
00721 DBC *dbca, *dbcb;
00722 db_recno_t counta, countb;
00723
00724
00725 counta = countb = 0;
00726
00727 dbca = *((DBC * const *)a);
00728 dbcb = *((DBC * const *)b);
00729
00730 if (dbca->c_count(dbca, &counta, 0) != 0 ||
00731 dbcb->c_count(dbcb, &countb, 0) != 0)
00732 return (0);
00733
00734 return (counta - countb);
00735 }