gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r33313 - in gnunet/src: include multicast psyc


From: gnunet
Subject: [GNUnet-SVN] r33313 - in gnunet/src: include multicast psyc
Date: Sat, 17 May 2014 12:16:15 +0200

Author: tg
Date: 2014-05-17 12:16:15 +0200 (Sat, 17 May 2014)
New Revision: 33313

Modified:
   gnunet/src/include/gnunet_protocols.h
   gnunet/src/include/gnunet_psyc_service.h
   gnunet/src/include/gnunet_signatures.h
   gnunet/src/multicast/gnunet-service-multicast.c
   gnunet/src/multicast/multicast.h
   gnunet/src/multicast/multicast_api.c
   gnunet/src/psyc/gnunet-service-psyc.c
   gnunet/src/psyc/psyc.h
   gnunet/src/psyc/psyc_api.c
   gnunet/src/psyc/test_psyc.c
Log:
multicast, psyc: client connections, join requests

Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2014-05-17 04:47:33 UTC (rev 
33312)
+++ gnunet/src/include/gnunet_protocols.h       2014-05-17 10:16:15 UTC (rev 
33313)
@@ -2338,73 +2338,63 @@
 #define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START 750
 
 /**
- * C->S: Stop the origin.
- */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_STOP 751
-
-/**
  * C->S: Join group as a member.
  */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN 752
+#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN 751
 
 /**
- * C->S: Part the group.
- */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_PART 753
-
-/**
- * C<->S<->T: Multicast message from the origin to all members.
- */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 754
-
-/**
- * C<->S<->T: Unicast request from a group member to the origin.
- */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 755
-
-/**
  * C<--S<->T: A peer wants to join the group.
  *
  * Unicast message to the origin or another group member.
  */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST
+#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST 752
 
 /**
  * C<->S<->T: Response to a join request.
  *
  * Unicast message from a group member to the peer wanting to join.
  */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION
+#define GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION 753
 
 /**
  * A peer wants to part the group.
  */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST
+#define GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST 754
 
 /**
  * Acknowledgement sent in response to a part request.
  *
  * Unicast message from a group member to the peer wanting to part.
  */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK
+#define GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK 755
 
 /**
  * Group terminated.
  */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_GROUP_END
+#define GNUNET_MESSAGE_TYPE_MULTICAST_GROUP_END 756
 
 /**
- *
+ * C<->S<->T: Multicast message from the origin to all members.
  */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST
+#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 757
 
 /**
- *
+ * C<->S<->T: Unicast request from a group member to the origin.
  */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 758
 
+/**
+ * C<->S<->T: Replay request from a group member to another member.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST 759
 
+/**
+ * C<->S<->T: Cancellation of a replay request.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL 760
 
+
+
 
/*******************************************************************************
  * SECRETSHARING message types
  
******************************************************************************/

Modified: gnunet/src/include/gnunet_psyc_service.h
===================================================================
--- gnunet/src/include/gnunet_psyc_service.h    2014-05-17 04:47:33 UTC (rev 
33312)
+++ gnunet/src/include/gnunet_psyc_service.h    2014-05-17 10:16:15 UTC (rev 
33313)
@@ -620,9 +620,7 @@
  * @param message_cb Function to invoke on message parts received from the
  *        channel, typically at least contains method handlers for @e join and
  *        @e part.
- * @param join_cb function invoked once we have joined with the current
- *        message ID of the channel
- * @param slave_joined_cb Function to invoke when a peer wants to join.
+ * @param slave_joined_cb Function invoked once we have joined the channel.
  * @param cls Closure for @a message_cb and @a slave_joined_cb.
  * @param method_name Method name for the join request.
  * @param env Environment containing transient variables for the request, or 
NULL.
@@ -638,7 +636,6 @@
                         uint32_t relay_count,
                         const struct GNUNET_PeerIdentity *relays,
                         GNUNET_PSYC_MessageCallback message_cb,
-                        GNUNET_PSYC_JoinCallback join_cb,
                         GNUNET_PSYC_SlaveJoinCallback slave_joined_cb,
                         void *cls,
                         const char *method_name,

Modified: gnunet/src/include/gnunet_signatures.h
===================================================================
--- gnunet/src/include/gnunet_signatures.h      2014-05-17 04:47:33 UTC (rev 
33312)
+++ gnunet/src/include/gnunet_signatures.h      2014-05-17 10:16:15 UTC (rev 
33313)
@@ -137,7 +137,7 @@
 #define GNUNET_SIGNATURE_PURPOSE_REGEX_ACCEPT 18
 
 /**
- * Signature of a multicast message.
+ * Signature of a multicast message sent by the origin.
  */
 #define GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE 19
 
@@ -166,7 +166,12 @@
  */
 #define GNUNET_SIGNATURE_PURPOSE_SECRETSHARING_DECRYPTION 23
 
+/**
+ * Signature of a multicast request sent by a member.
+ */
+#define GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST 24
 
+
 #if 0                           /* keep Emacsens' auto-indent happy */
 {
 #endif

Modified: gnunet/src/multicast/gnunet-service-multicast.c
===================================================================
--- gnunet/src/multicast/gnunet-service-multicast.c     2014-05-17 04:47:33 UTC 
(rev 33312)
+++ gnunet/src/multicast/gnunet-service-multicast.c     2014-05-17 10:16:15 UTC 
(rev 33313)
@@ -46,22 +46,40 @@
 
 /**
  * All connected origins.
- * Group's pub_key_hash -> struct Group
+ * Group's pub_key_hash -> struct Origin
  */
 static struct GNUNET_CONTAINER_MultiHashMap *origins;
 
 /**
  * All connected members.
- * Group's pub_key_hash -> struct Group
+ * Group's pub_key_hash -> struct Member
  */
 static struct GNUNET_CONTAINER_MultiHashMap *members;
 
 /**
+ * Connected members per group.
+ * Group's pub_key_hash -> Member's pub_key -> struct Member
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *group_members;
+
+
+/**
+ * List of connected clients.
+ */
+struct ClientList
+{
+  struct ClientList *prev;
+  struct ClientList *next;
+  struct GNUNET_SERVER_Client *client;
+};
+
+/**
  * Common part of the client context for both an origin and member.
  */
 struct Group
 {
-  struct GNUNET_SERVER_Client *client;
+  struct ClientList *clients_head;
+  struct ClientList *clients_tail;
 
   /**
    * Public key of the group.
@@ -117,6 +135,29 @@
   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
 
   /**
+   * Public key of the member.
+   */
+  struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+
+  /**
+   * Hash of @a pub_key.
+   */
+  struct GNUNET_HashCode pub_key_hash;
+
+  /**
+   * Join request sent to the origin / members.
+   */
+  struct GNUNET_MULTICAST_JoinRequest *join_request;
+
+  /**
+   * Join decision sent in reply to our request.
+   *
+   * Only a positive decision is stored here, in case of a negative decision 
the
+   * client is disconnected.
+   */
+  struct MulticastJoinDecisionMessage *join_decision;
+
+  /**
    * Last request fragment ID sent to the origin.
    */
   uint64_t max_fragment_id;
@@ -135,23 +176,161 @@
   /* FIXME: do clean up here */
 }
 
+/**
+ * Clean up origin data structures after a client disconnected.
+ */
+static void
+cleanup_origin (struct Origin *orig)
+{
+  struct Group *grp = &orig->grp;
+  GNUNET_CONTAINER_multihashmap_remove (origins, &grp->pub_key_hash, orig);
+}
 
+
 /**
- * Iterator callback for sending a message to clients.
+ * Clean up member data structures after a client disconnected.
  */
+static void
+cleanup_member (struct Member *mem)
+{
+  struct Group *grp = &mem->grp;
+  struct GNUNET_CONTAINER_MultiHashMap *
+    grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members,
+                                                 &grp->pub_key_hash);
+  GNUNET_assert (NULL != grp_mem);
+  GNUNET_CONTAINER_multihashmap_remove (grp_mem, &mem->pub_key_hash, mem);
+
+  if (0 == GNUNET_CONTAINER_multihashmap_size (grp_mem))
+  {
+    GNUNET_CONTAINER_multihashmap_remove (group_members, &grp->pub_key_hash,
+                                          grp_mem);
+    GNUNET_CONTAINER_multihashmap_destroy (grp_mem);
+  }
+  GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem);
+}
+
+
+/**
+ * Clean up group data structures after a client disconnected.
+ */
+static void
+cleanup_group (struct Group *grp)
+{
+  (GNUNET_YES == grp->is_origin)
+    ? cleanup_origin ((struct Origin *) grp)
+    : cleanup_member ((struct Member *) grp);
+
+  GNUNET_free (grp);
+}
+
+
+/**
+ * Called whenever a client is disconnected.
+ *
+ * Frees our resources associated with that client.
+ *
+ * @param cls  Closure.
+ * @param client  Client handle.
+ */
+static void
+client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
+{
+  if (NULL == client)
+    return;
+
+  struct Group *grp
+    = GNUNET_SERVER_client_get_user_context (client, struct Group);
+
+  if (NULL == grp)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "%p User context is NULL in client_disconnect()\n", grp);
+    GNUNET_assert (0);
+    return;
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Client (%s) disconnected from group %s\n",
+              grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member",
+              GNUNET_h2s (&grp->pub_key_hash));
+
+  struct ClientList *cl = grp->clients_head;
+  while (NULL != cl)
+  {
+    if (cl->client == client)
+    {
+      GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl);
+      GNUNET_free (cl);
+      break;
+    }
+    cl = cl->next;
+  }
+
+  if (NULL == grp->clients_head)
+  { /* Last client disconnected. */
+#if FIXME
+    if (NULL != grp->tmit_head)
+    { /* Send pending messages via CADET before cleanup. */
+      transmit_message (grp);
+    }
+    else
+#endif
+    {
+      cleanup_group (grp);
+    }
+  }
+}
+
+
+/**
+ * Send message to all clients connected to the group.
+ */
+static void
+message_to_clients (const struct Group *grp,
+                    const struct GNUNET_MessageHeader *msg)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              "%p Sending message to clients.\n", grp);
+
+  struct ClientList *cl = grp->clients_head;
+  while (NULL != cl)
+  {
+    GNUNET_SERVER_notification_context_add (nc, cl->client);
+    GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, 
GNUNET_NO);
+    cl = cl->next;
+  }
+}
+
+
+/**
+ * Iterator callback for sending a message to origin clients.
+ */
 static int
-message_callback (void *cls, const struct GNUNET_HashCode *pub_key_hash,
-                  void *group)
+origin_message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
+                   void *origin)
 {
   const struct GNUNET_MessageHeader *msg = cls;
-  struct Group *grp = group;
+  struct Member *orig = origin;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-              "%p Sending message to client.\n", grp);
+  message_to_clients (&orig->grp, msg);
+  return GNUNET_YES;
+}
 
-  GNUNET_SERVER_notification_context_add (nc, grp->client);
-  GNUNET_SERVER_notification_context_unicast (nc, grp->client, msg, GNUNET_NO);
 
+/**
+ * Iterator callback for sending a message to member clients.
+ */
+static int
+member_message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
+                   void *member)
+{
+  const struct GNUNET_MessageHeader *msg = cls;
+  struct Member *mem = member;
+
+  if (NULL != mem->join_decision)
+  { /* Only send message to admitted members */
+    message_to_clients (&mem->grp, msg);
+  }
   return GNUNET_YES;
 }
 
@@ -167,10 +346,10 @@
 {
   if (origins != NULL)
     GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
-                                                message_callback, (void *) 
msg);
+                                                origin_message_cb, (void *) 
msg);
   if (members != NULL)
     GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
-                                                message_callback, (void *) 
msg);
+                                                member_message_cb, (void *) 
msg);
 }
 
 
@@ -185,7 +364,7 @@
 {
   if (origins != NULL)
     GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
-                                                message_callback, (void *) 
msg);
+                                                origin_message_cb, (void *) 
msg);
 }
 
 
@@ -199,38 +378,47 @@
   const struct MulticastOriginStartMessage *
     msg = (const struct MulticastOriginStartMessage *) m;
 
-  struct Origin *orig = GNUNET_new (struct Origin);
-  orig->priv_key = msg->group_key;
+  struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+  struct GNUNET_HashCode pub_key_hash;
 
-  struct Group *grp = &orig->grp;
-  grp->is_origin = GNUNET_YES;
-  grp->client = client;
+  GNUNET_CRYPTO_eddsa_key_get_public (&msg->group_key, &pub_key);
+  GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
 
-  GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key);
-  GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), 
&grp->pub_key_hash);
+  struct Origin *
+    orig = GNUNET_CONTAINER_multihashmap_get (origins, &pub_key_hash);
+  struct Group *grp;
 
+  if (NULL == orig)
+  {
+    orig = GNUNET_new (struct Origin);
+    orig->priv_key = msg->group_key;
+    grp = &orig->grp;
+    grp->is_origin = GNUNET_YES;
+    grp->pub_key = pub_key;
+    grp->pub_key_hash = pub_key_hash;
+
+    GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+  }
+  else
+  {
+    grp = &orig->grp;
+  }
+
+  struct ClientList *cl = GNUNET_new (struct ClientList);
+  cl->client = client;
+  GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
+
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Client connected as origin to group %s.\n",
               orig, GNUNET_h2s (&grp->pub_key_hash));
 
   GNUNET_SERVER_client_set_user_context (client, grp);
-  GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
-                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
 /**
- * Handle a client stopping an origin.
- */
-static void
-handle_origin_stop (void *cls, struct GNUNET_SERVER_Client *client,
-                    const struct GNUNET_MessageHeader *msg)
-{
-}
-
-
-/**
  * Handle a connecting client joining a group.
  */
 static void
@@ -240,34 +428,113 @@
   struct MulticastMemberJoinMessage *
     msg = (struct MulticastMemberJoinMessage *) m;
 
-  struct Member *mem = GNUNET_new (struct Member);
-  mem->priv_key = msg->member_key;
+  struct GNUNET_CRYPTO_EddsaPublicKey mem_pub_key;
+  struct GNUNET_HashCode pub_key_hash, mem_pub_key_hash;
 
-  struct Group *grp = &mem->grp;
-  grp->is_origin = GNUNET_NO;
-  grp->client = client;
-  grp->pub_key = msg->group_key;
-  GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), 
&grp->pub_key_hash);
+  GNUNET_CRYPTO_eddsa_key_get_public (&msg->member_key, &mem_pub_key);
+  GNUNET_CRYPTO_hash (&mem_pub_key, sizeof (mem_pub_key), &mem_pub_key_hash);
+  GNUNET_CRYPTO_hash (&msg->group_key, sizeof (msg->group_key), &pub_key_hash);
 
+  struct GNUNET_CONTAINER_MultiHashMap *
+    grp_mem = GNUNET_CONTAINER_multihashmap_get (group_members, &pub_key_hash);
+  struct Member *mem = NULL;
+  struct Group *grp;
+
+  if (NULL == grp_mem)
+  {
+    grp_mem = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+    GNUNET_CONTAINER_multihashmap_put (group_members, &pub_key_hash, grp_mem,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  }
+  else
+  {
+    mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &mem_pub_key_hash);
+  }
+
+  if (NULL == mem)
+  {
+    mem = GNUNET_new (struct Member);
+    mem->priv_key = msg->member_key;
+    mem->pub_key = mem_pub_key;
+    mem->pub_key_hash = mem_pub_key_hash;
+
+    grp = &mem->grp;
+    grp->is_origin = GNUNET_NO;
+    grp->pub_key = msg->group_key;
+    grp->pub_key_hash = pub_key_hash;
+
+    GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem_pub_key_hash, mem,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+    GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  }
+  else
+  {
+    grp = &mem->grp;
+  }
+
+  struct ClientList *cl = GNUNET_new (struct ClientList);
+  cl->client = client;
+  GNUNET_CONTAINER_DLL_insert (grp->clients_head, grp->clients_tail, cl);
+
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Client connected as member to group %s.\n",
               mem, GNUNET_h2s (&grp->pub_key_hash));
 
   GNUNET_SERVER_client_set_user_context (client, grp);
-  GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
-                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
-}
 
+  if (NULL != mem->join_decision)
+  { /* Already got a join decision, send it to client. */
+    GNUNET_SERVER_notification_context_add (nc, client);
+    GNUNET_SERVER_notification_context_unicast (nc, client,
+                                                (struct GNUNET_MessageHeader *)
+                                                mem->join_decision,
+                                                GNUNET_NO);
+  }
+  else if (grp->clients_head == grp->clients_tail)
+  { /* First client, send join request. */
+    struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) 
&msg[1];
+    uint32_t relay_count = ntohs (msg->relay_count);
+    struct GNUNET_MessageHeader *
+      join_req = ((struct GNUNET_MessageHeader *)
+                  ((char *) &msg[1]) + relay_count * sizeof (*relays));
+    uint16_t join_req_size = ntohs (join_req->size);
 
-/**
- * Handle a client parting a group.
- */
-static void
-handle_member_part (void *cls, struct GNUNET_SERVER_Client *client,
-                    const struct GNUNET_MessageHeader *msg)
-{
+    struct MulticastJoinRequestMessage *
+      req = GNUNET_malloc (sizeof (*req) + join_req_size);
+    req->header.size = htons (sizeof (*req) + join_req_size);
+    req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST);
+    req->group_key = grp->pub_key;
+    GNUNET_CRYPTO_eddsa_key_get_public (&mem->priv_key, &req->member_key);
+    memcpy (&req[1], join_req, join_req_size);
 
+    req->purpose.size = htonl (sizeof (*req) + join_req_size
+                               - sizeof (req->header)
+                               - sizeof (req->signature));
+    req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
+
+    if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->priv_key, &req->purpose,
+                                               &req->signature))
+    {
+      /* FIXME: handle error */
+      GNUNET_assert (0);
+    }
+
+    if (NULL != mem->join_request)
+      GNUNET_free (mem->join_request);
+    mem->join_request = req;
+
+    if (GNUNET_YES
+        == GNUNET_CONTAINER_multihashmap_contains (origins, 
&grp->pub_key_hash))
+    { /* Local origin */
+      message_to_origin (grp, (struct GNUNET_MessageHeader *) 
mem->join_request);
+    }
+    else
+    {
+      /* FIXME: send join request to remote origin / members */
+    }
+  }
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
@@ -296,7 +563,7 @@
                                              &msg->signature))
   {
     /* FIXME: handle error */
-    return;
+    GNUNET_assert (0);
   }
 
   /* FIXME: send to remote members */
@@ -327,18 +594,24 @@
                              - sizeof (req->header)
                              - sizeof (req->member_key)
                              - sizeof (req->signature));
-  req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
+  req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_REQUEST);
 
   if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->priv_key, &req->purpose,
                                              &req->signature))
   {
     /* FIXME: handle error */
-    return;
+    GNUNET_assert (0);
   }
 
-  /* FIXME: send to remote origin */
-
-  message_to_origin (grp, m);
+  if (GNUNET_YES
+      == GNUNET_CONTAINER_multihashmap_contains (origins, &grp->pub_key_hash))
+  { /* Local origin */
+    message_to_origin (grp, m);
+  }
+  else
+  {
+    /* FIXME: send to remote origin */
+  }
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -357,15 +630,9 @@
     { &handle_origin_start, NULL,
       GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, 0 },
 
-    { &handle_origin_stop, NULL,
-      GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_STOP, 0 },
-
     { &handle_member_join, NULL,
       GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, 0 },
 
-    { &handle_member_part, NULL,
-      GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_PART, 0 },
-
     { &handle_multicast_message, NULL,
       GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
 
@@ -379,9 +646,11 @@
   stats = GNUNET_STATISTICS_create ("multicast", cfg);
   origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
   members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+  group_members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
   nc = GNUNET_SERVER_notification_context_create (server, 1);
 
   GNUNET_SERVER_add_handlers (server, handlers);
+  GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleanup_task,
                                 NULL);
 }

Modified: gnunet/src/multicast/multicast.h
===================================================================
--- gnunet/src/multicast/multicast.h    2014-05-17 04:47:33 UTC (rev 33312)
+++ gnunet/src/multicast/multicast.h    2014-05-17 10:16:15 UTC (rev 33313)
@@ -33,7 +33,7 @@
 /**
  * Header of a join request sent to the origin or another member.
  */
-struct GNUNET_MULTICAST_JoinRequest
+struct MulticastJoinRequestMessage
 {
   /**
    * Header for the join request.
@@ -67,7 +67,7 @@
    */
   struct GNUNET_PeerIdentity member_peer;
 
-  /* Followed by request body. */
+  /* Followed by struct GNUNET_MessageHeader join_request */
 };
 
 
@@ -97,9 +97,9 @@
    */
   uint32_t relay_count;
 
-  /* followed by 'relay_count' peer identities */
+  /* Followed by relay_count peer identities */
 
-  /* followed by the join response message */
+  /* Followed by the join response message */
 
 };
 

Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c        2014-05-17 04:47:33 UTC (rev 
33312)
+++ gnunet/src/multicast/multicast_api.c        2014-05-17 10:16:15 UTC (rev 
33313)
@@ -196,6 +196,17 @@
  */
 struct GNUNET_MULTICAST_JoinHandle
 {
+  struct GNUNET_MULTICAST_Group *group;
+
+  /**
+   * Public key of the joining member.
+   */
+  struct GNUNET_CRYPTO_EddsaPublicKey member_key;
+
+  /**
+   * Peer identity of the joining member.
+   */
+  struct GNUNET_PeerIdentity member_peer;
 };
 
 
@@ -437,8 +448,7 @@
  * Iterator callback for calling message callbacks for all groups.
  */
 static int
-message_callback (void *cls, const struct GNUNET_HashCode *pub_key_hash,
-                  void *group)
+message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *group)
 {
   const struct GNUNET_MessageHeader *msg = cls;
   struct GNUNET_MULTICAST_Group *grp = group;
@@ -456,32 +466,10 @@
 
 
 /**
- * Handle a multicast message from the service.
- *
- * Call message callbacks of all origins and members of the destination group.
- *
- * @param grp Destination group of the message.
- * @param msg The message.
- */
-static void
-handle_multicast_message (struct GNUNET_MULTICAST_Group *grp,
-                          const struct GNUNET_MULTICAST_MessageHeader *msg)
-{
-  if (origins != NULL)
-    GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
-                                                message_callback, (void *) 
msg);
-  if (members != NULL)
-    GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
-                                                message_callback, (void *) 
msg);
-}
-
-
-/**
  * Iterator callback for calling request callbacks of origins.
  */
 static int
-request_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
-                  void *origin)
+request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void 
*origin)
 {
   const struct GNUNET_MULTICAST_RequestHeader *req = cls;
   struct GNUNET_MULTICAST_Origin *orig = origin;
@@ -497,20 +485,26 @@
 
 
 /**
- * Handle a multicast request from the service.
- *
- * Call request callbacks of all origins of the destination group.
- *
- * @param grp Destination group of the message.
- * @param msg The message.
+ * Iterator callback for calling join request callbacks of origins.
  */
-static void
-handle_multicast_request (struct GNUNET_MULTICAST_Group *grp,
-                          const struct GNUNET_MULTICAST_RequestHeader *req)
+static int
+join_request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
+                 void *group)
 {
-  if (NULL != origins)
-    GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
-                                                request_callback, (void *) 
req);
+  const struct MulticastJoinRequestMessage *req = cls;
+  struct GNUNET_MULTICAST_Group *grp = group;
+
+  struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
+  jh->group = grp;
+  jh->member_key = req->member_key;
+  jh->member_peer = req->member_peer;
+
+  const struct GNUNET_MessageHeader *msg = NULL;
+  if (sizeof (*req) + sizeof (*msg) <= ntohs (req->header.size))
+    msg =(const struct GNUNET_MessageHeader *) &req[1];
+
+  grp->join_cb (grp->cb_cls, &req->member_key, msg, jh);
+  return GNUNET_YES;
 }
 
 
@@ -551,22 +545,31 @@
     size_min = sizeof (struct GNUNET_MULTICAST_RequestHeader);
     break;
 
+  case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST:
+    size_min = sizeof (struct MulticastJoinRequestMessage);
+    break;
+
   default:
     GNUNET_break_op (0);
-    return;
+    type = 0;
   }
 
   if (! ((0 < size_eq && size == size_eq)
          || (0 < size_min && size_min <= size)))
   {
     GNUNET_break_op (0);
-    return;
+    type = 0;
   }
 
   switch (type)
   {
   case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
-    handle_multicast_message (grp, (struct GNUNET_MULTICAST_MessageHeader *) 
msg);
+    if (origins != NULL)
+      GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
+                                                  message_cb, (void *) msg);
+    if (members != NULL)
+      GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
+                                                  message_cb, (void *) msg);
     break;
 
   case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
@@ -576,12 +579,19 @@
       break;
     }
 
-    handle_multicast_request (grp, (struct GNUNET_MULTICAST_RequestHeader *) 
msg);
+    if (NULL != origins)
+      GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
+                                                  request_cb, (void *) msg);
     break;
 
-  default:
-    GNUNET_break_op (0);
-    return;
+  case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST:
+    if (NULL != origins)
+      GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
+                                                  join_request_cb, (void *) 
msg);
+    if (NULL != members)
+      GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
+                                                  join_request_cb, (void *) 
msg);
+    break;
   }
 
   if (NULL != grp->client)
@@ -621,6 +631,7 @@
                                 const struct GNUNET_PeerIdentity *relays,
                                 const struct GNUNET_MessageHeader 
*join_response)
 {
+  GNUNET_free (jh);
   return NULL;
 }
 

Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c       2014-05-17 04:47:33 UTC (rev 
33312)
+++ gnunet/src/psyc/gnunet-service-psyc.c       2014-05-17 10:16:15 UTC (rev 
33313)
@@ -59,17 +59,23 @@
 
 /**
  * All connected masters.
- * Channel's pub_key_hash -> struct Channel
+ * Channel's pub_key_hash -> struct Master
  */
 static struct GNUNET_CONTAINER_MultiHashMap *masters;
 
 /**
  * All connected slaves.
- * Channel's pub_key_hash -> struct Channel
+ * Channel's pub_key_hash -> struct Slave
  */
 static struct GNUNET_CONTAINER_MultiHashMap *slaves;
 
+/**
+ * Connected slaves per channel.
+ * Channel's pub_key_hash -> Slave's pub_key -> struct Slave
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *channel_slaves;
 
+
 /**
  * Message in the transmission queue.
  */
@@ -78,6 +84,8 @@
   struct TransmitMessage *prev;
   struct TransmitMessage *next;
 
+  struct GNUNET_SERVER_Client *client;
+
   /**
    * ID assigned to the message.
    */
@@ -164,11 +172,23 @@
 
 
 /**
+ * List of connected clients.
+ */
+struct ClientList
+{
+  struct ClientList *prev;
+  struct ClientList *next;
+  struct GNUNET_SERVER_Client *client;
+};
+
+
+/**
  * Common part of the client context for both a channel master and slave.
  */
 struct Channel
 {
-  struct GNUNET_SERVER_Client *client;
+  struct ClientList *clients_head;
+  struct ClientList *clients_tail;
 
   struct TransmitMessage *tmit_head;
   struct TransmitMessage *tmit_tail;
@@ -316,6 +336,16 @@
   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
 
   /**
+   * Public key of the slave.
+   */
+  struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+
+  /**
+   * Hash of @a pub_key.
+   */
+  struct GNUNET_HashCode pub_key_hash;
+
+  /**
    * Handle for the multicast member.
    */
   struct GNUNET_MULTICAST_Member *member;
@@ -378,30 +408,62 @@
 }
 
 
+/**
+ * Clean up master data structures after a client disconnected.
+ */
 static void
-client_cleanup (struct Channel *ch)
+cleanup_master (struct Master *mst)
 {
-  /* FIXME: fragment_cache_clear */
+  struct Channel *ch = &mst->channel;
 
-  if (ch->is_master)
+  if (NULL != mst->origin)
+    GNUNET_MULTICAST_origin_stop (mst->origin);
+  GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch);
+}
+
+
+/**
+ * Clean up slave data structures after a client disconnected.
+ */
+static void
+cleanup_slave (struct Slave *slv)
+{
+  struct Channel *ch = &slv->channel;
+  struct GNUNET_CONTAINER_MultiHashMap *
+    ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
+                                                &ch->pub_key_hash);
+  GNUNET_assert (NULL != ch_slv);
+  GNUNET_CONTAINER_multihashmap_remove (ch_slv, &slv->pub_key_hash, slv);
+
+  if (0 == GNUNET_CONTAINER_multihashmap_size (ch_slv))
   {
-    struct Master *mst = (struct Master *) ch;
-    if (NULL != mst->origin)
-      GNUNET_MULTICAST_origin_stop (mst->origin);
-    GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch);
+    GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &ch->pub_key_hash,
+                                          ch_slv);
+    GNUNET_CONTAINER_multihashmap_destroy (ch_slv);
   }
-  else
-  {
-    struct Slave *slv = (struct Slave *) ch;
-    if (NULL != slv->join_req)
-      GNUNET_free (slv->join_req);
-    if (NULL != slv->relays)
-      GNUNET_free (slv->relays);
-    if (NULL != slv->member)
-      GNUNET_MULTICAST_member_part (slv->member);
-    GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
-  }
+  GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, slv);
 
+  if (NULL != slv->join_req)
+    GNUNET_free (slv->join_req);
+  if (NULL != slv->relays)
+    GNUNET_free (slv->relays);
+  if (NULL != slv->member)
+    GNUNET_MULTICAST_member_part (slv->member);
+  GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
+}
+
+
+/**
+ * Clean up channel data structures after a client disconnected.
+ */
+static void
+cleanup_channel (struct Channel *ch)
+{
+  /* FIXME: fragment_cache_clear */
+
+  (GNUNET_YES == ch->is_master)
+    ? cleanup_master ((struct Master *) ch)
+    : cleanup_slave ((struct Slave *) ch);
   GNUNET_free (ch);
 }
 
@@ -421,7 +483,10 @@
 
   struct Channel *ch
     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Client (%s) disconnected from channel %s\n",
+              ch, (GNUNET_YES == ch->is_master) ? "master" : "slave",
+              GNUNET_h2s (&ch->pub_key_hash));
 
   if (NULL == ch)
   {
@@ -431,29 +496,112 @@
     return;
   }
 
-  ch->disconnected = GNUNET_YES;
+  struct ClientList *cl = ch->clients_head;
+  while (NULL != cl)
+  {
+    if (cl->client == client)
+    {
+      GNUNET_CONTAINER_DLL_remove (ch->clients_head, ch->clients_tail, cl);
+      GNUNET_free (cl);
+      break;
+    }
+    cl = cl->next;
+  }
 
-  /* Send pending messages to multicast before cleanup. */
-  if (NULL != ch->tmit_head)
+  if (NULL == ch->clients_head)
+  { /* Last client disconnected. */
+    if (NULL != ch->tmit_head)
+    { /* Send pending messages to multicast before cleanup. */
+      transmit_message (ch);
+    }
+    else
+    {
+      cleanup_channel (ch);
+    }
+  }
+}
+
+
+/**
+ * Send message to all clients connected to the channel.
+ */
+static void
+msg_to_clients (const struct Channel *ch,
+                const struct GNUNET_MessageHeader *msg)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              "%p Sending message to clients.\n", ch);
+
+  struct ClientList *cl = ch->clients_head;
+  while (NULL != cl)
   {
-    transmit_message (ch);
+    GNUNET_SERVER_notification_context_add (nc, cl->client);
+    GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, 
GNUNET_NO);
+    cl = cl->next;
   }
+}
+
+
+/**
+ * Closure for join_mem_test_cb()
+ */
+struct JoinMemTestCls
+{
+  struct Channel *ch;
+  struct GNUNET_MULTICAST_JoinHandle *jh;
+  struct MasterJoinRequest *master_join_req;
+};
+
+
+/**
+ * Membership test result callback used for join requests.m
+ */
+static void
+join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
+{
+  struct JoinMemTestCls *jcls = cls;
+
+  if (GNUNET_NO == result && GNUNET_YES == jcls->ch->is_master)
+  { /* Pass on join request to client if this is a master channel */
+    msg_to_clients (jcls->ch,
+                    (struct GNUNET_MessageHeader *) jcls->master_join_req);
+  }
   else
   {
-    client_cleanup (ch);
+    // FIXME: relays
+    GNUNET_MULTICAST_join_decision(jcls->jh, result, 0, NULL, NULL);
   }
+  GNUNET_free (jcls->master_join_req);
+  GNUNET_free (jcls);
 }
 
 
 /**
- * Master receives a join request from a slave.
+ * Incoming join request from multicast.
  */
 static void
 join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
          const struct GNUNET_MessageHeader *join_req,
          struct GNUNET_MULTICAST_JoinHandle *jh)
 {
+  struct Channel *ch = cls;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", ch);
 
+  uint16_t join_req_size = (NULL != join_req) ? ntohs (join_req->size) : 0;
+  struct MasterJoinRequest *req = GNUNET_malloc (sizeof (*req) + 
join_req_size);
+  req->header.size = htons (sizeof (*req) + join_req_size);
+  req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST);
+  req->slave_key = *slave_key;
+  memcpy (&req[1], join_req, join_req_size);
+
+  struct JoinMemTestCls *jcls = GNUNET_malloc (sizeof (*jcls));
+  jcls->ch = ch;
+  jcls->jh = jh;
+  jcls->master_join_req = req;
+
+  GNUNET_PSYCSTORE_membership_test (store, &ch->pub_key, slave_key,
+                                    ch->max_message_id, 0,
+                                    &join_mem_test_cb, jcls);
 }
 
 
@@ -474,6 +622,7 @@
                     struct GNUNET_MULTICAST_ReplayHandle *rh)
 
 {
+
 }
 
 
@@ -497,35 +646,6 @@
 }
 
 
-static void
-message_to_client (struct Channel *ch,
-                   const struct GNUNET_MULTICAST_MessageHeader *mmsg)
-{
-  uint16_t size = ntohs (mmsg->header.size);
-  struct GNUNET_PSYC_MessageHeader *pmsg;
-  uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Sending message to client. "
-              "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
-              ch, GNUNET_ntohll (mmsg->fragment_id),
-              GNUNET_ntohll (mmsg->message_id));
-
-  pmsg = GNUNET_malloc (psize);
-  pmsg->header.size = htons (psize);
-  pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
-  pmsg->message_id = mmsg->message_id;
-
-  memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
-
-  GNUNET_SERVER_notification_context_add (nc, ch->client);
-  GNUNET_SERVER_notification_context_unicast (nc, ch->client,
-                                              (const struct 
GNUNET_MessageHeader *) pmsg,
-                                              GNUNET_NO);
-  GNUNET_free (pmsg);
-}
-
-
 /**
  * Convert an uint64_t in network byte order to a HashCode
  * that can be used as key in a MultiHashMap
@@ -564,6 +684,34 @@
 
 
 /**
+ * Send multicast message to all clients connected to the channel.
+ */
+static void
+mmsg_to_clients (struct Channel *ch,
+                 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+{
+  uint16_t size = ntohs (mmsg->header.size);
+  struct GNUNET_PSYC_MessageHeader *pmsg;
+  uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Sending message to client. "
+              "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
+              ch, GNUNET_ntohll (mmsg->fragment_id),
+              GNUNET_ntohll (mmsg->message_id));
+
+  pmsg = GNUNET_malloc (psize);
+  pmsg->header.size = htons (psize);
+  pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+  pmsg->message_id = mmsg->message_id;
+
+  memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
+  msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg);
+  GNUNET_free (pmsg);
+}
+
+
+/**
  * Insert a multicast message fragment into the queue belonging to the message.
  *
  * @param ch           Channel.
@@ -752,7 +900,7 @@
     {
       if (GNUNET_NO == drop)
       {
-        message_to_client (ch, cache_entry->mmsg);
+        mmsg_to_clients (ch, cache_entry->mmsg);
       }
       if (cache_entry->ref_count <= 1)
       {
@@ -997,11 +1145,7 @@
     pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
 
     memcpy (&pmsg[1], &req[1], size - sizeof (*req));
-
-    GNUNET_SERVER_notification_context_add (nc, ch->client);
-    GNUNET_SERVER_notification_context_unicast (nc, ch->client,
-                                                (const struct 
GNUNET_MessageHeader *) pmsg,
-                                                GNUNET_NO);
+    msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg);
     GNUNET_free (pmsg);
     break;
   }
@@ -1025,11 +1169,11 @@
   struct Master *mst = cls;
   struct Channel *ch = &mst->channel;
 
-  struct CountersResult *res = GNUNET_malloc (sizeof (*res));
-  res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
-  res->header.size = htons (sizeof (*res));
-  res->result_code = htonl (result);
-  res->max_message_id = GNUNET_htonll (max_message_id);
+  struct CountersResult res;
+  res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
+  res.header.size = htons (sizeof (res));
+  res.result_code = htonl (result);
+  res.max_message_id = GNUNET_htonll (max_message_id);
 
   if (GNUNET_OK == result || GNUNET_NO == result)
   {
@@ -1053,10 +1197,7 @@
                 ch, result, GNUNET_h2s (&ch->pub_key_hash));
   }
 
-  GNUNET_SERVER_notification_context_add (nc, ch->client);
-  GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
-                                              GNUNET_NO);
-  GNUNET_free (res);
+  msg_to_clients (ch, &res.header);
 }
 
 
@@ -1071,11 +1212,11 @@
   struct Slave *slv = cls;
   struct Channel *ch = &slv->channel;
 
-  struct CountersResult *res = GNUNET_malloc (sizeof (*res));
-  res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
-  res->header.size = htons (sizeof (*res));
-  res->result_code = htonl (result);
-  res->max_message_id = GNUNET_htonll (max_message_id);
+  struct CountersResult res;
+  res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
+  res.header.size = htons (sizeof (res));
+  res.result_code = htonl (result);
+  res.max_message_id = GNUNET_htonll (max_message_id);
 
   if (GNUNET_OK == result || GNUNET_NO == result)
   {
@@ -1099,10 +1240,7 @@
                 ch, result, GNUNET_h2s (&ch->pub_key_hash));
   }
 
-  GNUNET_SERVER_notification_context_add (nc, ch->client);
-  GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
-                                              GNUNET_NO);
-  GNUNET_free (res);
+  msg_to_clients (ch, &res.header);
 }
 
 
@@ -1125,25 +1263,55 @@
   const struct MasterStartRequest *req
     = (const struct MasterStartRequest *) msg;
 
-  struct Master *mst = GNUNET_new (struct Master);
-  mst->policy = ntohl (req->policy);
-  mst->priv_key = req->channel_key;
+  struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+  struct GNUNET_HashCode pub_key_hash;
 
-  struct Channel *ch = &mst->channel;
-  ch->client = client;
-  ch->is_master = GNUNET_YES;
-  GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key);
-  GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash);
-  channel_init (ch);
+  GNUNET_CRYPTO_eddsa_key_get_public (&req->channel_key, &pub_key);
+  GNUNET_CRYPTO_hash (&pub_key, sizeof (pub_key), &pub_key_hash);
 
+  struct Master *
+    mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
+  struct Channel *ch;
+
+  if (NULL == mst)
+  {
+    mst = GNUNET_new (struct Master);
+    mst->policy = ntohl (req->policy);
+    mst->priv_key = req->channel_key;
+
+    ch = &mst->channel;
+    ch->is_master = GNUNET_YES;
+    ch->pub_key = pub_key;
+    ch->pub_key_hash = pub_key_hash;
+    channel_init (ch);
+
+    GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+    GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, 
mst);
+  }
+  else
+  {
+    ch = &mst->channel;
+
+    struct CountersResult res;
+    res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
+    res.header.size = htons (sizeof (res));
+    res.result_code = htonl (GNUNET_OK);
+    res.max_message_id = GNUNET_htonll (mst->max_message_id);
+
+    GNUNET_SERVER_notification_context_add (nc, client);
+    GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
+                                                GNUNET_NO);
+  }
+
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Master connected to channel %s.\n",
+              "%p Client connected as master to channel %s.\n",
               mst, GNUNET_h2s (&ch->pub_key_hash));
 
-  GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, master_counters_cb, mst);
+  struct ClientList *cl = GNUNET_new (struct ClientList);
+  cl->client = client;
+  GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
 
-  GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
-                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   GNUNET_SERVER_client_set_user_context (client, ch);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -1158,37 +1326,82 @@
 {
   const struct SlaveJoinRequest *req
     = (const struct SlaveJoinRequest *) msg;
-  struct Slave *slv = GNUNET_new (struct Slave);
-  slv->priv_key = req->slave_key;
-  slv->origin = req->origin;
-  slv->relay_count = ntohl (req->relay_count);
-  if (0 < slv->relay_count)
+
+  struct GNUNET_CRYPTO_EddsaPublicKey slv_pub_key;
+  struct GNUNET_HashCode pub_key_hash, slv_pub_key_hash;
+
+  GNUNET_CRYPTO_eddsa_key_get_public (&req->slave_key, &slv_pub_key);
+  GNUNET_CRYPTO_hash (&slv_pub_key, sizeof (slv_pub_key), &slv_pub_key_hash);
+  GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), 
&pub_key_hash);
+
+  struct GNUNET_CONTAINER_MultiHashMap *
+    ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
+  struct Slave *slv = NULL;
+  struct Channel *ch;
+
+  if (NULL == ch_slv)
   {
-    const struct GNUNET_PeerIdentity *relays
-      = (const struct GNUNET_PeerIdentity *) &req[1];
-    slv->relays
-      = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
-    uint32_t i;
-    for (i = 0; i < slv->relay_count; i++)
-      memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
+    ch_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+    GNUNET_CONTAINER_multihashmap_put (channel_slaves, &pub_key_hash, ch_slv,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   }
+  else
+  {
+    slv = GNUNET_CONTAINER_multihashmap_get (ch_slv, &slv_pub_key_hash);
+  }
 
-  struct Channel *ch = &slv->channel;
-  ch->client = client;
-  ch->is_master = GNUNET_NO;
-  ch->pub_key = req->channel_key;
-  GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key),
-                      &ch->pub_key_hash);
-  channel_init (ch);
+  if (NULL == slv)
+  {
+    slv = GNUNET_new (struct Slave);
+    slv->priv_key = req->slave_key;
+    slv->origin = req->origin;
+    slv->relay_count = ntohl (req->relay_count);
+    if (0 < slv->relay_count)
+    {
+      const struct GNUNET_PeerIdentity *relays
+        = (const struct GNUNET_PeerIdentity *) &req[1];
+      slv->relays
+        = GNUNET_malloc (slv->relay_count * sizeof (struct 
GNUNET_PeerIdentity));
+      uint32_t i;
+      for (i = 0; i < slv->relay_count; i++)
+        memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
+    }
 
+    ch = &slv->channel;
+    ch->is_master = GNUNET_NO;
+    ch->pub_key = req->channel_key;
+    ch->pub_key_hash = pub_key_hash;
+    channel_init (ch);
+
+    GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv_pub_key_hash, ch,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+    GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+    GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, 
slv);
+  }
+  else
+  {
+    ch = &slv->channel;
+
+    struct CountersResult res;
+    res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
+    res.header.size = htons (sizeof (res));
+    res.result_code = htonl (GNUNET_OK);
+    res.max_message_id = GNUNET_htonll (ch->max_message_id);
+
+    GNUNET_SERVER_notification_context_add (nc, client);
+    GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
+                                                GNUNET_NO);
+  }
+
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Slave connected to channel %s.\n",
+              "%p Client connected as slave to channel %s.\n",
               slv, GNUNET_h2s (&ch->pub_key_hash));
 
-  GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, slave_counters_cb, slv);
+  struct ClientList *cl = GNUNET_new (struct ClientList);
+  cl->client = client;
+  GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
 
-  GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
-                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   GNUNET_SERVER_client_set_user_context (client, &slv->channel);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -1202,14 +1415,15 @@
  * @param ch The channel struct for the client.
  */
 static void
-send_message_ack (struct Channel *ch)
+send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client)
 {
   struct GNUNET_MessageHeader res;
   res.size = htons (sizeof (res));
   res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
 
-  GNUNET_SERVER_notification_context_add (nc, ch->client);
-  GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res, GNUNET_NO);
+  /* FIXME */
+  GNUNET_SERVER_notification_context_add (nc, client);
+  GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
 }
 
 
@@ -1236,12 +1450,13 @@
   *data_size = tmit_msg->size;
   memcpy (data, &tmit_msg[1], *data_size);
 
+  int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
+  if (NULL != tmit_msg->client)
+    send_message_ack (ch, tmit_msg->client);
+
   GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
   GNUNET_free (tmit_msg);
 
-  int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
-  send_message_ack (ch);
-
   if (0 == ch->tmit_task)
   {
     if (NULL != ch->tmit_head)
@@ -1251,7 +1466,7 @@
     else if (ch->disconnected)
     {
       /* FIXME: handle partial message (when still in_transmit) */
-      client_cleanup (ch);
+      cleanup_channel (ch);
     }
   }
 
@@ -1394,12 +1609,15 @@
 
 
 static void
-queue_message (struct Channel *ch,  const struct GNUNET_MessageHeader *msg,
+queue_message (struct Channel *ch,
+               struct GNUNET_SERVER_Client *client,
+               const struct GNUNET_MessageHeader *msg,
                uint16_t first_ptype, uint16_t last_ptype)
 {
   uint16_t size = ntohs (msg->size) - sizeof (*msg);
   struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
   memcpy (&tmit_msg[1], &msg[1], size);
+  tmit_msg->client = client;
   tmit_msg->size = size;
   tmit_msg->state = ch->tmit_state;
 
@@ -1414,7 +1632,7 @@
 
 
 static void
-transmit_error (struct Channel *ch)
+transmit_error (struct Channel *ch, struct GNUNET_SERVER_Client *client)
 {
   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
 
@@ -1422,7 +1640,7 @@
   msg.size = ntohs (sizeof (msg));
   msg.type = ntohs (type);
 
-  queue_message (ch, &msg, type, type);
+  queue_message (ch, client, &msg, type, type);
   transmit_message (ch);
 
   /* FIXME: cleanup */
@@ -1458,7 +1676,7 @@
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
     GNUNET_break (0);
-    transmit_error (ch);
+    transmit_error (ch, client);
     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
     return;
   }
@@ -1472,12 +1690,12 @@
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "%p Received invalid message part from client.\n", ch);
     GNUNET_break (0);
-    transmit_error (ch);
+    transmit_error (ch, client);
     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
     return;
   }
 
-  queue_message (ch, msg, first_ptype, last_ptype);
+  queue_message (ch, client, msg, first_ptype, last_ptype);
   transmit_message (ch);
 
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -1581,6 +1799,7 @@
   stats = GNUNET_STATISTICS_create ("psyc", cfg);
   masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
   slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+  channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
   recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
   nc = GNUNET_SERVER_notification_context_create (server, 1);
   GNUNET_SERVER_add_handlers (server, handlers);

Modified: gnunet/src/psyc/psyc.h
===================================================================
--- gnunet/src/psyc/psyc.h      2014-05-17 04:47:33 UTC (rev 33312)
+++ gnunet/src/psyc/psyc.h      2014-05-17 10:16:15 UTC (rev 33313)
@@ -227,6 +227,21 @@
 };
 
 
+struct MasterJoinRequest
+{
+  /**
+   * Types:
+   * - GNUNET_MESSAGE_TYPE_PSYC_MASTER_JOIN_REQUEST
+   */
+  struct GNUNET_MessageHeader header;
+  /**
+   * Public key of the joining slave.
+   */
+  struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
+
+  /* Followed by struct GNUNET_MessageHeader join_request */
+};
+
 GNUNET_NETWORK_STRUCT_END
 
 #endif

Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c  2014-05-17 04:47:33 UTC (rev 33312)
+++ gnunet/src/psyc/psyc_api.c  2014-05-17 10:16:15 UTC (rev 33313)
@@ -126,13 +126,8 @@
   GNUNET_PSYC_MessageCallback hist_message_cb;
 
   /**
-   * Join handler callback.
+   * Closure for @a message_cb.
    */
-  GNUNET_PSYC_JoinCallback join_cb;
-
-  /**
-   * Closure for @a message_cb and @a join_cb.
-   */
   void *cb_cls;
 
   /**
@@ -200,6 +195,11 @@
   struct GNUNET_PSYC_Channel ch;
 
   GNUNET_PSYC_MasterStartCallback start_cb;
+
+  /**
+   * Join handler callback.
+   */
+  GNUNET_PSYC_JoinCallback join_cb;
 };
 
 
@@ -908,6 +908,18 @@
 }
 
 
+static void
+handle_psyc_join_request (struct GNUNET_PSYC_Master *mst,
+                          const struct MasterJoinRequest *req)
+{
+  // FIXME: extract join message from req[1]
+  const char *method_name = "_fixme";
+  struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
+  mst->join_cb (mst->ch.cb_cls, &req->slave_key, method_name,
+                0, NULL, NULL, 0, jh);
+}
+
+
 /**
  * Type of a function to call when we receive a message
  * from the service.
@@ -951,6 +963,9 @@
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
     size_eq = sizeof (struct GNUNET_MessageHeader);
     break;
+  case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
+    size_min = sizeof (struct MasterJoinRequest);
+    break;
   default:
     GNUNET_break_op (0);
     return;
@@ -988,6 +1003,11 @@
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
     handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
     break;
+
+  case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
+    handle_psyc_join_request ((struct GNUNET_PSYC_Master *) ch,
+                              (const struct MasterJoinRequest *) msg);
+    break;
   }
 
   if (NULL != ch->client)
@@ -1186,8 +1206,8 @@
   req->policy = policy;
 
   mst->start_cb = master_started_cb;
+  mst->join_cb = join_cb;
   ch->message_cb = message_cb;
-  ch->join_cb = join_cb;
   ch->cb_cls = cls;
   ch->cfg = cfg;
   ch->is_master = GNUNET_YES;
@@ -1320,9 +1340,7 @@
  * @param message_cb Function to invoke on message parts received from the
  *        channel, typically at least contains method handlers for @e join and
  *        @e part.
- * @param join_cb function invoked once we have joined with the current
- *        message ID of the channel
- * @param slave_joined_cb Function to invoke when a peer wants to join.
+ * @param slave_joined_cb Function invoked once we have joined the channel.
  * @param cls Closure for @a message_cb and @a slave_joined_cb.
  * @param method_name Method name for the join request.
  * @param env Environment containing transient variables for the request, or 
NULL.
@@ -1339,7 +1357,6 @@
                         uint32_t relay_count,
                         const struct GNUNET_PeerIdentity *relays,
                         GNUNET_PSYC_MessageCallback message_cb,
-                        GNUNET_PSYC_JoinCallback join_cb,
                         GNUNET_PSYC_SlaveJoinCallback slave_joined_cb,
                         void *cls,
                         const char *method_name,
@@ -1362,7 +1379,6 @@
 
   slv->join_cb = slave_joined_cb;
   ch->message_cb = message_cb;
-  ch->join_cb = join_cb;
   ch->cb_cls = cls;
 
   ch->cfg = cfg;

Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2014-05-17 04:47:33 UTC (rev 33312)
+++ gnunet/src/psyc/test_psyc.c 2014-05-17 10:16:15 UTC (rev 33313)
@@ -130,6 +130,7 @@
 {
   res = 1;
   cleanup ();
+  GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test FAILED.\n");
 }
 
 
@@ -144,6 +145,7 @@
 {
   res = 0;
   cleanup ();
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Test PASSED.\n");
 }
 
 
@@ -181,7 +183,7 @@
 
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
               "Master got message part of type %u and size %u "
-              "belonging to message ID %llu with flags %u\n",
+              "belonging to message ID %llu with flags %bu\n",
               type, size, message_id, flags);
 
   switch (test)
@@ -225,7 +227,7 @@
 
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
               "Slave got message part of type %u and size %u "
-              "belonging to message ID %llu with flags %u\n",
+              "belonging to message ID %llu with flags %bu\n",
               type, size, message_id, flags);
 
   switch (test)




reply via email to

[Prev in Thread] Current Thread [Next in Thread]