gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r33563 - in gnunet/src: peerstore sensor


From: gnunet
Subject: [GNUnet-SVN] r33563 - in gnunet/src: peerstore sensor
Date: Fri, 6 Jun 2014 13:55:14 +0200

Author: otarabai
Date: 2014-06-06 13:55:14 +0200 (Fri, 06 Jun 2014)
New Revision: 33563

Modified:
   gnunet/src/peerstore/gnunet-service-peerstore.c
   gnunet/src/peerstore/peerstore_api.c
   gnunet/src/sensor/Makefile.am
   gnunet/src/sensor/gnunet-service-sensor.c
Log:
using PEERSTORE in SENSOR + fixes in PEERSTORE


Modified: gnunet/src/peerstore/gnunet-service-peerstore.c
===================================================================
--- gnunet/src/peerstore/gnunet-service-peerstore.c     2014-06-06 11:51:26 UTC 
(rev 33562)
+++ gnunet/src/peerstore/gnunet-service-peerstore.c     2014-06-06 11:55:14 UTC 
(rev 33563)
@@ -32,7 +32,7 @@
 /**
  * Interval for expired records cleanup (in seconds)
  */
-#define CLEANUP_INTERVAL 300 /* 5mins */
+#define EXPIRED_RECORDS_CLEANUP_INTERVAL 300 /* 5mins */
 
 /**
  * Our configuration.
@@ -96,7 +96,7 @@
   deleted = db->expire_records(db->cls, GNUNET_TIME_absolute_get());
   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", deleted);
   GNUNET_SCHEDULER_add_delayed(
-      GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 
CLEANUP_INTERVAL),
+      GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 
EXPIRED_RECORDS_CLEANUP_INTERVAL),
       &cleanup_expired_records, NULL);
 }
 

Modified: gnunet/src/peerstore/peerstore_api.c
===================================================================
--- gnunet/src/peerstore/peerstore_api.c        2014-06-06 11:51:26 UTC (rev 
33562)
+++ gnunet/src/peerstore/peerstore_api.c        2014-06-06 11:55:14 UTC (rev 
33563)
@@ -245,6 +245,27 @@
 reconnect (struct GNUNET_PEERSTORE_Handle *h);
 
 /**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
+ */
+void watch_request_sent (void *cls);
+
+/**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
+ */
+void iterate_request_sent (void *cls);
+
+/**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
+ */
+void store_request_sent (void *cls);
+
+/**
  * MQ message handlers
  */
 static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
@@ -268,6 +289,28 @@
 }
 
 /**
+ * Iterator over previous watches to resend them
+ */
+int rewatch_it(void *cls,
+    const struct GNUNET_HashCode *key,
+    void *value)
+{
+  struct GNUNET_PEERSTORE_Handle *h = cls;
+  struct GNUNET_PEERSTORE_WatchContext *wc = value;
+  struct StoreKeyHashMessage *hm;
+
+  if(GNUNET_YES == wc->request_sent)
+  { /* Envelope gone, create new one. */
+    wc->ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
+    hm->keyhash = wc->keyhash;
+    wc->request_sent = GNUNET_NO;
+  }
+  GNUNET_MQ_notify_sent(wc->ev, &watch_request_sent, wc);
+  GNUNET_MQ_send(h->mq, wc->ev);
+  return GNUNET_YES;
+}
+
+/**
  * Close the existing connection to PEERSTORE and reconnect.
  *
  * @param h handle to the service
@@ -275,6 +318,11 @@
 static void
 reconnect (struct GNUNET_PEERSTORE_Handle *h)
 {
+  struct GNUNET_PEERSTORE_IterateContext *ic;
+  GNUNET_PEERSTORE_Processor icb;
+  void *icb_cls;
+  struct GNUNET_PEERSTORE_StoreContext *sc;
+
   LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
   if (NULL != h->mq)
   {
@@ -287,13 +335,43 @@
     h->client = NULL;
   }
   h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
-  //FIXME: retry connecting if fails again (client == NULL)
+  GNUNET_assert(NULL != h->client);
   h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
       mq_handlers,
       &handle_client_error,
       h);
-  //FIXME: resend pending requests after reconnecting
-
+  LOG(GNUNET_ERROR_TYPE_DEBUG,
+      "Resending pending requests after reconnect.\n");
+  if (NULL != h->watches)
+  {
+    GNUNET_CONTAINER_multihashmap_iterate(h->watches,
+        &rewatch_it, h);
+  }
+  ic = h->iterate_head;
+  while (NULL != ic)
+  {
+    if (GNUNET_YES == ic->request_sent)
+    {
+      icb = ic->callback;
+      icb_cls = ic->callback_cls;
+      GNUNET_PEERSTORE_iterate_cancel(ic);
+      if(NULL != icb)
+        icb(icb_cls, NULL,_("Iteration canceled due to reconnection."));
+    }
+    else
+    {
+      GNUNET_MQ_notify_sent(ic->ev, &iterate_request_sent, ic);
+      GNUNET_MQ_send(h->mq, ic->ev);
+    }
+    ic = ic->next;
+  }
+  sc = h->store_head;
+  while (NULL != sc)
+  {
+    GNUNET_MQ_notify_sent(sc->ev, &store_request_sent, sc);
+    GNUNET_MQ_send(h->mq, sc->ev);
+    sc = sc->next;
+  }
 }
 
 /**
@@ -336,6 +414,7 @@
 void
 GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
 {
+  LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
   if(NULL != h->watches)
   {
     GNUNET_CONTAINER_multihashmap_destroy(h->watches);
@@ -442,7 +521,7 @@
   sc->cont = cont;
   sc->cont_cls = cont_cls;
   sc->h = h;
-  GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc);
+  GNUNET_CONTAINER_DLL_insert_tail(h->store_head, h->store_tail, sc);
   GNUNET_MQ_notify_sent(ev, &store_request_sent, sc);
   GNUNET_MQ_send(h->mq, ev);
   return sc;
@@ -604,7 +683,7 @@
   ic->ev = ev;
   ic->h = h;
   ic->request_sent = GNUNET_NO;
-  GNUNET_CONTAINER_DLL_insert(h->iterate_head, h->iterate_tail, ic);
+  GNUNET_CONTAINER_DLL_insert_tail(h->iterate_head, h->iterate_tail, ic);
   LOG(GNUNET_ERROR_TYPE_DEBUG,
         "Sending an iterate request for sub system `%s'\n", sub_system);
   GNUNET_MQ_notify_sent(ev, &iterate_request_sent, ic);

Modified: gnunet/src/sensor/Makefile.am
===================================================================
--- gnunet/src/sensor/Makefile.am       2014-06-06 11:51:26 UTC (rev 33562)
+++ gnunet/src/sensor/Makefile.am       2014-06-06 11:55:14 UTC (rev 33563)
@@ -36,6 +36,7 @@
 gnunet_service_sensor_LDADD = \
   $(top_builddir)/src/util/libgnunetutil.la \
   $(top_builddir)/src/statistics/libgnunetstatistics.la \
+  $(top_builddir)/src/peerstore/libgnunetpeerstore.la \
   $(GN_LIBINTL)
 
 libgnunetsensor_la_SOURCES = \

Modified: gnunet/src/sensor/gnunet-service-sensor.c
===================================================================
--- gnunet/src/sensor/gnunet-service-sensor.c   2014-06-06 11:51:26 UTC (rev 
33562)
+++ gnunet/src/sensor/gnunet-service-sensor.c   2014-06-06 11:55:14 UTC (rev 
33563)
@@ -28,6 +28,7 @@
 #include "gnunet_util_lib.h"
 #include "sensor.h"
 #include "gnunet_statistics_service.h"
+#include "gnunet_peerstore_service.h"
 
 /**
  * Minimum sensor execution interval (in seconds)
@@ -204,6 +205,21 @@
 struct GNUNET_STATISTICS_Handle *statistics;
 
 /**
+ * Handle to peerstore service
+ */
+struct GNUNET_PEERSTORE_Handle *peerstore;
+
+/**
+ * Service name
+ */
+char *subsystem = "sensor";
+
+/**
+ * My peer id
+ */
+struct GNUNET_PeerIdentity peerid;
+
+/**
  * Remove sensor execution from scheduler
  *
  * @param cls unused
@@ -290,7 +306,15 @@
   GNUNET_CONTAINER_multihashmap_iterate(sensors, &destroy_sensor, NULL);
   GNUNET_CONTAINER_multihashmap_destroy(sensors);
   if(NULL != statistics)
+  {
     GNUNET_STATISTICS_destroy(statistics, GNUNET_YES);
+    statistics = NULL;
+  }
+  if(NULL != peerstore)
+  {
+    GNUNET_PEERSTORE_disconnect(peerstore);
+    peerstore = NULL;
+  }
   GNUNET_SCHEDULER_shutdown();
 }
 
@@ -816,8 +840,21 @@
     int is_persistent)
 {
   struct SensorInfo *sensorinfo = cls;
+  struct GNUNET_TIME_Absolute expiry;
 
   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a value for sensor `%s': %" 
PRIu64 "\n", sensorinfo->name, value);
+  //FIXME: store first line, last line or all ??
+  expiry = GNUNET_TIME_relative_to_absolute(sensorinfo->interval);
+  GNUNET_PEERSTORE_store(peerstore,
+      subsystem,
+      &peerid,
+      sensorinfo->name,
+      &value,
+      sizeof(value),
+      expiry,
+      GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
+      NULL,
+      NULL);
   return GNUNET_OK;
 }
 
@@ -845,6 +882,7 @@
 void sensor_process_callback (void *cls, const char *line)
 {
   struct SensorInfo *sensorinfo = cls;
+  struct GNUNET_TIME_Absolute expiry;
 
   if(NULL == line) //end of output
   {
@@ -854,6 +892,18 @@
     return;
   }
   GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a value for sensor `%s': %s\n", 
sensorinfo->name, line);
+  //FIXME: store first line, last line or all ??
+  expiry = GNUNET_TIME_relative_to_absolute(sensorinfo->interval);
+  GNUNET_PEERSTORE_store(peerstore,
+      subsystem,
+      &peerid,
+      sensorinfo->name,
+      line,
+      strlen(line) + 1,
+      expiry,
+      GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
+      NULL,
+      NULL);
 }
 
 /**
@@ -903,10 +953,6 @@
   GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Starting the execution of sensor 
`%s'\n", sensorinfo->name);
   if(sources[0] == sensorinfo->source) //gnunet-statistics
   {
-    if(NULL == statistics)
-    {
-      statistics = GNUNET_STATISTICS_create("sensor", cfg);
-    }
     sensorinfo->gnunet_stat_get_handle = GNUNET_STATISTICS_get(statistics,
         sensorinfo->gnunet_stat_service,
         sensorinfo->gnunet_stat_name,
@@ -1032,6 +1078,9 @@
   sensors = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO);
   reload_sensors();
   schedule_all_sensors();
+  statistics = GNUNET_STATISTICS_create("sensor", cfg);
+  GNUNET_CRYPTO_get_peer_identity(cfg, &peerid);
+  peerstore = GNUNET_PEERSTORE_connect(cfg);
   GNUNET_SERVER_add_handlers (server, handlers);
   GNUNET_SERVER_disconnect_notify (server, 
                                   &handle_client_disconnect,




reply via email to

[Prev in Thread] Current Thread [Next in Thread]