gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r36372 - in gnunet/src: include multicast util


From: gnunet
Subject: [GNUnet-SVN] r36372 - in gnunet/src: include multicast util
Date: Sat, 26 Sep 2015 19:09:57 +0200

Author: tg
Date: 2015-09-26 19:09:57 +0200 (Sat, 26 Sep 2015)
New Revision: 36372

Modified:
   gnunet/src/include/gnunet_multicast_service.h
   gnunet/src/include/gnunet_protocols.h
   gnunet/src/multicast/gnunet-service-multicast.c
   gnunet/src/multicast/multicast.h
   gnunet/src/multicast/multicast_api.c
   gnunet/src/multicast/test_multicast.c
   gnunet/src/util/client_manager.c
Log:
multicast: replay

Modified: gnunet/src/include/gnunet_multicast_service.h
===================================================================
--- gnunet/src/include/gnunet_multicast_service.h       2015-09-26 17:09:48 UTC 
(rev 36371)
+++ gnunet/src/include/gnunet_multicast_service.h       2015-09-26 17:09:57 UTC 
(rev 36372)
@@ -750,7 +750,9 @@
 struct GNUNET_MULTICAST_MemberReplayHandle *
 GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member 
*member,
                                          uint64_t fragment_id,
-                                         uint64_t flags);
+                                         uint64_t flags,
+                                         GNUNET_MULTICAST_ResultCallback 
result_cb,
+                                         void *result_cb_cls);
 
 
 /**

Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2015-09-26 17:09:48 UTC (rev 
36371)
+++ gnunet/src/include/gnunet_protocols.h       2015-09-26 17:09:57 UTC (rev 
36372)
@@ -2437,8 +2437,18 @@
  */
 #define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBERSHIP_TEST_RESULT 762
 
+/**
+ * C<->S<->T: Replay response from a group member to another member.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE 763
 
+/**
+ * C<->S: End of replay response.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END 764
 
+
+
 
/*******************************************************************************
  * SECRETSHARING message types
  
******************************************************************************/

Modified: gnunet/src/multicast/gnunet-service-multicast.c
===================================================================
--- gnunet/src/multicast/gnunet-service-multicast.c     2015-09-26 17:09:48 UTC 
(rev 36371)
+++ gnunet/src/multicast/gnunet-service-multicast.c     2015-09-26 17:09:57 UTC 
(rev 36372)
@@ -88,18 +88,33 @@
 static struct GNUNET_CONTAINER_MultiHashMap *group_members;
 
 /**
- * Incoming CADET channels.
+ * Incoming CADET channels with connected children in the tree.
  * Group's pub_key_hash -> struct Channel * (multi)
  */
 static struct GNUNET_CONTAINER_MultiHashMap *channels_in;
 
 /**
- * Outgoing CADET channels.
+ * Outgoing CADET channels connecting to parents in the tree.
  * Group's pub_key_hash -> struct Channel * (multi)
  */
 static struct GNUNET_CONTAINER_MultiHashMap *channels_out;
 
 /**
+ * Incoming replay requests from CADET.
+ * Group's pub_key_hash ->
+ *   H(fragment_id, message_id, fragment_offset, flags) -> struct Channel *
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *replay_req_cadet;
+
+/**
+ * Incoming replay requests from clients.
+ * Group's pub_key_hash ->
+ *   H(fragment_id, message_id, fragment_offset, flags) -> struct 
GNUNET_SERVER_Client *
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *replay_req_client;
+
+
+/**
  * Join status of a remote peer.
  */
 enum JoinStatus
@@ -294,6 +309,15 @@
 };
 
 
+struct ReplayRequestKey
+{
+  uint64_t fragment_id;
+  uint64_t message_id;
+  uint64_t fragment_offset;
+  uint64_t flags;
+};
+
+
 /**
  * Task run during shutdown.
  *
@@ -375,7 +399,96 @@
 }
 
 
+void
+replay_key_hash (uint64_t fragment_id, uint64_t message_id,
+                 uint64_t fragment_offset, uint64_t flags,
+                 struct GNUNET_HashCode *key_hash)
+{
+  struct ReplayRequestKey key = {
+    .fragment_id = fragment_id,
+    .message_id = message_id,
+    .fragment_offset = fragment_offset,
+    .flags = flags,
+  };
+  GNUNET_CRYPTO_hash (&key, sizeof (key), key_hash);
+}
+
+
 /**
+ * Remove channel from replay request hashmap.
+ *
+ * @param chn
+ *        Channel to remove.
+ *
+ * @return #GNUNET_YES if there are more entries to process,
+ *         #GNUNET_NO when reached end of hashmap.
+ */
+static int
+replay_req_remove_cadet (struct Channel *chn)
+{
+  struct GNUNET_CONTAINER_MultiHashMap *
+    grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
+                                                        
&chn->grp->pub_key_hash);
+  if (NULL == grp_replay_req)
+    return GNUNET_NO;
+
+  struct GNUNET_CONTAINER_MultiHashMapIterator *
+    it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
+  struct GNUNET_HashCode key;
+  const struct Channel *c;
+  while (GNUNET_YES
+         == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
+                                                         (const void **) &c))
+  {
+    if (c == chn)
+    {
+      GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, chn);
+      return GNUNET_YES;
+    }
+  }
+  GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
+  return GNUNET_NO;
+}
+
+
+/**
+ * Remove client from replay request hashmap.
+ *
+ * @param client
+ *        Client to remove.
+ *
+ * @return #GNUNET_YES if there are more entries to process,
+ *         #GNUNET_NO when reached end of hashmap.
+ */
+static int
+replay_req_remove_client (struct Group *grp, struct GNUNET_SERVER_Client 
*client)
+{
+  struct GNUNET_CONTAINER_MultiHashMap *
+    grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
+                                                        &grp->pub_key_hash);
+  if (NULL == grp_replay_req)
+    return GNUNET_NO;
+
+  struct GNUNET_CONTAINER_MultiHashMapIterator *
+    it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
+  struct GNUNET_HashCode key;
+  const struct GNUNET_SERVER_Client *c;
+  while (GNUNET_YES
+         == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
+                                                         (const void **) &c))
+  {
+    if (c == client)
+    {
+      GNUNET_CONTAINER_multihashmap_remove (replay_req_client, &key, client);
+      return GNUNET_YES;
+    }
+  }
+  GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
+  return GNUNET_NO;
+}
+
+
+/**
  * Called whenever a client is disconnected.
  *
  * Frees our resources associated with that client.
@@ -417,6 +530,8 @@
     cl = cl->next;
   }
 
+  while (GNUNET_YES == replay_req_remove_client (grp, client));
+
   if (NULL == grp->clients_head)
   { /* Last client disconnected. */
 #if FIXME
@@ -434,14 +549,29 @@
 
 
 /**
+ * Send message to a client.
+ */
+static void
+client_send (struct GNUNET_SERVER_Client *client,
+             const struct GNUNET_MessageHeader *msg)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              "%p Sending message to client.\n", client);
+
+  GNUNET_SERVER_notification_context_add (nc, client);
+  GNUNET_SERVER_notification_context_unicast (nc, client, msg, GNUNET_NO);
+}
+
+
+/**
  * Send message to all clients connected to the group.
  */
 static void
-client_send_msg (const struct Group *grp,
-                 const struct GNUNET_MessageHeader *msg)
+client_send_group (const struct Group *grp,
+                   const struct GNUNET_MessageHeader *msg)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-              "%p Sending message to clients.\n", grp);
+              "%p Sending message to all clients of the group.\n", grp);
 
   struct ClientList *cl = grp->clients_head;
   while (NULL != cl)
@@ -463,7 +593,7 @@
   const struct GNUNET_MessageHeader *msg = cls;
   struct Member *orig = origin;
 
-  client_send_msg (&orig->grp, msg);
+  client_send_group (&orig->grp, msg);
   return GNUNET_YES;
 }
 
@@ -480,7 +610,7 @@
 
   if (NULL != mem->join_dcsn)
   { /* Only send message to admitted members */
-    client_send_msg (&mem->grp, msg);
+    client_send_group (&mem->grp, msg);
   }
   return GNUNET_YES;
 }
@@ -497,19 +627,37 @@
                  const struct GNUNET_MessageHeader *msg)
 {
   int n = 0;
-  if (origins != NULL)
-    n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
-                                                     client_send_origin_cb,
-                                                     (void *) msg);
-  if (members != NULL)
-    n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash,
-                                                     client_send_member_cb,
-                                                     (void *) msg);
+  n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
+                                                   client_send_origin_cb,
+                                                   (void *) msg);
+  n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash,
+                                                   client_send_member_cb,
+                                                   (void *) msg);
   return n;
 }
 
 
 /**
+ * Send message to a random origin client or a random member client.
+ *
+ * @param grp  The group to send @a msg to.
+ * @param msg  Message to send.
+ */
+static int
+client_send_random (struct GNUNET_HashCode *pub_key_hash,
+                    const struct GNUNET_MessageHeader *msg)
+{
+  int n = 0;
+  n = GNUNET_CONTAINER_multihashmap_get_random (origins, client_send_origin_cb,
+                                                 (void *) msg);
+  if (n <= 0)
+    n = GNUNET_CONTAINER_multihashmap_get_random (members, 
client_send_member_cb,
+                                                   (void *) msg);
+  return n;
+}
+
+
+/**
  * Send message to all origin clients connected to the group.
  *
  * @param grp  The group to send @a msg to.
@@ -520,10 +668,9 @@
                     const struct GNUNET_MessageHeader *msg)
 {
   int n = 0;
-  if (origins != NULL)
-    n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
-                                                     client_send_origin_cb,
-                                                     (void *) msg);
+  n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
+                                                   client_send_origin_cb,
+                                                   (void *) msg);
   return n;
 }
 
@@ -554,7 +701,7 @@
  * @param msg  Message.
  */
 static void
-cadet_send_msg (struct Channel *chn, const struct GNUNET_MessageHeader *msg)
+cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader 
*msg)
 {
   chn->tmit_handle
     = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO,
@@ -604,7 +751,7 @@
 cadet_send_join_request (struct Member *mem)
 {
   mem->origin_channel = cadet_channel_create (&mem->grp, &mem->origin);
-  cadet_send_msg (mem->origin_channel, &mem->join_req->header);
+  cadet_send_channel (mem->origin_channel, &mem->join_req->header);
 
   uint32_t i;
   for (i = 0; i < mem->relay_count; i++)
@@ -611,7 +758,7 @@
   {
     struct Channel *
       chn = cadet_channel_create (&mem->grp, &mem->relays[i]);
-    cadet_send_msg (chn, &mem->join_req->header);
+    cadet_send_channel (chn, &mem->join_req->header);
   }
 }
 
@@ -627,7 +774,7 @@
   if (0 == memcmp (&hdcsn->member_key, &chn->member_key, sizeof 
(chn->member_key))
       && 0 == memcmp (&hdcsn->peer, &chn->peer, sizeof (chn->peer)))
   {
-    cadet_send_msg (chn, &hdcsn->header);
+    cadet_send_channel (chn, &hdcsn->header);
     return GNUNET_NO;
   }
   return GNUNET_YES;
@@ -651,30 +798,48 @@
  * Iterator callback for sending a message to origin clients.
  */
 static int
-cadet_send_members_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
-                       void *channel)
+cadet_send_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
+               void *channel)
 {
   const struct GNUNET_MessageHeader *msg = cls;
   struct Channel *chn = channel;
   if (JOIN_ADMITTED == chn->join_status)
-    cadet_send_msg (chn, msg);
+    cadet_send_channel (chn, msg);
   return GNUNET_YES;
 }
 
 
+/**
+ * Send message to all connected children.
+ */
 static int
-cadet_send_members (struct GNUNET_HashCode *pub_key_hash,
-                    const struct GNUNET_MessageHeader *msg)
+cadet_send_children (struct GNUNET_HashCode *pub_key_hash,
+                     const struct GNUNET_MessageHeader *msg)
 {
   int n = 0;
   if (channels_in != NULL)
     n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, pub_key_hash,
-                                                     cadet_send_members_cb,
-                                                     (void *) msg);
+                                                     cadet_send_cb, (void *) 
msg);
   return n;
 }
 
+
 /**
+ * Send message to all connected parents.
+ */
+static int
+cadet_send_parents (struct GNUNET_HashCode *pub_key_hash,
+                    const struct GNUNET_MessageHeader *msg)
+{
+  int n = 0;
+  if (channels_in != NULL)
+    n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_out, 
pub_key_hash,
+                                                     cadet_send_cb, (void *) 
msg);
+  return n;
+}
+
+
+/**
  * Handle a connecting client starting an origin.
  */
 static void
@@ -866,7 +1031,7 @@
 client_send_join_decision (struct Member *mem,
                            const struct MulticastJoinDecisionMessageHeader 
*hdcsn)
 {
-  client_send_msg (&mem->grp, &hdcsn->header);
+  client_send_group (&mem->grp, &hdcsn->header);
 
   const struct MulticastJoinDecisionMessage *
     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
@@ -959,9 +1124,9 @@
   }
   GNUNET_assert (GNUNET_YES == grp->is_origin);
   orig = (struct Origin *) grp;
+
   /* FIXME: yucky, should use separate message structs for P2P and CS! */
   out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (m);
-
   out->fragment_id = GNUNET_htonll (++orig->max_fragment_id);
   out->purpose.size = htonl (ntohs (out->header.size)
                              - sizeof (out->header)
@@ -976,7 +1141,7 @@
   }
 
   client_send_all (&grp->pub_key_hash, &out->header);
-  cadet_send_members (&grp->pub_key_hash, &out->header);
+  cadet_send_children (&grp->pub_key_hash, &out->header);
   GNUNET_free (out);
 
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -993,7 +1158,6 @@
   struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct 
Group);
   struct Member *mem;
   struct GNUNET_MULTICAST_RequestHeader *out;
-
   if (NULL == grp)
   {
     GNUNET_break (0);
@@ -1002,9 +1166,9 @@
   }
   GNUNET_assert (GNUNET_NO == grp->is_origin);
   mem = (struct Member *) grp;
+
   /* FIXME: yucky, should use separate message structs for P2P and CS! */
   out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (m);
-
   out->member_key = mem->pub_key;
   out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id);
   out->purpose.size = htonl (ntohs (out->header.size)
@@ -1023,7 +1187,7 @@
   { /* No local origins, send to remote origin */
     if (NULL != mem->origin_channel)
     {
-      cadet_send_msg (mem->origin_channel, &out->header);
+      cadet_send_channel (mem->origin_channel, &out->header);
     }
     else
     {
@@ -1039,6 +1203,207 @@
 
 
 /**
+ * Incoming replay request from a client.
+ */
+static void
+client_recv_replay_request (void *cls, struct GNUNET_SERVER_Client *client,
+                            const struct GNUNET_MessageHeader *m)
+{
+  struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct 
Group);
+  struct Member *mem;
+  if (NULL == grp)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+  GNUNET_assert (GNUNET_NO == grp->is_origin);
+  mem = (struct Member *) grp;
+
+  struct GNUNET_CONTAINER_MultiHashMap *
+    grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
+                                                        &grp->pub_key_hash);
+  if (NULL == grp_replay_req)
+  {
+    grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+    GNUNET_CONTAINER_multihashmap_put (replay_req_client,
+                                       &grp->pub_key_hash, grp_replay_req,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+  }
+  struct MulticastReplayRequestMessage *
+    rep = (struct MulticastReplayRequestMessage *) m;
+  struct GNUNET_HashCode key_hash;
+  replay_key_hash (rep->fragment_id, rep->message_id, rep->fragment_offset,
+                   rep->flags, &key_hash);
+  GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, client,
+                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+
+  if (0 == client_send_origin (&grp->pub_key_hash, m))
+  { /* No local origin, replay from remote members / origin. */
+    if (NULL != mem->origin_channel)
+    {
+      cadet_send_channel (mem->origin_channel, m);
+    }
+    else
+    {
+      /* FIXME: not yet connected to origin */
+      GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+      return;
+    }
+  }
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+static int
+cadet_send_replay_response_cb (void *cls,
+                               const struct GNUNET_HashCode *key_hash,
+                               void *value)
+{
+  struct Channel *chn = value;
+  struct GNUNET_MessageHeader *msg = cls;
+
+  cadet_send_channel (chn, msg);
+  return GNUNET_OK;
+}
+
+
+static int
+client_send_replay_response_cb (void *cls,
+                                const struct GNUNET_HashCode *key_hash,
+                                void *value)
+{
+  struct GNUNET_SERVER_Client *client = value;
+  struct GNUNET_MessageHeader *msg = cls;
+
+  client_send (client, msg);
+  return GNUNET_OK;
+}
+
+
+/**
+ * End of replay response from a client.
+ */
+static void
+client_recv_replay_response_end (void *cls, struct GNUNET_SERVER_Client 
*client,
+                                 const struct GNUNET_MessageHeader *m)
+{
+  struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct 
Group);
+  if (NULL == grp)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+
+  struct MulticastReplayResponseMessage *
+    res = (struct MulticastReplayResponseMessage *) m;
+
+  struct GNUNET_HashCode key_hash;
+  replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
+                   res->flags, &key_hash);
+
+  struct GNUNET_CONTAINER_MultiHashMap *
+    grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
+                                                                
&grp->pub_key_hash);
+  if (NULL != grp_replay_req_cadet)
+  {
+    GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_cadet, &key_hash);
+  }
+  struct GNUNET_CONTAINER_MultiHashMap *
+    grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get 
(replay_req_client,
+                                                               
&grp->pub_key_hash);
+  if (NULL != grp_replay_req_client)
+  {
+    GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_client, 
&key_hash);
+  }
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+/**
+ * Incoming replay response from a client.
+ *
+ * Respond with a multicast message on success, or otherwise with an error 
code.
+ */
+static void
+client_recv_replay_response (void *cls, struct GNUNET_SERVER_Client *client,
+                             const struct GNUNET_MessageHeader *m)
+{
+  struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct 
Group);
+  if (NULL == grp)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+
+  struct MulticastReplayResponseMessage *
+    res = (struct MulticastReplayResponseMessage *) m;
+
+  const struct GNUNET_MessageHeader *msg = m;
+  if (GNUNET_MULTICAST_REC_OK == res->error_code)
+  {
+    msg = (struct GNUNET_MessageHeader *) &res[1];
+  }
+
+  struct GNUNET_HashCode key_hash;
+  replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
+                   res->flags, &key_hash);
+
+  struct GNUNET_CONTAINER_MultiHashMap *
+    grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
+                                                              
&grp->pub_key_hash);
+  if (NULL != grp_replay_req_cadet)
+  {
+    GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_cadet, 
&key_hash,
+                                                cadet_send_replay_response_cb,
+                                                (void *) msg);
+  }
+  if (GNUNET_MULTICAST_REC_OK == res->error_code)
+  {
+    struct GNUNET_CONTAINER_MultiHashMap *
+      grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get 
(replay_req_client,
+                                                                 
&grp->pub_key_hash);
+    if (NULL != grp_replay_req_client)
+    {
+      GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_client, 
&key_hash,
+                                                  
client_send_replay_response_cb,
+                                                  (void *) msg);
+    }
+  }
+  else
+  {
+    client_recv_replay_response_end (cls, client, m);
+    return;
+  }
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+/**
+ * Incoming replay request from a client.
+ */
+static void
+client_recv_replay_request_cancel (void *cls, struct GNUNET_SERVER_Client 
*client,
+                                   const struct GNUNET_MessageHeader *m)
+{
+  struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct 
Group);
+}
+
+
+/**
+ * Incoming replay request from a client.
+ */
+static void
+client_recv_membership_test_result (void *cls, struct GNUNET_SERVER_Client 
*client,
+                                    const struct GNUNET_MessageHeader *m)
+{
+  struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct 
Group);
+}
+
+
+/**
  * A new client connected.
  */
 static void
@@ -1053,22 +1418,37 @@
  * Message handlers for the server.
  */
 static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
-  { &client_recv_origin_start, NULL,
+  { client_recv_origin_start, NULL,
     GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, 0 },
 
-  { &client_recv_member_join, NULL,
+  { client_recv_member_join, NULL,
     GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, 0 },
 
-  { &client_recv_join_decision, NULL,
+  { client_recv_join_decision, NULL,
     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, 0 },
 
-  { &client_recv_multicast_message, NULL,
+  { client_recv_multicast_message, NULL,
     GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
 
-  { &client_recv_multicast_request, NULL,
+  { client_recv_multicast_request, NULL,
     GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
 
-  {NULL, NULL, 0, 0}
+  { client_recv_replay_request, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, 0 },
+
+  { client_recv_replay_request_cancel, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL, 0 },
+
+  { client_recv_replay_response, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, 0 },
+
+  { client_recv_replay_response_end, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END, 0 },
+
+  { client_recv_membership_test_result, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_MEMBERSHIP_TEST_RESULT, 0 },
+
+  { NULL, NULL, 0, 0 }
 };
 
 
@@ -1107,6 +1487,9 @@
         mem->origin_channel = NULL;
     }
   }
+
+  while (GNUNET_YES == replay_req_remove_cadet (chn));
+
   GNUNET_free (chn);
 }
 
@@ -1320,12 +1703,99 @@
 
 
 /**
+ * Incoming multicast replay request from CADET.
+ */
+int
+cadet_recv_replay_request (void *cls,
+                           struct GNUNET_CADET_Channel *channel,
+                           void **ctx,
+                           const struct GNUNET_MessageHeader *m)
+{
+  struct MulticastReplayRequestMessage rep;
+  uint16_t size = ntohs (m->size);
+  if (size < sizeof (rep))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  struct Channel *chn = *ctx;
+
+  memcpy (&rep, m, sizeof (rep));
+  memcpy (&rep.member_key, &chn->member_key, sizeof (chn->member_key));
+
+  struct GNUNET_CONTAINER_MultiHashMap *
+    grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
+                                                        
&chn->grp->pub_key_hash);
+  if (NULL == grp_replay_req)
+  {
+    grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+    GNUNET_CONTAINER_multihashmap_put (replay_req_cadet,
+                                       &chn->grp->pub_key_hash, grp_replay_req,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+  }
+  struct GNUNET_HashCode key_hash;
+  replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset,
+                   rep.flags, &key_hash);
+  GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
+                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+
+  client_send_random (&chn->group_key_hash, &rep.header);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming multicast replay request cancellation from CADET.
+ */
+int
+cadet_recv_replay_request_cancel (void *cls,
+                                  struct GNUNET_CADET_Channel *channel,
+                                  void **ctx,
+                                  const struct GNUNET_MessageHeader *m)
+{
+
+}
+
+
+/**
+ * Incoming multicast replay response from CADET.
+ */
+int
+cadet_recv_replay_response (void *cls,
+                            struct GNUNET_CADET_Channel *channel,
+                            void **ctx,
+                            const struct GNUNET_MessageHeader *m)
+{
+  struct Channel *chn = *ctx;
+
+  /* @todo FIXME: got replay error response, send request to other members */
+
+  return GNUNET_OK;
+}
+
+
+/**
  * Message handlers for CADET.
  */
 static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
-  { &cadet_recv_join_request, GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 0 },
-  { &cadet_recv_message, GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
-  { &cadet_recv_request, GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
+  { cadet_recv_join_request,
+    GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 0 },
+
+  { cadet_recv_message,
+    GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
+
+  { cadet_recv_request,
+    GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
+
+  { cadet_recv_replay_request,
+    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, 0 },
+
+  { cadet_recv_replay_request_cancel,
+    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL, 0 },
+
+  { cadet_recv_replay_response,
+    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, 0 },
+
   { NULL, 0, 0 }
 };
 
@@ -1350,6 +1820,8 @@
   group_members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
   channels_in = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
   channels_out = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+  replay_req_cadet = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+  replay_req_client = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
 
   cadet = GNUNET_CADET_connect (cfg, NULL,
                                 &cadet_notify_channel_new,

Modified: gnunet/src/multicast/multicast.h
===================================================================
--- gnunet/src/multicast/multicast.h    2015-09-26 17:09:48 UTC (rev 36371)
+++ gnunet/src/multicast/multicast.h    2015-09-26 17:09:57 UTC (rev 36372)
@@ -27,6 +27,9 @@
 #ifndef MULTICAST_H
 #define MULTICAST_H
 
+#include "platform.h"
+#include "gnunet_multicast_service.h"
+
 GNUNET_NETWORK_STRUCT_BEGIN
 
 
@@ -157,49 +160,91 @@
 
 
 /**
- * Message sent from the client to the service to give the service
- * a replayed message.
+ * Message sent from the client to the service OR the service to the
+ * client asking for a message fragment to be replayed.
  */
-struct MulticastReplayResponseMessage
+struct MulticastReplayRequestMessage
 {
 
   /**
-   *
+   * The message type can be either
+   * #GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST or
+   * #GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL.
    */
   struct GNUNET_MessageHeader header;
 
   /**
-   * Unique ID that identifies the associated replay session.
+   * S->C: Public key of the member requesting replay.
+   * C->S: Unused.
    */
-  uint32_t uid;
+  struct GNUNET_CRYPTO_EcdsaPublicKey member_key;
 
   /**
-   * An `enum GNUNET_MULTICAST_ReplayErrorCode` identifying issues (in NBO).
+   * ID of the message that is being requested.
    */
-  int32_t error_code;
+  uint64_t fragment_id;
 
-  /* followed by replayed message */
+  /**
+   * ID of the message that is being requested.
+   */
+  uint64_t message_id;
 
+  /**
+   * Offset of the fragment that is being requested.
+   */
+  uint64_t fragment_offset;
+
+  /**
+   * Additional flags for the request.
+   */
+  uint64_t flags;
+
+  /**
+   * Replay request ID.
+   */
+  uint32_t uid;
 };
 
 
 /**
- * Message sent from the client to the service to notify the service
- * about the end of a replay session.
+ * Message sent from the client to the service to give the service
+ * a replayed message.
  */
-struct MulticastReplayEndMessage
+struct MulticastReplayResponseMessage
 {
 
   /**
-   *
+   * Type: GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE
+   *    or GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END
    */
   struct GNUNET_MessageHeader header;
 
   /**
-   * Unique ID that identifies the associated replay session.
+   * ID of the message that is being requested.
    */
-  uint32_t uid;
+  uint64_t fragment_id;
 
+  /**
+   * ID of the message that is being requested.
+   */
+  uint64_t message_id;
+
+  /**
+   * Offset of the fragment that is being requested.
+   */
+  uint64_t fragment_offset;
+
+  /**
+   * Additional flags for the request.
+   */
+  uint64_t flags;
+
+  /**
+   * An `enum GNUNET_MULTICAST_ReplayErrorCode` identifying issues (in NBO).
+   */
+  int32_t error_code;
+
+  /* followed by replayed message */
 };
 
 
@@ -253,6 +298,7 @@
   /* Followed by struct GNUNET_MessageHeader join_msg */
 };
 
+
 #if NOT_USED
 /**
  * Message sent from the client to the service to broadcast to all group
@@ -323,44 +369,6 @@
 
 
 /**
- * Message sent from the client to the service OR the service to the
- * client asking for a message fragment to be replayed.
- */
-struct MulticastReplayRequestMessage
-{
-
-  /**
-   * The message type can be either
-   * #GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST or
-   * #GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL.
-   */
-  struct GNUNET_MessageHeader header;
-
-  /**
-   * Replay request ID.
-   */
-  uint32_t uid;
-
-  /**
-   * ID of the message that is being requested.
-   */
-  uint64_t message_id;
-
-  /**
-   * Offset of the fragment that is being requested.
-   */
-  uint64_t fragment_offset;
-
-  /**
-   * Additional flags for the request.
-   */
-  uint64_t flags;
-
-};
-
-
-
-/**
  * Message sent from the client to the service to unicast to the group origin.
  */
 struct MulticastUnicastToOriginMessage

Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c        2015-09-26 17:09:48 UTC (rev 
36371)
+++ gnunet/src/multicast/multicast_api.c        2015-09-26 17:09:57 UTC (rev 
36372)
@@ -137,6 +137,11 @@
 
   GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb;
 
+  /**
+   * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle *
+   */
+  struct GNUNET_CONTAINER_MultiHashMap *replay_reqs;
+
   uint64_t next_fragment_id;
 };
 
@@ -176,6 +181,8 @@
  */
 struct GNUNET_MULTICAST_ReplayHandle
 {
+  struct GNUNET_MULTICAST_Group *grp;
+  struct MulticastReplayRequestMessage req;
 };
 
 
@@ -184,6 +191,9 @@
  */
 struct GNUNET_MULTICAST_MemberReplayHandle
 {
+
+  GNUNET_MULTICAST_ResultCallback result_cb;
+  void *result_cls;
 };
 
 
@@ -263,11 +273,14 @@
   struct GNUNET_MULTICAST_MessageHeader *
     mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg;
 
+  if (GNUNET_YES == grp->is_disconnecting)
+    return;
+
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Calling message callback with a message of size %u.\n",
               ntohs (mmsg->header.size));
 
-  if (GNUNET_YES != grp->is_disconnecting && NULL != grp->message_cb)
+  if (NULL != grp->message_cb)
     grp->message_cb (grp->cb_cls, mmsg);
 }
 
@@ -297,6 +310,73 @@
 
 
 /**
+ * Receive multicast replay request from service.
+ */
+static void
+group_recv_replay_request (void *cls,
+                           struct GNUNET_CLIENT_MANAGER_Connection *client,
+                           const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_MULTICAST_Group *
+    grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+  struct MulticastReplayRequestMessage *
+    rep = (struct MulticastReplayRequestMessage *) msg;
+
+  if (GNUNET_YES == grp->is_disconnecting)
+    return;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n");
+
+  if (0 != rep->fragment_id)
+  {
+    if (NULL != grp->replay_frag_cb)
+    {
+      struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
+      rh->grp = grp;
+      rh->req = *rep;
+      grp->replay_frag_cb (grp->cb_cls, &rep->member_key,
+                           GNUNET_ntohll (rep->fragment_id),
+                           GNUNET_ntohll (rep->flags), rh);
+    }
+  }
+  else if (0 != rep->message_id)
+  {
+    if (NULL != grp->replay_msg_cb)
+    {
+      struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
+      rh->grp = grp;
+      rh->req = *rep;
+      grp->replay_msg_cb (grp->cb_cls, &rep->member_key,
+                          GNUNET_ntohll (rep->message_id),
+                          GNUNET_ntohll (rep->fragment_offset),
+                          GNUNET_ntohll (rep->flags), rh);
+    }
+  }
+}
+
+
+/**
+ * Receive multicast replay request from service.
+ */
+static void
+member_recv_replay_response (void *cls,
+                            struct GNUNET_CLIENT_MANAGER_Connection *client,
+                            const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_MULTICAST_Group *grp;
+  struct GNUNET_MULTICAST_Member *
+    mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+  grp = &mem->grp;
+  struct MulticastReplayResponseMessage *
+    res = (struct MulticastReplayResponseMessage *) msg;
+
+  if (GNUNET_YES == grp->is_disconnecting)
+    return;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
+}
+
+/**
  * Member receives join decision.
  */
 static void
@@ -369,20 +449,24 @@
  */
 static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
 {
-  { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
+  { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
 
-  { &group_recv_message, NULL,
+  { group_recv_message, NULL,
     GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
     sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
 
-  { &origin_recv_request, NULL,
+  { origin_recv_request, NULL,
     GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
     sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
 
-  { &group_recv_join_request, NULL,
+  { group_recv_join_request, NULL,
     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
     sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
 
+  { group_recv_replay_request, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+    sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
+
   { NULL, NULL, 0, 0, GNUNET_NO }
 };
 
@@ -392,20 +476,28 @@
  */
 static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
 {
-  { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
+  { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
 
-  { &group_recv_message, NULL,
+  { group_recv_message, NULL,
     GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
     sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
 
-  { &group_recv_join_request, NULL,
+  { group_recv_join_request, NULL,
     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
     sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
 
-  { &member_recv_join_decision, NULL,
+  { member_recv_join_decision, NULL,
     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
     sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES },
 
+  { group_recv_replay_request, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+    sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
+
+  { member_recv_replay_response, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
+    sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
+
   { NULL, NULL, 0, 0, GNUNET_NO }
 };
 
@@ -514,9 +606,13 @@
 /**
  * Replay a message fragment for the multicast group.
  *
- * @param rh Replay handle identifying which replay operation was requested.
- * @param msg Replayed message fragment, NULL if unknown/error.
- * @param ec Error code.
+ * @param rh
+ *        Replay handle identifying which replay operation was requested.
+ * @param msg
+ *        Replayed message fragment, NULL if not found / an error occurred.
+ * @param ec
+ *        Error code.  See enum GNUNET_MULTICAST_ReplayErrorCode
+ *        If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated.
  */
 void
 GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
@@ -523,6 +619,32 @@
                                   const struct GNUNET_MessageHeader *msg,
                                   enum GNUNET_MULTICAST_ReplayErrorCode ec)
 {
+  uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
+  struct MulticastReplayResponseMessage *
+    res = GNUNET_malloc (sizeof (*res) + msg_size);
+  *res = (struct MulticastReplayResponseMessage) {
+    .header = {
+      .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE),
+      .size = htons (sizeof (*res) + msg_size),
+    },
+    .fragment_id = rh->req.fragment_id,
+    .message_id = rh->req.message_id,
+    .fragment_offset = rh->req.fragment_offset,
+    .flags = rh->req.flags,
+    .error_code = htonl (ec),
+  };
+
+  if (GNUNET_MULTICAST_REC_OK == ec)
+  {
+    GNUNET_assert (NULL != msg);
+    memcpy (&res[1], msg, msg_size);
+  }
+
+  GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &res->header);
+  GNUNET_free (res);
+
+  if (GNUNET_MULTICAST_REC_OK != ec)
+    GNUNET_free (rh);
 }
 
 
@@ -536,6 +658,19 @@
 void
 GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
 {
+  struct MulticastReplayResponseMessage end = {
+    .header = {
+      .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END),
+      .size = htons (sizeof (end)),
+    },
+    .fragment_id = rh->req.fragment_id,
+    .message_id = rh->req.message_id,
+    .fragment_offset = rh->req.fragment_offset,
+    .flags = rh->req.flags,
+  };
+
+  GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &end.header);
+  GNUNET_free (rh);
 }
 
 
@@ -827,6 +962,7 @@
   grp->join_req_cb = join_request_cb;
   grp->member_test_cb = member_test_cb;
   grp->replay_frag_cb = replay_frag_cb;
+  grp->replay_msg_cb = replay_msg_cb;
   grp->message_cb = message_cb;
   grp->cb_cls = cls;
 
@@ -864,6 +1000,27 @@
 }
 
 
+void
+member_replay_request (struct GNUNET_MULTICAST_Member *mem,
+                       uint64_t fragment_id,
+                       uint64_t message_id,
+                       uint64_t fragment_offset,
+                       uint64_t flags)
+{
+  struct MulticastReplayRequestMessage rep = {
+    .header = {
+      .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST),
+      .size = htons (sizeof (rep)),
+    },
+    .fragment_id = GNUNET_htonll (fragment_id),
+    .message_id = GNUNET_htonll (message_id),
+    .fragment_offset = GNUNET_htonll (fragment_offset),
+    .flags = GNUNET_htonll (flags),
+  };
+  GNUNET_CLIENT_MANAGER_transmit (mem->grp.client, &rep.header);
+}
+
+
 /**
  * Request a fragment to be replayed by fragment ID.
  *
@@ -870,20 +1027,28 @@
  * Useful if messages below the @e max_known_fragment_id given when joining are
  * needed and not known to the client.
  *
- * @param member Membership handle.
- * @param fragment_id ID of a message fragment that this client would like to
-          see replayed.
- * @param flags Additional flags for the replay request.  It is used and 
defined
- *        by the replay callback.  FIXME: which replay callback? FIXME: use 
enum?
- *        FIXME: why not pass reply cb here?
- * @return Replay request handle, NULL on error.
+ * @param member
+ *        Membership handle.
+ * @param fragment_id
+ *        ID of a message fragment that this client would like to see replayed.
+ * @param flags
+ *        Additional flags for the replay request.
+ *        It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
+ * @param result_cb
+ *        Function to call when the replayed message fragment arrives.
+ * @param result_cls
+ *        Closure for @a result_cb.
+ *
+ * @return Replay request handle.
  */
 struct GNUNET_MULTICAST_MemberReplayHandle *
-GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member 
*member,
+GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
                                          uint64_t fragment_id,
-                                         uint64_t flags)
+                                         uint64_t flags,
+                                         GNUNET_MULTICAST_ResultCallback 
result_cb,
+                                         void *result_cls)
 {
-  return NULL;
+  member_replay_request (mem, fragment_id, 0, 0, flags);
 }
 
 
@@ -893,24 +1058,31 @@
  * Useful if messages below the @e max_known_fragment_id given when joining are
  * needed and not known to the client.
  *
- * @param member Membership handle.
- * @param message_id ID of the message this client would like to see replayed.
- * @param fragment_offset Offset of the fragment within the message to replay.
- * @param flags Additional flags for the replay request.  It is used & defined
- *        by the replay callback.
- * @param result_cb Function to be called for the replayed message.
- * @param result_cb_cls Closure for @a result_cb.
+ * @param member
+ *        Membership handle.
+ * @param message_id
+ *        ID of the message this client would like to see replayed.
+ * @param fragment_offset
+ *        Offset of the fragment within the message to replay.
+ * @param flags
+ *        Additional flags for the replay request.
+ *        It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
+ * @param result_cb
+ *        Function to call for each replayed message fragment.
+ * @param result_cls
+ *        Closure for @a result_cb.
+ *
  * @return Replay request handle, NULL on error.
  */
 struct GNUNET_MULTICAST_MemberReplayHandle *
-GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *member,
+GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
                                         uint64_t message_id,
                                         uint64_t fragment_offset,
                                         uint64_t flags,
                                         GNUNET_MULTICAST_ResultCallback 
result_cb,
-                                        void *result_cb_cls)
+                                        void *result_cls)
 {
-  return NULL;
+  member_replay_request (mem, 0, message_id, fragment_offset, flags);
 }
 
 

Modified: gnunet/src/multicast/test_multicast.c
===================================================================
--- gnunet/src/multicast/test_multicast.c       2015-09-26 17:09:48 UTC (rev 
36371)
+++ gnunet/src/multicast/test_multicast.c       2015-09-26 17:09:57 UTC (rev 
36372)
@@ -84,17 +84,21 @@
 
 enum
 {
-  TEST_NONE               = 0,
-  TEST_ORIGIN_START       = 1,
-  TEST_MEMBER_JOIN_REFUSE = 2,
-  TEST_MEMBER_JOIN_ADMIT  = 3,
-  TEST_ORIGIN_TO_ALL      = 4,
-  TEST_ORIGIN_TO_ALL_RECV = 5,
-  TEST_MEMBER_TO_ORIGIN   = 6,
-  TEST_MEMBER_PART        = 7,
-  TEST_ORIGIN_STOP        = 8,
+  TEST_NONE                = 0,
+  TEST_ORIGIN_START        = 1,
+  TEST_MEMBER_JOIN_REFUSE  = 2,
+  TEST_MEMBER_JOIN_ADMIT   = 3,
+  TEST_ORIGIN_TO_ALL       = 4,
+  TEST_ORIGIN_TO_ALL_RECV  = 5,
+  TEST_MEMBER_TO_ORIGIN    = 6,
+  TEST_MEMBER_REPLAY_ERROR = 7,
+  TEST_MEMBER_REPLAY_OK    = 8,
+  TEST_MEMBER_PART         = 9,
+  TEST_ORIGIN_STOP        = 10,
 } test;
 
+uint64_t replay_fragment_id;
+uint64_t replay_flags;
 
 static void
 member_join (int t);
@@ -230,56 +234,6 @@
 
 
 static void
-origin_recv_replay_msg (void *cls,
-                        const struct GNUNET_CRYPTO_EcdsaPublicKey *member_key,
-                        uint64_t message_id,
-                        uint64_t fragment_offset,
-                        uint64_t flags,
-                        struct GNUNET_MULTICAST_ReplayHandle *rh)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Test #%u: origin_recv_replay_msg()\n", test);
-}
-
-
-static void
-member_recv_replay_msg (void *cls,
-                        const struct GNUNET_CRYPTO_EcdsaPublicKey *member_key,
-                        uint64_t message_id,
-                        uint64_t fragment_offset,
-                        uint64_t flags,
-                        struct GNUNET_MULTICAST_ReplayHandle *rh)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Test #%u: member_recv_replay_msg()\n", test);
-}
-
-
-static void
-origin_recv_replay_frag (void *cls,
-                         const struct GNUNET_CRYPTO_EcdsaPublicKey *member_key,
-                         uint64_t fragment_id,
-                         uint64_t flags,
-                         struct GNUNET_MULTICAST_ReplayHandle *rh)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Test #%u: origin_recv_replay_frag()\n", test);
-}
-
-
-static void
-member_recv_replay_frag (void *cls,
-                         const struct GNUNET_CRYPTO_EcdsaPublicKey *member_key,
-                         uint64_t fragment_id,
-                         uint64_t flags,
-                         struct GNUNET_MULTICAST_ReplayHandle *rh)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Test #%u: member_recv_replay_frag()\n", test);
-}
-
-
-static void
 origin_recv_membership_test (void *cls,
                              const struct GNUNET_CRYPTO_EcdsaPublicKey 
*member_key,
                              uint64_t message_id,
@@ -339,6 +293,7 @@
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Test #%u: member_parted()\n", test);
+  member = NULL;
 
   switch (test)
   {
@@ -359,24 +314,139 @@
 
 
 static void
+schedule_member_part (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Test #%u: schedule_member_part()\n", test);
+  GNUNET_MULTICAST_member_part (member, member_parted, NULL);
+}
+
+
+static void
 member_part ()
 {
   test = TEST_MEMBER_PART;
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
               "Test #%u: member_part()\n", test);
-  GNUNET_MULTICAST_member_part (member, member_parted, NULL);
-  member = NULL;
+  GNUNET_SCHEDULER_add_now (schedule_member_part, NULL);
 }
 
 
 static void
-schedule_member_part (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+member_replay_ok ()
 {
-  GNUNET_MULTICAST_member_part (member, member_parted, NULL);
+  test = TEST_MEMBER_REPLAY_OK;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Test #%u: member_replay_ok()\n", test);
+  replay_fragment_id = 1;
+  replay_flags = 1 | 1<<11;
+  GNUNET_MULTICAST_member_replay_fragment (member, replay_fragment_id,
+                                           replay_flags, NULL, NULL);
 }
 
 
 static void
+member_replay_error ()
+{
+  test = TEST_MEMBER_REPLAY_ERROR;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Test #%u: member_replay_error()\n", test);
+  replay_fragment_id = 1234;
+  replay_flags = 11 | 1<<11;
+  GNUNET_MULTICAST_member_replay_fragment (member, replay_fragment_id,
+                                           replay_flags, NULL, NULL);
+}
+
+
+static void
+origin_recv_replay_msg (void *cls,
+                        const struct GNUNET_CRYPTO_EcdsaPublicKey *member_key,
+                        uint64_t message_id,
+                        uint64_t fragment_offset,
+                        uint64_t flags,
+                        struct GNUNET_MULTICAST_ReplayHandle *rh)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Test #%u: origin_recv_replay_msg()\n", test);
+  GNUNET_assert (0);
+}
+
+
+static void
+member_recv_replay_msg (void *cls,
+                        const struct GNUNET_CRYPTO_EcdsaPublicKey *member_key,
+                        uint64_t message_id,
+                        uint64_t fragment_offset,
+                        uint64_t flags,
+                        struct GNUNET_MULTICAST_ReplayHandle *rh)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Test #%u: member_recv_replay_msg()\n", test);
+  GNUNET_assert (0);
+}
+
+
+static void
+origin_recv_replay_frag (void *cls,
+                         const struct GNUNET_CRYPTO_EcdsaPublicKey *member_key,
+                         uint64_t fragment_id,
+                         uint64_t flags,
+                         struct GNUNET_MULTICAST_ReplayHandle *rh)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Test #%u: origin_recv_replay_frag()"
+              " - fragment_id=%" PRIu64 " flags=%" PRIu64 "\n",
+              test, fragment_id, flags);
+  GNUNET_assert (replay_fragment_id == fragment_id && replay_flags == flags);
+  switch (test)
+  {
+  case TEST_MEMBER_REPLAY_ERROR:
+    GNUNET_MULTICAST_replay_response (rh, NULL, GNUNET_SYSERR);
+    member_replay_ok ();
+    break;
+
+  case TEST_MEMBER_REPLAY_OK:
+  {
+    struct GNUNET_MULTICAST_MessageHeader mmsg = {
+      .header = {
+        .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE),
+        .size = htons (sizeof (mmsg)),
+      },
+      .fragment_id = GNUNET_htonll (1),
+      .message_id = GNUNET_htonll (1),
+      .fragment_offset = 0,
+      .group_generation = GNUNET_htonll (1),
+      .flags = 0,
+    };
+    member_cls.n = 0;
+    member_cls.msgs_expected = 1;
+    GNUNET_MULTICAST_replay_response (rh, &mmsg.header, 
GNUNET_MULTICAST_REC_OK);
+    GNUNET_MULTICAST_replay_response_end (rh);
+    break;
+  }
+
+  default:
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Invalid test #%d in origin_recv_replay_frag()\n", test);
+    GNUNET_assert (0);
+  }
+}
+
+
+static void
+member_recv_replay_frag (void *cls,
+                         const struct GNUNET_CRYPTO_EcdsaPublicKey *member_key,
+                         uint64_t fragment_id,
+                         uint64_t flags,
+                         struct GNUNET_MULTICAST_ReplayHandle *rh)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Test #%u: member_recv_replay_frag()\n", test);
+  GNUNET_assert (0);
+}
+
+
+static void
 origin_recv_request (void *cls,
                      const struct GNUNET_MULTICAST_RequestHeader *req)
 {
@@ -385,7 +455,7 @@
               "Test #%u: origin_recv_request()\n", test);
   if (++ocls->n != ocls->msgs_expected)
     return;
-  
+
   GNUNET_assert (0 == memcmp (&req->member_key,
                               &member_pub_key, sizeof (member_pub_key)));
 
@@ -392,7 +462,7 @@
 
   // FIXME: check message content
 
-  member_part ();
+  member_replay_error ();
 }
 
 
@@ -437,6 +507,11 @@
     member_to_origin ();
     break;
 
+  case TEST_MEMBER_REPLAY_OK:
+    GNUNET_assert (replay_fragment_id == GNUNET_ntohll (msg->fragment_id));
+    member_part ();
+    break;
+
   default:
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "Invalid test #%d in origin_recv_message()\n", test);
@@ -516,7 +591,7 @@
     GNUNET_assert (0 == relay_count);
     GNUNET_SCHEDULER_add_now (schedule_member_part, NULL);
     break;
-    
+
   case TEST_MEMBER_JOIN_ADMIT:
     GNUNET_assert (1 == relay_count);
     GNUNET_assert (0 == memcmp (relays, &this_peer, sizeof (this_peer)));
@@ -551,13 +626,13 @@
   join_resp->size = htons (sizeof (join_resp) + data_size);
   join_resp->type = htons (456);
   memcpy (&join_resp[1], data, data_size);
-  
+
   switch (test)
   {
   case TEST_MEMBER_JOIN_REFUSE:
     GNUNET_MULTICAST_join_decision (jh, GNUNET_NO, 0, NULL, join_resp);
     break;
-    
+
   case TEST_MEMBER_JOIN_ADMIT:
     GNUNET_MULTICAST_join_decision (jh, GNUNET_YES, 1, &this_peer, join_resp);
     break;

Modified: gnunet/src/util/client_manager.c
===================================================================
--- gnunet/src/util/client_manager.c    2015-09-26 17:09:48 UTC (rev 36371)
+++ gnunet/src/util/client_manager.c    2015-09-26 17:09:57 UTC (rev 36372)
@@ -491,7 +491,7 @@
                                 struct GNUNET_MessageHeader *msg)
 {
   struct MessageQueueItem *mqi = GNUNET_malloc (sizeof (*mqi));
-  mqi->msg = msg;
+  mqi->msg = GNUNET_copy_message (msg);
   GNUNET_CONTAINER_DLL_insert_tail (mgr->tmit_head, mgr->tmit_tail, mqi);
   transmit_next (mgr);
 }
@@ -511,7 +511,7 @@
                                     struct GNUNET_MessageHeader *msg)
 {
   struct MessageQueueItem *mqi = GNUNET_malloc (sizeof (*mqi));
-  mqi->msg = msg;
+  mqi->msg = GNUNET_copy_message (msg);
   GNUNET_CONTAINER_DLL_insert (mgr->tmit_head, mgr->tmit_tail, mqi);
   transmit_next (mgr);
 }




reply via email to

[Prev in Thread] Current Thread [Next in Thread]