gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r19544 - gnunet/src/transport


From: gnunet
Subject: [GNUnet-SVN] r19544 - gnunet/src/transport
Date: Mon, 30 Jan 2012 18:41:58 +0100

Author: wachs
Date: 2012-01-30 18:41:58 +0100 (Mon, 30 Jan 2012)
New Revision: 19544

Modified:
   gnunet/src/transport/plugin_transport_udp_new.c
   gnunet/src/transport/plugin_transport_udp_new.h
Log:
- latest changes


Modified: gnunet/src/transport/plugin_transport_udp_new.c
===================================================================
--- gnunet/src/transport/plugin_transport_udp_new.c     2012-01-30 16:38:34 UTC 
(rev 19543)
+++ gnunet/src/transport/plugin_transport_udp_new.c     2012-01-30 17:41:58 UTC 
(rev 19544)
@@ -44,6 +44,27 @@
 
 
 /**
+ * Number of messages we can defragment in parallel.  We only really
+ * defragment 1 message at a time, but if messages get re-ordered, we
+ * may want to keep knowledge about the previous message to avoid
+ * discarding the current message in favor of a single fragment of a
+ * previous message.  3 should be good since we don't expect massive
+ * message reorderings with UDP.
+ */
+#define UDP_MAX_MESSAGES_IN_DEFRAG 3
+
+/**
+ * We keep a defragmentation queue per sender address.  How many
+ * sender addresses do we support at the same time? Memory consumption
+ * is roughly a factor of 32k * UDP_MAX_MESSAGES_IN_DEFRAG times this
+ * value. (So 128 corresponds to 12 MB and should suffice for
+ * connecting to roughly 128 peers via UDP).
+ */
+#define UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG 128
+
+
+
+/**
  * Closure for 'append_port'.
  */
 struct PrettyPrinterContext
@@ -82,6 +103,9 @@
    * Desired delay for next sending we received from other peer
    */
   struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
+
+  struct FragmentationContext * head;
+  struct FragmentationContext * tail;
 };
 
 
@@ -117,6 +141,142 @@
 
 
 /**
+ * Closure for 'find_receive_context'.
+ */
+struct FindReceiveContext
+{
+  /**
+   * Where to store the result.
+   */
+  struct ReceiveContext *rc;
+
+  /**
+   * Address to find.
+   */
+  const struct sockaddr *addr;
+
+  /**
+   * Number of bytes in 'addr'.
+   */
+  socklen_t addr_len;
+
+  struct Session *session;
+};
+
+
+
+/**
+ * Data structure to track defragmentation contexts based
+ * on the source of the UDP traffic.
+ */
+struct ReceiveContext
+{
+
+  /**
+   * Defragmentation context.
+   */
+  struct GNUNET_DEFRAGMENT_Context *defrag;
+
+  /**
+   * Source address this receive context is for (allocated at the
+   * end of the struct).
+   */
+  const struct sockaddr *src_addr;
+
+  /**
+   * Reference to master plugin struct.
+   */
+  struct Plugin *plugin;
+
+  /**
+   * Node in the defrag heap.
+   */
+  struct GNUNET_CONTAINER_HeapNode *hnode;
+
+  /**
+   * Length of 'src_addr'
+   */
+  size_t addr_len;
+
+  struct GNUNET_PeerIdentity id;
+
+};
+
+
+
+/**
+ * Closure for 'process_inbound_tokenized_messages'
+ */
+struct FragmentationContext
+{
+  struct FragmentationContext * next;
+  struct FragmentationContext * prev;
+
+  struct Plugin * plugin;
+  struct GNUNET_FRAGMENT_Context * frag;
+  struct Session * session;
+
+  /**
+   * Function to call upon completion of the transmission.
+   */
+  GNUNET_TRANSPORT_TransmitContinuation cont;
+
+  /**
+   * Closure for 'cont'.
+   */
+  void *cont_cls;
+
+  size_t bytes_to_send;
+};
+
+
+struct UDPMessageWrapper
+{
+  struct Session *session;
+  struct UDPMessageWrapper *prev;
+  struct UDPMessageWrapper *next;
+  struct UDPMessage *udp;
+  size_t msg_size;
+  /**
+   * Function to call upon completion of the transmission.
+   */
+  GNUNET_TRANSPORT_TransmitContinuation cont;
+
+  /**
+   * Closure for 'cont'.
+   */
+  void *cont_cls;
+
+  struct FragmentationContext *frag;
+
+};
+
+
+/**
+ * UDP ACK Message-Packet header (after defragmentation).
+ */
+struct UDP_ACK_Message
+{
+  /**
+   * Message header.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Desired delay for flow control
+   */
+  uint32_t delay;
+
+  /**
+   * What is the identity of the sender
+   */
+  struct GNUNET_PeerIdentity sender;
+
+};
+
+
+
+/**
  * Function called for a quick conversion of the binary address to
  * a numeric address.  Note that the caller must not free the
  * address and that the next call to this function is allowed
@@ -364,6 +524,14 @@
          GNUNET_i2s (&s->target),
          GNUNET_a2s (s->sock_addr, s->addrlen));
 #endif
+  struct FragmentationContext *fctx = s->head;
+  while (fctx != NULL)
+  {
+    GNUNET_FRAGMENT_context_destroy(fctx->frag);
+    GNUNET_CONTAINER_DLL_remove(s->head, s->tail, fctx);
+    GNUNET_free (fctx);
+    fctx = s->head;
+  }
 
   plugin->env->session_end (plugin->env->cls, &s->target, s);
 
@@ -576,7 +744,44 @@
   return s;
 }
 
+/**
+ * Function that is called with messages created by the fragmentation
+ * module.  In the case of the 'proc' callback of the
+ * GNUNET_FRAGMENT_context_create function, this function must
+ * eventually call 'GNUNET_FRAGMENT_context_transmission_done'.
+ *
+ * @param cls closure, the 'struct FragmentationContext'
+ * @param msg the message that was created
+ */
+static void
+enqueue_fragment (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+  struct FragmentationContext *frag_ctx = cls;
+  struct Plugin *plugin = frag_ctx->plugin;
+  struct UDPMessageWrapper * udpw;
 
+  size_t msg_len = ntohs (msg->size);
+
+#if DEBUG_UDP
+#endif
+  GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Enqueueing fragment with %u bytes 
%u\n", msg_len , sizeof (struct UDPMessageWrapper));
+
+
+  udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + msg_len);
+  udpw->session = frag_ctx->session;
+  udpw->udp = (struct UDPMessage *) &udpw[1];
+
+  udpw->msg_size = msg_len;
+  udpw->cont = frag_ctx->cont;
+  udpw->cont_cls = frag_ctx->cont_cls;
+  udpw->frag = frag_ctx;
+
+  memcpy (udpw->udp, msg, msg_len);
+
+  GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, udpw);
+}
+
+
 /**
  * Function that can be used by the transport service to transmit
  * a message using the plugin.   Note that in the case of a
@@ -613,12 +818,12 @@
                   GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
 {
   struct Plugin *plugin = cls;
-  size_t mlen = msgbuf_size + sizeof (struct UDPMessage);;
+  size_t mlen = msgbuf_size + sizeof (struct UDPMessage);
 
   struct GNUNET_TIME_Relative delta;
   struct UDPMessageWrapper * udpw;
   struct UDPMessage *udp;
-
+  char mbuf[mlen];
   GNUNET_assert (plugin != NULL);
   GNUNET_assert (s != NULL);
 
@@ -628,34 +833,63 @@
     return GNUNET_SYSERR;
   }
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "UDP transmits %u-byte message to `%s' using address `%s'\n",
-         msgbuf_size,
-         GNUNET_i2s (&s->target),
-         GNUNET_a2s(s->sock_addr, s->addrlen));
-
   if (GNUNET_YES != 
GNUNET_CONTAINER_multihashmap_contains_value(plugin->sessions, 
&s->target.hashPubKey, s))
   {
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
 
-  udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + sizeof (struct 
UDPMessage) + msgbuf_size);
-  udpw->session = s;
-  udp = (struct UDPMessage *) &udpw[1];
-  udpw->udp = udp;
-  udpw->msg_size = mlen;
-  udpw->cont = cont;
-  udpw->cont_cls = cont_cls;
+  LOG (GNUNET_ERROR_TYPE_ERROR,
+       "UDP transmits %u-byte message to `%s' using address `%s'\n",
+         msgbuf_size,
+         GNUNET_i2s (&s->target),
+         GNUNET_a2s(s->sock_addr, s->addrlen));
 
+  /* Message */
+  udp = (struct UDPMessage *) mbuf;
   udp->header.size = htons (mlen);
   udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE);
   udp->reserved = htonl (0);
   udp->sender = *plugin->env->my_identity;
-  memcpy (&udp[1], msgbuf, msgbuf_size);
 
-  GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, udpw);
+  if (mlen <= UDP_MTU)
+  {
+    udpw = GNUNET_malloc (sizeof (struct UDPMessageWrapper) + mlen);
+    udpw->session = s;
+    udpw->udp = (struct UDPMessage *) &udpw[1];
+    udpw->msg_size = mlen;
+    udpw->cont = cont;
+    udpw->cont_cls = cont_cls;
+    udpw->frag = NULL;
+    memcpy (udpw->udp, udp, sizeof (struct UDPMessage));
+    memcpy (&udpw->udp[1], msgbuf, msgbuf_size);
 
+    GNUNET_CONTAINER_DLL_insert(plugin->msg_head, plugin->msg_tail, udpw);
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         "UDP has to fragment message \n");
+    memcpy (&udp[1], msgbuf, msgbuf_size);
+    struct FragmentationContext * frag_ctx = GNUNET_malloc(sizeof (struct 
FragmentationContext));
+
+    frag_ctx->plugin = plugin;
+    frag_ctx->session = s;
+    frag_ctx->cont = cont;
+    frag_ctx->cont_cls = cont_cls;
+    frag_ctx->bytes_to_send = mlen;
+    frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
+              UDP_MTU,
+              &plugin->tracker,
+              plugin->last_expected_delay,
+              &udp->header,
+              &enqueue_fragment,
+              frag_ctx);
+
+    GNUNET_CONTAINER_DLL_insert(s->head, s->tail, frag_ctx);
+
+  }
+
   delta = GNUNET_TIME_absolute_get_remaining (s->flow_delay_from_other_peer);
   return mlen;
 }
@@ -847,6 +1081,108 @@
 
 
 /**
+ * Scan the heap for a receive context with the given address.
+ *
+ * @param cls the 'struct FindReceiveContext'
+ * @param node internal node of the heap
+ * @param element value stored at the node (a 'struct ReceiveContext')
+ * @param cost cost associated with the node
+ * @return GNUNET_YES if we should continue to iterate,
+ *         GNUNET_NO if not.
+ */
+static int
+find_receive_context (void *cls, struct GNUNET_CONTAINER_HeapNode *node,
+                      void *element, GNUNET_CONTAINER_HeapCostType cost)
+{
+  struct FindReceiveContext *frc = cls;
+  struct ReceiveContext *e = element;
+
+  if ((frc->addr_len == e->addr_len) &&
+      (0 == memcmp (frc->addr, e->src_addr, frc->addr_len)))
+  {
+    frc->rc = e;
+    return GNUNET_NO;
+  }
+  return GNUNET_YES;
+}
+
+
+/**
+ * Process a defragmented message.
+ *
+ * @param cls the 'struct ReceiveContext'
+ * @param msg the message
+ */
+static void
+fragment_msg_proc (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+  struct ReceiveContext *rc = cls;
+
+  if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  if (ntohs (msg->size) < sizeof (struct UDPMessage))
+  {
+    GNUNET_break (0);
+    return;
+  }
+  process_udp_message (rc->plugin, (const struct UDPMessage *) msg,
+                       rc->src_addr, rc->addr_len);
+}
+
+/**
+ * Transmit an acknowledgement.
+ *
+ * @param cls the 'struct ReceiveContext'
+ * @param id message ID (unused)
+ * @param msg ack to transmit
+ */
+static void
+ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
+{
+#if 0
+  struct ReceiveContext *rc = cls;
+
+  size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
+  char buf[msize];
+  struct UDP_ACK_Message *udp_ack;
+  uint32_t delay = 0;
+
+  struct Session *s;
+
+  s = find_inbound_session_by_addr (rc->plugin, rc->src_addr, rc->addr_len);
+  if (s != NULL)
+  {
+    if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
+      delay = s->flow_delay_for_other_peer.rel_value;
+    else
+      delay = UINT32_MAX;
+  }
+
+
+#if DEBUG_UDP
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Sending ACK to `%s' including delay of %u ms\n",
+       GNUNET_a2s (rc->src_addr,
+                   (rc->src_addr->sa_family ==
+                    AF_INET) ? sizeof (struct sockaddr_in) : sizeof (struct
+                                                                     
sockaddr_in6)),
+       delay);
+#endif
+  udp_ack = (struct UDP_ACK_Message *) buf;
+  udp_ack->header.size = htons ((uint16_t) msize);
+  udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
+  udp_ack->delay = htonl (delay);
+  udp_ack->sender = *rc->plugin->env->my_identity;
+  memcpy (&udp_ack[1], msg, ntohs (msg->size));
+  (void) udp_send (rc->plugin, rc->src_addr, &udp_ack->header);
+#endif
+}
+
+
+/**
  * Read and process a message from the given socket.
  *
  * @param plugin the overall plugin
@@ -863,9 +1199,9 @@
   //const struct GNUNET_MessageHeader *ack;
   //struct Session *peer_session;
   //const struct UDP_ACK_Message *udp_ack;
-  //struct ReceiveContext *rc;
-  //struct GNUNET_TIME_Absolute now;
-  //struct FindReceiveContext frc;
+  struct ReceiveContext *rc;
+  struct GNUNET_TIME_Absolute now;
+  struct FindReceiveContext frc;
   //struct Session *s = NULL;
   //struct GNUNET_TIME_Relative flow_delay;
   //struct GNUNET_ATS_Information ats;
@@ -903,10 +1239,68 @@
       GNUNET_break_op (0);
       return;
     }
-
     process_udp_message (plugin, (const struct UDPMessage *) msg,
                          (const struct sockaddr *) addr, fromlen);
+    return;
+  case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
+    if (ntohs (msg->size) <
+        sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
+    {
+      GNUNET_break_op (0);
+      return;
+    }
+    /* TODO */
+    GNUNET_break_op (0);
+    return;
+  case GNUNET_MESSAGE_TYPE_FRAGMENT:
+    frc.rc = NULL;
+    frc.addr = (const struct sockaddr *) addr;
+    frc.addr_len = fromlen;
+    GNUNET_CONTAINER_heap_iterate (plugin->defrags,
+                                   &find_receive_context,
+                                   &frc);
+    now = GNUNET_TIME_absolute_get ();
+    rc = frc.rc;
+    if (rc == NULL)
+    {
+      /* need to create a new RC */
+      rc = GNUNET_malloc (sizeof (struct ReceiveContext) + fromlen);
+      memcpy (&rc[1], addr, fromlen);
+      rc->src_addr = (const struct sockaddr *) &rc[1];
+      rc->addr_len = fromlen;
+      rc->plugin = plugin;
+      rc->defrag =
+          GNUNET_DEFRAGMENT_context_create (plugin->env->stats, UDP_MTU,
+                                            UDP_MAX_MESSAGES_IN_DEFRAG, rc,
+                                            &fragment_msg_proc, &ack_proc);
+      rc->hnode =
+          GNUNET_CONTAINER_heap_insert (plugin->defrags, rc,
+                                        (GNUNET_CONTAINER_HeapCostType)
+                                        now.abs_value);
+    }
+#if DEBUG_UDP
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "UDP processes %u-byte fragment from `%s'\n",
+         (unsigned int) ntohs (msg->size),
+         GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
+#endif
 
+    if (GNUNET_OK == GNUNET_DEFRAGMENT_process_fragment (rc->defrag, msg))
+    {
+      /* keep this 'rc' from expiring */
+      GNUNET_CONTAINER_heap_update_cost (plugin->defrags, rc->hnode,
+                                         (GNUNET_CONTAINER_HeapCostType)
+                                         now.abs_value);
+    }
+    if (GNUNET_CONTAINER_heap_get_size (plugin->defrags) >
+        UDP_MAX_SENDER_ADDRESSES_WITH_DEFRAG)
+    {
+      /* remove 'rc' that was inactive the longest */
+      rc = GNUNET_CONTAINER_heap_remove_root (plugin->defrags);
+      GNUNET_assert (NULL != rc);
+      GNUNET_DEFRAGMENT_context_destroy (rc->defrag);
+      GNUNET_free (rc);
+    }
+
     return;
   default:
     GNUNET_break_op (0);
@@ -958,8 +1352,17 @@
        (unsigned int) ntohs (udpw->msg_size), GNUNET_a2s (sa, slen), (int) 
sent,
        (sent < 0) ? STRERROR (errno) : "ok");
 
-  if (udpw->cont != NULL)
-    udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_OK);
+  /* This was just a message fragment */
+  if (udpw->frag != NULL)
+  {
+    GNUNET_FRAGMENT_context_transmission_done (udpw->frag->frag);
+  }
+  /* This was a complete message*/
+  else
+  {
+    if (udpw->cont != NULL)
+      udpw->cont (udpw->cont_cls, &udpw->session->target, GNUNET_OK);
+  }
 
   GNUNET_CONTAINER_DLL_remove(plugin->msg_head, plugin->msg_tail, udpw);
   GNUNET_free (udpw);
@@ -1273,6 +1676,7 @@
 
 
   plugin->sessions = GNUNET_CONTAINER_multihashmap_create (10);
+  plugin->defrags = GNUNET_CONTAINER_heap_create 
(GNUNET_CONTAINER_HEAP_ORDER_MIN);
   plugin->mst = GNUNET_SERVER_mst_create (&process_inbound_tokenized_messages, 
plugin);
   plugin->port = port;
   plugin->aport = aport;
@@ -1347,6 +1751,11 @@
   GNUNET_NETWORK_fdset_destroy (plugin->ws);
   GNUNET_NAT_unregister (plugin->nat);
 
+  if (plugin->defrags != NULL)
+  {
+    GNUNET_CONTAINER_heap_destroy(plugin->defrags);
+    plugin->defrags = NULL;
+  }
   if (plugin->mst != NULL)
   {
     GNUNET_SERVER_mst_destroy(plugin->mst);

Modified: gnunet/src/transport/plugin_transport_udp_new.h
===================================================================
--- gnunet/src/transport/plugin_transport_udp_new.h     2012-01-30 16:38:34 UTC 
(rev 19543)
+++ gnunet/src/transport/plugin_transport_udp_new.h     2012-01-30 17:41:58 UTC 
(rev 19544)
@@ -110,24 +110,7 @@
 
 };
 
-struct UDPMessageWrapper
-{
-  struct Session *session;
-  struct UDPMessageWrapper *prev;
-  struct UDPMessageWrapper *next;
-  struct UDPMessage *udp;
-  size_t msg_size;
-  /**
-   * Function to call upon completion of the transmission.
-   */
-  GNUNET_TRANSPORT_TransmitContinuation cont;
 
-  /**
-   * Closure for 'cont'.
-   */
-  void *cont_cls;
-};
-
 /**
  * Encapsulation of all of the state of the plugin.
  */




reply via email to

[Prev in Thread] Current Thread [Next in Thread]