gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] 02/06: RPS api: Schedule callback


From: gnunet
Subject: [GNUnet-SVN] [gnunet] 02/06: RPS api: Schedule callback
Date: Fri, 23 Nov 2018 00:47:32 +0100

This is an automated email from the git hooks/post-receive script.

julius-buenger pushed a commit to branch master
in repository gnunet.

commit 101694a7120bc90c8c02024294b6f098b1bca9ff
Author: Julius Bünger <address@hidden>
AuthorDate: Thu Nov 22 13:07:47 2018 +0100

    RPS api: Schedule callback
---
 src/rps/rps_api.c | 84 +++++++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 69 insertions(+), 15 deletions(-)

diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index cfab06f17..420323c4b 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -11,7 +11,7 @@
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      Affero General Public License for more details.
-    
+
      You should have received a copy of the GNU Affero General Public License
      along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
@@ -52,6 +52,11 @@ struct GNUNET_RPS_StreamRequestHandle
   void *ready_cb_cls;
 
   /**
+   * @brief Scheduler task for scheduled callback
+   */
+  struct GNUNET_SCHEDULER_Task *callback_task;
+
+  /**
    * @brief Next element of the DLL
    */
   struct GNUNET_RPS_StreamRequestHandle *next;
@@ -172,6 +177,19 @@ struct cb_cls_pack
 
 
 /**
+ * @brief Peers received from the biased stream to be passed to all
+ * srh_handlers
+ */
+static struct GNUNET_PeerIdentity *srh_callback_peers;
+
+/**
+ * @brief Number of peers in the biased stream that are to be passed to all
+ * srh_handlers
+ */
+static uint64_t srh_callback_num_peers;
+
+
+/**
  * @brief Create a new handle for a stream request
  *
  * @param rps_handle The rps handle
@@ -213,6 +231,12 @@ remove_stream_request (struct 
GNUNET_RPS_StreamRequestHandle *srh,
                        struct GNUNET_RPS_StreamRequestHandle *srh_head,
                        struct GNUNET_RPS_StreamRequestHandle *srh_tail)
 {
+  GNUNET_assert (NULL != srh);
+  if (NULL != srh->callback_task)
+  {
+    GNUNET_SCHEDULER_cancel (srh->callback_task);
+    srh->callback_task = NULL;
+  }
   GNUNET_CONTAINER_DLL_remove (srh_head,
                                srh_tail,
                                srh);
@@ -425,12 +449,10 @@ GNUNET_RPS_stream_cancel (struct 
GNUNET_RPS_StreamRequestHandle *srh)
 {
   struct GNUNET_RPS_Handle *rps_handle;
 
-  GNUNET_assert (NULL != srh);
   rps_handle = srh->rps_handle;
-  GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head,
-                               rps_handle->stream_requests_tail,
-                               srh);
-  GNUNET_free (srh);
+  remove_stream_request (srh,
+                         rps_handle->stream_requests_head,
+                         rps_handle->stream_requests_tail);
   if (NULL == rps_handle->stream_requests_head) cancel_stream (rps_handle);
 }
 
@@ -463,6 +485,24 @@ check_stream_input (void *cls,
   return GNUNET_OK;
 }
 
+
+/**
+ * @brief Called by the scheduler to call the callbacks of the srh handlers
+ *
+ * @param cls Stream request handle
+ */
+static void
+srh_callback_scheduled (void *cls)
+{
+  struct GNUNET_RPS_StreamRequestHandle *srh = cls;
+
+  srh->callback_task = NULL;
+  srh->ready_cb (srh->ready_cb_cls,
+                 srh_callback_num_peers,
+                 srh_callback_peers);
+}
+
+
 /**
  * This function is called, when the service sends another peer from the biased
  * stream.
@@ -476,13 +516,20 @@ handle_stream_input (void *cls,
                      const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
 {
   struct GNUNET_RPS_Handle *h = cls;
-  const struct GNUNET_PeerIdentity *peers;
+  //const struct GNUNET_PeerIdentity *peers;
   uint64_t num_peers;
   struct GNUNET_RPS_StreamRequestHandle *srh_iter;
   struct GNUNET_RPS_StreamRequestHandle *srh_next;
 
-  peers = (struct GNUNET_PeerIdentity *) &msg[1];
+  //peers = (struct GNUNET_PeerIdentity *) &msg[1];
   num_peers = ntohl (msg->num_peers);
+  srh_callback_num_peers = num_peers;
+  if (NULL != srh_callback_peers) GNUNET_free (srh_callback_peers);
+  srh_callback_peers =
+    GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity));
+  GNUNET_memcpy (srh_callback_peers,
+                 &msg[1],
+                 num_peers * sizeof (struct GNUNET_PeerIdentity));
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Received %" PRIu64 " peer(s) from stream input.\n",
        num_peers);
@@ -492,9 +539,12 @@ handle_stream_input (void *cls,
     LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n");
     /* Store next pointer - srh might be removed/freed in callback */
     srh_next = srh_iter->next;
-    srh_iter->ready_cb (srh_iter->ready_cb_cls,
-                        num_peers,
-                        peers);
+    if (NULL != srh_iter->callback_task)
+    {
+      GNUNET_SCHEDULER_cancel (srh_iter->callback_task);
+    }
+    srh_iter->callback_task =
+      GNUNET_SCHEDULER_add_now (srh_callback_scheduled, srh_iter);
     srh_iter = srh_next;
   }
 
@@ -855,10 +905,9 @@ GNUNET_RPS_request_cancel (struct 
GNUNET_RPS_Request_Handle *rh)
 
   h = rh->rps_handle;
   GNUNET_assert (NULL != rh);
-  GNUNET_assert (NULL != rh->srh);
-  remove_stream_request (rh->srh,
-                         h->stream_requests_head,
-                         h->stream_requests_tail);
+  GNUNET_assert (h == rh->srh->rps_handle);
+  GNUNET_RPS_stream_cancel (rh->srh);
+  rh->srh = NULL;
   if (NULL == h->stream_requests_head) cancel_stream(h);
   if (NULL != rh->sampler_rh)
   {
@@ -891,6 +940,11 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
       GNUNET_RPS_stream_cancel (srh_tmp);
     }
   }
+  if (NULL != srh_callback_peers)
+  {
+    GNUNET_free (srh_callback_peers);
+    srh_callback_peers = NULL;
+  }
   if (NULL != h->view_update_cb)
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]