[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r4122 - in GNUnet/src/applications: dht/tools dstore
From: |
grothoff |
Subject: |
[GNUnet-SVN] r4122 - in GNUnet/src/applications: dht/tools dstore |
Date: |
Fri, 29 Dec 2006 23:13:50 -0800 (PST) |
Author: grothoff
Date: 2006-12-29 23:13:48 -0800 (Fri, 29 Dec 2006)
New Revision: 4122
Modified:
GNUnet/src/applications/dht/tools/dht-query.c
GNUnet/src/applications/dht/tools/dht_api.c
GNUnet/src/applications/dstore/dstore.c
Log:
fixing dstore
Modified: GNUnet/src/applications/dht/tools/dht-query.c
===================================================================
--- GNUnet/src/applications/dht/tools/dht-query.c 2006-12-30 06:05:16 UTC
(rev 4121)
+++ GNUnet/src/applications/dht/tools/dht-query.c 2006-12-30 07:13:48 UTC
(rev 4122)
@@ -33,6 +33,8 @@
#include "gnunet_util_boot.h"
#include "gnunet_util_network_client.h"
+#define DEBUG_DHT_QUERY NO
+
/**
* How long should a "GET" run (or how long should
* content last on the network).
@@ -81,11 +83,13 @@
hash(key,
strlen(key),
&hc);
+#if DEBUG_DHT_QUERY
GE_LOG(ectx,
GE_DEBUG | GE_REQUEST | GE_USER,
"Issuing '%s(%s)' command.\n",
"get",
key);
+#endif
if (timeout == 0)
timeout = 30 * cronSECONDS;
ret = DHT_LIB_get(cfg,
@@ -115,12 +119,14 @@
memcpy(&dc[1],
value,
strlen(value));
+#if DEBUG_DHT_QUERY
GE_LOG(ectx,
GE_DEBUG | GE_REQUEST | GE_USER,
_("Issuing '%s(%s,%s)' command.\n"),
"put",
key,
value);
+#endif
if (timeout == 0)
timeout = 30 * cronMINUTES;
if (OK == DHT_LIB_put(cfg,
Modified: GNUnet/src/applications/dht/tools/dht_api.c
===================================================================
--- GNUnet/src/applications/dht/tools/dht_api.c 2006-12-30 06:05:16 UTC (rev
4121)
+++ GNUnet/src/applications/dht/tools/dht_api.c 2006-12-30 07:13:48 UTC (rev
4122)
@@ -30,6 +30,8 @@
#include "gnunet_dht_lib.h"
#include "gnunet_util_network_client.h"
+#define DEBUG_DHT_API NO
+
/**
* Data exchanged between main thread and GET thread.
*/
@@ -216,11 +218,13 @@
GE_BREAK(ectx, 0); /* content already expired!? */
return SYSERR;
}
+#if DEBUG_DHT_API
GE_LOG(ectx,
GE_DEBUG | GE_REQUEST | GE_USER,
"DHT_LIB_put called with value '%.*s'\n",
ntohl(value->size),
&value[1]);
+#endif
sock = client_connection_create(ectx,
cfg);
if (sock == NULL)
Modified: GNUnet/src/applications/dstore/dstore.c
===================================================================
--- GNUnet/src/applications/dstore/dstore.c 2006-12-30 06:05:16 UTC (rev
4121)
+++ GNUnet/src/applications/dstore/dstore.c 2006-12-30 07:13:48 UTC (rev
4122)
@@ -70,6 +70,11 @@
static unsigned int stat_dstore_size;
/**
+ * Estimate of the per-entry overhead (including indices).
+ */
+#define OVERHEAD ((4+4+8+8*2+sizeof(HashCode512)*2+32))
+
+/**
* @brief Prepare a SQL statement
*/
static int sq_prepare(sqlite3 * dbh,
@@ -83,62 +88,196 @@
(const char**) &dummy);
}
-static void db_reset() {
- int fd;
+#define SQLITE3_EXEC(db, cmd) do { if (SQLITE_OK != sqlite3_exec(db, cmd,
NULL, NULL, &emsg)) { GE_LOG(coreAPI->ectx, GE_ERROR | GE_ADMIN | GE_BULK,
_("`%s' failed at %s:%d with error: %s\n"), "sqlite3_exec", __FILE__, __LINE__,
emsg); sqlite3_free(emsg); } } while(0);
- UNLINK(fn);
- FREE(fn);
- fn = STRDUP("/tmp/dstoreXXXXXX");
- fd = mkstemp(fn);
- if (fd != -1)
- CLOSE(fd);
-}
+static void db_init(sqlite3 * dbh) {
+ char * emsg;
-static void db_init(sqlite3 * dbh) {
- sqlite3_exec(dbh,
- "PRAGMA temp_store=MEMORY",
- NULL,
- NULL,
- NULL);
- sqlite3_exec(dbh,
- "PRAGMA synchronous=OFF",
- NULL,
- NULL,
- NULL);
- sqlite3_exec(dbh,
- "PRAGMA count_changes=OFF",
- NULL,
- NULL,
- NULL);
- sqlite3_exec(dbh,
- "PRAGMA page_size=4092",
- NULL,
- NULL,
- NULL);
- sqlite3_exec(dbh,
+ SQLITE3_EXEC(dbh,
+ "PRAGMA temp_store=MEMORY");
+ SQLITE3_EXEC(dbh,
+ "PRAGMA synchronous=OFF");
+ SQLITE3_EXEC(dbh,
+ "PRAGMA count_changes=OFF");
+ SQLITE3_EXEC(dbh,
+ "PRAGMA page_size=4092");
+ SQLITE3_EXEC(dbh,
"CREATE TABLE ds071 ("
" size INTEGER NOT NULL DEFAULT 0,"
" type INTEGER NOT NULL DEFAULT 0,"
" puttime INTEGER NOT NULL DEFAULT 0,"
" expire INTEGER NOT NULL DEFAULT 0,"
" key TEXT NOT NULL DEFAULT '',"
- " value BLOB NOT NULL DEFAULT '')",
- NULL,
- NULL,
- NULL);
- sqlite3_exec(dbh,
- "CREATE INDEX idx_key ON ds071 (key)",
- NULL,
- NULL,
- NULL);
- sqlite3_exec(dbh,
- "CREATE INDEX idx_puttime ON ds071 (puttime)",
- NULL,
- NULL,
- NULL);
+ " value BLOB NOT NULL DEFAULT '')");
+ SQLITE3_EXEC(dbh,
+ "CREATE INDEX idx_key ON ds071 (key)");
+ SQLITE3_EXEC(dbh,
+ "CREATE INDEX idx_puttime ON ds071 (puttime)");
}
+static int db_reset() {
+ int fd;
+ sqlite3 * dbh;
+
+ if (fn != NULL) {
+ UNLINK(fn);
+ FREE(fn);
+ }
+ fn = STRDUP("/tmp/dstoreXXXXXX");
+ fd = mkstemp(fn);
+ if (fd == -1) {
+ GE_BREAK(NULL, 0);
+ FREE(fn);
+ fn = NULL;
+ return SYSERR;
+ }
+ CLOSE(fd);
+ if (SQLITE_OK != sqlite3_open(fn,
+ &dbh))
+ return SYSERR;
+ db_init(dbh);
+ sqlite3_close(dbh);
+ return OK;
+}
+
/**
+ * Check that we are within quota.
+ * @return OK if we are.
+ */
+static int checkQuota(sqlite3 * dbh) {
+ HashCode512 dkey;
+ unsigned int dsize;
+ unsigned int dtype;
+ cron_t dputtime;
+ cron_t dexpire;
+ char * dcontent;
+ sqlite3_stmt * stmt;
+ sqlite3_stmt * dstmt;
+ int err;
+
+ if (payload * 10 <= quota * 9)
+ return OK; /* we seem to be about 10% off */
+#if DEBUG_DSTORE
+ GE_LOG(coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
+ "DStore above qutoa (have %llu, allowed %llu), will delete some
data.\n",
+ payload,
+ quota);
+#endif
+ stmt = NULL;
+ dstmt = NULL;
+ if ( (sq_prepare(dbh,
+ "SELECT size, type, puttime, expire, key, value FROM ds071
ORDER BY puttime ASC",
+ &stmt) != SQLITE_OK) ||
+ (sq_prepare(dbh,
+ "DELETE FROM ds071 "
+ "WHERE size = ? AND type = ? AND puttime = ? AND expire = ?
AND key = ? AND value = ?",
+ &dstmt) != SQLITE_OK) ) {
+ GE_LOG(coreAPI->ectx,
+ GE_ERROR | GE_ADMIN | GE_BULK,
+ _("`%s' failed at %s:%d with error: %s\n"),
+ "sq_prepare",
+ __FILE__,
+ __LINE__,
+ sqlite3_errmsg(dbh));
+ GE_BREAK(NULL, 0);
+ if (dstmt != NULL)
+ sqlite3_finalize(dstmt);
+ if (stmt != NULL)
+ sqlite3_finalize(stmt);
+ return SYSERR;
+ }
+ dcontent = MALLOC(MAX_CONTENT_SIZE);
+ while ( (payload * 10 > quota * 9) && /* we seem to be about 10% off */
+ ((err = sqlite3_step(stmt)) == SQLITE_ROW) ) {
+ dsize = sqlite3_column_int(stmt, 0);
+ dtype = sqlite3_column_int(stmt, 1);
+ dputtime = sqlite3_column_int64(stmt, 2);
+ dexpire = sqlite3_column_int64(stmt, 3);
+ GE_BREAK(NULL,
+ sqlite3_column_bytes(stmt, 4) == sizeof(HashCode512));
+ GE_BREAK(NULL,
+ dsize == sqlite3_column_bytes(stmt, 5));
+ memcpy(&dkey,
+ sqlite3_column_blob(stmt, 4),
+ sizeof(HashCode512));
+ if (dsize >= MAX_CONTENT_SIZE) {
+ GE_BREAK(NULL, 0);
+ dsize = MAX_CONTENT_SIZE;
+ }
+ memcpy(dcontent,
+ sqlite3_column_blob(stmt, 5),
+ dsize);
+ sqlite3_reset(stmt);
+ sqlite3_bind_int(dstmt,
+ 1,
+ dsize);
+ sqlite3_bind_int(dstmt,
+ 2,
+ dtype);
+ sqlite3_bind_int64(dstmt,
+ 3,
+ dputtime);
+ sqlite3_bind_int64(dstmt,
+ 4,
+ dexpire);
+ sqlite3_bind_blob(dstmt,
+ 5,
+ &dkey,
+ sizeof(HashCode512),
+ SQLITE_TRANSIENT);
+ sqlite3_bind_blob(dstmt,
+ 6,
+ dcontent,
+ dsize,
+ SQLITE_TRANSIENT);
+ if ((err = sqlite3_step(dstmt)) != SQLITE_DONE) {
+ GE_LOG(coreAPI->ectx,
+ GE_ERROR | GE_ADMIN | GE_BULK,
+ _("`%s' failed at %s:%d with error: %s\n"),
+ "sqlite3_step",
+ __FILE__,
+ __LINE__,
+ sqlite3_errmsg(dbh));
+ sqlite3_reset(dstmt);
+ GE_BREAK(NULL, 0); /* should delete but cannot!? */
+ break;
+ }
+ payload -= (dsize + OVERHEAD);
+#if DEBUG_DSTORE
+ GE_LOG(coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
+ "Deleting %u bytes decreases DStore payload to %llu out of %llu\n",
+ dsize,
+ payload,
+ quota);
+#endif
+ sqlite3_reset(dstmt);
+ }
+ if (err != SQLITE_DONE) {
+ GE_LOG(coreAPI->ectx,
+ GE_ERROR | GE_ADMIN | GE_BULK,
+ _("`%s' failed at %s:%d with error: %s\n"),
+ "sqlite3_step",
+ __FILE__,
+ __LINE__,
+ sqlite3_errmsg(dbh));
+ }
+ FREE(dcontent);
+ sqlite3_finalize(dstmt);
+ sqlite3_finalize(stmt);
+ if (payload * 10 > quota * 9) {
+ GE_LOG(coreAPI->ectx,
+ GE_ERROR | GE_BULK | GE_DEVELOPER,
+ "Failed to delete content to drop below quota (bug?).\n",
+ payload,
+ quota);
+ return SYSERR; /* we seem to be about 10% off */
+ }
+ return OK;
+}
+
+/**
* Store an item in the datastore.
*
* @return OK on success, SYSERR on error
@@ -150,13 +289,13 @@
const char * data) {
sqlite3 * dbh;
sqlite3_stmt * stmt;
- sqlite3_stmt * dstmt;
if (size > MAX_CONTENT_SIZE)
return SYSERR;
MUTEX_LOCK(lock);
- if (SQLITE_OK != sqlite3_open(fn,
- &dbh)) {
+ if ( (fn == NULL) ||
+ (SQLITE_OK != sqlite3_open(fn,
+ &dbh)) ) {
db_reset(dbh);
MUTEX_UNLOCK(lock);
return SYSERR;
@@ -168,12 +307,23 @@
size,
data);
#endif
- db_init(dbh);
+ if (OK != checkQuota(dbh)) {
+ sqlite3_close(dbh);
+ MUTEX_UNLOCK(lock);
+ return SYSERR;
+ }
if (sq_prepare(dbh,
"INSERT INTO ds071 "
"(size, type, puttime, expire, key, value) "
"VALUES (?, ?, ?, ?, ?, ?)",
&stmt) != SQLITE_OK) {
+ GE_LOG(coreAPI->ectx,
+ GE_ERROR | GE_ADMIN | GE_BULK,
+ _("`%s' failed at %s:%d with error: %s\n"),
+ "sq_prepare",
+ __FILE__,
+ __LINE__,
+ sqlite3_errmsg(dbh));
sqlite3_close(dbh);
MUTEX_UNLOCK(lock);
return SYSERR;
@@ -202,97 +352,16 @@
SQLITE_TRANSIENT);
sqlite3_step(stmt);
sqlite3_finalize(stmt);
- stmt = NULL;
- dstmt = NULL;
- payload += size;
+ payload += size + OVERHEAD;
+#if DEBUG_DSTORE
GE_LOG(coreAPI->ectx,
GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
"Storing %u bytes increases DStore payload to %llu out of %llu\n",
size,
payload,
quota);
-
- if (payload > quota) {
- GE_LOG(coreAPI->ectx,
- GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
- "DStore above qutoa (have %llu, allowed %llu), will delete some
data.\n",
- payload,
- quota);
- if ( (sq_prepare(dbh,
- "SELECT size, type, puttime, expire, key, value FROM ds071
ORDER BY puttime ASC",
- &stmt) == SQLITE_OK) &&
- (sq_prepare(dbh,
- "DELETE FROM ds071 "
- "WHERE size = ? AND type = ? AND puttime = ? AND expire =
? AND key = ? AND value = ?",
- &dstmt) == SQLITE_OK) ) {
- HashCode512 dkey;
- unsigned int dsize;
- unsigned int dtype;
- cron_t dputtime;
- cron_t dexpire;
- char * dcontent;
-
- dcontent = MALLOC(MAX_CONTENT_SIZE);
- while ( (payload > quota) &&
- (sqlite3_step(stmt) == SQLITE_ROW) ) {
- dsize = sqlite3_column_int(stmt, 0);
- dtype = sqlite3_column_int(stmt, 1);
- dputtime = sqlite3_column_int64(stmt, 2);
- dexpire = sqlite3_column_int64(stmt, 3);
- GE_BREAK(NULL,
- sqlite3_column_bytes(stmt, 4) == sizeof(HashCode512));
- GE_BREAK(NULL,
- dsize == sqlite3_column_bytes(stmt, 5));
- memcpy(&dkey,
- sqlite3_column_blob(stmt, 4),
- sizeof(HashCode512));
- if (dsize >= MAX_CONTENT_SIZE) {
- GE_BREAK(NULL, 0);
- dsize = MAX_CONTENT_SIZE;
- }
- memcpy(dcontent,
- sqlite3_column_blob(stmt, 5),
- dsize);
- sqlite3_bind_int(dstmt,
- 1,
- dsize);
- sqlite3_bind_int(dstmt,
- 2,
- dtype);
- sqlite3_bind_int64(dstmt,
- 3,
- dputtime);
- sqlite3_bind_int64(dstmt,
- 4,
- dexpire);
- sqlite3_bind_blob(dstmt,
- 5,
- &dkey,
- sizeof(HashCode512),
- SQLITE_TRANSIENT);
- sqlite3_bind_blob(dstmt,
- 6,
- dcontent,
- dsize,
- SQLITE_TRANSIENT);
- if (sqlite3_step(dstmt) != SQLITE_ROW) {
- sqlite3_reset(dstmt);
- GE_BREAK(NULL, 0); /* should delete but cannot!? */
- break;
- }
- sqlite3_reset(dstmt);
- }
- FREE(dcontent);
- sqlite3_finalize(dstmt);
- sqlite3_finalize(stmt);
- } else {
- GE_BREAK(NULL, 0);
- if (dstmt != NULL)
- sqlite3_finalize(dstmt);
- if (stmt != NULL)
- sqlite3_finalize(stmt);
- }
- }
+#endif
+ checkQuota(dbh);
sqlite3_close(dbh);
MUTEX_UNLOCK(lock);
if (stats != NULL)
@@ -322,8 +391,9 @@
unsigned int cnt;
MUTEX_LOCK(lock);
- if (SQLITE_OK != sqlite3_open(fn,
- &dbh)) {
+ if ( (fn == NULL) ||
+ (SQLITE_OK != sqlite3_open(fn,
+ &dbh)) ) {
db_reset(dbh);
MUTEX_UNLOCK(lock);
return SYSERR;
@@ -333,11 +403,17 @@
GE_DEBUG | GE_REQUEST | GE_DEVELOPER,
"dstore processes get\n");
#endif
- db_init(dbh);
now = get_time();
if (sq_prepare(dbh,
"SELECT size, value FROM ds071 WHERE key=? AND type=? AND
expire >= ?",
&stmt) != SQLITE_OK) {
+ GE_LOG(coreAPI->ectx,
+ GE_ERROR | GE_ADMIN | GE_BULK,
+ _("`%s' failed at %s:%d with error: %s\n"),
+ "sq_prepare",
+ __FILE__,
+ __LINE__,
+ sqlite3_errmsg(dbh));
sqlite3_close(dbh);
MUTEX_UNLOCK(lock);
return SYSERR;
@@ -377,20 +453,14 @@
Dstore_ServiceAPI *
provide_module_dstore(CoreAPIForApplication * capi) {
static Dstore_ServiceAPI api;
- int fd;
#if DEBUG_SQLITE
GE_LOG(capi->ectx,
GE_DEBUG | GE_REQUEST | GE_USER,
"SQLite Dstore: initializing database\n");
#endif
- fn = STRDUP("/tmp/dstoreXXXXXX");
- fd = mkstemp(fn);
- if (fd == -1) {
- FREE(fn);
+ if (OK != db_reset())
return NULL;
- }
- CLOSE(fd);
lock = MUTEX_CREATE(NO);
coreAPI = capi;
api.get = &d_get;
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r4122 - in GNUnet/src/applications: dht/tools dstore,
grothoff <=