gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r33452 - gnunet/src/peerstore


From: gnunet
Subject: [GNUnet-SVN] r33452 - gnunet/src/peerstore
Date: Sat, 31 May 2014 23:48:01 +0200

Author: otarabai
Date: 2014-05-31 23:48:01 +0200 (Sat, 31 May 2014)
New Revision: 33452

Modified:
   gnunet/src/peerstore/gnunet-service-peerstore.c
   gnunet/src/peerstore/peerstore_api.c
   gnunet/src/peerstore/test_peerstore_api.c
Log:
peerstore: watch functionality


Modified: gnunet/src/peerstore/gnunet-service-peerstore.c
===================================================================
--- gnunet/src/peerstore/gnunet-service-peerstore.c     2014-05-31 21:46:05 UTC 
(rev 33451)
+++ gnunet/src/peerstore/gnunet-service-peerstore.c     2014-05-31 21:48:01 UTC 
(rev 33452)
@@ -73,6 +73,11 @@
 static struct GNUNET_CONTAINER_MultiHashMap *watchers;
 
 /**
+ * Our notification context.
+ */
+static struct GNUNET_SERVER_NotificationContext *nc;
+
+/**
  * Task run during shutdown.
  *
  * @param cls unused
@@ -88,8 +93,8 @@
     GNUNET_free (db_lib_name);
     db_lib_name = NULL;
   }
-  if(NULL != watchers)
-    GNUNET_CONTAINER_multihashmap_destroy(watchers);
+  GNUNET_SERVER_notification_context_destroy(nc);
+  GNUNET_CONTAINER_multihashmap_destroy(watchers);
   GNUNET_SCHEDULER_shutdown();
 }
 
@@ -154,6 +159,60 @@
 }
 
 /**
+ * Iterator over all watcher clients
+ * to notify them of a new record
+ *
+ * @param cls closuer, a 'struct GNUNET_PEERSTORE_Record *'
+ * @param key hash of record key
+ * @param value the watcher client, a 'struct GNUNET_SERVER_Client *'
+ * @return #GNUNET_YES to continue iterating
+ */
+int watch_notifier_it(void *cls,
+    const struct GNUNET_HashCode *key,
+    void *value)
+{
+  struct GNUNET_PEERSTORE_Record *record = cls;
+  struct GNUNET_SERVER_Client *client = value;
+  struct StoreRecordMessage *srm;
+
+  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n");
+  if(NULL == value)
+  {
+    GNUNET_CONTAINER_multihashmap_remove(watchers, key, value);
+    return GNUNET_YES;
+  }
+  srm = PEERSTORE_create_record_message(record->sub_system,
+      record->peer,
+      record->key,
+      record->value,
+      record->value_size,
+      record->expiry,
+      GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD);
+  GNUNET_SERVER_notification_context_unicast(nc, client,
+      (const struct GNUNET_MessageHeader *)srm, GNUNET_YES);
+  return GNUNET_YES;
+}
+
+/**
+ * Given a new record, notifies watchers
+ *
+ * @cls closure, a 'struct GNUNET_PEERSTORE_Record *'
+ * @tc unused
+ */
+void watch_notifier (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_PEERSTORE_Record *record = cls;
+  struct GNUNET_HashCode keyhash;
+
+  GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Sending update to any watchers.\n");
+  PEERSTORE_hash_key(record->sub_system,
+      record->peer,
+      record->key,
+      &keyhash);
+  GNUNET_CONTAINER_multihashmap_get_multiple(watchers, &keyhash, 
&watch_notifier_it, record);
+}
+
+/**
  * Handle a watch cancel request from client
  *
  * @param cls unused
@@ -167,13 +226,6 @@
   struct StoreKeyHashMessage *hm;
 
   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request from 
client.\n");
-  if(NULL == watchers)
-  {
-    GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
-        "Received a watch cancel request when we don't have any watchers.\n");
-    GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
-    return;
-  }
   hm = (struct StoreKeyHashMessage *) message;
   GNUNET_CONTAINER_multihashmap_remove(watchers, &hm->keyhash, client);
   GNUNET_SERVER_receive_done(client, GNUNET_OK);
@@ -195,8 +247,7 @@
   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch request from 
client.\n");
   hm = (struct StoreKeyHashMessage *) message;
   GNUNET_SERVER_client_mark_monitor(client);
-  if(NULL == watchers)
-    watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO);
+  GNUNET_SERVER_notification_context_add(nc, client);
   GNUNET_CONTAINER_multihashmap_put(watchers, &hm->keyhash,
      client, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   GNUNET_SERVER_receive_done(client, GNUNET_OK);
@@ -246,7 +297,7 @@
     GNUNET_free(tc);
     GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
   }
-  GNUNET_free(record);
+  GNUNET_free(record); /* FIXME: destroy record */
 }
 
 /**
@@ -261,7 +312,6 @@
     const struct GNUNET_MessageHeader *message)
 {
   struct GNUNET_PEERSTORE_Record *record;
-  uint16_t response_type;
   struct GNUNET_SERVER_TransmitContext *tc;
 
   record = PEERSTORE_parse_record_message(message);
@@ -275,6 +325,7 @@
       || NULL == record->peer
       || NULL == record->key)
   {
+    /* FIXME: Destroy record */
     GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Full key not supplied in client store 
request\n");
     GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
     return;
@@ -284,7 +335,7 @@
       record->sub_system,
       GNUNET_i2s (record->peer),
       record->key);
-  if(GNUNET_OK == db->store_record(db->cls,
+  if(GNUNET_OK != db->store_record(db->cls,
       record->sub_system,
       record->peer,
       record->key,
@@ -292,18 +343,15 @@
       record->value_size,
       *record->expiry))
   {
-    response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK;
-  }
-  else
-  {
+    /* FIXME: Destroy record */
     GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Failed to store requested value, 
sqlite database error.");
-    response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL;
+    GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
+    return;
   }
-
   tc = GNUNET_SERVER_transmit_context_create (client);
-  GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, response_type);
+  GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, 
GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK);
   GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
-  //TODO: notify watchers, if a client is disconnected, remove its watch entry
+  GNUNET_SCHEDULER_add_continuation(&watch_notifier, record, -1);
 }
 
 /**
@@ -343,6 +391,8 @@
          GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Could not load database backend 
`%s'\n", db_lib_name);
   else
   {
+    nc = GNUNET_SERVER_notification_context_create (server, 16);
+    watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO);
     GNUNET_SCHEDULER_add_now(&cleanup_expired_records, NULL);
     GNUNET_SERVER_add_handlers (server, handlers);
     GNUNET_SERVER_disconnect_notify (server,

Modified: gnunet/src/peerstore/peerstore_api.c
===================================================================
--- gnunet/src/peerstore/peerstore_api.c        2014-05-31 21:46:05 UTC (rev 
33451)
+++ gnunet/src/peerstore/peerstore_api.c        2014-05-31 21:48:01 UTC (rev 
33452)
@@ -263,7 +263,6 @@
  */
 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
     {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK, 
sizeof(struct GNUNET_MessageHeader)},
-    {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL, 
sizeof(struct GNUNET_MessageHeader)},
     {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD, 0},
     {&handle_iterate_result, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END, 
sizeof(struct GNUNET_MessageHeader)},
     {&handle_watch_result, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD, 0},
@@ -386,7 +385,6 @@
 {
   struct GNUNET_PEERSTORE_Handle *h = cls;
   struct GNUNET_PEERSTORE_StoreContext *sc;
-  uint16_t msg_type;
   GNUNET_PEERSTORE_Continuation cont;
   void *cont_cls;
 
@@ -409,13 +407,7 @@
     return;
   }
   if(NULL != cont) /* Run continuation */
-  {
-    msg_type = ntohs(msg->type);
-    if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK == msg_type)
-      cont(cont_cls, GNUNET_OK);
-    else if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL == msg_type)
-      cont(cont_cls, GNUNET_SYSERR);
-  }
+    cont(cont_cls, GNUNET_OK);
 
 }
 
@@ -681,54 +673,26 @@
  */
 void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
 {
-  /*struct GNUNET_PEERSTORE_Handle *h = cls;
+  struct GNUNET_PEERSTORE_Handle *h = cls;
+  struct GNUNET_PEERSTORE_Record *record;
+  struct GNUNET_HashCode keyhash;
   struct GNUNET_PEERSTORE_WatchContext *wc;
-  GNUNET_PEERSTORE_Processor callback;
-  void *callback_cls;
 
-
-
-  struct GNUNET_PEERSTORE_IterateContext *ic;
-  uint16_t msg_type;
-  struct GNUNET_PEERSTORE_Record *record;
-  int continue_iter;
-
-  ic = h->iterate_head;
-  if(NULL == ic)
+  if(NULL == msg)
   {
-    LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected iteration response, this should 
not happen.\n");
+    LOG(GNUNET_ERROR_TYPE_ERROR,
+        "Problem receiving a watch response, no way to determine which 
request.\n");
     reconnect(h);
     return;
   }
-  callback = ic->callback;
-  callback_cls = ic->callback_cls;
-  if(NULL == msg) * Connection error *
-  {
-
-    if(NULL != callback)
-      callback(callback_cls, NULL,
-          _("Error communicating with `PEERSTORE' service."));
-    reconnect(h);
-    return;
-  }
-  msg_type = ntohs(msg->type);
-  if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
-  {
-    GNUNET_PEERSTORE_iterate_cancel(ic);
-    if(NULL != callback)
-      callback(callback_cls, NULL, NULL);
-    return;
-  }
-  if(NULL != callback)
-  {
-    record = PEERSTORE_parse_record_message(msg);
-    if(NULL == record)
-      continue_iter = callback(callback_cls, record, _("Received a malformed 
response from service."));
-    else
-      continue_iter = callback(callback_cls, record, NULL);
-    if(GNUNET_NO == continue_iter)
-      ic->callback = NULL;
-  }*/
+  LOG(GNUNET_ERROR_TYPE_DEBUG, "Received a watch record from service.\n");
+  record = PEERSTORE_parse_record_message(msg);
+  PEERSTORE_hash_key(record->sub_system,
+      record->peer, record->key, &keyhash);
+  wc = GNUNET_CONTAINER_multihashmap_get(h->watches, &keyhash);
+  if(NULL != wc->callback)
+    wc->callback(wc->callback_cls, record, NULL);
+  /* TODO: destroy record */
 }
 
 /**
@@ -809,7 +773,7 @@
   if(NULL == h->watches)
     h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO);
   GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash,
-      wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+      wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
   LOG(GNUNET_ERROR_TYPE_DEBUG,
       "Sending a watch request for sub system `%s'.\n", sub_system);
   GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc);

Modified: gnunet/src/peerstore/test_peerstore_api.c
===================================================================
--- gnunet/src/peerstore/test_peerstore_api.c   2014-05-31 21:46:05 UTC (rev 
33451)
+++ gnunet/src/peerstore/test_peerstore_api.c   2014-05-31 21:48:01 UTC (rev 
33452)
@@ -127,7 +127,6 @@
       expiry,
       &store_cont,
       NULL);
-
 }
 
 int iterator (void *cls, const struct GNUNET_HashCode *key, void *value)




reply via email to

[Prev in Thread] Current Thread [Next in Thread]