[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r33314 - in gnunet/src: include peerstore
From: |
gnunet |
Subject: |
[GNUnet-SVN] r33314 - in gnunet/src: include peerstore |
Date: |
Sat, 17 May 2014 18:57:49 +0200 |
Author: otarabai
Date: 2014-05-17 18:57:49 +0200 (Sat, 17 May 2014)
New Revision: 33314
Modified:
gnunet/src/include/gnunet_peerstore_service.h
gnunet/src/include/gnunet_protocols.h
gnunet/src/peerstore/gnunet-service-peerstore.c
gnunet/src/peerstore/peerstore_api.c
gnunet/src/peerstore/peerstore_common.c
gnunet/src/peerstore/peerstore_common.h
gnunet/src/peerstore/plugin_peerstore_sqlite.c
gnunet/src/peerstore/test_peerstore_api.c
Log:
peestore: towards iterate functionality
Modified: gnunet/src/include/gnunet_peerstore_service.h
===================================================================
--- gnunet/src/include/gnunet_peerstore_service.h 2014-05-17 10:16:15 UTC
(rev 33313)
+++ gnunet/src/include/gnunet_peerstore_service.h 2014-05-17 16:57:49 UTC
(rev 33314)
@@ -97,19 +97,13 @@
* Function called by for each matching record.
*
* @param cls closure
- * @param peer peer identity
- * @param sub_system name of the GNUnet sub system responsible
- * @param value stored value
- * @param size size of stored value
+ * @param record peerstore record information
+ * @param emsg error message, or NULL if no errors
* @return #GNUNET_YES to continue iterating, #GNUNET_NO to stop
*/
typedef int (*GNUNET_PEERSTORE_Processor) (void *cls,
- const char *sub_system,
- const struct GNUNET_PeerIdentity *peer,
- const char *key,
- const void *value,
- size_t size,
- struct GNUNET_TIME_Absolute expiry);
+ struct GNUNET_PEERSTORE_Record *record,
+ char *emsg);
/**
* Connect to the PEERSTORE service.
Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h 2014-05-17 10:16:15 UTC (rev
33313)
+++ gnunet/src/include/gnunet_protocols.h 2014-05-17 16:57:49 UTC (rev
33314)
@@ -2467,7 +2467,13 @@
*/
#define GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE 823
+/**
+ * Iteration response messages
+ */
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD 824
+#define GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END 825
+
/**
* Next available: 840
*/
Modified: gnunet/src/peerstore/gnunet-service-peerstore.c
===================================================================
--- gnunet/src/peerstore/gnunet-service-peerstore.c 2014-05-17 10:16:15 UTC
(rev 33313)
+++ gnunet/src/peerstore/gnunet-service-peerstore.c 2014-05-17 16:57:49 UTC
(rev 33314)
@@ -87,28 +87,24 @@
* @param sub_system name of the GNUnet sub system responsible
* @param value stored value
* @param size size of stored value
- *
+ */
int record_iterator(void *cls,
- const char *sub_system,
- const struct GNUNET_PeerIdentity *peer,
- const char *key,
- const void *value,
- size_t size,
- struct GNUNET_TIME_Absolute expiry)
+ struct GNUNET_PEERSTORE_Record *record,
+ char *emsg)
{
struct GNUNET_SERVER_TransmitContext *tc = cls;
struct StoreRecordMessage *srm;
- srm = PEERSTORE_create_record_message(sub_system,
- peer,
- key,
- value,
- size,
- expiry,
- GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
+ srm = PEERSTORE_create_record_message(record->sub_system,
+ record->peer,
+ record->key,
+ record->value,
+ record->value_size,
+ record->expiry,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD);
GNUNET_SERVER_transmit_context_append_message(tc, (const struct
GNUNET_MessageHeader *)srm);
return GNUNET_YES;
-}*/
+}
/**
* Handle an iterate request from client
@@ -116,7 +112,7 @@
* @param cls unused
* @param client identification of the client
* @param message the actual message
- *
+ */
void handle_iterate (void *cls,
struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *message)
@@ -145,9 +141,16 @@
&record_iterator,
tc))
{
-
+ GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0,
GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END);
+ GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
}
-}*/
+ else
+ {
+ GNUNET_free(tc);
+ GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
+ }
+ GNUNET_free(record);
+}
/**
* Handle a store request from client
@@ -220,7 +223,7 @@
{
static const struct GNUNET_SERVER_MessageHandler handlers[] = {
{&handle_store, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE, 0},
-// {&handle_iterate, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, 0},
+ {&handle_iterate, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, 0},
{NULL, NULL, 0, 0}
};
char *database;
Modified: gnunet/src/peerstore/peerstore_api.c
===================================================================
--- gnunet/src/peerstore/peerstore_api.c 2014-05-17 10:16:15 UTC (rev
33313)
+++ gnunet/src/peerstore/peerstore_api.c 2014-05-17 16:57:49 UTC (rev
33314)
@@ -65,6 +65,16 @@
*/
struct GNUNET_PEERSTORE_StoreContext *store_tail;
+ /**
+ * Head of active ITERATE requests.
+ */
+ struct GNUNET_PEERSTORE_IterateContext *iterate_head;
+
+ /**
+ * Tail of active ITERATE requests.
+ */
+ struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
+
};
/**
@@ -110,6 +120,49 @@
};
+/**
+ * Context for a iterate request
+ */
+struct GNUNET_PEERSTORE_IterateContext
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct GNUNET_PEERSTORE_IterateContext *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct GNUNET_PEERSTORE_IterateContext *prev;
+
+ /**
+ * Handle to the PEERSTORE service.
+ */
+ struct GNUNET_PEERSTORE_Handle *h;
+
+ /**
+ * MQ Envelope with iterate request message
+ */
+ struct GNUNET_MQ_Envelope *ev;
+
+ /**
+ * Callback with each matching record
+ */
+ GNUNET_PEERSTORE_Processor callback;
+
+ /**
+ * Closure for 'callback'
+ */
+ void *callback_cls;
+
+ /**
+ * #GNUNET_YES / #GNUNET_NO
+ * if sent, cannot be canceled
+ */
+ int request_sent;
+
+};
+
/******************************************************************************/
/******************* DECLARATIONS
*********************/
/******************************************************************************/
@@ -122,6 +175,19 @@
*/
void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg);
+/**
+ * When a response for iterate request is received
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
+ * @param msg message received, NULL on timeout or fatal error
+ */
+void handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg);
+
+/**
+ * Close the existing connection to PEERSTORE and reconnect.
+ *
+ * @param h handle to the service
+ */
static void
reconnect (struct GNUNET_PEERSTORE_Handle *h);
@@ -131,6 +197,8 @@
static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
{&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK,
sizeof(struct GNUNET_MessageHeader)},
{&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL,
sizeof(struct GNUNET_MessageHeader)},
+ {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
+ {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END,
sizeof(struct GNUNET_MessageHeader)},
GNUNET_MQ_HANDLERS_END
};
@@ -154,12 +222,18 @@
cleanup_handle(struct GNUNET_PEERSTORE_Handle *h)
{
struct GNUNET_PEERSTORE_StoreContext *sc;
+ struct GNUNET_PEERSTORE_IterateContext *ic;
while (NULL != (sc = h->store_head))
{
GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc);
GNUNET_free(sc);
}
+ while (NULL != (ic = h->iterate_head))
+ {
+ GNUNET_CONTAINER_DLL_remove(h->iterate_head, h->iterate_tail, ic);
+ GNUNET_free(ic);
+ }
}
/**
@@ -254,7 +328,7 @@
/**
* When a response for store request is received
*
- * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
+ * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
* @param msg message received, NULL on timeout or fatal error
*/
void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg)
@@ -304,6 +378,7 @@
struct GNUNET_PEERSTORE_StoreContext *sc = cls;
sc->request_sent = GNUNET_YES;
+ sc->ev = NULL;
}
/**
@@ -319,7 +394,10 @@
if(GNUNET_NO == sc->request_sent)
{
if(NULL != sc->ev)
- GNUNET_MQ_discard(sc->ev);
+ {
+ //GNUNET_MQ_discard(sc->ev); //FIXME: this should be
GNUNET_MQ_send_cancel
+ sc->ev = NULL;
+ }
GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc);
GNUNET_free(sc);
}
@@ -365,10 +443,8 @@
key,
value,
size,
- expiry,
+ &expiry,
GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
- GNUNET_MQ_send(h->mq, ev);
- GNUNET_MQ_notify_sent(ev, &store_request_sent, ev);
sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext);
sc->ev = ev;
sc->cont = cont;
@@ -376,8 +452,144 @@
sc->h = h;
sc->request_sent = GNUNET_NO;
GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc);
+ GNUNET_MQ_notify_sent(ev, &store_request_sent, ev);
+ GNUNET_MQ_send(h->mq, ev);
return sc;
}
+/******************************************************************************/
+/******************* ITERATE FUNCTIONS
*********************/
+/******************************************************************************/
+
+/**
+ * When a response for iterate request is received
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_Handle *'
+ * @param msg message received, NULL on timeout or fatal error
+ */
+void handle_iterate_result (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_PEERSTORE_Handle *h = cls;
+ struct GNUNET_PEERSTORE_IterateContext *ic;
+ GNUNET_PEERSTORE_Processor callback;
+ void *callback_cls;
+ uint16_t msg_type;
+ struct GNUNET_PEERSTORE_Record *record;
+
+ ic = h->iterate_head;
+ if(NULL == ic)
+ {
+ GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Unexpected iteration response, this
should not happen.\n");
+ reconnect(h);
+ return;
+ }
+ callback = ic->callback;
+ callback_cls = ic->callback_cls;
+ if(NULL == msg) /* Connection error */
+ {
+
+ if(NULL != callback)
+ callback(callback_cls, NULL,
+ _("Error communicating with `PEERSTORE' service."));
+ reconnect(h);
+ return;
+ }
+ msg_type = ntohs(msg->type);
+ if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
+ {
+ if(NULL != callback)
+ callback(callback_cls, NULL, NULL);
+ GNUNET_CONTAINER_DLL_remove(ic->h->iterate_head, ic->h->iterate_tail, ic);
+ GNUNET_free(ic);
+ return;
+ }
+ if(NULL != callback)
+ {
+ record = PEERSTORE_parse_record_message(msg);
+ if(NULL == record)
+ callback(callback_cls, record, _("Received a malformed response from
service."));
+ else
+ callback(callback_cls, record, NULL);
+ }
+
+}
+
+/**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
+ */
+void iterate_request_sent (void *cls)
+{
+ struct GNUNET_PEERSTORE_IterateContext *ic = cls;
+
+ ic->request_sent = GNUNET_YES;
+ ic->ev = NULL;
+}
+
+/**
+ * Cancel an iterate request
+ * Please do not call after the iterate request is done
+ *
+ * @param ic Iterate request context as returned by GNUNET_PEERSTORE_iterate()
+ */
+void
+GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
+{
+ if(GNUNET_NO == ic->request_sent)
+ {
+ if(NULL != ic->ev)
+ {
+ //GNUNET_MQ_discard(ic->ev); //FIXME: this should be
GNUNET_MQ_send_cancel
+ ic->ev = NULL;
+ }
+ GNUNET_CONTAINER_DLL_remove(ic->h->iterate_head, ic->h->iterate_tail, ic);
+ GNUNET_free(ic);
+ }
+ else
+ ic->callback = NULL;
+}
+
+/**
+ * Iterate over records matching supplied key information
+ *
+ * @param h handle to the PEERSTORE service
+ * @param sub_system name of sub system
+ * @param peer Peer identity (can be NULL)
+ * @param key entry key string (can be NULL)
+ * @param timeout time after which the iterate request is canceled
+ * @param callback function called with each matching record, all NULL's on end
+ * @param callback_cls closure for @a callback
+ */
+struct GNUNET_PEERSTORE_IterateContext *
+GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
+ char *sub_system,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *key,
+ struct GNUNET_TIME_Relative timeout, //FIXME: handle timeout
+ GNUNET_PEERSTORE_Processor callback, void *callback_cls)
+{
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_PEERSTORE_IterateContext *ic;
+
+ ev = PEERSTORE_create_record_mq_envelope(sub_system,
+ peer,
+ key,
+ NULL,
+ 0,
+ NULL,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
+ ic = GNUNET_new(struct GNUNET_PEERSTORE_IterateContext);
+ ic->callback = callback;
+ ic->callback_cls = callback_cls;
+ ic->ev = ev;
+ ic->h = h;
+ ic->request_sent = GNUNET_NO;
+ GNUNET_CONTAINER_DLL_insert(h->iterate_head, h->iterate_tail, ic);
+ GNUNET_MQ_notify_sent(ev, &iterate_request_sent, ev);
+ GNUNET_MQ_send(h->mq, ev);
+ return ic;
+}
+
/* end of peerstore_api.c */
Modified: gnunet/src/peerstore/peerstore_common.c
===================================================================
--- gnunet/src/peerstore/peerstore_common.c 2014-05-17 10:16:15 UTC (rev
33313)
+++ gnunet/src/peerstore/peerstore_common.c 2014-05-17 16:57:49 UTC (rev
33314)
@@ -36,14 +36,14 @@
* @param expiry absolute time after which the record expires
* @param msg_type message type to be set in header
* @return pointer to record message struct
- *
+ */
struct StoreRecordMessage *
PEERSTORE_create_record_message(const char *sub_system,
const struct GNUNET_PeerIdentity *peer,
const char *key,
const void *value,
size_t value_size,
- struct GNUNET_TIME_Absolute expiry,
+ struct GNUNET_TIME_Absolute *expiry,
uint16_t msg_type)
{
struct StoreRecordMessage *srm;
@@ -65,7 +65,8 @@
srm->header.size = htons(request_size);
srm->header.type = htons(msg_type);
srm->key_size = htons(key_size);
- srm->expiry = expiry;
+ if(NULL != expiry)
+ srm->expiry = *expiry;
if(NULL == peer)
srm->peer_set = htons(GNUNET_NO);
else
@@ -83,7 +84,7 @@
memcpy(dummy, value, value_size);
return srm;
-}*/
+}
/**
* Creates a MQ envelope for a single record
@@ -103,7 +104,7 @@
const char *key,
const void *value,
size_t value_size,
- struct GNUNET_TIME_Absolute expiry,
+ struct GNUNET_TIME_Absolute *expiry,
uint16_t msg_type)
{
struct StoreRecordMessage *srm;
@@ -124,7 +125,8 @@
value_size;
ev = GNUNET_MQ_msg_extra(srm, msg_size, msg_type);
srm->key_size = htons(key_size);
- srm->expiry = expiry;
+ if(NULL != expiry)
+ srm->expiry = *expiry;
if(NULL == peer)
srm->peer_set = htons(GNUNET_NO);
else
Modified: gnunet/src/peerstore/peerstore_common.h
===================================================================
--- gnunet/src/peerstore/peerstore_common.h 2014-05-17 10:16:15 UTC (rev
33313)
+++ gnunet/src/peerstore/peerstore_common.h 2014-05-17 16:57:49 UTC (rev
33314)
@@ -27,6 +27,27 @@
#include "peerstore.h"
/**
+ * Creates a record message ready to be sent
+ *
+ * @param sub_system sub system string
+ * @param peer Peer identity (can be NULL)
+ * @param key record key string (can be NULL)
+ * @param value record value BLOB (can be NULL)
+ * @param value_size record value size in bytes (set to 0 if value is NULL)
+ * @param expiry absolute time after which the record expires
+ * @param msg_type message type to be set in header
+ * @return pointer to record message struct
+ */
+struct StoreRecordMessage *
+PEERSTORE_create_record_message(const char *sub_system,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *key,
+ const void *value,
+ size_t value_size,
+ struct GNUNET_TIME_Absolute *expiry,
+ uint16_t msg_type);
+
+/**
* Creates a MQ envelope for a single record
*
* @param sub_system sub system string
@@ -44,7 +65,7 @@
const char *key,
const void *value,
size_t value_size,
- struct GNUNET_TIME_Absolute expiry,
+ struct GNUNET_TIME_Absolute *expiry,
uint16_t msg_type);
/**
Modified: gnunet/src/peerstore/plugin_peerstore_sqlite.c
===================================================================
--- gnunet/src/peerstore/plugin_peerstore_sqlite.c 2014-05-17 10:16:15 UTC
(rev 33313)
+++ gnunet/src/peerstore/plugin_peerstore_sqlite.c 2014-05-17 16:57:49 UTC
(rev 33314)
@@ -122,12 +122,7 @@
sqlite3_stmt *stmt;
int err = 0;
int sret;
- const char *ret_sub_system;
- const struct GNUNET_PeerIdentity *ret_peer;
- const char *ret_key;
- const void *ret_value;
- size_t ret_value_size;
- struct GNUNET_TIME_Absolute ret_expiry;
+ struct GNUNET_PEERSTORE_Record *ret;
if(NULL == peer && NULL == key)
{
@@ -166,20 +161,18 @@
}
while (SQLITE_ROW == (sret = sqlite3_step (stmt)))
{
- ret_sub_system = (const char *)sqlite3_column_text(stmt, 0);
- ret_peer = sqlite3_column_blob(stmt, 1);
- ret_key = (const char *)sqlite3_column_text(stmt, 2);
- ret_value = sqlite3_column_blob(stmt, 3);
- ret_value_size = sqlite3_column_bytes(stmt, 3);
- ret_expiry.abs_value_us = (uint64_t)sqlite3_column_int64(stmt, 4);
+ ret = GNUNET_new(struct GNUNET_PEERSTORE_Record);
+ ret->sub_system = (char *)sqlite3_column_text(stmt, 0);
+ ret->peer = (struct GNUNET_PeerIdentity *)sqlite3_column_blob(stmt, 1);
+ ret->key = (char *)sqlite3_column_text(stmt, 2);
+ ret->value = (void *)sqlite3_column_blob(stmt, 3);
+ ret->value_size = sqlite3_column_bytes(stmt, 3);
+ ret->expiry = GNUNET_new(struct GNUNET_TIME_Absolute);
+ ret->expiry->abs_value_us = (uint64_t)sqlite3_column_int64(stmt, 4);
if (NULL != iter)
iter (iter_cls,
- ret_sub_system,
- ret_peer,
- ret_key,
- ret_value,
- ret_value_size,
- ret_expiry);
+ ret,
+ NULL);
}
if (SQLITE_DONE != sret)
{
Modified: gnunet/src/peerstore/test_peerstore_api.c
===================================================================
--- gnunet/src/peerstore/test_peerstore_api.c 2014-05-17 10:16:15 UTC (rev
33313)
+++ gnunet/src/peerstore/test_peerstore_api.c 2014-05-17 16:57:49 UTC (rev
33314)
@@ -25,19 +25,53 @@
#include "gnunet_util_lib.h"
#include "gnunet_testing_lib.h"
#include "gnunet_peerstore_service.h"
+#include <inttypes.h>
static int ok = 1;
+static int counter = 0;
+
struct GNUNET_PEERSTORE_Handle *h;
+int iterate_cb (void *cls,
+ struct GNUNET_PEERSTORE_Record *record,
+ char *emsg)
+{
+ if(NULL != emsg)
+ {
+ printf("Error received: %s.\n", emsg);
+ return GNUNET_YES;
+ }
+ printf("Record:\n");
+ if(NULL == record)
+ {
+ counter = 0;
+ printf("END\n");
+ GNUNET_PEERSTORE_disconnect(h);
+ return GNUNET_YES;
+ }
+ printf("Sub system: %s\n", record->sub_system);
+ printf("Peer: %s\n", GNUNET_i2s (record->peer));
+ printf("Key: %s\n", record->key);
+ printf("Value: %.*s\n", record->value);
+ printf("Expiry: %" PRIu64 "\n", record->expiry->abs_value_us);
+
+ return GNUNET_YES;
+}
+
void store_cont(void *cls, int success)
{
if(GNUNET_OK == success)
ok = 0;
else
ok = 1;
- printf("Success: %d\n", success);
- GNUNET_PEERSTORE_disconnect(h);
+ printf("Store success: %d\n", success);
+ GNUNET_PEERSTORE_iterate(h, "peerstore-test-value",
+ NULL,
+ NULL,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &iterate_cb,
+ NULL);
}
static void
@@ -48,6 +82,7 @@
struct GNUNET_PeerIdentity pid;
char *val = "peerstore-test-value";
size_t val_size = strlen(val);
+ struct GNUNET_PEERSTORE_StoreContext *sc;
ok = 0;
memset (&pid, 32, sizeof (pid));
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r33314 - in gnunet/src: include peerstore,
gnunet <=