gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r17662 - gnunet/src/transport


From: gnunet
Subject: [GNUnet-SVN] r17662 - gnunet/src/transport
Date: Fri, 21 Oct 2011 17:55:33 +0200

Author: wachs
Date: 2011-10-21 17:55:33 +0200 (Fri, 21 Oct 2011)
New Revision: 17662

Modified:
   gnunet/src/transport/plugin_transport_udp.c
Log:
transmitting  flow control information between peers


Modified: gnunet/src/transport/plugin_transport_udp.c
===================================================================
--- gnunet/src/transport/plugin_transport_udp.c 2011-10-21 15:54:30 UTC (rev 
17661)
+++ gnunet/src/transport/plugin_transport_udp.c 2011-10-21 15:55:33 UTC (rev 
17662)
@@ -94,6 +94,28 @@
 
 
 /**
+ * UDP ACK Message-Packet header (after defragmentation).
+ */
+struct UDP_ACK_Message
+{
+  /**
+   * Message header.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Desired delay for flow control
+   */
+  uint32_t delay;
+
+  /**
+   * What is the identity of the sender
+   */
+  struct GNUNET_PeerIdentity sender;
+};
+
+
+/**
  * Network format for IPv4 addresses.
  */
 struct IPv4UdpAddress
@@ -174,6 +196,16 @@
   struct GNUNET_TIME_Absolute valid_until;
 
   GNUNET_SCHEDULER_TaskIdentifier invalidation_task;
+
+  /*
+   * Desired delay for next sending we send to other peer
+   */
+  struct GNUNET_TIME_Relative flow_delay_for_other_peer;
+
+  /*
+   * Desired delay for next sending we received from other peer
+   */
+  struct GNUNET_TIME_Absolute flow_delay_from_other_peer;
 };
 
 
@@ -210,6 +242,8 @@
    */
   size_t addr_len;
 
+  struct GNUNET_PeerIdentity id;
+
 };
 
 
@@ -363,8 +397,45 @@
   return psc.result;
 }
 
+int inbound_session_by_addr_iterator (void *cls,
+                             const GNUNET_HashCode * key,
+                             void *value)
+{
+  struct PeerSessionIteratorContext *psc = cls;
+  struct Session *s = value;
+  if (s->addrlen == psc->addrlen)
+  {
+    if (0 == memcmp (&s[1], psc->addr, s->addrlen))
+      psc->result = s;
+  }
+  if (psc->result != NULL)
+    return GNUNET_NO;
+  else
+    return GNUNET_YES;
+};
 
 /**
+ * Lookup the session for the given peer just by address.
+ *
+ * @param plugin the plugin
+ * @param addr address
+ * @param addrlen address length
+ * @return NULL if we have no session
+ */
+struct Session *
+find_inbound_session_by_addr (struct Plugin *plugin, const void * addr, size_t 
addrlen)
+{
+  struct PeerSessionIteratorContext psc;
+  psc.result = NULL;
+  psc.addrlen = addrlen;
+  psc.addr = addr;
+
+  GNUNET_CONTAINER_multihashmap_iterate (plugin->inbound_sessions, 
&inbound_session_by_addr_iterator, &psc);
+  return psc.result;
+}
+
+
+/**
  * Destroy a session, plugin is being unloaded.
  *
  * @param cls unused
@@ -633,6 +704,7 @@
   if ((force_address == GNUNET_SYSERR) && (session == NULL))
     return GNUNET_SYSERR;
 
+  s = NULL;
   /* safety check: comparing address to address stored in session */
   if ((session != NULL) && (addr != NULL) && (addrlen != 0))
   {
@@ -699,6 +771,22 @@
   udp->sender = *plugin->env->my_identity;
   memcpy (&udp[1], msgbuf, msgbuf_size);
 
+  if (s != NULL)
+  {
+    struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get();
+    if (s->flow_delay_from_other_peer.abs_value > now.abs_value)
+    {
+      struct GNUNET_TIME_Relative delta = 
GNUNET_TIME_absolute_get_difference(now, s->flow_delay_from_other_peer);
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+                  "We try to send to early! Should in %llu!\n", 
delta.rel_value);
+    }
+    else
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+                  "We can send!\n");
+  }
+  else
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+                "SENDING without session!\n");
   if (mlen <= UDP_MTU)
   {
     mlen = udp_send (plugin, peer_session->sock_addr, &udp->header);
@@ -763,6 +851,7 @@
   struct Plugin *plugin = cls;
   struct SourceInformation *si = client;
   struct GNUNET_ATS_Information distance;
+  struct GNUNET_TIME_Relative delay;
 
   /* setup ATS */
   distance.type = htonl (GNUNET_ATS_QUALITY_NET_DISTANCE);
@@ -770,8 +859,9 @@
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
                    "Giving Session %X %s  to transport\n", si->session, 
GNUNET_i2s(&si->session->target));
-  plugin->env->receive (plugin->env->cls, &si->sender, hdr, &distance, 1, 
si->session,
+  delay = plugin->env->receive (plugin->env->cls, &si->sender, hdr, &distance, 
1, si->session,
                         si->arg, si->args);
+  si->session->flow_delay_for_other_peer = delay;
 }
 
 static void
@@ -938,24 +1028,38 @@
 ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
 {
   struct ReceiveContext *rc = cls;
-  size_t msize = sizeof (struct UDPMessage) + ntohs (msg->size);
+
+  size_t msize = sizeof (struct UDP_ACK_Message) + ntohs (msg->size);
   char buf[msize];
-  struct UDPMessage *udp;
+  struct UDP_ACK_Message *udp_ack;
+  uint32_t delay = 0;
 
+  struct Session *s;
+  s = find_inbound_session_by_addr (rc->plugin, rc->src_addr, rc->addr_len);
+  if (s != NULL)
+  {
+    if (s->flow_delay_for_other_peer.rel_value <= UINT32_MAX)
+      delay = s->flow_delay_for_other_peer.rel_value;
+    else
+      delay = UINT32_MAX;
+  }
+
+
 #if DEBUG_UDP
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s'\n",
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending ACK to `%s' including delay of %u 
ms\n",
                    GNUNET_a2s (rc->src_addr,
                                (rc->src_addr->sa_family ==
                                 AF_INET) ? sizeof (struct sockaddr_in) :
-                               sizeof (struct sockaddr_in6)));
+                               sizeof (struct sockaddr_in6)),
+                               delay);
 #endif
-  udp = (struct UDPMessage *) buf;
-  udp->header.size = htons ((uint16_t) msize);
-  udp->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
-  udp->reserved = htonl (0);
-  udp->sender = *rc->plugin->env->my_identity;
-  memcpy (&udp[1], msg, ntohs (msg->size));
-  (void) udp_send (rc->plugin, rc->src_addr, &udp->header);
+  udp_ack = (struct UDP_ACK_Message *) buf;
+  udp_ack->header.size = htons ((uint16_t) msize);
+  udp_ack->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK);
+  udp_ack->delay = htonl (delay);
+  udp_ack->sender = *rc->plugin->env->my_identity;
+  memcpy (&udp_ack[1], msg, ntohs (msg->size));
+  (void) udp_send (rc->plugin, rc->src_addr, &udp_ack->header);
 }
 
 
@@ -978,6 +1082,8 @@
    * Number of bytes in 'addr'.
    */
   socklen_t addr_len;
+
+  struct Session * session;
 };
 
 
@@ -1024,10 +1130,12 @@
   const struct GNUNET_MessageHeader *msg;
   const struct GNUNET_MessageHeader *ack;
   struct Session *peer_session;
-  const struct UDPMessage *udp;
+  const struct UDP_ACK_Message *udp_ack;
   struct ReceiveContext *rc;
   struct GNUNET_TIME_Absolute now;
   struct FindReceiveContext frc;
+  struct Session * s = NULL;
+  struct GNUNET_TIME_Relative flow_delay;
 
   fromlen = sizeof (addr);
   memset (&addr, 0, sizeof (addr));
@@ -1062,20 +1170,26 @@
                          (const struct sockaddr *) addr, fromlen);
     return;
   case GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_ACK:
+
     if (ntohs (msg->size) <
-        sizeof (struct UDPMessage) + sizeof (struct GNUNET_MessageHeader))
+        sizeof (struct UDP_ACK_Message) + sizeof (struct GNUNET_MessageHeader))
     {
       GNUNET_break_op (0);
       return;
     }
-    udp = (const struct UDPMessage *) msg;
-    if (ntohl (udp->reserved) != 0)
+    udp_ack = (const struct UDP_ACK_Message *) msg;
+    s = find_inbound_session(plugin, &udp_ack->sender, addr, fromlen);
+    if (s != NULL)
     {
-      GNUNET_break_op (0);
-      return;
+      flow_delay.rel_value = (uint64_t) ntohl(udp_ack->delay);
+
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+                  "We received a sending delay of %llu\n", 
flow_delay.rel_value);
+
+      s->flow_delay_from_other_peer = 
GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), flow_delay);
     }
-    ack = (const struct GNUNET_MessageHeader *) &udp[1];
-    if (ntohs (ack->size) != ntohs (msg->size) - sizeof (struct UDPMessage))
+    ack = (const struct GNUNET_MessageHeader *) &udp_ack[1];
+    if (ntohs (ack->size) != ntohs (msg->size) - sizeof (struct 
UDP_ACK_Message))
     {
       GNUNET_break_op (0);
       return;
@@ -1087,7 +1201,7 @@
                 GNUNET_a2s ((const struct sockaddr *) addr, fromlen));
 #endif
 
-    peer_session = find_session (plugin, &udp->sender);
+    peer_session = find_session (plugin, &udp_ack->sender);
     if (NULL == peer_session)
     {
 #if DEBUG_UDP
@@ -1100,13 +1214,13 @@
       return;
     GNUNET_assert (GNUNET_OK ==
                    GNUNET_CONTAINER_multihashmap_remove (plugin->sessions,
-                                                         &udp->
+                                                         &udp_ack->
                                                          sender.hashPubKey,
                                                          peer_session));
     plugin->last_expected_delay =
         GNUNET_FRAGMENT_context_destroy (peer_session->frag);
     if (peer_session->cont != NULL)
-      peer_session->cont (peer_session->cont_cls, &udp->sender, GNUNET_OK);
+      peer_session->cont (peer_session->cont_cls, &udp_ack->sender, GNUNET_OK);
     GNUNET_free (peer_session);
     return;
   case GNUNET_MESSAGE_TYPE_FRAGMENT:
@@ -1717,27 +1831,6 @@
   return api;
 }
 
-/*
-
-static void invalidation_task (void *cls, const struct 
GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct Session * s = cls;
-  struct Plugin * plugin = s->plugin;
-
-  s->invalidation_task = GNUNET_SCHEDULER_NO_TASK;
-
-  GNUNET_CONTAINER_multihashmap_remove (plugin->inbound_sessions, 
&s->target.hashPubKey, s);
-
-
-  plugin->env->session_end (plugin->env->cls, &s->target, s);
-  LOG (GNUNET_ERROR_TYPE_ERROR,
-              "Session %X is now invalid\n", s);
-  destroy_session(s, &s->target.hashPubKey, s);
-}
-*/
-
-
-
 /**
  * Shutdown the plugin.
  *




reply via email to

[Prev in Thread] Current Thread [Next in Thread]