gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r37460 - in gnunet/src: dv include


From: gnunet
Subject: [GNUnet-SVN] r37460 - in gnunet/src: dv include
Date: Tue, 5 Jul 2016 16:17:51 +0200

Author: grothoff
Date: 2016-07-05 16:17:50 +0200 (Tue, 05 Jul 2016)
New Revision: 37460

Modified:
   gnunet/src/dv/dv.h
   gnunet/src/dv/dv_api.c
   gnunet/src/dv/gnunet-service-dv.c
   gnunet/src/dv/plugin_transport_dv.c
   gnunet/src/include/gnunet_dv_service.h
Log:
-reworking DV to use new MQ API

Modified: gnunet/src/dv/dv.h
===================================================================
--- gnunet/src/dv/dv.h  2016-07-05 12:41:49 UTC (rev 37459)
+++ gnunet/src/dv/dv.h  2016-07-05 14:17:50 UTC (rev 37460)
@@ -126,9 +126,9 @@
   struct GNUNET_MessageHeader header;
 
   /**
-   * Unique ID for this message, for confirm callback, must never be zero.
+   * Reserved for alignment. 0.
    */
-  uint32_t uid GNUNET_PACKED;
+  uint32_t reserved GNUNET_PACKED;
 
   /**
    * The (actual) target of the message
@@ -139,31 +139,6 @@
 
 
 /**
- * Message from service to DV plugin, saying that a
- * SEND request was handled.
- */
-struct GNUNET_DV_AckMessage
-{
-  /**
-   * Type: #GNUNET_MESSAGE_TYPE_DV_SEND_ACK or
-   * #GNUNET_MESSAGE_TYPE_DV_SEND_NACK.
-   */
-  struct GNUNET_MessageHeader header;
-
-  /**
-   * Which message is being acknowledged?
-   */
-  uint32_t uid GNUNET_PACKED;
-
-  /**
-   * The (actual) target of the message
-   */
-  struct GNUNET_PeerIdentity target;
-
-};
-
-
-/**
  * Message from service to DV plugin, saying that our
  * distance to another peer changed.
  */

Modified: gnunet/src/dv/dv_api.c
===================================================================
--- gnunet/src/dv/dv_api.c      2016-07-05 12:41:49 UTC (rev 37459)
+++ gnunet/src/dv/dv_api.c      2016-07-05 14:17:50 UTC (rev 37460)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2009--2013 GNUnet e.V.
+     Copyright (C) 2009--2013, 2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -37,60 +37,6 @@
 /**
  * Information we track for each peer.
  */
-struct ConnectedPeer;
-
-
-/**
- * Handle for a send operation.
- */
-struct GNUNET_DV_TransmitHandle
-{
-  /**
-   * Kept in a DLL.
-   */
-  struct GNUNET_DV_TransmitHandle *next;
-
-  /**
-   * Kept in a DLL.
-   */
-  struct GNUNET_DV_TransmitHandle *prev;
-
-  /**
-   * Handle to the service.
-   */
-  struct GNUNET_DV_ServiceHandle *sh;
-
-  /**
-   * Function to call upon completion.
-   */
-  GNUNET_DV_MessageSentCallback cb;
-
-  /**
-   * Closure for @a cb.
-   */
-  void *cb_cls;
-
-  /**
-   * The actual message (allocated at the end of this struct).
-   */
-  const struct GNUNET_MessageHeader *msg;
-
-  /**
-   * Destination for the message.
-   */
-  struct ConnectedPeer *target;
-
-  /**
-   * UID of our message, if any.
-   */
-  uint32_t uid;
-
-};
-
-
-/**
- * Information we track for each peer.
- */
 struct ConnectedPeer
 {
 
@@ -99,22 +45,6 @@
    */
   struct GNUNET_PeerIdentity pid;
 
-  /**
-   * Head of DLL of transmission handles where we need
-   * to invoke a continuation when we are informed about
-   * successful transmission.  The respective request
-   * has already been sent to the DV service.
-   */
-  struct GNUNET_DV_TransmitHandle *head;
-
-  /**
-   * Tail of DLL of transmission handles where we need
-   * to invoke a continuation when we are informed about
-   * successful transmission.  The respective request
-   * has already been sent to the DV service.
-   */
-  struct GNUNET_DV_TransmitHandle *tail;
-
 };
 
 
@@ -127,14 +57,9 @@
   /**
    * Connection to DV service.
    */
-  struct GNUNET_CLIENT_Connection *client;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
-   * Active request for transmission to DV service.
-   */
-  struct GNUNET_CLIENT_TransmitHandle *th;
-
-  /**
    * Our configuration.
    */
   const struct GNUNET_CONFIGURATION_Handle *cfg;
@@ -165,26 +90,11 @@
   GNUNET_DV_MessageReceivedCallback message_cb;
 
   /**
-   * Head of messages to transmit.
-   */
-  struct GNUNET_DV_TransmitHandle *th_head;
-
-  /**
-   * Tail of messages to transmit.
-   */
-  struct GNUNET_DV_TransmitHandle *th_tail;
-
-  /**
    * Information tracked per connected peer.  Maps peer
    * identities to `struct ConnectedPeer` entries.
    */
   struct GNUNET_CONTAINER_MultiPeerMap *peers;
 
-  /**
-   * Current unique ID
-   */
-  uint32_t uid_gen;
-
 };
 
 
@@ -198,120 +108,160 @@
 
 
 /**
- * Start sending messages from our queue to the service.
+ * We got disconnected from the service and thus all of the
+ * connections need to be torn down.
  *
- * @param sh service handle
+ * @param cls the `struct GNUNET_DV_ServiceHandle`
+ * @param key a peer identity
+ * @param value a `struct ConnectedPeer` to clean up
+ * @return #GNUNET_OK (continue to iterate)
  */
-static void
-start_transmit (struct GNUNET_DV_ServiceHandle *sh);
+static int
+cleanup_send_cb (void *cls,
+                const struct GNUNET_PeerIdentity *key,
+                void *value)
+{
+  struct GNUNET_DV_ServiceHandle *sh = cls;
+  struct ConnectedPeer *peer = value;
 
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_remove (sh->peers,
+                                                      key,
+                                                      peer));
+  sh->disconnect_cb (sh->cls,
+                     key);
+  GNUNET_free (peer);
+  return GNUNET_OK;
+}
 
+
 /**
- * Gives a message from our queue to the DV service.
+ * Handles a message sent from the DV service to us.
+ * Parse it out and give it to the plugin.
  *
- * @param cls handle to the dv service (`struct GNUNET_DV_ServiceHandle`)
- * @param size how many bytes can we send
- * @param buf where to copy the message to send
- * @return how many bytes we copied to @a buf
+ * @param cls the handle to the DV API
+ * @param cm the message that was received
  */
-static size_t
-transmit_pending (void *cls, size_t size, void *buf)
+static void
+handle_connect (void *cls,
+                const struct GNUNET_DV_ConnectMessage *cm)
 {
   struct GNUNET_DV_ServiceHandle *sh = cls;
-  char *cbuf = buf;
-  struct GNUNET_DV_TransmitHandle *th;
-  size_t ret;
-  size_t tsize;
+  struct ConnectedPeer *peer;
 
-  sh->th = NULL;
-  if (NULL == buf)
+  peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+                                            &cm->peer);
+  if (NULL != peer)
   {
+    GNUNET_break (0);
     reconnect (sh);
-    return 0;
+    return;
   }
-  ret = 0;
-  while ( (NULL != (th = sh->th_head)) &&
-         (size - ret >= (tsize = ntohs (th->msg->size)) ))
+  peer = GNUNET_new (struct ConnectedPeer);
+  peer->pid = cm->peer;
+  GNUNET_assert (GNUNET_OK ==
+                 GNUNET_CONTAINER_multipeermap_put (sh->peers,
+                                                    &peer->pid,
+                                                    peer,
+                                                    
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+  sh->connect_cb (sh->cls,
+                  &cm->peer,
+                  ntohl (cm->distance),
+                  (enum GNUNET_ATS_Network_Type) ntohl (cm->network));
+}
+
+
+/**
+ * Handles a message sent from the DV service to us.
+ * Parse it out and give it to the plugin.
+ *
+ * @param cls the handle to the DV API
+ * @param dm the message that was received
+ */
+static void
+handle_disconnect (void *cls,
+                   const struct GNUNET_DV_DisconnectMessage *dm)
+{
+  struct GNUNET_DV_ServiceHandle *sh = cls;
+  struct ConnectedPeer *peer;
+
+  peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+                                            &dm->peer);
+  if (NULL == peer)
   {
-    GNUNET_CONTAINER_DLL_remove (sh->th_head,
-                                sh->th_tail,
-                                th);
-    memcpy (&cbuf[ret], th->msg, tsize);
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Passing %u bytes of type %u to DV service\n",
-         tsize,
-         ntohs (th->msg->type));
-    th->msg = NULL;
-    ret += tsize;
-    if (NULL != th->cb)
-    {
-      GNUNET_CONTAINER_DLL_insert_tail (th->target->head,
-                                        th->target->tail,
-                                        th);
-    }
-    else
-    {
-      GNUNET_free (th);
-    }
+    GNUNET_break (0);
+    reconnect (sh);
+    return;
   }
-  if (NULL != sh->th_head)
-    start_transmit (sh);
-  return ret;
+  cleanup_send_cb (sh,
+                   &dm->peer,
+                   peer);
 }
 
 
 /**
- * Start sending messages from our queue to the service.
+ * Handles a message sent from the DV service to us.
+ * Parse it out and give it to the plugin.
  *
- * @param sh service handle
+ * @param cls the handle to the DV API
+ * @param msg the message that was received
  */
 static void
-start_transmit (struct GNUNET_DV_ServiceHandle *sh)
+handle_distance_update (void *cls,
+                        const struct GNUNET_DV_DistanceUpdateMessage *dum)
 {
-  if (NULL != sh->th)
+  struct GNUNET_DV_ServiceHandle *sh = cls;
+  struct ConnectedPeer *peer;
+
+  peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
+                                            &dum->peer);
+  if (NULL == peer)
+  {
+    GNUNET_break (0);
+    reconnect (sh);
     return;
-  if (NULL == sh->th_head)
-    return;
-  sh->th =
-    GNUNET_CLIENT_notify_transmit_ready (sh->client,
-                                        ntohs (sh->th_head->msg->size),
-                                        GNUNET_TIME_UNIT_FOREVER_REL,
-                                        GNUNET_NO,
-                                        &transmit_pending, sh);
+  }
+  sh->distance_cb (sh->cls,
+                   &dum->peer,
+                   ntohl (dum->distance),
+                   (enum GNUNET_ATS_Network_Type) ntohl (dum->network));
 }
 
 
 /**
- * We got disconnected from the service and thus all of the
- * pending send callbacks will never be confirmed.  Clean up.
+ * Handles a message sent from the DV service to us.
+ * Parse it out and give it to the plugin.
  *
- * @param cls the 'struct GNUNET_DV_ServiceHandle'
- * @param key a peer identity
- * @param value a `struct ConnectedPeer` to clean up
- * @return #GNUNET_OK (continue to iterate)
+ * @param cls the handle to the DV API
+ * @param rm the message that was received
  */
 static int
-cleanup_send_cb (void *cls,
-                const struct GNUNET_PeerIdentity *key,
-                void *value)
+check_received (void *cls,
+                const struct GNUNET_DV_ReceivedMessage *rm)
 {
   struct GNUNET_DV_ServiceHandle *sh = cls;
-  struct ConnectedPeer *peer = value;
-  struct GNUNET_DV_TransmitHandle *th;
+  const struct GNUNET_MessageHeader *payload;
 
-  GNUNET_assert (GNUNET_YES ==
-                GNUNET_CONTAINER_multipeermap_remove (sh->peers,
-                                                      key,
-                                                      peer));
-  sh->disconnect_cb (sh->cls,
-                     key);
-  while (NULL != (th = peer->head))
+  if (NULL ==
+      GNUNET_CONTAINER_multipeermap_get (sh->peers,
+                                         &rm->sender))
   {
-    GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, th);
-    th->cb (th->cb_cls);
-    GNUNET_free (th);
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
   }
-  GNUNET_free (peer);
+  if (ntohs (rm->header.size) - sizeof (struct GNUNET_DV_ReceivedMessage) <
+      sizeof (*payload))
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  payload = (const struct GNUNET_MessageHeader *) &rm[1];
+  if (ntohs (rm->header.size) !=
+      sizeof (struct GNUNET_DV_ReceivedMessage) + ntohs (payload->size))
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
   return GNUNET_OK;
 }
 
@@ -321,211 +271,38 @@
  * Parse it out and give it to the plugin.
  *
  * @param cls the handle to the DV API
- * @param msg the message that was received
+ * @param rm the message that was received
  */
 static void
-handle_message_receipt (void *cls,
-                       const struct GNUNET_MessageHeader *msg)
+handle_received (void *cls,
+                 const struct GNUNET_DV_ReceivedMessage *rm)
 {
   struct GNUNET_DV_ServiceHandle *sh = cls;
-  const struct GNUNET_DV_ConnectMessage *cm;
-  const struct GNUNET_DV_DistanceUpdateMessage *dum;
-  const struct GNUNET_DV_DisconnectMessage *dm;
-  const struct GNUNET_DV_ReceivedMessage *rm;
   const struct GNUNET_MessageHeader *payload;
-  const struct GNUNET_DV_AckMessage *ack;
-  struct GNUNET_DV_TransmitHandle *th;
-  struct GNUNET_DV_TransmitHandle *tn;
-  struct ConnectedPeer *peer;
 
-  if (NULL == msg)
-  {
-    /* Connection closed */
-    reconnect (sh);
-    return;
-  }
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Received message of type %u with %u bytes from DV service\n",
-       (unsigned int) ntohs (msg->type),
-       (unsigned int) ntohs (msg->size));
-  switch (ntohs (msg->type))
-  {
-  case GNUNET_MESSAGE_TYPE_DV_CONNECT:
-    if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ConnectMessage))
-    {
-      GNUNET_break (0);
-      reconnect (sh);
-      return;
-    }
-    cm = (const struct GNUNET_DV_ConnectMessage *) msg;
-    peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
-                                              &cm->peer);
-    if (NULL != peer)
-    {
-      GNUNET_break (0);
-      reconnect (sh);
-      return;
-    }
-    peer = GNUNET_new (struct ConnectedPeer);
-    peer->pid = cm->peer;
-    GNUNET_assert (GNUNET_OK ==
-                   GNUNET_CONTAINER_multipeermap_put (sh->peers,
-                                                      &peer->pid,
-                                                      peer,
-                                                      
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-    sh->connect_cb (sh->cls,
-                   &cm->peer,
-                   ntohl (cm->distance),
-                    (enum GNUNET_ATS_Network_Type) ntohl (cm->network));
-    break;
-  case GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED:
-    if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DistanceUpdateMessage))
-    {
-      GNUNET_break (0);
-      reconnect (sh);
-      return;
-    }
-    dum = (const struct GNUNET_DV_DistanceUpdateMessage *) msg;
-    sh->distance_cb (sh->cls,
-                    &dum->peer,
-                    ntohl (dum->distance),
-                     (enum GNUNET_ATS_Network_Type) ntohl (dum->network));
-    break;
-  case GNUNET_MESSAGE_TYPE_DV_DISCONNECT:
-    if (ntohs (msg->size) != sizeof (struct GNUNET_DV_DisconnectMessage))
-    {
-      GNUNET_break (0);
-      reconnect (sh);
-      return;
-    }
-    dm = (const struct GNUNET_DV_DisconnectMessage *) msg;
-    peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
-                                              &dm->peer);
-    if (NULL == peer)
-    {
-      GNUNET_break (0);
-      reconnect (sh);
-      return;
-    }
-    tn = sh->th_head;
-    while (NULL != (th = tn))
-    {
-      tn = th->next;
-      if (peer == th->target)
-      {
-        GNUNET_CONTAINER_DLL_remove (sh->th_head,
-                                     sh->th_tail,
-                                     th);
-        th->cb (th->cb_cls);
-        GNUNET_free (th);
-      }
-    }
-    cleanup_send_cb (sh, &dm->peer, peer);
-    break;
-  case GNUNET_MESSAGE_TYPE_DV_RECV:
-    if (ntohs (msg->size) < sizeof (struct GNUNET_DV_ReceivedMessage) + sizeof 
(struct GNUNET_MessageHeader))
-    {
-      GNUNET_break (0);
-      reconnect (sh);
-      return;
-    }
-    rm = (const struct GNUNET_DV_ReceivedMessage *) msg;
-    payload = (const struct GNUNET_MessageHeader *) &rm[1];
-    if (ntohs (msg->size) != sizeof (struct GNUNET_DV_ReceivedMessage) + ntohs 
(payload->size))
-    {
-      GNUNET_break (0);
-      reconnect (sh);
-      return;
-    }
-    if (NULL ==
-        GNUNET_CONTAINER_multipeermap_get (sh->peers,
-                                           &rm->sender))
-    {
-      GNUNET_break (0);
-      reconnect (sh);
-      return;
-    }
-    sh->message_cb (sh->cls,
-                   &rm->sender,
-                   ntohl (rm->distance),
-                   payload);
-    break;
-  case GNUNET_MESSAGE_TYPE_DV_SEND_ACK:
-  case GNUNET_MESSAGE_TYPE_DV_SEND_NACK:
-    if (ntohs (msg->size) != sizeof (struct GNUNET_DV_AckMessage))
-    {
-      GNUNET_break (0);
-      reconnect (sh);
-      return;
-    }
-    ack = (const struct GNUNET_DV_AckMessage *) msg;
-    peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
-                                              &ack->target);
-    if (NULL == peer)
-      break; /* this happens, just ignore */
-    for (th = peer->head; NULL != th; th = th->next)
-    {
-      if (th->uid != ntohl (ack->uid))
-        continue;
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Matched ACK for message to peer %s\n",
-           GNUNET_i2s (&ack->target));
-      GNUNET_CONTAINER_DLL_remove (peer->head,
-                                   peer->tail,
-                                   th);
-      th->cb (th->cb_cls);
-      GNUNET_free (th);
-      break;
-    }
-    break;
-  default:
-    reconnect (sh);
-    break;
-  }
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Received message, continuing receive loop for %p\n",
-       sh->client);
-  GNUNET_CLIENT_receive (sh->client,
-                        &handle_message_receipt, sh,
-                         GNUNET_TIME_UNIT_FOREVER_REL);
+  payload = (const struct GNUNET_MessageHeader *) &rm[1];
+  sh->message_cb (sh->cls,
+                  &rm->sender,
+                  ntohl (rm->distance),
+                  payload);
 }
 
 
 /**
- * Transmit the start message to the DV service.
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
  *
- * @param cls the `struct GNUNET_DV_ServiceHandle *`
- * @param size number of bytes available in buf
- * @param buf where to copy the message
- * @return number of bytes written to buf
+ * @param cls closure with the `struct GNUNET_DV_ServiceHandle *`
+ * @param error error code
  */
-static size_t
-transmit_start (void *cls,
-               size_t size,
-               void *buf)
+static void
+mq_error_handler (void *cls,
+                  enum GNUNET_MQ_Error error)
 {
   struct GNUNET_DV_ServiceHandle *sh = cls;
-  struct GNUNET_MessageHeader start_message;
 
-  sh->th = NULL;
-  if (NULL == buf)
-  {
-    GNUNET_break (0);
-    reconnect (sh);
-    return 0;
-  }
-  GNUNET_assert (size >= sizeof (start_message));
-  start_message.size = htons (sizeof (struct GNUNET_MessageHeader));
-  start_message.type = htons (GNUNET_MESSAGE_TYPE_DV_START);
-  memcpy (buf, &start_message, sizeof (start_message));
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Transmitting START request, starting receive loop for %p\n",
-       sh->client);
-  GNUNET_CLIENT_receive (sh->client,
-                        &handle_message_receipt, sh,
-                         GNUNET_TIME_UNIT_FOREVER_REL);
-  start_transmit (sh);
-  return sizeof (start_message);
+  reconnect (sh);
 }
 
 
@@ -537,36 +314,52 @@
 static void
 reconnect (struct GNUNET_DV_ServiceHandle *sh)
 {
-  if (NULL != sh->th)
+  GNUNET_MQ_hd_fixed_size (connect,
+                           GNUNET_MESSAGE_TYPE_DV_CONNECT,
+                           struct GNUNET_DV_ConnectMessage);
+  GNUNET_MQ_hd_fixed_size (disconnect,
+                           GNUNET_MESSAGE_TYPE_DV_DISCONNECT,
+                           struct GNUNET_DV_DisconnectMessage);
+  GNUNET_MQ_hd_fixed_size (distance_update,
+                           GNUNET_MESSAGE_TYPE_DV_DISTANCE_CHANGED,
+                           struct GNUNET_DV_DistanceUpdateMessage);
+  GNUNET_MQ_hd_var_size (received,
+                         GNUNET_MESSAGE_TYPE_DV_RECV,
+                         struct GNUNET_DV_ReceivedMessage);
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    make_connect_handler (sh),
+    make_disconnect_handler (sh),
+    make_distance_update_handler (sh),
+    make_received_handler (sh),
+    GNUNET_MQ_handler_end ()
+  };
+  struct GNUNET_MessageHeader *sm;
+  struct GNUNET_MQ_Envelope *env;
+
+  if (NULL != sh->mq)
   {
-    GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th);
-    sh->th = NULL;
+    GNUNET_MQ_destroy (sh->mq);
+    sh->mq = NULL;
   }
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Disconnecting from DV service at %p\n",
-       sh->client);
-  if (NULL != sh->client)
-  {
-    GNUNET_CLIENT_disconnect (sh->client);
-    sh->client = NULL;
-  }
   GNUNET_CONTAINER_multipeermap_iterate (sh->peers,
                                         &cleanup_send_cb,
                                         sh);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Connecting to DV service\n");
-  sh->client = GNUNET_CLIENT_connect ("dv", sh->cfg);
-  if (NULL == sh->client)
+  sh->mq = GNUNET_CLIENT_connecT (sh->cfg,
+                                  "dv",
+                                  handlers,
+                                  &mq_error_handler,
+                                  sh);
+  if (NULL == sh->mq)
   {
     GNUNET_break (0);
     return;
   }
-  sh->th = GNUNET_CLIENT_notify_transmit_ready (sh->client,
-                                               sizeof (struct 
GNUNET_MessageHeader),
-                                               GNUNET_TIME_UNIT_FOREVER_REL,
-                                               GNUNET_YES,
-                                               &transmit_start,
-                                               sh);
+  env = GNUNET_MQ_msg (sm,
+                       GNUNET_MESSAGE_TYPE_DV_START);
+  GNUNET_MQ_send (sh->mq,
+                  env);
 }
 
 
@@ -598,7 +391,8 @@
   sh->distance_cb = distance_cb;
   sh->disconnect_cb = disconnect_cb;
   sh->message_cb = message_cb;
-  sh->peers = GNUNET_CONTAINER_multipeermap_create (128, GNUNET_YES);
+  sh->peers = GNUNET_CONTAINER_multipeermap_create (128,
+                                                    GNUNET_YES);
   reconnect (sh);
   return sh;
 }
@@ -612,27 +406,13 @@
 void
 GNUNET_DV_service_disconnect (struct GNUNET_DV_ServiceHandle *sh)
 {
-  struct GNUNET_DV_TransmitHandle *pos;
-
   if (NULL == sh)
     return;
-  if (NULL != sh->th)
+  if (NULL != sh->mq)
   {
-    GNUNET_CLIENT_notify_transmit_ready_cancel (sh->th);
-    sh->th = NULL;
+    GNUNET_MQ_destroy (sh->mq);
+    sh->mq = NULL;
   }
-  while (NULL != (pos = sh->th_head))
-  {
-    GNUNET_CONTAINER_DLL_remove (sh->th_head,
-                                sh->th_tail,
-                                pos);
-    GNUNET_free (pos);
-  }
-  if (NULL != sh->client)
-  {
-    GNUNET_CLIENT_disconnect (sh->client);
-    sh->client = NULL;
-  }
   GNUNET_CONTAINER_multipeermap_iterate (sh->peers,
                                         &cleanup_send_cb,
                                         sh);
@@ -647,88 +427,41 @@
  * @param sh service handle
  * @param target intended recpient
  * @param msg message payload
- * @param cb function to invoke when done
- * @param cb_cls closure for @a cb
- * @return handle to cancel the operation
  */
-struct GNUNET_DV_TransmitHandle *
+void
 GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh,
                const struct GNUNET_PeerIdentity *target,
-               const struct GNUNET_MessageHeader *msg,
-               GNUNET_DV_MessageSentCallback cb,
-               void *cb_cls)
+               const struct GNUNET_MessageHeader *msg)
 {
-  struct GNUNET_DV_TransmitHandle *th;
   struct GNUNET_DV_SendMessage *sm;
   struct ConnectedPeer *peer;
+  struct GNUNET_MQ_Envelope *env;
 
-  if (ntohs (msg->size) + sizeof (struct GNUNET_DV_SendMessage) >= 
GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  if (ntohs (msg->size) + sizeof (*sm) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
   {
     GNUNET_break (0);
-    return NULL;
+    return;
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Asked to send %u bytes of type %u to %s via %p\n",
+       "Asked to send %u bytes of type %u to %s\n",
        (unsigned int) ntohs (msg->size),
        (unsigned int) ntohs (msg->type),
-       GNUNET_i2s (target),
-       sh->client);
+       GNUNET_i2s (target));
   peer = GNUNET_CONTAINER_multipeermap_get (sh->peers,
                                             target);
   if (NULL == peer)
   {
     GNUNET_break (0);
-    return NULL;
+    return;
   }
-  th = GNUNET_malloc (sizeof (struct GNUNET_DV_TransmitHandle) +
-                     sizeof (struct GNUNET_DV_SendMessage) +
-                     ntohs (msg->size));
-  th->sh = sh;
-  th->target = peer;
-  th->cb = cb;
-  th->cb_cls = cb_cls;
-  th->msg = (const struct GNUNET_MessageHeader *) &th[1];
-  sm = (struct GNUNET_DV_SendMessage *) &th[1];
-  sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND);
-  sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) +
-                          ntohs (msg->size));
-  if (0 == sh->uid_gen)
-    sh->uid_gen = 1;
-  th->uid = sh->uid_gen;
-  sm->uid = htonl (sh->uid_gen++);
-  /* use memcpy here as 'target' may not be sufficiently aligned */
-  memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity));
-  memcpy (&sm[1], msg, ntohs (msg->size));
-  GNUNET_CONTAINER_DLL_insert_tail (sh->th_head,
-                                    sh->th_tail,
-                                    th);
-  start_transmit (sh);
-  return th;
+  GNUNET_assert (NULL != sh->mq);
+  env = GNUNET_MQ_msg_nested_mh (sm,
+                                 GNUNET_MESSAGE_TYPE_DV_SEND,
+                                 msg);
+  sm->target = *target;
+  GNUNET_MQ_send (sh->mq,
+                  env);
 }
 
 
-/**
- * Abort send operation (naturally, the message may have
- * already been transmitted; this only stops the 'cb'
- * from being called again).
- *
- * @param th send operation to cancel
- */
-void
-GNUNET_DV_send_cancel (struct GNUNET_DV_TransmitHandle *th)
-{
-  struct GNUNET_DV_ServiceHandle *sh = th->sh;
-
-  if (NULL == th->msg)
-    GNUNET_CONTAINER_DLL_remove (th->target->head,
-                                th->target->tail,
-                                th);
-  else
-    GNUNET_CONTAINER_DLL_remove (sh->th_head,
-                                sh->th_tail,
-                                th);
-  GNUNET_free (th);
-}
-
-
 /* end of dv_api.c */

Modified: gnunet/src/dv/gnunet-service-dv.c
===================================================================
--- gnunet/src/dv/gnunet-service-dv.c   2016-07-05 12:41:49 UTC (rev 37459)
+++ gnunet/src/dv/gnunet-service-dv.c   2016-07-05 14:17:50 UTC (rev 37460)
@@ -149,11 +149,6 @@
    */
   struct GNUNET_PeerIdentity next_target;
 
-  /**
-   * Unique ID of the message.
-   */
-  uint32_t uid;
-
 };
 
 
@@ -480,33 +475,6 @@
 
 
 /**
- * Give an (N)ACK message to the plugin, we transmitted a message for it.
- *
- * @param target peer that received the message
- * @param uid plugin-chosen UID for the message
- * @param nack #GNUNET_NO to send ACK, #GNUNET_YES to send NACK
- */
-static void
-send_ack_to_plugin (const struct GNUNET_PeerIdentity *target,
-                   uint32_t uid,
-                   int nack)
-{
-  struct GNUNET_DV_AckMessage ack_msg;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Delivering ACK for message to peer `%s'\n",
-              GNUNET_i2s (target));
-  ack_msg.header.size = htons (sizeof (ack_msg));
-  ack_msg.header.type = htons ((GNUNET_YES == nack)
-                              ? GNUNET_MESSAGE_TYPE_DV_SEND_NACK
-                              : GNUNET_MESSAGE_TYPE_DV_SEND_ACK);
-  ack_msg.uid = htonl (uid);
-  ack_msg.target = *target;
-  send_control_to_plugin (&ack_msg.header);
-}
-
-
-/**
  * Send a DISTANCE_CHANGED message to the plugin.
  *
  * @param peer peer with a changed distance
@@ -613,16 +581,6 @@
                                 dn->pm_tail,
                                  pending);
     memcpy (&cbuf[off], pending->msg, msize);
-    if (0 != pending->uid)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Acking transmission of %u bytes to %s with plugin\n",
-                  (unsigned int) msize,
-                  GNUNET_i2s (&pending->next_target));
-      send_ack_to_plugin (&pending->next_target,
-                         pending->uid,
-                         GNUNET_NO);
-    }
     GNUNET_free (pending);
     off += msize;
   }
@@ -649,7 +607,6 @@
  *
  * @param target where to send the message
  * @param distance distance to the @a sender
- * @param uid unique ID for the message
  * @param sender original sender of the message
  * @param actual_target ultimate recipient for the message
  * @param payload payload of the message
@@ -657,7 +614,6 @@
 static void
 forward_payload (struct DirectNeighbor *target,
                 uint32_t distance,
-                uint32_t uid,
                 const struct GNUNET_PeerIdentity *sender,
                 const struct GNUNET_PeerIdentity *actual_target,
                 const struct GNUNET_MessageHeader *payload)
@@ -667,7 +623,6 @@
   size_t msize;
 
   if ( (target->pm_queue_size >= MAX_QUEUE_SIZE) &&
-       (0 == uid) &&
        (0 != memcmp (sender,
                     &my_identity,
                     sizeof (struct GNUNET_PeerIdentity))) )
@@ -686,7 +641,6 @@
   }
   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
   pm->next_target = target->peer;
-  pm->uid = uid;
   pm->msg = (const struct GNUNET_MessageHeader *) &pm[1];
   rm = (struct RouteMessage *) &pm[1];
   rm->header.size = htons ((uint16_t) msize);
@@ -1888,7 +1842,6 @@
              GNUNET_i2s (&neighbor->peer));
   forward_payload (neighbor,
                   distance + 1,
-                  0,
                   &rm->sender,
                   &rm->target,
                   payload);
@@ -1920,7 +1873,6 @@
     return;
   }
   msg = (const struct GNUNET_DV_SendMessage *) message;
-  GNUNET_break (0 != ntohl (msg->uid));
   payload = (const struct GNUNET_MessageHeader *) &msg[1];
   if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs 
(payload->size))
   {
@@ -1940,7 +1892,6 @@
     GNUNET_STATISTICS_update (stats,
                              "# local messages discarded (no route)",
                              1, GNUNET_NO);
-    send_ack_to_plugin (&msg->target, ntohl (msg->uid), GNUNET_YES);
     GNUNET_SERVER_receive_done (client, GNUNET_OK);
     return;
   }
@@ -1952,7 +1903,6 @@
 
   forward_payload (route->next_hop,
                   0 /* first hop, distance is zero */,
-                  htonl (msg->uid),
                   &my_identity,
                   &msg->target,
                   payload);

Modified: gnunet/src/dv/plugin_transport_dv.c
===================================================================
--- gnunet/src/dv/plugin_transport_dv.c 2016-07-05 12:41:49 UTC (rev 37459)
+++ gnunet/src/dv/plugin_transport_dv.c 2016-07-05 14:17:50 UTC (rev 37460)
@@ -45,51 +45,6 @@
 
 
 /**
- * An active request for transmission via DV.
- */
-struct PendingRequest
-{
-
-  /**
-   * This is a DLL.
-   */
-  struct PendingRequest *next;
-
-  /**
-   * This is a DLL.
-   */
-  struct PendingRequest *prev;
-
-  /**
-   * Continuation function to call once the transmission buffer
-   * has again space available.  NULL if there is no
-   * continuation to call.
-   */
-  GNUNET_TRANSPORT_TransmitContinuation transmit_cont;
-
-  /**
-   * Closure for @e transmit_cont.
-   */
-  void *transmit_cont_cls;
-
-  /**
-   * Transmission handle from DV client library.
-   */
-  struct GNUNET_DV_TransmitHandle *th;
-
-  /**
-   * Session of this request.
-   */
-  struct GNUNET_ATS_Session *session;
-
-  /**
-   * Number of bytes to transmit.
-   */
-  size_t size;
-};
-
-
-/**
  * Session handle for connections.
  */
 struct GNUNET_ATS_Session
@@ -100,16 +55,6 @@
   struct Plugin *plugin;
 
   /**
-   * Head of pending requests.
-   */
-  struct PendingRequest *pr_head;
-
-  /**
-   * Tail of pending requests.
-   */
-  struct PendingRequest *pr_tail;
-
-  /**
    * Address we use for the other peer.
    */
   struct GNUNET_HELLO_Address *address;
@@ -449,7 +394,6 @@
 free_session (struct GNUNET_ATS_Session *session)
 {
   struct Plugin *plugin = session->plugin;
-  struct PendingRequest *pr;
 
   GNUNET_assert (GNUNET_YES ==
                 GNUNET_CONTAINER_multipeermap_remove (plugin->sessions,
@@ -470,20 +414,6 @@
                              session);
     session->active = GNUNET_NO;
   }
-  while (NULL != (pr = session->pr_head))
-  {
-    GNUNET_CONTAINER_DLL_remove (session->pr_head,
-                                session->pr_tail,
-                                pr);
-    GNUNET_DV_send_cancel (pr->th);
-    pr->th = NULL;
-    if (NULL != pr->transmit_cont)
-      pr->transmit_cont (pr->transmit_cont_cls,
-                        &session->sender,
-                        GNUNET_SYSERR,
-                         pr->size, 0);
-    GNUNET_free (pr);
-  }
   GNUNET_HELLO_address_free (session->address);
   GNUNET_free (session);
 }
@@ -515,31 +445,6 @@
 
 
 /**
- * Function called once the delivery of a message has been successful.
- * Clean up the pending request, and call continuations.
- *
- * @param cls closure
- */
-static void
-send_finished (void *cls)
-{
-  struct PendingRequest *pr = cls;
-  struct GNUNET_ATS_Session *session = pr->session;
-
-  pr->th = NULL;
-  GNUNET_CONTAINER_DLL_remove (session->pr_head,
-                              session->pr_tail,
-                              pr);
-  if (NULL != pr->transmit_cont)
-    pr->transmit_cont (pr->transmit_cont_cls,
-                      &session->sender,
-                      GNUNET_OK,
-                       pr->size, 0);
-  GNUNET_free (pr);
-}
-
-
-/**
  * Function that can be used by the transport service to transmit
  * a message using the plugin.
  *
@@ -565,10 +470,10 @@
                 size_t msgbuf_size,
                 unsigned int priority,
                 struct GNUNET_TIME_Relative timeout,
-                GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
+                GNUNET_TRANSPORT_TransmitContinuation cont,
+                void *cont_cls)
 {
   struct Plugin *plugin = cls;
-  struct PendingRequest *pr;
   const struct GNUNET_MessageHeader *msg;
   struct GNUNET_MessageHeader *box;
 
@@ -585,20 +490,13 @@
     memcpy (&box[1], msgbuf, msgbuf_size);
     msg = box;
   }
-  pr = GNUNET_new (struct PendingRequest);
-  pr->transmit_cont = cont;
-  pr->transmit_cont_cls = cont_cls;
-  pr->session = session;
-  pr->size = msgbuf_size;
-  GNUNET_CONTAINER_DLL_insert_tail (session->pr_head,
-                                   session->pr_tail,
-                                   pr);
-
-  pr->th = GNUNET_DV_send (plugin->dvh,
-                          &session->sender,
-                          msg,
-                          &send_finished,
-                          pr);
+  GNUNET_DV_send (plugin->dvh,
+                  &session->sender,
+                  msg);
+  cont (cont_cls,
+        &session->sender,
+        GNUNET_OK,
+        msgbuf_size, 0);
   GNUNET_free_non_null (box);
   return 0; /* DV */
 }
@@ -618,26 +516,11 @@
 {
   struct Plugin *plugin = cls;
   struct GNUNET_ATS_Session *session;
-  struct PendingRequest *pr;
 
   session = GNUNET_CONTAINER_multipeermap_get (plugin->sessions,
                                               target);
   if (NULL == session)
     return; /* nothing to do */
-  while (NULL != (pr = session->pr_head))
-  {
-    GNUNET_CONTAINER_DLL_remove (session->pr_head,
-                                session->pr_tail,
-                                pr);
-    GNUNET_DV_send_cancel (pr->th);
-    pr->th = NULL;
-    if (NULL != pr->transmit_cont)
-      pr->transmit_cont (pr->transmit_cont_cls,
-                        &session->sender,
-                        GNUNET_SYSERR,
-                         pr->size, 0);
-    GNUNET_free (pr);
-  }
   session->active = GNUNET_NO;
 }
 
@@ -655,22 +538,6 @@
 dv_plugin_disconnect_session (void *cls,
                               struct GNUNET_ATS_Session *session)
 {
-  struct PendingRequest *pr;
-
-  while (NULL != (pr = session->pr_head))
-  {
-    GNUNET_CONTAINER_DLL_remove (session->pr_head,
-                                session->pr_tail,
-                                pr);
-    GNUNET_DV_send_cancel (pr->th);
-    pr->th = NULL;
-    if (NULL != pr->transmit_cont)
-      pr->transmit_cont (pr->transmit_cont_cls,
-                        &session->sender,
-                        GNUNET_SYSERR,
-                         pr->size, 0);
-    GNUNET_free (pr);
-  }
   session->active = GNUNET_NO;
   return GNUNET_OK;
 }
@@ -691,9 +558,11 @@
  * @param asc_cls closure for @a asc
  */
 static void
-dv_plugin_address_pretty_printer (void *cls, const char *type,
+dv_plugin_address_pretty_printer (void *cls,
+                                  const char *type,
                                   const void *addr,
-                                  size_t addrlen, int numeric,
+                                  size_t addrlen,
+                                  int numeric,
                                   struct GNUNET_TIME_Relative timeout,
                                   GNUNET_TRANSPORT_AddressStringCallback asc,
                                   void *asc_cls)

Modified: gnunet/src/include/gnunet_dv_service.h
===================================================================
--- gnunet/src/include/gnunet_dv_service.h      2016-07-05 12:41:49 UTC (rev 
37459)
+++ gnunet/src/include/gnunet_dv_service.h      2016-07-05 14:17:50 UTC (rev 
37460)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2013 GNUnet e.V.
+     Copyright (C) 2013, 2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -153,29 +153,14 @@
  * @param sh service handle
  * @param target intended recpient
  * @param msg message payload
- * @param cb function to invoke when done
- * @param cb_cls closure for 'cb'
  * @return handle to cancel the operation
  */
-struct GNUNET_DV_TransmitHandle *
+void
 GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh,
                const struct GNUNET_PeerIdentity *target,
-               const struct GNUNET_MessageHeader *msg,
-               GNUNET_DV_MessageSentCallback cb,
-               void *cb_cls);
+               const struct GNUNET_MessageHeader *msg);
 
 
-/**
- * Abort send operation (naturally, the message may have
- * already been transmitted; this only stops the 'cb'
- * from being called again).
- *
- * @param th send operation to cancel
- */
-void
-GNUNET_DV_send_cancel (struct GNUNET_DV_TransmitHandle *th);
-
-
 #endif
 
 /** @} */  /* end of group */




reply via email to

[Prev in Thread] Current Thread [Next in Thread]