gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated: get FS test with CADET to f


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated: get FS test with CADET to finally pass again
Date: Fri, 17 Feb 2017 14:30:36 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new 181c039d1 get FS test with CADET to finally pass again
181c039d1 is described below

commit 181c039d12aa2aa99920d14070e7b64c018e8be7
Author: Christian Grothoff <address@hidden>
AuthorDate: Fri Feb 17 14:31:38 2017 +0100

    get FS test with CADET to finally pass again
---
 src/cadet/cadet_api_new.c               |   5 +-
 src/fs/Makefile.am                      |   2 +-
 src/fs/gnunet-service-fs_cadet_client.c | 358 +++++++++++++++-----------------
 src/fs/gnunet-service-fs_cadet_server.c |  10 +-
 4 files changed, 170 insertions(+), 205 deletions(-)

diff --git a/src/cadet/cadet_api_new.c b/src/cadet/cadet_api_new.c
index 8f482aa28..eb8bc2549 100644
--- a/src/cadet/cadet_api_new.c
+++ b/src/cadet/cadet_api_new.c
@@ -711,11 +711,10 @@ handle_local_data (void *cls,
   type = ntohs (payload->type);
   fwd = ntohl (ch->ccn.channel_of_client) <= GNUNET_CADET_LOCAL_CHANNEL_ID_CLI;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Got a %s data on channel %s [%X] of type %s (%u)\n",
-       GC_f2s (fwd),
+       "Got a %s data on channel %s [%X] of type %u\n",
+       fwd ? "FWD" : "BWD",
        GNUNET_i2s (&ch->peer),
        ntohl (message->ccn.channel_of_client),
-       GC_m2s (type),
        type);
   GNUNET_MQ_inject_message (ch->mq,
                             payload);
diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am
index 344eb5a74..75451c7f6 100644
--- a/src/fs/Makefile.am
+++ b/src/fs/Makefile.am
@@ -202,7 +202,7 @@ gnunet_service_fs_LDADD =  \
  $(top_builddir)/src/block/libgnunetblock.la \
  $(top_builddir)/src/datastore/libgnunetdatastore.la \
  $(top_builddir)/src/statistics/libgnunetstatistics.la \
- $(top_builddir)/src/cadet/libgnunetcadet.la \
+ $(top_builddir)/src/cadet/libgnunetcadetnew.la \
  $(top_builddir)/src/ats/libgnunetats.la \
  $(top_builddir)/src/core/libgnunetcore.la \
  $(top_builddir)/src/util/libgnunetutil.la \
diff --git a/src/fs/gnunet-service-fs_cadet_client.c 
b/src/fs/gnunet-service-fs_cadet_client.c
index 193fe2263..55e0cbc24 100644
--- a/src/fs/gnunet-service-fs_cadet_client.c
+++ b/src/fs/gnunet-service-fs_cadet_client.c
@@ -77,7 +77,7 @@ struct GSF_CadetRequest
   GSF_CadetReplyProcessor proc;
 
   /**
-   * Closure for 'proc'
+   * Closure for @e proc
    */
   void *proc_cls;
 
@@ -126,11 +126,6 @@ struct CadetHandle
   struct GNUNET_CADET_Channel *channel;
 
   /**
-   * Handle for active write operation, or NULL.
-   */
-  struct GNUNET_CADET_TransmitHandle *wh;
-
-  /**
    * Which peer does this cadet go to?
    */
   struct GNUNET_PeerIdentity target;
@@ -140,14 +135,14 @@ struct CadetHandle
    * a few seconds to give the application a chance to give
    * us another query).
    */
-  struct GNUNET_SCHEDULER_Task * timeout_task;
+  struct GNUNET_SCHEDULER_Task *timeout_task;
 
   /**
    * Task to reset cadets that had errors (asynchronously,
    * as we may not be able to do it immediately during a
    * callback from the cadet API).
    */
-  struct GNUNET_SCHEDULER_Task * reset_task;
+  struct GNUNET_SCHEDULER_Task *reset_task;
 
 };
 
@@ -170,10 +165,10 @@ struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
 /**
  * Transmit pending requests via the cadet.
  *
- * @param mh cadet to process
+ * @param cls `struct CadetHandle` to process
  */
 static void
-transmit_pending (struct CadetHandle *mh);
+transmit_pending (void *cls);
 
 
 /**
@@ -206,65 +201,19 @@ move_to_pending (void *cls,
 
 
 /**
- * We had a serious error, tear down and re-create cadet from scratch.
- *
- * @param mh cadet to reset
- */
-static void
-reset_cadet (struct CadetHandle *mh)
-{
-  struct GNUNET_CADET_Channel *channel = mh->channel;
-  struct GNUNET_HashCode port;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Resetting cadet channel to %s\n",
-             GNUNET_i2s (&mh->target));
-  mh->channel = NULL;
-
-  if (NULL != channel)
-  {
-    /* Avoid loop */
-    if (NULL != mh->wh)
-    {
-      GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
-      mh->wh = NULL;
-    }
-    GNUNET_CADET_channel_destroy (channel);
-  }
-  GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
-                                        &move_to_pending,
-                                        mh);
-  GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
-                      strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
-                      &port);
-  mh->channel = GNUNET_CADET_channel_create (cadet_handle,
-                                             mh,
-                                             &mh->target,
-                                             &port,
-                                             GNUNET_CADET_OPTION_RELIABLE);
-  transmit_pending (mh);
-}
-
-
-/**
- * Task called when it is time to destroy an inactive cadet channel.
+ * Functions with this signature are called whenever a complete reply
+ * is received.
  *
- * @param cls the `struct CadetHandle` to tear down
+ * @param cls closure with the `struct CadetHandle`
+ * @param srm the actual message
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
  */
-static void
-cadet_timeout (void *cls)
+static int
+check_reply (void *cls,
+             const struct CadetReplyMessage *srm)
 {
-  struct CadetHandle *mh = cls;
-  struct GNUNET_CADET_Channel *tun;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Timeout on cadet channel to %s\n",
-             GNUNET_i2s (&mh->target));
-  mh->timeout_task = NULL;
-  tun = mh->channel;
-  mh->channel = NULL;
-  if(NULL != tun)
-       GNUNET_CADET_channel_destroy (tun);
+  /* We check later... */
+  return GNUNET_OK;
 }
 
 
@@ -274,13 +223,7 @@ cadet_timeout (void *cls)
  * @param cls the `struct CadetHandle` to tear down
  */
 static void
-reset_cadet_task (void *cls)
-{
-  struct CadetHandle *mh = cls;
-
-  mh->reset_task = NULL;
-  reset_cadet (mh);
-}
+reset_cadet_task (void *cls);
 
 
 /**
@@ -300,83 +243,6 @@ reset_cadet_async (struct CadetHandle *mh)
 
 
 /**
- * Functions of this signature are called whenever we are ready to transmit
- * query via a cadet.
- *
- * @param cls the struct CadetHandle for which we did the write call
- * @param size the number of bytes that can be written to @a buf
- * @param buf where to write the message
- * @return number of bytes written to @a buf
- */
-static size_t
-transmit_sqm (void *cls,
-             size_t size,
-             void *buf)
-{
-  struct CadetHandle *mh = cls;
-  struct CadetQueryMessage sqm;
-  struct GSF_CadetRequest *sr;
-
-  mh->wh = NULL;
-  if (NULL == buf)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Cadet channel to %s failed during transmission attempt, 
rebuilding\n",
-               GNUNET_i2s (&mh->target));
-    reset_cadet_async (mh);
-    return 0;
-  }
-  sr = mh->pending_head;
-  if (NULL == sr)
-    return 0;
-  GNUNET_assert (size >= sizeof (struct CadetQueryMessage));
-  GNUNET_CONTAINER_DLL_remove (mh->pending_head,
-                              mh->pending_tail,
-                              sr);
-  GNUNET_assert (GNUNET_OK ==
-                GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
-                                                   &sr->query,
-                                                   sr,
-                                                   
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
-  sr->was_transmitted = GNUNET_YES;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Sending query for %s via cadet to %s\n",
-             GNUNET_h2s (&sr->query),
-             GNUNET_i2s (&mh->target));
-  sqm.header.size = htons (sizeof (sqm));
-  sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
-  sqm.type = htonl (sr->type);
-  sqm.query = sr->query;
-  GNUNET_memcpy (buf, &sqm, sizeof (sqm));
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Successfully transmitted %u bytes via cadet to %s\n",
-             (unsigned int) size,
-             GNUNET_i2s (&mh->target));
-  transmit_pending (mh);
-  return sizeof (sqm);
-}
-
-
-/**
- * Transmit pending requests via the cadet.
- *
- * @param mh cadet to process
- */
-static void
-transmit_pending (struct CadetHandle *mh)
-{
-  if (NULL == mh->channel)
-    return;
-  if (NULL != mh->wh)
-    return;
-  mh->wh = GNUNET_CADET_notify_transmit_ready (mh->channel, GNUNET_YES /* 
allow cork */,
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             sizeof (struct CadetQueryMessage),
-                                             &transmit_sqm, mh);
-}
-
-
-/**
  * Closure for handle_reply().
  */
 struct HandleReplyClosure
@@ -393,7 +259,7 @@ struct HandleReplyClosure
   struct GNUNET_TIME_Absolute expiration;
 
   /**
-   * Number of bytes in 'data'.
+   * Number of bytes in @e data.
    */
   size_t data_size;
 
@@ -439,19 +305,24 @@ process_reply (void *cls,
 
 
 /**
- * Functions with this signature are called whenever a complete reply
- * is received.
+ * Iterator called on each entry in a waiting map to
+ * call the 'proc' continuation and release associated
+ * resources.
  *
- * @param cls closure with the `struct CadetHandle`
- * @param srm the actual message
- * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
+ * @param cls the `struct CadetHandle`
+ * @param key the key of the entry in the map (the query)
+ * @param value the `struct GSF_CadetRequest` to clean up
+ * @return #GNUNET_YES (continue to iterate)
  */
 static int
-check_reply (void *cls,
-             const struct CadetReplyMessage *srm)
+free_waiting_entry (void *cls,
+                   const struct GNUNET_HashCode *key,
+                   void *value)
 {
-  /* We check later... */
-  return GNUNET_OK;
+  struct GSF_CadetRequest *sr = value;
+
+  GSF_cadet_query_cancel (sr);
+  return GNUNET_YES;
 }
 
 
@@ -517,28 +388,6 @@ handle_reply (void *cls,
 
 
 /**
- * Iterator called on each entry in a waiting map to
- * call the 'proc' continuation and release associated
- * resources.
- *
- * @param cls the `struct CadetHandle`
- * @param key the key of the entry in the map (the query)
- * @param value the `struct GSF_CadetRequest` to clean up
- * @return #GNUNET_YES (continue to iterate)
- */
-static int
-free_waiting_entry (void *cls,
-                   const struct GNUNET_HashCode *key,
-                   void *value)
-{
-  struct GSF_CadetRequest *sr = value;
-
-  GSF_cadet_query_cancel (sr);
-  return GNUNET_YES;
-}
-
-
-/**
  * Function called by cadet when a client disconnects.
  * Cleans up our `struct CadetClient` of that channel.
  *
@@ -569,8 +418,6 @@ disconnect_cb (void *cls,
   GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
                                         &free_waiting_entry,
                                         mh);
-  if (NULL != mh->wh)
-    GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
   if (NULL != mh->timeout_task)
     GNUNET_SCHEDULER_cancel (mh->timeout_task);
   if (NULL != mh->reset_task)
@@ -602,6 +449,133 @@ window_change_cb (void *cls,
                   int window_size)
 {
   /* FIXME: for flow control, implement? */
+#if 0
+  /* Something like this instead of the GNUNET_MQ_notify_sent() in
+     transmit_pending() might be good (once the window change CB works...) */
+  if (0 < window_size) /* test needed? */
+    transmit_pending (mh);
+#endif
+}
+
+
+/**
+ * We had a serious error, tear down and re-create cadet from scratch.
+ *
+ * @param mh cadet to reset
+ */
+static void
+reset_cadet (struct CadetHandle *mh)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Resetting cadet channel to %s\n",
+             GNUNET_i2s (&mh->target));
+  GNUNET_CADET_channel_destroy (mh->channel);
+  mh->channel = NULL;
+  GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
+                                        &move_to_pending,
+                                        mh);
+  {
+    struct GNUNET_MQ_MessageHandler handlers[] = {
+      GNUNET_MQ_hd_var_size (reply,
+                             GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
+                             struct CadetReplyMessage,
+                             mh),
+      GNUNET_MQ_handler_end ()
+    };
+    struct GNUNET_HashCode port;
+
+    GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
+                        strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
+                        &port);
+    mh->channel = GNUNET_CADET_channel_creatE (cadet_handle,
+                                               mh,
+                                               &mh->target,
+                                               &port,
+                                               GNUNET_CADET_OPTION_RELIABLE,
+                                               &window_change_cb,
+                                               &disconnect_cb,
+                                               handlers);
+  }
+  transmit_pending (mh);
+}
+
+
+/**
+ * Task called when it is time to destroy an inactive cadet channel.
+ *
+ * @param cls the `struct CadetHandle` to tear down
+ */
+static void
+cadet_timeout (void *cls)
+{
+  struct CadetHandle *mh = cls;
+  struct GNUNET_CADET_Channel *tun;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Timeout on cadet channel to %s\n",
+             GNUNET_i2s (&mh->target));
+  mh->timeout_task = NULL;
+  tun = mh->channel;
+  mh->channel = NULL;
+  if (NULL != tun)
+    GNUNET_CADET_channel_destroy (tun);
+}
+
+
+/**
+ * Task called when it is time to reset an cadet.
+ *
+ * @param cls the `struct CadetHandle` to tear down
+ */
+static void
+reset_cadet_task (void *cls)
+{
+  struct CadetHandle *mh = cls;
+
+  mh->reset_task = NULL;
+  reset_cadet (mh);
+}
+
+
+/**
+ * Transmit pending requests via the cadet.
+ *
+ * @param cls `struct CadetHandle` to process
+ */
+static void
+transmit_pending (void *cls)
+{
+  struct CadetHandle *mh = cls;
+  struct GNUNET_MQ_Handle *mq = GNUNET_CADET_get_mq (mh->channel);
+  struct GSF_CadetRequest *sr;
+  struct GNUNET_MQ_Envelope *env;
+  struct CadetQueryMessage *sqm;
+
+  if ( (0 != GNUNET_MQ_get_length (mq)) ||
+       (NULL == (sr = mh->pending_head)) )
+    return;
+  GNUNET_CONTAINER_DLL_remove (mh->pending_head,
+                              mh->pending_tail,
+                              sr);
+  GNUNET_assert (GNUNET_OK ==
+                GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
+                                                   &sr->query,
+                                                   sr,
+                                                   
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+  sr->was_transmitted = GNUNET_YES;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Sending query for %s via cadet to %s\n",
+             GNUNET_h2s (&sr->query),
+             GNUNET_i2s (&mh->target));
+  env = GNUNET_MQ_msg (sqm,
+                       GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
+  sqm->type = htonl (sr->type);
+  sqm->query = sr->query;
+  GNUNET_MQ_notify_sent (env,
+                         &transmit_pending,
+                         mh);
+  GNUNET_MQ_send (mq,
+                  env);
 }
 
 
@@ -614,7 +588,6 @@ static struct CadetHandle *
 get_cadet (const struct GNUNET_PeerIdentity *target)
 {
   struct CadetHandle *mh;
-  struct GNUNET_HashCode port;
 
   mh = GNUNET_CONTAINER_multipeermap_get (cadet_map,
                                          target);
@@ -641,10 +614,6 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
                                                    &mh->target,
                                                    mh,
                                                    
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-  GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
-                      strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
-                      &port);
-
   {
     struct GNUNET_MQ_MessageHandler handlers[] = {
       GNUNET_MQ_hd_var_size (reply,
@@ -653,7 +622,11 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
                              mh),
       GNUNET_MQ_handler_end ()
     };
+    struct GNUNET_HashCode port;
 
+    GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
+                        strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
+                        &port);
     mh->channel = GNUNET_CADET_channel_creatE (cadet_handle,
                                                mh,
                                                &mh->target,
@@ -679,9 +652,10 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
  */
 struct GSF_CadetRequest *
 GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
-               const struct GNUNET_HashCode *query,
-               enum GNUNET_BLOCK_Type type,
-               GSF_CadetReplyProcessor proc, void *proc_cls)
+                 const struct GNUNET_HashCode *query,
+                 enum GNUNET_BLOCK_Type type,
+                 GSF_CadetReplyProcessor proc,
+                 void *proc_cls)
 {
   struct CadetHandle *mh;
   struct GSF_CadetRequest *sr;
diff --git a/src/fs/gnunet-service-fs_cadet_server.c 
b/src/fs/gnunet-service-fs_cadet_server.c
index 0a72a8279..adbce1154 100644
--- a/src/fs/gnunet-service-fs_cadet_server.c
+++ b/src/fs/gnunet-service-fs_cadet_server.c
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2012, 2013 GNUnet e.V.
+     Copyright (C) 2012, 2013, 2017 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -86,11 +86,6 @@ struct CadetClient
   struct GNUNET_CADET_Channel *channel;
 
   /**
-   * Handle for active write operation, or NULL.
-   */
-  struct GNUNET_CADET_TransmitHandle *wh;
-
-  /**
    * Head of write queue.
    */
   struct WriteQueueItem *wqi_head;
@@ -439,8 +434,6 @@ disconnect_cb (void *cls,
     GNUNET_SCHEDULER_cancel (sc->terminate_task);
   if (NULL != sc->timeout_task)
     GNUNET_SCHEDULER_cancel (sc->timeout_task);
-  if (NULL != sc->wh)
-    GNUNET_CADET_notify_transmit_ready_cancel (sc->wh);
   if (NULL != sc->qe)
     GNUNET_DATASTORE_cancel (sc->qe);
   while (NULL != (wqi = sc->wqi_head))
@@ -458,7 +451,6 @@ disconnect_cb (void *cls,
 }
 
 
-
 /**
  * Function called whenever an MQ-channel's transmission window size changes.
  *

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]