gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28534 - in gnunet/src: include set


From: gnunet
Subject: [GNUnet-SVN] r28534 - in gnunet/src: include set
Date: Mon, 12 Aug 2013 16:34:16 +0200

Author: dold
Date: 2013-08-12 16:34:16 +0200 (Mon, 12 Aug 2013)
New Revision: 28534

Modified:
   gnunet/src/include/gnunet_set_service.h
   gnunet/src/set/gnunet-service-set.c
   gnunet/src/set/gnunet-service-set_union.c
   gnunet/src/set/set.h
   gnunet/src/set/set_api.c
Log:
- listener re-connects transparently
- bugs


Modified: gnunet/src/include/gnunet_set_service.h
===================================================================
--- gnunet/src/include/gnunet_set_service.h     2013-08-12 14:33:26 UTC (rev 
28533)
+++ gnunet/src/include/gnunet_set_service.h     2013-08-12 14:34:16 UTC (rev 
28534)
@@ -208,8 +208,7 @@
  * @param other_peer the other peer
  * @param context_msg message with application specific information from
  *        the other peer
- * @param request request from the other peer, use GNUNET_SET_accept
- *        Will be NULL if the listener failed.
+ * @param request request from the other peer (never NULL), use 
GNUNET_SET_accept
  *        to accept it, otherwise the request will be refused
  *        Note that we can't just return value from the listen callback,
  *        as it is also necessary to specify the set we want to do the
@@ -315,7 +314,9 @@
 
 
 /**
- * Wait for set operation requests for the given application id
+ * Wait for set operation requests for the given application ID.
+ * If the connection to the set service is lost, the listener is
+ * re-created transparently with exponential backoff.
  * 
  * @param cfg configuration to use for connecting to
  *            the set service
@@ -336,6 +337,8 @@
 
 /**
  * Cancel the given listen operation.
+ * After calling cancel, the listen callback for this listen handle
+ * will not be called again.
  *
  * @param lh handle for the listen operation
  */

Modified: gnunet/src/set/gnunet-service-set.c
===================================================================
--- gnunet/src/set/gnunet-service-set.c 2013-08-12 14:33:26 UTC (rev 28533)
+++ gnunet/src/set/gnunet-service-set.c 2013-08-12 14:34:16 UTC (rev 28534)
@@ -442,8 +442,8 @@
   }
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received P2P operation request (op %u, 
app %s)\n",
-              ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
-  listener = listener_get_by_target (ntohs (msg->operation), &msg->app_id);
+              ntohl (msg->operation), GNUNET_h2s (&msg->app_id));
+  listener = listener_get_by_target (ntohl (msg->operation), &msg->app_id);
   if (NULL == listener)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -477,6 +477,7 @@
     return;
   }
 
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
   set->vt->iterate (set);
 }
 
@@ -557,21 +558,30 @@
   listener->client = client;
   listener->client_mq = GNUNET_MQ_queue_for_server_client (client);
   listener->app_id = msg->app_id;
-  listener->operation = ntohs (msg->operation);
+  listener->operation = ntohl (msg->operation);
   GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new listener created (op %u, app 
%s)\n",
               listener->operation, GNUNET_h2s (&listener->app_id));
   for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
   {
-    if ( (NULL == incoming->spec) ||
-         (0 != incoming->suggest_id) )
+    if (NULL == incoming->spec)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request has no spec yet\n");
       continue;
+    }
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considering (op: %u, app: %s, 
suggest: %u)\n",
+                incoming->spec->operation, GNUNET_h2s 
(&incoming->spec->app_id), incoming->suggest_id);
+
+    if (0 != incoming->suggest_id)
+      continue;
     if (listener->operation != incoming->spec->operation)
       continue;
     if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, 
&incoming->spec->app_id))
       continue;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "request suggested\n");
     incoming_suggest (incoming, listener);
   }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "considered all incoming requests\n");
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -942,8 +952,9 @@
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dispatching mesh message (type: %u)\n",
               ntohs (message->type));
+  /* FIXME: do this before or after the handler? */
+  GNUNET_MESH_receive_done (tunnel);
   ret = tc->vt->msg_handler (tc->op, message);
-  GNUNET_MESH_receive_done (tunnel);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled mesh message (type: %u)\n",
               ntohs (message->type));
   return ret;
@@ -1023,7 +1034,7 @@
   int ret;
   ret = GNUNET_SERVICE_run (argc, argv, "set",
                             GNUNET_SERVICE_OPTION_NONE, &run, NULL);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
   return (GNUNET_OK == ret) ? 0 : 1;
 }
 

Modified: gnunet/src/set/gnunet-service-set_union.c
===================================================================
--- gnunet/src/set/gnunet-service-set_union.c   2013-08-12 14:33:26 UTC (rev 
28533)
+++ gnunet/src/set/gnunet-service-set_union.c   2013-08-12 14:34:16 UTC (rev 
28534)
@@ -147,6 +147,8 @@
 
   /**
    * Maps IBF-Keys (specific to the current salt) to elements.
+   * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
+   * Colliding IBF-Keys are linked.
    */
   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
 
@@ -493,7 +495,7 @@
     GNUNET_SERVER_client_disconnect (eo->spec->set->client);
     return;
   }
-  msg->operation = htons (GNUNET_SET_OPERATION_UNION);
+  msg->operation = htonl (GNUNET_SET_OPERATION_UNION);
   msg->app_id = eo->spec->app_id;
   msg->salt = htonl (eo->spec->salt);
   GNUNET_MQ_send (eo->mq, ev);
@@ -524,7 +526,7 @@
  *         GNUNET_NO if not.
  */
 static int
-insert_element_iterator (void *cls,
+op_register_element_iterator (void *cls,
                          uint32_t key,
                          void *value)
 {
@@ -549,12 +551,16 @@
 /**
  * Insert an element into the union operation's
  * key-to-element mapping. Takes ownership of 'ee'.
+ * Note that this does not insert the element in the set,
+ * only in the operation's key-element mapping.
+ * This is done to speed up re-tried operations, if some elements
+ * were transmitted, and then the IBF fails to decode.
  *
  * @param eo the union operation
  * @param ee the element entry
  */
 static void
-insert_element (struct OperationState *eo, struct ElementEntry *ee)
+op_register_element (struct OperationState *eo, struct ElementEntry *ee)
 {
   int ret;
   struct IBF_Key ibf_key;
@@ -566,14 +572,14 @@
   k->ibf_key = ibf_key;
   ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
                                                       (uint32_t) 
ibf_key.key_val,
-                                                      insert_element_iterator, 
k);
+                                                      
op_register_element_iterator, k);
 
   /* was the element inserted into a colliding bucket? */
   if (GNUNET_SYSERR == ret)
     return;
 
   GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) 
ibf_key.key_val, k,
-                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
 }
 
 
@@ -623,7 +629,7 @@
 
   e->remote = GNUNET_NO;
 
-  insert_element (eo, e);
+  op_register_element (eo, e);
   return GNUNET_YES;
 }
 
@@ -861,27 +867,32 @@
   ibf_destroy (eo->remote_ibf);
   eo->remote_ibf = NULL;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", 
diff_ibf->size);
+
   num_decoded = 0;
+  last_key.key_val = 0;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoding IBF (size=%u)\n", 
diff_ibf->size);
-
   while (1)
   {
     int res;
+    int cycle_detected = GNUNET_NO;
 
-    if (num_decoded > 0)
-      last_key = key;
+    last_key = key;
 
     res = ibf_decode (diff_ibf, &side, &key);
     if (res == GNUNET_OK)
+    {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "decoded ibf key %lx\n",
                   key.key_val);
-    num_decoded += 1;
-    if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val 
== key.key_val))
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded 
%u/%u)\n",
-                  num_decoded, diff_ibf->size);
-    if ((GNUNET_SYSERR == res) || (num_decoded > diff_ibf->size) ||
-        (num_decoded > 1 && last_key.key_val == key.key_val))
+      num_decoded += 1;
+      if (num_decoded > diff_ibf->size || (num_decoded > 1 && last_key.key_val 
== key.key_val))
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "detected cyclic ibf (decoded 
%u/%u)\n",
+                    num_decoded, diff_ibf->size);
+        cycle_detected = GNUNET_YES;
+      }
+    }
+    if ((GNUNET_SYSERR == res) || (GNUNET_YES == cycle_detected))
     {
       int next_order;
       next_order = 0;
@@ -922,6 +933,8 @@
 
       /* FIXME: before sending the request, check if we may just have the 
element */
       /* FIXME: merge multiple requests */
+      /* FIXME: remember somewhere that we already requested the element,
+       * so that we don't request it again with the next ibf if decoding fails 
*/
       ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
                                         
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
       
@@ -1089,7 +1102,9 @@
   ee->remote = GNUNET_YES;
   GNUNET_CRYPTO_hash (ee->element.data, ee->element.size, &ee->element_hash);
 
-  insert_element (eo, ee);
+  /* FIXME: see if the element has already been inserted! */
+
+  op_register_element (eo, ee);
   send_client_element (eo, &ee->element);
 }
 
@@ -1386,6 +1401,8 @@
 union_handle_p2p_message (struct OperationState *eo,
                           const struct GNUNET_MessageHeader *mh)
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received p2p message (t: %u, s: %u)\n",
+              ntohs (mh->type), ntohs (mh->size));
   switch (ntohs (mh->type))
   {
     case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
@@ -1490,6 +1507,8 @@
 {
   struct GNUNET_MQ_Envelope *ev;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "iterating union set with %u 
elements\n",
+              GNUNET_CONTAINER_multihashmap_size (set->state->elements));
   GNUNET_CONTAINER_multihashmap_iterate (set->state->elements, 
send_iter_element_iter, set);
   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_DONE);
   GNUNET_MQ_send (set->client_mq, ev);

Modified: gnunet/src/set/set.h
===================================================================
--- gnunet/src/set/set.h        2013-08-12 14:33:26 UTC (rev 28533)
+++ gnunet/src/set/set.h        2013-08-12 14:34:16 UTC (rev 28534)
@@ -59,7 +59,7 @@
   /**
    * Operation type, values of enum GNUNET_SET_OperationType
    */
-  uint16_t operation GNUNET_PACKED;
+  uint32_t operation GNUNET_PACKED;
 
   /**
    * application id

Modified: gnunet/src/set/set_api.c
===================================================================
--- gnunet/src/set/set_api.c    2013-08-12 14:33:26 UTC (rev 28533)
+++ gnunet/src/set/set_api.c    2013-08-12 14:34:16 UTC (rev 28534)
@@ -169,6 +169,13 @@
   struct GNUNET_MQ_Handle* mq;
 
   /**
+   * Configuration handle for the listener, stored
+   * here to be able to reconnect transparently on
+   * connection failure.
+   */
+  const struct GNUNET_CONFIGURATION_Handle *cfg;
+
+  /**
    * Function to call on a new incoming request,
    * or on error.
    */
@@ -178,9 +185,30 @@
    * Closure for listen_cb.
    */
   void *listen_cls;
+
+  /**
+   * Operation we listen for.
+   */
+  enum GNUNET_SET_OperationType operation;
+
+  /**
+   * Application ID we listen for.
+   */
+  struct GNUNET_HashCode app_id;
+
+  /**
+   * Time to wait until we try to reconnect on failure.
+   */
+  struct GNUNET_TIME_Relative reconnect_backoff;
 };
 
 
+/* forward declaration */
+static void
+listen_connect (void *cls,
+                const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
 /**
  * Handle element for iteration over the set.
  *
@@ -198,7 +226,8 @@
   if (NULL == set->iterator)
     return;
 
-  element.type = htons (mh->type);
+  element.size = ntohs (mh->size) - sizeof (struct 
GNUNET_SET_IterResponseMessage);
+  element.type = htons (msg->element_type);
   element.data = &msg[1];
   set->iterator (set->iterator_cls, &element);
 }
@@ -266,6 +295,7 @@
     oh->result_cb (oh->result_cls, &e, result_status);
 }
 
+
 /**
  * Handle request message for a listen operation
  *
@@ -297,9 +327,9 @@
     amsg->request_id = htonl (0);
     amsg->accept_reject_id = msg->accept_id;
     GNUNET_MQ_send (lh->mq, mqm);
-    GNUNET_free (req);
     LOG (GNUNET_ERROR_TYPE_DEBUG, "rejecting request\n");
   }
+  GNUNET_free (req);
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "processed op request from service\n");
 
@@ -313,8 +343,14 @@
 {
   struct GNUNET_SET_ListenHandle *lh = cls;
 
-  /* FIXME: why do you do this? */
-  lh->listen_cb (lh->listen_cls, NULL, NULL, NULL);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "listener broke down, re-connecting\n");
+  GNUNET_CLIENT_disconnect (lh->client);
+  lh->client = NULL;
+  GNUNET_MQ_destroy (lh->mq);
+  lh->mq = NULL;
+
+  GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff, listen_connect, lh);
+  lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
 }
 
 
@@ -465,6 +501,7 @@
   set->client = NULL;
   GNUNET_MQ_destroy (set->mq);
   set->mq = NULL;
+  GNUNET_free (set);
 }
 
 
@@ -514,11 +551,49 @@
   return oh;
 }
 
+
 /**
+ * Connect to the set service in order to listen
+ * for request.
+ *
+ * @param cls the listen handle to connect
+ * @param tc task context if invoked as a task, NULL otherwise
+ */
+static void
+listen_connect (void *cls,
+                const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_MQ_Envelope *mqm;
+  struct GNUNET_SET_ListenMessage *msg;
+  struct GNUNET_SET_ListenHandle *lh = cls;
+  static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+    {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST},
+    GNUNET_MQ_HANDLERS_END
+  };
+
+  GNUNET_assert (NULL == lh->client);
+  lh->client = GNUNET_CLIENT_connect ("set", lh->cfg);
+  if (NULL == lh->client)
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         "could not connect to set (wrong configuration?), giving up 
listening\n");
+    return;
+  }
+  GNUNET_assert (NULL == lh->mq);
+  lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers,
+                                                  
handle_client_listener_error, lh);
+  mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
+  msg->operation = htonl (lh->operation);
+  msg->app_id = lh->app_id;
+  GNUNET_MQ_send (lh->mq, mqm);
+}
+
+
+/**
  * Wait for set operation requests for the given application id
  * 
  * @param cfg configuration to use for connecting to
- *            the set service
+ *            the set service, needs to be valid for the lifetime of the 
listen handle
  * @param operation operation we want to listen for
  * @param app_id id of the application that handles set operation requests
  * @param listen_cb called for each incoming request matching the operation
@@ -534,25 +609,15 @@
                    void *listen_cls)
 {
   struct GNUNET_SET_ListenHandle *lh;
-  struct GNUNET_MQ_Envelope *mqm;
-  struct GNUNET_SET_ListenMessage *msg;
-  static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
-    {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST},
-    GNUNET_MQ_HANDLERS_END
-  };
 
   lh = GNUNET_new (struct GNUNET_SET_ListenHandle);
-  lh->client = GNUNET_CLIENT_connect ("set", cfg);
   lh->listen_cb = listen_cb;
   lh->listen_cls = listen_cls;
-  GNUNET_assert (NULL != lh->client);
-  lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers,
-                                                  
handle_client_listener_error, lh);
-  mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
-  msg->operation = htons (operation);
-  msg->app_id = *app_id;
-  GNUNET_MQ_send (lh->mq, mqm);
-
+  lh->cfg = cfg;
+  lh->operation = operation;
+  lh->app_id = *app_id;
+  lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
+  listen_connect (lh, NULL);
   return lh;
 }
 
@@ -680,7 +745,6 @@
 }
 
 
-
 /**
  * Iterate over all elements in the given set.
  * Note that this operation involves transferring every element of the set




reply via email to

[Prev in Thread] Current Thread [Next in Thread]