gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated: Add possibility to send mul


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated: Add possibility to send multiple peers to client
Date: Tue, 18 Sep 2018 17:20:41 +0200

This is an automated email from the git hooks/post-receive script.

julius-buenger pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new bd6822783 Add possibility to send multiple peers to client
bd6822783 is described below

commit bd6822783a5daa6d03f1af13e0b4f05ba56df42a
Author: Julius Bünger <address@hidden>
AuthorDate: Tue Sep 18 13:59:37 2018 +0200

    Add possibility to send multiple peers to client
---
 src/include/gnunet_rps_service.h |  1 +
 src/rps/gnunet-rps.c             | 32 ++++++++++++++++----
 src/rps/gnunet-service-rps.c     | 64 ++++++++++++++++++++++++----------------
 src/rps/rps.h                    |  6 ++--
 src/rps/rps_api.c                | 43 +++++++++++++++++++++++----
 5 files changed, 108 insertions(+), 38 deletions(-)

diff --git a/src/include/gnunet_rps_service.h b/src/include/gnunet_rps_service.h
index 252188c62..eda012076 100644
--- a/src/include/gnunet_rps_service.h
+++ b/src/include/gnunet_rps_service.h
@@ -79,6 +79,7 @@ typedef void (* GNUNET_RPS_ViewUpdateCB) (void *cls,
  * @param peer The received peer
  */
 typedef void (* GNUNET_RPS_StreamInputCB) (void *cls,
+    uint64_t num_peers,
     const struct GNUNET_PeerIdentity *peer);
 
 /**
diff --git a/src/rps/gnunet-rps.c b/src/rps/gnunet-rps.c
index d2c497fd4..d0f905f51 100644
--- a/src/rps/gnunet-rps.c
+++ b/src/rps/gnunet-rps.c
@@ -59,7 +59,7 @@ static int stream_input;
 static uint64_t num_view_updates;
 
 /**
- * @brief Number of updates we want to receive
+ * @brief Number of peers we want to receive from stream
  */
 static uint64_t num_stream_peers;
 
@@ -154,11 +154,33 @@ view_update_handle (void *cls,
  */
 static void
 stream_input_handle (void *cls,
-                     const struct GNUNET_PeerIdentity *recv_peer)
+                     uint64_t num_peers,
+                     const struct GNUNET_PeerIdentity *recv_peers)
 {
-  // TODO when source of peer is sent, also print source
-  FPRINTF (stdout, "%s\n",
-           GNUNET_i2s_full (recv_peer));
+  uint64_t i;
+  (void) cls;
+
+  if (0 == num_peers)
+  {
+    FPRINTF (stdout, "Empty view\n");
+  }
+  req_handle = NULL;
+  for (i = 0; i < num_peers; i++)
+  {
+    FPRINTF (stdout, "%s\n",
+             GNUNET_i2s_full (&recv_peers[i]));
+
+    if (1 == num_stream_peers)
+    {
+      ret = 0;
+      GNUNET_SCHEDULER_shutdown ();
+      break;
+    }
+    else if (1 < num_stream_peers)
+    {
+      num_stream_peers--;
+    }
+  }
 }
 
 
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 5b78bb4a8..4da73b09c 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -2208,8 +2208,8 @@ send_view (const struct ClientContext *cli_ctx,
   out_msg->num_peers = htonl (view_size);
 
   GNUNET_memcpy (&out_msg[1],
-          view_array,
-          view_size * sizeof (struct GNUNET_PeerIdentity));
+                 view_array,
+                 view_size * sizeof (struct GNUNET_PeerIdentity));
   GNUNET_MQ_send (cli_ctx->mq, ev);
 }
 
@@ -2217,25 +2217,30 @@ send_view (const struct ClientContext *cli_ctx,
 /**
  * @brief Send peer from biased stream to client.
  *
+ * TODO merge with send_view, parameterise
+ *
  * @param cli_ctx the context of the client
  * @param view_array the peerids of the view as array (can be empty)
  * @param view_size the size of the view array (can be 0)
  */
 void
 send_stream_peer (const struct ClientContext *cli_ctx,
-                  const struct GNUNET_PeerIdentity *peer)
+                  uint64_t num_peers,
+                  const struct GNUNET_PeerIdentity *peers)
 {
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
 
-  GNUNET_assert (NULL != peer);
+  GNUNET_assert (NULL != peers);
 
-  ev = GNUNET_MQ_msg (out_msg,
-                      GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
+  ev = GNUNET_MQ_msg_extra (out_msg,
+                            num_peers * sizeof (struct GNUNET_PeerIdentity),
+                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
+  out_msg->num_peers = htonl (num_peers);
 
-  GNUNET_memcpy (&out_msg->peer,
-          peer,
-          sizeof (struct GNUNET_PeerIdentity));
+  GNUNET_memcpy (&out_msg[1],
+                 peers,
+                 num_peers * sizeof (struct GNUNET_PeerIdentity));
   GNUNET_MQ_send (cli_ctx->mq, ev);
 }
 
@@ -2288,36 +2293,45 @@ clients_notify_view_update (void)
  * @brief sends updates to clients that are interested
  */
 static void
-clients_notify_stream_peer (const struct GNUNET_PeerIdentity *peer)
-                            //enum StreamPeerSource)
+clients_notify_stream_peer (uint64_t num_peers,
+                            const struct GNUNET_PeerIdentity *peers)
+                            // TODO enum StreamPeerSource)
 {
   struct ClientContext *cli_ctx_iter;
+  uint64_t num_peers_send;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
       "Got peer (%s) from biased stream - update all clients\n",
-      GNUNET_i2s (peer));
+      GNUNET_i2s (peers));
 
-  /* check size of view is small enough */
   for (cli_ctx_iter = cli_ctx_head;
        NULL != cli_ctx_iter;
        cli_ctx_iter = cli_ctx_head->next)
   {
-    if (1 < cli_ctx_iter->stream_peers_left)
+    if (0 < cli_ctx_iter->stream_peers_left)
     {
       /* Client wants to receive limited amount of updates */
-      cli_ctx_iter->stream_peers_left -= 1;
-    } else if (1 == cli_ctx_iter->stream_peers_left)
+      if (num_peers > cli_ctx_iter->stream_peers_left)
+      {
+        num_peers_send = num_peers - cli_ctx_iter->stream_peers_left;
+        cli_ctx_iter->stream_peers_left = 0;
+      }
+      else
+      {
+        num_peers_send = cli_ctx_iter->stream_peers_left - num_peers;
+        cli_ctx_iter->stream_peers_left -= num_peers_send;
+      }
+    } else if (0 > cli_ctx_iter->stream_peers_left)
     {
-      /* Last update of view for client */
-      cli_ctx_iter->stream_peers_left = -1;
-    } else if (0 > cli_ctx_iter->stream_peers_left) {
       /* Client is not interested in updates */
       continue;
+    } else /* _updates_left == 0 - infinite amount of updates */
+    {
+      num_peers_send = num_peers;
     }
-    /* else _updates_left == 0 - infinite amount of updates */
 
     /* send view */
-    send_stream_peer (cli_ctx_iter, peer);
+    send_stream_peer (cli_ctx_iter, num_peers_send, peers);
   }
 }
 
@@ -2338,7 +2352,7 @@ hist_update (void *cls,
     inserted = insert_in_view (&ids[i]);
     if (GNUNET_OK == inserted)
     {
-      clients_notify_stream_peer (&ids[i]);
+      clients_notify_stream_peer (1, &ids[i]);
     }
     to_file (file_name_view_log,
              "+%s\t(hist)",
@@ -2549,7 +2563,7 @@ insert_in_view_op (void *cls,
   inserted = insert_in_view (peer);
   if (GNUNET_OK == inserted)
   {
-    clients_notify_stream_peer (peer);
+    clients_notify_stream_peer (1, peer);
   }
 }
 
@@ -3834,7 +3848,7 @@ do_round (void *cls)
                                                                   permut[i]));
       if (GNUNET_OK == inserted)
       {
-        clients_notify_stream_peer (
+        clients_notify_stream_peer (1,
             CustomPeerMap_get_peer_by_index (push_map, permut[i]));
       }
       to_file (file_name_view_log,
@@ -3855,7 +3869,7 @@ do_round (void *cls)
             permut[i - first_border]));
       if (GNUNET_OK == inserted)
       {
-        clients_notify_stream_peer (
+        clients_notify_stream_peer (1,
             CustomPeerMap_get_peer_by_index (push_map, permut[i]));
       }
       to_file (file_name_view_log,
diff --git a/src/rps/rps.h b/src/rps/rps.h
index 66b2dd962..26615bfc5 100644
--- a/src/rps/rps.h
+++ b/src/rps/rps.h
@@ -250,11 +250,13 @@ struct GNUNET_RPS_CS_DEBUG_StreamReply
   uint32_t id GNUNET_PACKED;
 
   /**
-   * @brief The peer of the biased stream
+   * Number of peers
    */
-  struct GNUNET_PeerIdentity peer;
+  uint64_t num_peers GNUNET_PACKED;
 
   // TODO maybe source of peer (pull/push list, peerinfo, ...)
+
+  /* Followed by num_peers * GNUNET_PeerIdentity */
 };
 
 GNUNET_NETWORK_STRUCT_END
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index b7644540d..96660ded6 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -377,6 +377,34 @@ handle_view_update (void *cls,
  * It calls the callback the caller provided
  * and disconnects afterwards.
  *
+ * TODO merge with check_view_update
+ *
+ * @param msg the message
+ */
+static int
+check_stream_input (void *cls,
+                    const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
+{
+  uint16_t msize = ntohs (msg->header.size);
+  uint32_t num_peers = ntohl (msg->num_peers);
+  (void) cls;
+
+  msize -= sizeof (struct GNUNET_RPS_CS_DEBUG_StreamReply);
+  if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
+       (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+/**
+ * This function is called, when the service sends another peer from the biased
+ * stream.
+ * It calls the callback the caller provided
+ * and disconnects afterwards.
+ *
  * @param msg the message
  */
 static void
@@ -384,14 +412,17 @@ handle_stream_input (void *cls,
                      const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
 {
   struct GNUNET_RPS_Handle *h = cls;
+  const struct GNUNET_PeerIdentity *peers;
 
   /* Give the peers back */
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "New peer of biased input stream\n");
+       "New peer of %" PRIu64 " biased input stream\n",
+       ntohl (msg->num_peers));
 
+  peers = (struct GNUNET_PeerIdentity *) &msg[1];
   GNUNET_assert (NULL != h);
   GNUNET_assert (NULL != h->stream_input_cb);
-  h->stream_input_cb (h->stream_input_cb, &msg->peer);
+  h->stream_input_cb (h->stream_input_cb, ntohl (msg->num_peers), peers);
 }
 
 
@@ -444,10 +475,10 @@ reconnect (struct GNUNET_RPS_Handle *h)
                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
                            struct GNUNET_RPS_CS_DEBUG_ViewReply,
                            h),
-    GNUNET_MQ_hd_fixed_size (stream_input,
-                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
-                             struct GNUNET_RPS_CS_DEBUG_StreamReply,
-                             h),
+    GNUNET_MQ_hd_var_size (stream_input,
+                           GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
+                           struct GNUNET_RPS_CS_DEBUG_StreamReply,
+                           h),
     GNUNET_MQ_handler_end ()
   };
 

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]