WordDBCache.cc

Go to the documentation of this file.
00001 //
00002 // Part of the ht://Dig package   <http://www.htdig.org/>
00003 // Copyright (c) 1999, 2000, 2001 The ht://Dig Group
00004 // For copyright details, see the file COPYING in your distribution
00005 // or the GNU General Public License version 2 or later
00006 // <http://www.gnu.org/copyleft/gpl.html>
00007 //
00008 #ifdef HAVE_CONFIG_H
00009 #include "config.h"
00010 #endif /* HAVE_CONFIG_H */
00011 
00012 #include <stdio.h>
00013 #include <unistd.h>
00014 #include <stdlib.h>
00015 #include <sys/stat.h>
00016 #include <errno.h>
00017 
00018 #include "WordKey.h"
00019 #include "WordDB.h"
00020 #include "WordDBCache.h"
00021 #include "WordMeta.h"
00022 #include "ber.h"
00023 
00024 int WordDBCaches::Add(char* key, int key_size, char* data, int data_size)
00025 {
00026   int ret;
00027   if((ret = cache.Allocate(key_size + data_size)) == ENOMEM) {
00028     if((ret = CacheFlush()) != 0) return ret;
00029     if((ret = cache.Allocate(key_size + data_size))) return ret;
00030   }
00031 
00032   return cache.Add(key, key_size, data, data_size);
00033 }
00034 
00035 int WordDBCaches::AddFile(String& filename)
00036 {
00037   char tmp[32];
00038   unsigned int serial;
00039   words->Meta()->Serial(WORD_META_SERIAL_FILE, serial);
00040   if(serial == WORD_META_SERIAL_INVALID)
00041     return NOTOK;
00042   filename = words->Filename();
00043   sprintf(tmp, "C%08d", serial - 1);
00044   filename << tmp;
00045 
00046   //
00047   // Recalculate the total size of the cache by summing all the temporary
00048   // file sizes.
00049   //
00050   size = 0;
00051   {
00052     String filename;
00053     String dummy;
00054     WordDBCursor* cursor = files->Cursor();
00055     struct stat stat_buf;
00056     int i;
00057     int ret;
00058     for(i = 0; (ret = cursor->Get(filename, dummy, DB_NEXT)) == 0; i++) {
00059       //
00060       // ENOENT is an acceptable error condition. Other processes may have
00061       // a temporary file that is not yet full and therefore not yet written
00062       // to disk, or even created. The file name was allocated but the file
00063       // does not exist and this is perfectly ok.
00064       //
00065       if(stat((char*)filename, &stat_buf) == 0) {
00066         size += stat_buf.st_size;
00067       } else if(errno != ENOENT) {
00068         const String message = String("WordDBCaches::AddFile: cannot stat ") + filename;
00069         perror((const char*)message);
00070         return NOTOK;
00071       }
00072     }
00073     delete cursor;
00074   }
00075 
00076   String dummy;
00077   if(files->Put(0, filename, dummy, 0) != 0)
00078     return NOTOK;
00079 
00080   return OK;
00081 }
00082 
00083 int WordDBCaches::CacheFlush()
00084 {
00085   if(cache.Empty()) return OK;
00086 
00087   if(cache.Sort() != OK) return NOTOK;
00088   String filename;
00089   int locking = 0;
00090   if(!lock) {
00091     words->Meta()->Lock("cache", lock);
00092     locking = 1;
00093   }
00094   if(AddFile(filename) != OK) return NOTOK;
00095   if(CacheWrite(filename) != OK) return NOTOK;
00096 
00097   unsigned int serial;
00098   words->Meta()->GetSerial(WORD_META_SERIAL_FILE, serial);
00099   if(serial >= (unsigned int)file_max || Full())
00100     if(Merge() != OK) return NOTOK;
00101   if(locking) words->Meta()->Unlock("cache", lock);
00102 
00103   return OK;
00104 }
00105 
00106 static int merge_cmp_size(WordDBCaches* , WordDBCacheFile* a, WordDBCacheFile* b)
00107 {
00108   return b->size - a->size;
00109 }
00110 
00111 int WordDBCaches::Merge()
00112 {
00113   if(CacheFlush() != OK) return NOTOK;
00114 
00115   int locking = 0;
00116   if(!lock) {
00117     words->Meta()->Lock("cache", lock);
00118     locking = 1;
00119   }
00120   unsigned int serial;
00121   words->Meta()->GetSerial(WORD_META_SERIAL_FILE, serial);
00122   if(serial <= 1) return OK;
00123   
00124   //
00125   // heap lists all the files in decreasing size order (biggest first)
00126   //
00127   WordDBCacheFile* heap = new WordDBCacheFile[serial];
00128   {
00129     String filename;
00130     String dummy;
00131     WordDBCursor* cursor = files->Cursor();
00132     struct stat stat_buf;
00133     int i;
00134     int ret;
00135     for(i = 0; (ret = cursor->Get(filename, dummy, DB_NEXT)) == 0; i++) {
00136       WordDBCacheFile& file = heap[i];
00137       file.filename = filename;
00138       if(stat((char*)file.filename, &stat_buf) == 0) {
00139         file.size = stat_buf.st_size;
00140       } else {
00141         const String message = String("WordDBCaches::Merge: cannot stat ") + file.filename;
00142         perror((const char*)message);
00143         return NOTOK;
00144       }
00145       cursor->Del();
00146     }
00147     delete cursor;
00148     myqsort((void*)heap, serial, sizeof(WordDBCacheFile), (myqsort_cmp)merge_cmp_size, (void*)this);
00149   }
00150 
00151   String tmpname = words->Filename() + String("C.tmp");
00152 
00153   while(serial > 1) {
00154     WordDBCacheFile* a = &heap[serial - 1];
00155     WordDBCacheFile* b = &heap[serial - 2];
00156 
00157     if(Merge(a->filename, b->filename, tmpname) != OK) return NOTOK;
00158 
00159     //
00160     // Remove file a
00161     //
00162     if(unlink((char*)a->filename) != 0) {
00163       const String message = String("WordDBCaches::Merge: unlink ") + a->filename;
00164       perror((const char*)message);
00165       return NOTOK;
00166     }
00167 
00168     //
00169     // Remove file b
00170     //
00171     if(unlink((char*)b->filename) != 0) {
00172       const String message = String("WordDBCaches::Merge: unlink ") + b->filename;
00173       perror((const char*)message);
00174       return NOTOK;
00175     }
00176 
00177     //
00178     // Rename tmp file into file b
00179     //
00180     if(rename((char*)tmpname, (char*)b->filename) != 0) {
00181       const String message = String("WordDBCaches::Merge: rename ") + tmpname + String(" ") + b->filename;
00182       perror((const char*)message);
00183       return NOTOK;
00184     }
00185 
00186     //
00187     // Update b file size. The size need not be accurate number as long
00188     // as it reflects the relative size of each file.
00189     //
00190     b->size += a->size;
00191     
00192     serial--;
00193     //
00194     // update heap
00195     //
00196     myqsort((void*)heap, serial, sizeof(WordDBCacheFile), (myqsort_cmp)merge_cmp_size, (void*)this);
00197   }
00198 
00199   {
00200     String newname(words->Filename());
00201     newname << "C00000000";
00202 
00203     if(rename((char*)heap[0].filename, (char*)newname) != 0) {
00204       const String message = String("WordDBCaches::Merge: rename ") + heap[0].filename + String(" ") + newname;
00205       perror((const char*)message);
00206       return NOTOK;
00207     }
00208 
00209     String dummy;
00210     if(files->Put(0, newname, dummy, 0) != 0)
00211       return NOTOK;
00212     words->Meta()->SetSerial(WORD_META_SERIAL_FILE, serial);
00213   }
00214   if(locking) words->Meta()->Unlock("cache", lock);
00215 
00216   return OK;
00217 }
00218 
00219 int WordDBCaches::Merge(const String& filea, const String& fileb, const String& tmpname)
00220 {
00221   FILE* ftmp = fopen((const char*)tmpname, "w");
00222   FILE* fa = fopen((const char*)filea, "r");
00223   FILE* fb = fopen((const char*)fileb, "r");
00224 
00225   unsigned int buffertmp_size = 128;
00226   unsigned char* buffertmp = (unsigned char*)malloc(buffertmp_size);
00227   unsigned int buffera_size = 128;
00228   unsigned char* buffera = (unsigned char*)malloc(buffera_size);
00229   unsigned int bufferb_size = 128;
00230   unsigned char* bufferb = (unsigned char*)malloc(bufferb_size);
00231 
00232   unsigned int entriesa_length;
00233   if(ber_file2value(fa, entriesa_length) < 1) return NOTOK;
00234   unsigned int entriesb_length;
00235   if(ber_file2value(fb, entriesb_length) < 1) return NOTOK;
00236 
00237   if(ber_value2file(ftmp, entriesa_length + entriesb_length) < 1) return NOTOK;
00238 
00239   WordDBCacheEntry entrya;
00240   WordDBCacheEntry entryb;
00241 
00242   if(entriesa_length > 0 && entriesb_length > 0) {
00243     
00244     if(ReadEntry(fa, entrya, buffera, buffera_size) != OK) return NOTOK;
00245     if(ReadEntry(fb, entryb, bufferb, bufferb_size) != OK) return NOTOK;
00246 
00247     while(entriesa_length > 0 && entriesb_length > 0) {
00248       if(WordKey::Compare(words->GetContext(), (const unsigned char*)entrya.key, entrya.key_size, (const unsigned char*)entryb.key, entryb.key_size) < 0) {
00249         if(WriteEntry(ftmp, entrya, buffertmp, buffertmp_size) != OK) return NOTOK;
00250         if(--entriesa_length > 0)
00251           if(ReadEntry(fa, entrya, buffera, buffera_size) != OK) return NOTOK;
00252       } else {
00253         if(WriteEntry(ftmp, entryb, buffertmp, buffertmp_size) != OK) return NOTOK;
00254         if(--entriesb_length > 0)
00255           if(ReadEntry(fb, entryb, bufferb, bufferb_size) != OK) return NOTOK;
00256       }
00257     }
00258   }
00259 
00260   if(entriesa_length > 0 || entriesb_length > 0) {
00261     FILE* fp = entriesa_length > 0 ? fa : fb;
00262     unsigned int& entries_length = entriesa_length > 0 ? entriesa_length : entriesb_length;
00263     WordDBCacheEntry& entry = entriesa_length > 0 ? entrya : entryb;
00264     while(entries_length > 0) {
00265       if(WriteEntry(ftmp, entry, buffertmp, buffertmp_size) != OK) return NOTOK;
00266       if(--entries_length > 0)
00267         if(ReadEntry(fp, entry, buffera, buffera_size) != OK) return NOTOK;
00268     }
00269   }
00270 
00271   free(buffera);
00272   free(bufferb);
00273   free(buffertmp);
00274 
00275   fclose(fa);
00276   fclose(fb);
00277   fclose(ftmp);
00278 
00279   return OK;
00280 }
00281 
00282 int WordDBCaches::Merge(WordDB& db)
00283 {
00284   int locking = 0;
00285   if(!lock) {
00286     words->Meta()->Lock("cache", lock);
00287     locking = 1;
00288   }
00289   if(Merge() != OK) return NOTOK;
00290 
00291   String filename;
00292   String dummy;
00293   WordDBCursor* cursor = files->Cursor();
00294   if(cursor->Get(filename, dummy, DB_FIRST) != 0) {
00295     delete cursor;
00296     return NOTOK;
00297   }
00298   cursor->Del();
00299   delete cursor;
00300 
00301   FILE* fp = fopen((char*)filename, "r");
00302 
00303   unsigned int buffer_size = 128;
00304   unsigned char* buffer = (unsigned char*)malloc(buffer_size);
00305 
00306   unsigned int entries_length;
00307   if(ber_file2value(fp, entries_length) < 1) return NOTOK;
00308 
00309   WordDBCacheEntry entry;
00310 
00311   unsigned int i;
00312   for(i = 0; i < entries_length; i++) {
00313     if(ReadEntry(fp, entry, buffer, buffer_size) != OK) return NOTOK;
00314     void* user_data = words->GetContext();
00315     WORD_DBT_INIT(rkey, (void*)entry.key, entry.key_size);
00316     WORD_DBT_INIT(rdata, (void*)entry.data, entry.data_size);
00317     db.db->put(db.db, 0, &rkey, &rdata, 0);
00318   }
00319 
00320   if(unlink((char*)filename) != 0) {
00321     const String message = String("WordDBCaches::Merge: unlink ") + filename;
00322     perror((const char*)message);
00323     return NOTOK;
00324   }
00325 
00326   words->Meta()->SetSerial(WORD_META_SERIAL_FILE, 0);
00327   if(locking) words->Meta()->Unlock("cache", lock);
00328   size = 0;
00329   free(buffer);
00330   fclose(fp);
00331 
00332   return OK;
00333 }
00334 
00335 int WordDBCaches::CacheWrite(const String& filename)
00336 {
00337   FILE* fp = fopen(filename, "w");
00338   if(!fp) {
00339     String message;
00340     message << "WordDBCaches::CacheWrite()" << filename << "): ";
00341     perror((char*)message);
00342     return NOTOK;
00343   }
00344 
00345   int entries_length;
00346   WordDBCacheEntry* entries;
00347   int ret;
00348   if((ret = cache.Entries(entries, entries_length)) != 0)
00349     return ret;
00350 
00351   if(ber_value2file(fp, entries_length) < 1) return NOTOK;
00352 
00353   unsigned int buffer_size = 1024;
00354   unsigned char* buffer = (unsigned char*)malloc(buffer_size);
00355   int i;
00356   for(i = 0; i < entries_length; i++) {
00357     if(WriteEntry(fp, entries[i], buffer, buffer_size) != OK) return NOTOK;
00358   }
00359   free(buffer);
00360   fclose(fp);
00361 
00362   cache.Flush();
00363 
00364   return OK;
00365 }
00366 
00367 int WordDBCaches::WriteEntry(FILE* fp, WordDBCacheEntry& entry, unsigned char*& buffer, unsigned int& buffer_size)
00368 {
00369     if(entry.key_size + entry.data_size + 64 > buffer_size) {
00370       buffer_size = entry.key_size + entry.data_size + 64;
00371       buffer = (unsigned char*)realloc(buffer, buffer_size);
00372     }
00373 
00374     int p_size = buffer_size;
00375     unsigned char* p = buffer;
00376 
00377     int ber_len;
00378     if((ber_len = ber_value2buf(p, p_size, entry.key_size)) < 1) {
00379       fprintf(stderr, "WordDBCaches::WriteEntry: BER failed for key %d\n", entry.key_size);
00380       return NOTOK;
00381     }
00382     p += ber_len;
00383     memcpy(p, entry.key, entry.key_size);
00384     p += entry.key_size;
00385 
00386     p_size -= ber_len + entry.key_size;
00387 
00388     if((ber_len = ber_value2buf(p, p_size, entry.data_size)) < 1) {
00389       fprintf(stderr, "WordDBCaches::WriteEntry: BER failed for data %d\n", entry.data_size);
00390       return NOTOK;
00391     }
00392     p += ber_len;
00393     memcpy(p, entry.data, entry.data_size);
00394     p += entry.data_size;
00395     
00396     if(fwrite((void*)buffer, p - buffer, 1, fp) != 1) {
00397       perror("WordDBCaches::WriteEntry: cannot write entry ");
00398       return NOTOK;
00399     }
00400 
00401     return OK;
00402 }
00403 
00404 int WordDBCaches::ReadEntry(FILE* fp, WordDBCacheEntry& entry, unsigned char*& buffer, unsigned int& buffer_size)
00405 {
00406   if(ber_file2value(fp, entry.key_size) < 1) return NOTOK;
00407 
00408   if(entry.key_size > buffer_size) {
00409     buffer_size += entry.key_size;
00410     if(!(buffer = (unsigned char*)realloc(buffer, buffer_size))) return NOTOK;
00411   }
00412     
00413   if(fread((void*)buffer, entry.key_size, 1, fp) != 1) {
00414     perror("WordDBCaches::ReadEntry(): cannot read key entry ");
00415     return NOTOK;
00416   }
00417 
00418   if(ber_file2value(fp, entry.data_size) < 1) return NOTOK;
00419 
00420   if(entry.data_size > 0) {
00421     if(entry.data_size + entry.key_size > buffer_size) {
00422       buffer_size += entry.data_size;
00423       if(!(buffer = (unsigned char*)realloc(buffer, buffer_size))) return NOTOK;
00424     }
00425 
00426     if(fread((void*)(buffer + entry.key_size), entry.data_size, 1, fp) != 1) {
00427       perror("WordDBCaches::ReadEntry(): cannot read data entry ");
00428       return NOTOK;
00429     }
00430   }
00431 
00432   entry.key = (char*)buffer;
00433   entry.data = (char*)(buffer + entry.key_size);
00434 
00435   return OK;
00436 }

Generated on Sun Jun 8 10:56:40 2008 for GNUmifluz by  doxygen 1.5.5