gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated: getting data structures in


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated: getting data structures in place for gnunet-service-tng
Date: Thu, 15 Nov 2018 15:43:43 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new 700359ed8 getting data structures in place for gnunet-service-tng
700359ed8 is described below

commit 700359ed8ce18fd6ddfc0940a5d0e5ba145c1fd1
Author: Christian Grothoff <address@hidden>
AuthorDate: Thu Nov 15 15:43:41 2018 +0100

    getting data structures in place for gnunet-service-tng
---
 src/transport/gnunet-service-tng.c | 518 +++++++++++++++++++++++++++++++++++--
 1 file changed, 491 insertions(+), 27 deletions(-)

diff --git a/src/transport/gnunet-service-tng.c 
b/src/transport/gnunet-service-tng.c
index 73b295442..1e638377a 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -75,6 +75,147 @@ enum ClientType
 /**
  * Client connected to the transport service.
  */
+struct TransportClient;
+
+
+/**
+ * A neighbour that at least one communicator is connected to.
+ */
+struct Neighbour;
+
+
+/**
+ * List of available queues for a particular neighbour.
+ */
+struct Queue
+{
+  /**
+   * Kept in a MDLL.
+   */
+  struct Queue *next_neighbour;
+
+  /**
+   * Kept in a MDLL.
+   */
+  struct Queue *prev_neighbour;
+
+  /**
+   * Kept in a MDLL.
+   */
+  struct Queue *prev_client;
+
+  /**
+   * Kept in a MDLL.
+   */
+  struct Queue *next_client;
+
+  /**
+   * Which neighbour is this queue for?
+   */
+  struct Neighbour *neighbour;
+
+  /**
+   * Which communicator offers this queue?
+   */
+  struct TransportClient *tc;
+
+  /**
+   * Unique identifier of this queue with the communicator.
+   */
+  uint32_t qid;
+
+  /**
+   * Network type offered by this queue.
+   */
+  enum GNUNET_ATS_Network_Type nt;
+
+  /**
+   * Address served by the queue.
+   */
+  const char *address;
+};
+
+
+/**
+ * A neighbour that at least one communicator is connected to.
+ */
+struct Neighbour
+{
+
+  /**
+   * Which peer is this about?
+   */
+  struct GNUNET_PeerIdentity pid;
+
+  /**
+   * Head of list of messages pending for this neighbour.
+   */
+  struct PendingMessage *pending_msg_head;
+
+  /**
+   * Tail of list of messages pending for this neighbour.
+   */
+  struct PendingMessage *pending_msg_tail;
+
+  /**
+   * Head of DLL of queues to this peer.
+   */
+  struct Queue *queue_head;
+
+  /**
+   * Tail of DLL of queues to this peer.
+   */
+  struct Queue *queue_tail;
+  
+};
+
+
+/**
+ * Transmission request from CORE that is awaiting delivery.
+ */
+struct PendingMessage
+{
+  /**
+   * Kept in a MDLL of messages for this @a target.
+   */
+  struct PendingMessage *next_neighbour;
+
+  /**
+   * Kept in a MDLL of messages for this @a target.
+   */
+  struct PendingMessage *prev_neighbour;
+
+  /**
+   * Kept in a MDLL of messages from this @a client.
+   */
+  struct PendingMessage *next_client;
+
+  /**
+   * Kept in a MDLL of messages from this @a client.
+   */
+  struct PendingMessage *prev_client;
+
+  /**
+   * Target of the request.
+   */
+  struct Neighbour *target;
+
+  /**
+   * Client that issued the transmission request.
+   */
+  struct TransportClient *client;
+
+  /**
+   * Size of the original message.
+   */
+  uint32_t bytes_msg;
+  
+};
+
+
+/**
+ * Client connected to the transport service.
+ */
 struct TransportClient
 {
 
@@ -107,17 +248,63 @@ struct TransportClient
   {
 
     /**
-     * Peer identity to monitor the addresses of.
-     * Zero to monitor all neighbours.  Valid if
-     * @e type is #CT_MONITOR.
+     * Information for @e type #CT_CORE.
      */
-    struct GNUNET_PeerIdentity monitor_peer;
+    struct {
+
+      /**
+       * Head of list of messages pending for this client.
+       */
+      struct PendingMessage *pending_msg_head;
+
+      /**
+       * Tail of list of messages pending for this client.
+       */
+      struct PendingMessage *pending_msg_tail;
+      
+    } core;
+
+    /**
+     * Information for @e type #CT_MONITOR.
+     */
+    struct {
+    
+      /**
+       * Peer identity to monitor the addresses of.
+       * Zero to monitor all neighbours.  Valid if
+       * @e type is #CT_MONITOR.
+       */
+      struct GNUNET_PeerIdentity peer;
+
+      /**
+       * Is this a one-shot monitor?
+       */
+      int one_shot;
+      
+    } monitor;
+    
 
     /**
-     * If @e type is #CT_COMMUNICATOR, this communicator
-     * supports communicating using these addresses.
+     * Information for @e type #CT_COMMUNICATOR.
      */
-    const char *address_prefix;
+    struct {    
+      /**
+       * If @e type is #CT_COMMUNICATOR, this communicator
+       * supports communicating using these addresses.
+       */
+      char *address_prefix;
+      
+      /**
+       * Head of DLL of queues offered by this communicator.
+       */
+      struct Queue *queue_head;
+      
+      /**
+       * Tail of DLL of queues offered by this communicator.
+       */
+      struct Queue *queue_tail;
+      
+    } communicator;
 
   } details;
 
@@ -154,6 +341,26 @@ struct GNUNET_PeerIdentity GST_my_identity;
  */
 struct GNUNET_CRYPTO_EddsaPrivateKey *GST_my_private_key;
 
+/**
+ * Map from PIDs to `struct Neighbour` entries.  A peer is
+ * a neighbour if we have an MQ to it from some communicator.
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
+
+
+/**
+ * Lookup neighbour record for peer @a pid.
+ *
+ * @param pid neighbour to look for
+ * @return NULL if we do not have this peer as a neighbour
+ */
+static struct Neighbour *
+lookup_neighbour (const struct GNUNET_PeerIdentity *pid)
+{
+  return GNUNET_CONTAINER_multipeermap_get (neighbours,
+                                           pid);
+}
+
 
 /**
  * Called whenever a client connects.  Allocates our
@@ -210,10 +417,23 @@ client_disconnect_cb (void *cls,
   case CT_NONE:
     break;
   case CT_CORE:
+    {
+      struct PendingMessage *pm;
+
+      while (NULL != (pm = tc->details.core.pending_msg_head))
+      {
+       GNUNET_CONTAINER_MDLL_remove (client,
+                                     tc->details.core.pending_msg_head,
+                                     tc->details.core.pending_msg_tail,
+                                     pm);
+       pm->client = NULL;
+      }
+    }
     break;
   case CT_MONITOR:
     break;
   case CT_COMMUNICATOR:
+    GNUNET_free (tc->details.communicator.address_prefix);
     break;
   }
   GNUNET_free (tc);
@@ -268,10 +488,15 @@ static int
 check_client_send (void *cls,
                   const struct OutboundMessage *obm)
 {
+  struct TransportClient *tc = cls;
   uint16_t size;
   const struct GNUNET_MessageHeader *obmm;
-
-  (void) cls;
+  
+  if (CT_CORE != tc->type)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
   size = ntohs (obm->header.size) - sizeof (struct OutboundMessage);
   if (size < sizeof (struct GNUNET_MessageHeader))
   {
@@ -289,6 +514,51 @@ check_client_send (void *cls,
 
 
 /**
+ * Send a response to the @a pm that we have processed a 
+ * "send" request with status @a success. We
+ * transmitted @a bytes_physical on the actual wire.
+ * Sends a confirmation to the "core" client responsible
+ * for the original request and free's @a pm.
+ *
+ * @param pm handle to the original pending message
+ * @param success status code, #GNUNET_OK on success, #GNUNET_SYSERR
+ *          for transmission failure
+ * @param bytes_physical amount of bandwidth consumed
+ */
+static void
+client_send_response (struct PendingMessage *pm,
+                     int success,
+                     uint32_t bytes_physical)
+{
+  struct TransportClient *tc = pm->client;
+  struct Neighbour *target = pm->target;
+  struct GNUNET_MQ_Envelope *env;
+  struct SendOkMessage *som;
+
+  if (NULL != tc)
+  {
+    env = GNUNET_MQ_msg (som,
+                        GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
+    som->success = htonl ((uint32_t) success);
+    som->bytes_msg = htonl (pm->bytes_msg);
+    som->bytes_physical = htonl (bytes_physical);
+    som->peer = target->pid;
+    GNUNET_MQ_send (tc->mq,
+                   env);
+    GNUNET_CONTAINER_MDLL_remove (client,
+                                 tc->details.core.pending_msg_head,
+                                 tc->details.core.pending_msg_tail,
+                                 pm);
+  }
+  GNUNET_CONTAINER_MDLL_remove (neighbour,
+                               target->pending_msg_head,
+                               target->pending_msg_tail,
+                               pm);
+  GNUNET_free (pm);
+}
+
+
+/**
  * Client asked for transmission to a peer.  Process the request.
  *
  * @param cls the client
@@ -299,9 +569,55 @@ handle_client_send (void *cls,
                    const struct OutboundMessage *obm)
 {
   struct TransportClient *tc = cls;
+  struct PendingMessage *pm;
   const struct GNUNET_MessageHeader *obmm;
+  struct Neighbour *target;
+  uint32_t bytes_msg;
 
+  GNUNET_assert (CT_CORE == tc->type);
   obmm = (const struct GNUNET_MessageHeader *) &obm[1];
+  bytes_msg = ntohs (obmm->size);
+  target = lookup_neighbour (&obm->peer);
+  if (NULL == target)
+  {
+    /* Failure: don't have this peer as a neighbour (anymore).
+       Might have gone down asynchronously, so this is NOT
+       a protocol violation by CORE. Still count the event,
+       as this should be rare. */
+    struct GNUNET_MQ_Envelope *env;
+    struct SendOkMessage *som;
+    
+    env = GNUNET_MQ_msg (som,
+                        GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
+    som->success = htonl (GNUNET_SYSERR);
+    som->bytes_msg = htonl (bytes_msg);
+    som->bytes_physical = htonl (0);
+    som->peer = obm->peer;
+    GNUNET_MQ_send (tc->mq,
+                   env);
+    GNUNET_SERVICE_client_continue (tc->client);
+    GNUNET_STATISTICS_update (GST_stats,
+                             "# messages dropped (neighbour unknown)",
+                             1,
+                             GNUNET_NO);
+    return;
+  }  
+  pm = GNUNET_new (struct PendingMessage);
+  pm->client = tc;
+  pm->target = target;
+  pm->bytes_msg = bytes_msg;
+  GNUNET_CONTAINER_MDLL_insert (neighbour,
+                               target->pending_msg_head,
+                               target->pending_msg_tail,
+                               pm);
+  GNUNET_CONTAINER_MDLL_insert (client,
+                               tc->details.core.pending_msg_head,
+                               tc->details.core.pending_msg_tail,
+                               pm);
+  // FIXME: do the work, continuation with:
+  client_send_response (pm,
+                       GNUNET_NO,
+                       0);
 }
 
 
@@ -315,10 +631,16 @@ static int
 check_communicator_available (void *cls,
                               const struct 
GNUNET_TRANSPORT_CommunicatorAvailableMessage *cam)
 {
+  struct TransportClient *tc = cls;
   const char *addr;
   uint16_t size;
 
-  (void) cls;
+  if (CT_NONE != tc->type)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  tc->type = CT_COMMUNICATOR;
   size = ntohs (cam->header.size) - sizeof (*cam);
   if (0 == size)
     return GNUNET_OK; /* receive-only communicator */
@@ -345,17 +667,10 @@ handle_communicator_available (void *cls,
   struct TransportClient *tc = cls;
   uint16_t size;
 
-  if (CT_NONE != tc->type)
-  {
-    GNUNET_break (0);
-    GNUNET_SERVICE_client_drop (tc->client);
-    return;
-  }
-  tc->type = CT_COMMUNICATOR;
   size = ntohs (cam->header.size) - sizeof (*cam);
   if (0 == size)
     return; /* receive-only communicator */
-  tc->details.address_prefix = GNUNET_strdup ((const char *) &cam[1]);
+  tc->details.communicator.address_prefix = GNUNET_strdup ((const char *) 
&cam[1]);
   GNUNET_SERVICE_client_continue (tc->client);
 }
 
@@ -370,10 +685,15 @@ static int
 check_add_address (void *cls,
                    const struct GNUNET_TRANSPORT_AddAddressMessage *aam)
 {
+  struct TransportClient *tc = cls;
   const char *addr;
   uint16_t size;
 
-  (void) cls;
+  if (CT_COMMUNICATOR != tc->type)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
   size = ntohs (aam->header.size) - sizeof (*aam);
   if (0 == size)
   {
@@ -418,12 +738,19 @@ handle_del_address (void *cls,
 {
   struct TransportClient *tc = cls;
 
+  if (CT_COMMUNICATOR != tc->type)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (tc->client);
+    return;
+  }
+
   GNUNET_SERVICE_client_continue (tc->client);
 }
 
 
 /**
- * Client asked for transmission to a peer.  Process the request.
+ * Client notified us about transmission from a peer.  Process the request.
  *
  * @param cls the client
  * @param obm the send message that was sent
@@ -432,10 +759,15 @@ static int
 check_incoming_msg (void *cls,
                     const struct GNUNET_TRANSPORT_IncomingMessage *im)
 {
+  struct TransportClient *tc = cls;
   uint16_t size;
   const struct GNUNET_MessageHeader *obmm;
 
-  (void) cls;
+  if (CT_COMMUNICATOR != tc->type)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
   size = ntohs (im->header.size) - sizeof (*im);
   if (size < sizeof (struct GNUNET_MessageHeader))
   {
@@ -478,10 +810,15 @@ static int
 check_add_queue_message (void *cls,
                          const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
 {
+  struct TransportClient *tc = cls;
   const char *addr;
   uint16_t size;
 
-  (void) cls;
+  if (CT_COMMUNICATOR != tc->type)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
   size = ntohs (aqm->header.size) - sizeof (*aqm);
   if (0 == size)
   {
@@ -509,12 +846,66 @@ handle_add_queue_message (void *cls,
                           const struct GNUNET_TRANSPORT_AddQueueMessage *aqm)
 {
   struct TransportClient *tc = cls;
+  struct Queue *queue;
+  struct Neighbour *neighbour;
+  const char *addr;
+  uint16_t addr_len;
 
+  neighbour = lookup_neighbour (&aqm->receiver);
+  if (NULL == neighbour)
+  {
+    neighbour = GNUNET_new (struct Neighbour);
+    neighbour->pid = aqm->receiver;
+    GNUNET_assert (GNUNET_OK ==
+                  GNUNET_CONTAINER_multipeermap_put (neighbours,
+                                                     &neighbour->pid,
+                                                     neighbour,
+                                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    // FIXME: notify cores/monitors!
+  }
+  addr_len = ntohs (aqm->header.size) - sizeof (*aqm);
+  addr = (const char *) &aqm[1];
+  
+  queue = GNUNET_malloc (sizeof (struct Queue) + addr_len);
+  queue->qid = aqm->qid;
+  queue->nt = (enum GNUNET_ATS_Network_Type) ntohl (aqm->nt);
+  queue->tc = tc;
+  queue->neighbour = neighbour;
+  queue->address = (const char *) &queue[1];
+  memcpy (&queue[1],
+         addr,
+         addr_len);
+  GNUNET_CONTAINER_MDLL_insert (neighbour,
+                               neighbour->queue_head,
+                               neighbour->queue_tail,
+                               queue);
+  GNUNET_CONTAINER_MDLL_insert (client,
+                               tc->details.communicator.queue_head,
+                               tc->details.communicator.queue_tail,
+                               queue);
+  // FIXME: possibly transmit queued messages?
   GNUNET_SERVICE_client_continue (tc->client);
 }
 
 
 /**
+ * Release memory used by @a neighbour.
+ *
+ * @param neighbour neighbour entry to free
+ */
+static void
+free_neighbour (struct Neighbour *neighbour)
+{
+  GNUNET_assert (NULL == neighbour->queue_head);
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multipeermap_remove (neighbours,
+                                                      &neighbour->pid,
+                                                      neighbour));
+  GNUNET_free (neighbour);
+}
+
+
+/**
  * Queue to a peer went down.  Process the request.
  *
  * @param cls the client
@@ -526,7 +917,42 @@ handle_del_queue_message (void *cls,
 {
   struct TransportClient *tc = cls;
 
-  GNUNET_SERVICE_client_continue (tc->client);
+  if (CT_COMMUNICATOR != tc->type)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (tc->client);
+    return;
+  }
+  for (struct Queue *queue = tc->details.communicator.queue_head;
+       NULL != queue;
+       queue = queue->next_client)
+  {
+    struct Neighbour *neighbour = queue->neighbour;
+
+    if ( (dqm->qid != queue->qid) ||
+        (0 != memcmp (&dqm->receiver,
+                      &neighbour->pid,
+                      sizeof (struct GNUNET_PeerIdentity))) )
+      continue;
+    GNUNET_CONTAINER_MDLL_remove (neighbour,
+                                 neighbour->queue_head,
+                                 neighbour->queue_tail,
+                                 queue);
+    GNUNET_CONTAINER_MDLL_remove (client,
+                                 tc->details.communicator.queue_head,
+                                 tc->details.communicator.queue_tail,
+                                 queue);
+    GNUNET_free (queue);
+    if (NULL == neighbour->queue_head)
+    {
+      // FIXME: notify cores/monitors!
+      free_neighbour (neighbour);
+    }
+    GNUNET_SERVICE_client_continue (tc->client);    
+    return;
+  }
+  GNUNET_break (0);
+  GNUNET_SERVICE_client_drop (tc->client);
 }
 
 
@@ -542,6 +968,12 @@ handle_send_message_ack (void *cls,
 {
   struct TransportClient *tc = cls;
 
+  if (CT_COMMUNICATOR != tc->type)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (tc->client);
+    return;
+  }
   GNUNET_SERVICE_client_continue (tc->client);
 }
 
@@ -565,20 +997,45 @@ handle_monitor_start (void *cls,
     return;
   }
   tc->type = CT_MONITOR;
-  tc->details.monitor_peer = start->peer;
-  // FIXME: remember also the one_shot flag!
+  tc->details.monitor.peer = start->peer;
+  tc->details.monitor.one_shot = ntohl (start->one_shot);
+  // FIXME: do work!
   GNUNET_SERVICE_client_continue (tc->client);
 }
 
 
 /**
+ * Free neighbour entry.
+ *
+ * @param cls NULL
+ * @param pid unused
+ * @param value a `struct Neighbour`
+ * @return #GNUNET_OK (always)
+ */
+static int
+free_neighbour_cb (void *cls,
+                  const struct GNUNET_PeerIdentity *pid,
+                  void *value)
+{
+  struct Neighbour *neighbour = value;
+
+  (void) cls;
+  (void) pid;  
+  GNUNET_break (0); // should this ever happen?
+  free_neighbour (neighbour);
+  
+  return GNUNET_OK;
+}
+
+
+/**
  * Function called when the service shuts down.  Unloads our plugins
  * and cancels pending validations.
  *
  * @param cls closure, unused
  */
 static void
-shutdown_task (void *cls)
+do_shutdown (void *cls)
 {
   (void) cls;
 
@@ -593,6 +1050,10 @@ shutdown_task (void *cls)
     GNUNET_free (GST_my_private_key);
     GST_my_private_key = NULL;
   }
+  GNUNET_CONTAINER_multipeermap_iterate (neighbours,
+                                        &free_neighbour_cb,
+                                        NULL);
+  GNUNET_CONTAINER_multipeermap_destroy (neighbours);
 }
 
 
@@ -608,8 +1069,11 @@ run (void *cls,
      const struct GNUNET_CONFIGURATION_Handle *c,
      struct GNUNET_SERVICE_Handle *service)
 {
+  (void) cls;
   /* setup globals */
   GST_cfg = c;
+  neighbours = GNUNET_CONTAINER_multipeermap_create (1024,
+                                                    GNUNET_YES);
   GST_my_private_key = GNUNET_CRYPTO_eddsa_key_create_from_configuration 
(GST_cfg);
   if (NULL == GST_my_private_key)
   {
@@ -626,7 +1090,7 @@ run (void *cls,
 
   GST_stats = GNUNET_STATISTICS_create ("transport",
                                         GST_cfg);
-  GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+  GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
                                 NULL);
   /* start subsystems */
 }

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]