gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r31792 - in gnunet: contrib src/include src/multicast src/p


From: gnunet
Subject: [GNUnet-SVN] r31792 - in gnunet: contrib src/include src/multicast src/psyc
Date: Mon, 6 Jan 2014 01:09:43 +0100

Author: tg
Date: 2014-01-06 01:09:43 +0100 (Mon, 06 Jan 2014)
New Revision: 31792

Modified:
   gnunet/contrib/logread.pl
   gnunet/src/include/gnunet_protocols.h
   gnunet/src/include/gnunet_psyc_service.h
   gnunet/src/multicast/multicast_api.c
   gnunet/src/psyc/gnunet-service-psyc.c
   gnunet/src/psyc/psyc_api.c
   gnunet/src/psyc/test_psyc.c
   gnunet/src/psyc/test_psyc.conf
Log:
psyc: ipc messages, notify callback for modifiers, tests

Modified: gnunet/contrib/logread.pl
===================================================================
--- gnunet/contrib/logread.pl   2014-01-06 00:09:40 UTC (rev 31791)
+++ gnunet/contrib/logread.pl   2014-01-06 00:09:43 UTC (rev 31792)
@@ -98,7 +98,7 @@
     s/\b(multicast|psyc|psycstore|social)\b/BLUE $1/ex;
 
     # Add message type names
-    s/(message(?:\s+of)?\s+type\s+)(\d+)/
+    s/(message(?:\s+part)?(?:\s+of)?\s+type\s+)(\d+)/
       $1 . BRIGHT_CYAN (exists $msgtypes{$2} ? $msgtypes{$2} : 'UNKNOWN') .
       CYAN " ($2)"/e;
 

Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2014-01-06 00:09:40 UTC (rev 
31791)
+++ gnunet/src/include/gnunet_protocols.h       2014-01-06 00:09:43 UTC (rev 
31792)
@@ -2130,7 +2130,7 @@
 
 #define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL 697
 
-#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK 698
+#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK 698
 
 
 #define GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST 701

Modified: gnunet/src/include/gnunet_psyc_service.h
===================================================================
--- gnunet/src/include/gnunet_psyc_service.h    2014-01-06 00:09:40 UTC (rev 
31791)
+++ gnunet/src/include/gnunet_psyc_service.h    2014-01-06 00:09:43 UTC (rev 
31792)
@@ -426,6 +426,11 @@
                                      uint16_t *data_size,
                                      void *data);
 
+typedef int
+(*GNUNET_PSYC_MasterTransmitNotifyModifier) (void *cls,
+                                             uint16_t *data_size,
+                                             void *data,
+                                             uint8_t *oper);
 
 /**
  * Flags for transmitting messages to a channel by the master.
@@ -472,7 +477,7 @@
 struct GNUNET_PSYC_MasterTransmitHandle *
 GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
                              const char *method_name,
-                             GNUNET_PSYC_MasterTransmitNotify notify_mod,
+                             GNUNET_PSYC_MasterTransmitNotifyModifier 
notify_mod,
                              GNUNET_PSYC_MasterTransmitNotify notify_data,
                              void *notify_cls,
                              enum GNUNET_PSYC_MasterTransmitFlags flags);

Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c        2014-01-06 00:09:40 UTC (rev 
31791)
+++ gnunet/src/multicast/multicast_api.c        2014-01-06 00:09:43 UTC (rev 
31792)
@@ -362,8 +362,9 @@
   struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle;
 
   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD;
+  char buf[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
   struct GNUNET_MULTICAST_MessageHeader *msg
-    = GNUNET_malloc (buf_size);
+    = (struct GNUNET_MULTICAST_MessageHeader *) buf;
   int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]);
 
   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
@@ -380,12 +381,12 @@
 
   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
   msg->header.size = htons (sizeof (*msg) + buf_size);
-  msg->message_id = mh->message_id;
+  msg->message_id = GNUNET_htonll (mh->message_id);
   msg->group_generation = mh->group_generation;
 
   /* FIXME: add fragment ID and signature in the service instead of here */
-  msg->fragment_id = orig->next_fragment_id++;
-  msg->fragment_offset = mh->fragment_offset;
+  msg->fragment_id = GNUNET_ntohll (orig->next_fragment_id++);
+  msg->fragment_offset = GNUNET_ntohll (mh->fragment_offset);
   mh->fragment_offset += sizeof (*msg) + buf_size;
   msg->purpose.size = htonl (sizeof (*msg) + buf_size
                              - sizeof (msg->header)

Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c       2014-01-06 00:09:40 UTC (rev 
31791)
+++ gnunet/src/psyc/gnunet-service-psyc.c       2014-01-06 00:09:43 UTC (rev 
31792)
@@ -171,8 +171,8 @@
 };
 
 
-static void
-transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay);
+static inline void
+transmit_message (struct Channel *ch);
 
 
 /**
@@ -205,6 +205,7 @@
     struct Master *mst = (struct Master *) ch;
     if (NULL != mst->origin)
       GNUNET_MULTICAST_origin_stop (mst->origin);
+    GNUNET_CONTAINER_multihashmap_remove (clients, &mst->pub_key_hash, mst);
   }
   else
   {
@@ -251,7 +252,7 @@
   /* Send pending messages to multicast before cleanup. */
   if (NULL != ch->tmit_head)
   {
-    transmit_message (ch, GNUNET_TIME_UNIT_ZERO);
+    transmit_message (ch);
   }
   else
   {
@@ -321,6 +322,10 @@
   const struct GNUNET_MessageHeader *msg = cls;
   struct Channel *ch = chan;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending message of type %u and size %u to client 0x%zx.\n",
+              ntohs (msg->type), ntohs (msg->size), ch->client);
+
   GNUNET_SERVER_notification_context_add (nc, ch->client);
   GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO);
 
@@ -363,36 +368,38 @@
                                       GNUNET_MULTICAST_MessageHeader *) msg,
                                      0, NULL, NULL);
 
+#if TODO
+    /* FIXME: apply modifiers to state in PSYCstore */
+    GNUNET_PSYCSTORE_state_modify (store, chan_key,
+                                   GNUNET_ntohll (mmsg->message_id),
+                                   meth->mod_count, mods,
+                                   rcb, rcb_cls);
+#endif
+
+    const struct GNUNET_MULTICAST_MessageHeader *mmsg
+      = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
+    struct GNUNET_PSYC_MessageHeader *pmsg;
+
     uint16_t size = ntohs (msg->size);
     uint16_t psize = 0;
     uint16_t pos = 0;
 
-    for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
+    for (pos = 0; sizeof (*mmsg) + pos < size; pos += psize)
     {
       const struct GNUNET_MessageHeader *pmsg
-        = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
-      uint16_t psize = ntohs (pmsg->size);
-      if (sizeof (*msg) + pos + psize > size)
+        = (const struct GNUNET_MessageHeader *) ((char *) &mmsg[1] + pos);
+      psize = ntohs (pmsg->size);
+      if (psize < sizeof (*pmsg) || sizeof (*mmsg) + pos + psize > size)
       {
         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                    "Message received from multicast contains invalid PSYC "
-                    "message. Not sending to clients.\n");
+                    "Received invalid message part of type %u and size %u "
+                    "from multicast. Not sending to clients.\n",
+                    ntohs (pmsg->type), psize);
+        GNUNET_break_op (0);
         return;
       }
     }
 
-#if TODO
-    /* FIXME: apply modifiers to state in PSYCstore */
-    GNUNET_PSYCSTORE_state_modify (store, chan_key,
-                                   GNUNET_ntohll (mmsg->message_id),
-                                   meth->mod_count, mods,
-                                   rcb, rcb_cls);
-#endif
-
-    const struct GNUNET_MULTICAST_MessageHeader *mmsg
-      = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
-    struct GNUNET_PSYC_MessageHeader *pmsg;
-
     psize = sizeof (*pmsg) + size - sizeof (*mmsg);
     pmsg = GNUNET_malloc (psize);
     pmsg->header.size = htons (psize);
@@ -572,19 +579,18 @@
 
 
 /**
- * Send transmission acknowledgement to a client.
+ * Send acknowledgement to a client.
  *
- * Sent after the last GNUNET_PSYC_MessageModifier and after each
- * GNUNET_PSYC_MessageData.
+ * Sent after a message fragment has been passed on to multicast.
  *
  * @param ch The channel struct for the client.
  */
 static void
-send_transmit_ack (struct Channel *ch)
+send_message_ack (struct Channel *ch)
 {
   struct GNUNET_MessageHeader res;
   res.size = htons (sizeof (res));
-  res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK);
+  res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
 
   GNUNET_SERVER_notification_context_add (nc, ch->client);
   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res,
@@ -599,9 +605,9 @@
 transmit_notify (void *cls, size_t *data_size, void *data)
 {
   struct Channel *ch = cls;
-  struct TransmitMessage *msg = ch->tmit_head;
+  struct TransmitMessage *tmit_msg = ch->tmit_head;
 
-  if (NULL == msg || *data_size < msg->size)
+  if (NULL == tmit_msg || *data_size < tmit_msg->size)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to 
send.\n");
     *data_size = 0;
@@ -609,21 +615,22 @@
   }
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "transmit_notify: sending %u bytes.\n", msg->size);
+              "transmit_notify: sending %u bytes.\n", tmit_msg->size);
 
-  *data_size = msg->size;
-  memcpy (data, msg->buf, *data_size);
+  *data_size = tmit_msg->size;
+  memcpy (data, tmit_msg->buf, *data_size);
 
-  GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg);
-  GNUNET_free (msg);
+  GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
+  GNUNET_free (tmit_msg);
 
   int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
+  send_message_ack (ch);
 
   if (0 == ch->tmit_task)
   {
     if (NULL != ch->tmit_head)
     {
-      transmit_message (ch, GNUNET_TIME_UNIT_ZERO);
+      transmit_message (ch);
     }
     else if (ch->disconnected)
     {
@@ -640,11 +647,9 @@
  * Transmit a message from a channel master to the multicast group.
  */
 static void
-master_transmit_message (void *cls,
-                         const struct GNUNET_SCHEDULER_TaskContext *tc)
+master_transmit_message (struct Master *mst)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n");
-  struct Master *mst = cls;
   mst->channel.tmit_task = 0;
   if (NULL == mst->tmit_handle)
   {
@@ -664,10 +669,8 @@
  * Transmit a message from a channel slave to the multicast group.
  */
 static void
-slave_transmit_message (void *cls,
-                        const struct GNUNET_SCHEDULER_TaskContext *tc)
+slave_transmit_message (struct Slave *slv)
 {
-  struct Slave *slv = cls;
   slv->channel.tmit_task = 0;
   if (NULL == slv->tmit_handle)
   {
@@ -682,215 +685,86 @@
 }
 
 
-/**
- * Schedule message transmission from a channel to the multicast group.
- *
- * @param ch The channel.
- * @param delay Transmission delay.
- */
-static void
-transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay)
+static inline void
+transmit_message (struct Channel *ch)
 {
-  if (0 != ch->tmit_task)
-    GNUNET_SCHEDULER_cancel (ch->tmit_task);
-
-  ch->tmit_task
-    = ch->is_master
-    ? GNUNET_SCHEDULER_add_delayed (delay, master_transmit_message, ch)
-    : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch);
+  ch->is_master
+    ? master_transmit_message ((struct Master *) ch)
+    : slave_transmit_message ((struct Slave *) ch);
 }
 
 
-/**
- * Queue incoming message parts from a client for transmission, and send them 
to
- * the multicast group when the buffer is full or reached the end of message.
- *
- * @param ch Channel struct for the client.
- * @param msg Message from the client.
- *
- * @return #GNUNET_OK on success, else #GNUNET_SYSERR.
- */
-static int
-queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
-{
-  uint16_t size = ntohs (msg->size);
-  struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO;
-  struct TransmitMessage *tmit_msg = ch->tmit_tail;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Queueing message of type %u and size %u "
-              "for transmission to multicast.\n",
-              ntohs (msg->type), size);
-
-  if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size)
-    return GNUNET_SYSERR;
-
-  if (NULL == tmit_msg
-      || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit_msg->size + size)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Appending message to new buffer.\n");
-    /* Start filling up new buffer */
-    tmit_msg = GNUNET_new (struct TransmitMessage);
-    tmit_msg->buf = GNUNET_malloc (size);
-    memcpy (tmit_msg->buf, msg, size);
-    tmit_msg->size = size;
-    tmit_msg->state = ch->tmit_state;
-    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
-  }
-  else
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Appending message to existing buffer.\n");
-    /* Append to existing buffer */
-    tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size);
-    memcpy (tmit_msg->buf + tmit_msg->size, msg, size);
-    tmit_msg->size += size;
-    tmit_msg->state = ch->tmit_state;
-  }
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size);
-
-  /* Wait a bit for the remaining message parts from the client
-     if there's still some space left in the buffer. */
-  if (tmit_msg->state < MSG_STATE_END
-      && (tmit_msg->size + sizeof (struct GNUNET_MessageHeader)
-          < GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD))
-  {
-    tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2);
-  }
-  else
-  {
-    send_transmit_ack (ch);
-  }
-
-  transmit_message (ch, tmit_delay);
-
-  return GNUNET_OK;
-}
-
-
 static void
 transmit_error (struct Channel *ch)
 {
-  struct GNUNET_MessageHeader *msg = GNUNET_malloc (sizeof (*msg));
+  struct GNUNET_MessageHeader *msg;
+  struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg)
+                                                    + sizeof (*msg));
+  msg = (struct GNUNET_MessageHeader *) &tmit_msg[1];
   msg->size = ntohs (sizeof (*msg));
   msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
-  queue_message (ch, msg);
 
+  tmit_msg->buf = (char *) &tmit_msg[1];
+  tmit_msg->size = sizeof (*msg);
+  tmit_msg->state = ch->tmit_state;
+  GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
+  transmit_message (ch);
+
+  /* FIXME: cleanup */
   GNUNET_SERVER_client_disconnect (ch->client);
 }
 
-/**
- * Incoming method from a client.
- */
-static void
-handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
-                        const struct GNUNET_MessageHeader *msg)
-{
-  /* const struct GNUNET_PSYC_MessageMethod *meth
-     = (const struct GNUNET_PSYC_MessageMethod *) msg; */
-  struct Channel *ch
-    = GNUNET_SERVER_client_get_user_context (client, struct Channel);
-  GNUNET_assert (NULL != ch);
 
-  if (MSG_STATE_START != ch->tmit_state)
-  {
-    transmit_error (ch);
-    return;
-  }
-  ch->tmit_state = MSG_STATE_METHOD;
-
-  queue_message (ch, msg);
-  send_transmit_ack (ch);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
-};
-
-
 /**
- * Incoming modifier from a client.
+ * Incoming message from a client.
  */
 static void
-handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
-                          const struct GNUNET_MessageHeader *msg)
+handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
+                         const struct GNUNET_MessageHeader *msg)
 {
-  const struct GNUNET_PSYC_MessageModifier *mod
-    = (const struct GNUNET_PSYC_MessageModifier *) msg;
-
   struct Channel *ch
     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
   GNUNET_assert (NULL != ch);
 
-  if (MSG_STATE_METHOD != ch->tmit_state
-      || MSG_STATE_MODIFIER != ch->tmit_state
-      || MSG_STATE_MOD_CONT != ch->tmit_state
-      || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size)
-  {
-    transmit_error (ch);
-    return;
-  }
-  ch->tmit_mod_value_size_expected = ntohl (mod->value_size);
-  ch->tmit_mod_value_size = ntohs (msg->size) - ntohs(mod->name_size) - 1;
+  uint16_t size = ntohs (msg->size);
+  uint16_t psize = 0, pos = 0;
 
-  queue_message (ch, msg);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
-};
-
-
-/**
- * Incoming modifier from a client.
- */
-static void
-handle_transmit_mod_cont (void *cls, struct GNUNET_SERVER_Client *client,
-                          const struct GNUNET_MessageHeader *msg)
-{
-  struct Channel *ch
-    = GNUNET_SERVER_client_get_user_context (client, struct Channel);
-  GNUNET_assert (NULL != ch);
-
-  ch->tmit_mod_value_size += ntohs (msg->size);
-
-  if (MSG_STATE_MODIFIER != ch->tmit_state
-      || MSG_STATE_MOD_CONT != ch->tmit_state
-      || ch->tmit_mod_value_size_expected < ch->tmit_mod_value_size)
+  if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Message payload too large\n");
+    GNUNET_break (0);
     transmit_error (ch);
     return;
   }
-  ch->tmit_state = MSG_STATE_MOD_CONT;
 
-  queue_message (ch, msg);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
-};
-
-
-/**
- * Incoming data from a client.
- */
-static void
-handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
-                      const struct GNUNET_MessageHeader *msg)
-{
-  struct Channel *ch
-    = GNUNET_SERVER_client_get_user_context (client, struct Channel);
-  GNUNET_assert (NULL != ch);
-
-  if (MSG_STATE_METHOD != ch->tmit_state
-      || MSG_STATE_MODIFIER != ch->tmit_state
-      || MSG_STATE_MOD_CONT != ch->tmit_state
-      || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size)
+  for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
   {
-    transmit_error (ch);
-    return;
+    const struct GNUNET_MessageHeader *pmsg
+      = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
+    psize = ntohs (pmsg->size);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Received message part of type %u and size %u "
+                "from client.\n", ntohs (pmsg->type), psize);
+    if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Received invalid message part of type %u and size %u "
+                  "from client.\n", ntohs (pmsg->type), psize);
+      GNUNET_break (0);
+      transmit_error (ch);
+      return;
+    }
   }
-  ch->tmit_state = MSG_STATE_DATA;
 
-  queue_message (ch, msg);
-  send_transmit_ack (ch);
+  size -= sizeof (*msg);
+  struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
+  tmit_msg->buf = (char *) &tmit_msg[1];
+  memcpy (tmit_msg->buf, &msg[1], size);
+  tmit_msg->size = size;
+  tmit_msg->state = ch->tmit_state;
+  GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
+  transmit_message (ch);
 
-  if (MSG_STATE_END <= ch->tmit_state)
-    ch->tmit_state = MSG_STATE_START;
-
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 };
 
@@ -912,22 +786,9 @@
 
     { &handle_slave_join, NULL,
       GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
-#if TODO
+
     { &handle_psyc_message, NULL,
       GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
-#endif
-    { &handle_transmit_method, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 },
-
-    { &handle_transmit_modifier, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 },
-
-    { &handle_transmit_mod_cont, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT, 0 },
-
-    { &handle_transmit_data, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 },
-    { NULL, NULL, 0, 0 }
   };
 
   cfg = c;

Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c  2014-01-06 00:09:40 UTC (rev 31791)
+++ gnunet/src/psyc/psyc_api.c  2014-01-06 00:09:43 UTC (rev 31792)
@@ -45,7 +45,7 @@
 {
   struct OperationHandle *prev;
   struct OperationHandle *next;
-  const struct GNUNET_MessageHeader *msg;
+  struct GNUNET_MessageHeader *msg;
 };
 
 /**
@@ -79,6 +79,11 @@
   struct OperationHandle *tmit_tail;
 
   /**
+   * Message being transmitted to the PSYC service.
+   */
+  struct OperationHandle *tmit_msg;
+
+  /**
    * Message to send on reconnect.
    */
   struct GNUNET_MessageHeader *reconnect_msg;
@@ -139,11 +144,6 @@
   uint32_t recv_mod_value_size;
 
   /**
-   * Buffer space available for transmitting the next data fragment.
-   */
-  uint16_t tmit_size; // FIXME
-
-  /**
    * Is transmission paused?
    */
   uint8_t tmit_paused;
@@ -151,7 +151,7 @@
   /**
    * Are we still waiting for a PSYC_TRANSMIT_ACK?
    */
-  uint8_t tmit_ack_pending; // FIXME
+  uint8_t tmit_ack_pending;
 
   /**
    * Are we polling for incoming messages right now?
@@ -176,7 +176,7 @@
 struct GNUNET_PSYC_MasterTransmitHandle
 {
   struct GNUNET_PSYC_Master *master;
-  GNUNET_PSYC_MasterTransmitNotify notify_mod;
+  GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod;
   GNUNET_PSYC_MasterTransmitNotify notify_data;
   void *notify_cls;
   enum MessageState state;
@@ -246,16 +246,14 @@
 };
 
 
-/**
- * Try again to connect to the PSYC service.
- *
- * @param cls Handle to the PSYC service.
- * @param tc Scheduler context
- */
 static void
 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
+static void
+master_transmit_data (struct GNUNET_PSYC_Master *mst);
+
+
 /**
  * Reschedule a connect attempt to the service.
  *
@@ -323,7 +321,80 @@
     message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL);
 }
 
+
 /**
+ * Queue an incoming message part for transmission to the PSYC service.
+ *
+ * The message part is added to the current message buffer.
+ * When this buffer is full, it is added to the transmission queue.
+ *
+ * @param ch Channel struct for the client.
+ * @param msg Modifier message part, or NULL when there's no more modifiers.
+ * @param end End of message.
+ */
+static void
+queue_message (struct GNUNET_PSYC_Channel *ch,
+               const struct GNUNET_MessageHeader *msg,
+               uint8_t end)
+{
+  uint16_t size = msg ? ntohs (msg->size) : 0;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Queueing message of type %u and size %u (end: %u)).\n",
+       ntohs (msg->type), size, end);
+
+  struct OperationHandle *op = ch->tmit_msg;
+  if (NULL != op)
+  {
+    if (NULL == msg
+        || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < op->msg->size + size)
+    {
+      /* End of message or buffer is full, add it to transmission queue
+       * and start with empty buffer */
+      op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+      op->msg->size = htons (op->msg->size);
+      GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
+      ch->tmit_msg = op = NULL;
+      ch->tmit_ack_pending++;
+    }
+    else
+    {
+      /* Message fits in current buffer, append */
+      ch->tmit_msg = op
+        = GNUNET_realloc (op, sizeof (*op) + op->msg->size + size);
+      op->msg = (struct GNUNET_MessageHeader *) &op[1];
+      memcpy ((char *) op->msg + op->msg->size, msg, size);
+      op->msg->size += size;
+    }
+  }
+
+  if (NULL == op && NULL != msg)
+  {
+    /* Empty buffer, copy over message. */
+    ch->tmit_msg = op
+      = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) + size);
+    op->msg = (struct GNUNET_MessageHeader *) &op[1];
+    op->msg->size = sizeof (*op->msg) + size;
+    memcpy (&op->msg[1], msg, size);
+  }
+ 
+  if (NULL != op
+      && (end || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
+                  < op->msg->size + sizeof (struct GNUNET_MessageHeader))))
+  {
+    /* End of message or buffer is full, add it to transmission queue. */
+    op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+    op->msg->size = htons (op->msg->size);
+    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
+    ch->tmit_msg = op = NULL;
+    ch->tmit_ack_pending++;
+  }
+
+  transmit_next (ch);
+}
+
+
+/**
  * Request a modifier from a client to transmit.
  *
  * @param mst Master handle.
@@ -332,32 +403,71 @@
 master_transmit_mod (struct GNUNET_PSYC_Master *mst)
 {
   struct GNUNET_PSYC_Channel *ch = &mst->ch;
-  uint16_t max_data_size
-    = ch->tmit_size > sizeof (struct GNUNET_MessageHeader)
-    ? GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - ch->tmit_size
-    : GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD - ch->tmit_size;
-  uint16_t data_size = max_data_size;
+  uint16_t max_data_size, data_size;
+  char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
+  struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
+  int notify_ret;
 
-  struct GNUNET_MessageHeader *msg;
-  struct OperationHandle *op
-    = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size);
-  op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
-  msg->type
-    = MSG_STATE_MODIFIER == mst->tmit->state
-    ? htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER)
-    : htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
+  switch (mst->tmit->state)
+  {
+  case MSG_STATE_MODIFIER:
+  {
+    struct GNUNET_PSYC_MessageModifier *mod
+      = (struct GNUNET_PSYC_MessageModifier *) msg;
+    max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
+    msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
+    msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
+    notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls,
+                                        &data_size, &mod[1], &mod->oper);
+    mod->name_size = strnlen ((char *) &mod[1], data_size);
+    if (mod->name_size < data_size)
+    {
+      mod->oper = htons (mod->oper);
+      mod->value_size = htons (data_size - 1 - mod->name_size);
+      mod->name_size = htons (mod->name_size);
+    }
+    else if (0 < data_size)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
+      notify_ret = GNUNET_SYSERR;
+    }
+    break;
+  }
+  case MSG_STATE_MOD_CONT:
+  {
+    max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
+    msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);    
+    msg->size = sizeof (struct GNUNET_MessageHeader);
+    notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls,
+                                        &data_size, &msg[1], NULL);
+    break;
+  }
+  default:
+    GNUNET_assert (0);
+  }
 
-  int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls,
-                                           &data_size, &msg[1]);
   switch (notify_ret)
   {
   case GNUNET_NO:
-    if (0 != data_size)
-      mst->tmit->state = MSG_STATE_MOD_CONT;
+    if (0 == data_size)
+    { /* Transmission paused, nothing to send. */
+      ch->tmit_paused = GNUNET_YES;
+      return;
+    }
+    mst->tmit->state = MSG_STATE_MOD_CONT;
     break;
 
   case GNUNET_YES:
-    mst->tmit->state = (0 == data_size) ? MSG_STATE_DATA : MSG_STATE_MODIFIER;
+    if (0 == data_size)
+    {
+      /* End of modifiers. */
+      mst->tmit->state = MSG_STATE_DATA;
+      if (0 == ch->tmit_ack_pending)
+        master_transmit_data (mst);
+
+      return;
+    }
+    mst->tmit->state = MSG_STATE_MODIFIER;
     break;
 
   default:
@@ -368,36 +478,18 @@
     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
     msg->size = htons (sizeof (*msg));
 
-    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
-    transmit_next (ch);
+    queue_message (ch, msg, GNUNET_YES);
     return;
   }
 
-  if ((GNUNET_NO == notify_ret && 0 == data_size))
-  {
-    /* Transmission paused, nothing to send. */
-    ch->tmit_paused = GNUNET_YES;
-    GNUNET_free (op);
-  }
-
   if (0 < data_size)
   {
-    GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
-    msg->size = htons (sizeof (*msg) + data_size);
-    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
+    GNUNET_assert (data_size <= max_data_size);
+    msg->size = htons (msg->size + data_size);
+    queue_message (ch, msg, GNUNET_NO);
   }
 
-  /* End of message. */
-  if (GNUNET_YES == notify_ret)
-  {
-    op = GNUNET_malloc (sizeof *(op) + sizeof (*msg));
-    op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
-    msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
-    msg->size = htons (sizeof (*msg));
-    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
-  }
-
-  transmit_next (ch);
+  master_transmit_mod (mst);
 }
 
 
@@ -410,11 +502,10 @@
 master_transmit_data (struct GNUNET_PSYC_Master *mst)
 {
   struct GNUNET_PSYC_Channel *ch = &mst->ch;
-  struct GNUNET_MessageHeader *msg;
   uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
-  struct OperationHandle *op
-    = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size);
-  op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
+  char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
+  struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
+
   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
 
   int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls,
@@ -426,7 +517,7 @@
     {
       /* Transmission paused, nothing to send. */
       ch->tmit_paused = GNUNET_YES;
-      GNUNET_free (op);
+      return;
     }
     break;
 
@@ -441,9 +532,7 @@
     mst->tmit->state = MSG_STATE_START;
     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
     msg->size = htons (sizeof (*msg));
-
-    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
-    transmit_next (ch);
+    queue_message (ch, msg, GNUNET_YES);
     return;
   }
 
@@ -451,20 +540,16 @@
   {
     GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
     msg->size = htons (sizeof (*msg) + data_size);
-    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
+    queue_message (ch, msg, !notify_ret);
   }
 
   /* End of message. */
   if (GNUNET_YES == notify_ret)
   {
-    op = GNUNET_malloc (sizeof *(op) + sizeof (*msg));
-    op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
     msg->size = htons (sizeof (*msg));
-    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
+    queue_message (ch, msg, GNUNET_YES);
   }
-
-  transmit_next (ch);
 }
 
 
@@ -476,57 +561,55 @@
  */
 static void
 handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
-                     const struct GNUNET_PSYC_MessageHeader *pmsg)
+                     const struct GNUNET_PSYC_MessageHeader *msg)
 {
-  const struct GNUNET_MessageHeader *msg;
-  uint16_t msize = ntohs (pmsg->header.size);
-  uint16_t pos = 0;
-  uint16_t size = 0;
-  uint16_t type, size_eq, size_min;
+  uint16_t size = ntohs (msg->header.size);
 
   if (MSG_STATE_START == ch->recv_state)
   {
-    ch->recv_message_id = GNUNET_ntohll (pmsg->message_id);
-    ch->recv_flags = ntohl (pmsg->flags);
+    ch->recv_message_id = GNUNET_ntohll (msg->message_id);
+    ch->recv_flags = ntohl (msg->flags);
   }
-  else if (GNUNET_ntohll (pmsg->message_id) != ch->recv_message_id)
+  else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
-         GNUNET_ntohll (pmsg->message_id), ch->recv_message_id);
+         GNUNET_ntohll (msg->message_id), ch->recv_message_id);
     GNUNET_break_op (0);
     recv_error (ch);
   }
-  else if (ntohl (pmsg->flags) != ch->recv_flags)
+  else if (ntohl (msg->flags) != ch->recv_flags)
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
          "Unexpected message flags. Got: %lu, expected: %lu\n",
-         ntohl (pmsg->flags), ch->recv_flags);
+         ntohl (msg->flags), ch->recv_flags);
     GNUNET_break_op (0);
     recv_error (ch);
   }
 
-  for (pos = 0; sizeof (*pmsg) + pos < msize; pos += size)
+  uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
+
+  for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
   {
-    msg = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
-    size = ntohs (msg->size);
-    type = ntohs (msg->type);
+    const struct GNUNET_MessageHeader *pmsg
+      = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
+    psize = ntohs (pmsg->size);
+    ptype = ntohs (pmsg->type);
     size_eq = size_min = 0;
 
-    if (msize < sizeof (*pmsg) + pos + size)
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Received message part of type %u and size %u from PSYC.\n",
+                ptype, psize);
+
+    if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "Discarding message of type %u with invalid size. "
-                  "(%u < %u + %u + %u)\n", ntohs (msg->type),
-                  msize, sizeof (*msg), pos, size);
+                  "Discarding message of type %u with invalid size %u.\n",
+                  ptype, psize);
       break;
     }
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Received message part of type %u and size %u from PSYC.\n",
-                ntohs (msg->type), size);
 
-
-    switch (type)
+    switch (ptype)
     {
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
       size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
@@ -534,6 +617,7 @@
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
       size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
       break;
+    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
       size_min = sizeof (struct GNUNET_MessageHeader);
       break;
@@ -543,22 +627,22 @@
       break;
     }
 
-    if (! ((0 < size_eq && size == size_eq)
-           || (0 < size_min && size_min <= size)))
+    if (! ((0 < size_eq && psize == size_eq)
+           || (0 < size_min && size_min <= psize)))
     {
       GNUNET_break (0);
       reschedule_connect (ch);
       return;
     }
 
-    switch (type)
+    switch (ptype)
     {
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
     {
       struct GNUNET_PSYC_MessageMethod *meth
-        = (struct GNUNET_PSYC_MessageMethod *) msg;
+        = (struct GNUNET_PSYC_MessageMethod *) pmsg;
 
-      if (MSG_STATE_HEADER != ch->recv_state)
+      if (MSG_STATE_START != ch->recv_state)
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
              "Discarding out of order message method.\n");
@@ -568,89 +652,66 @@
          */
         GNUNET_break_op (0);
         recv_error (ch);
-        break;
+        return;
       }
 
-      if ('\0' != (char *) meth + msg->size - 1)
+      if ('\0' != *((char *) meth + psize - 1))
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
              "Discarding message with malformed method. "
              "Message ID: %" PRIu64 "\n", ch->recv_message_id);
         GNUNET_break_op (0);
         recv_error (ch);
-        break;
+        return;
       }
-      GNUNET_PSYC_MessageCallback message_cb
-        = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
-        ? ch->hist_message_cb
-        : ch->message_cb;
-
-      if (NULL != message_cb)
-        message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
-
       ch->recv_state = MSG_STATE_METHOD;
       break;
     }
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
     {
-      if (MSG_STATE_MODIFIER != ch->recv_state)
+      if (!(MSG_STATE_METHOD == ch->recv_state
+            || MSG_STATE_MODIFIER == ch->recv_state
+            || MSG_STATE_MOD_CONT == ch->recv_state))
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
              "Discarding out of order message modifier.\n");
         GNUNET_break_op (0);
         recv_error (ch);
-        break;
+        return;
       }
 
       struct GNUNET_PSYC_MessageModifier *mod
-        = (struct GNUNET_PSYC_MessageModifier *) msg;
+        = (struct GNUNET_PSYC_MessageModifier *) pmsg;
 
       uint16_t name_size = ntohs (mod->name_size);
       ch->recv_mod_value_size_expected = ntohs (mod->value_size);
-      ch->recv_mod_value_size = size - sizeof (*mod) - name_size - 1;
+      ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
 
-      if (size < sizeof (*mod) + name_size + 1
-          || '\0' != (char *) &mod[1] + mod->name_size
+      if (psize < sizeof (*mod) + name_size + 1
+          || '\0' != *((char *) &mod[1] + name_size)
           || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
       {
         LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n");
         GNUNET_break_op (0);
-        break;
+        return;
       }
-
       ch->recv_state = MSG_STATE_MODIFIER;
-
-      GNUNET_PSYC_MessageCallback message_cb
-        = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
-        ? ch->hist_message_cb
-        : ch->message_cb;
-
-      if (NULL != message_cb)
-        message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
-
       break;
     }
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
     {
-      ch->recv_mod_value_size += size - sizeof (*msg);
+      ch->recv_mod_value_size += psize - sizeof (*pmsg);
 
-      if (MSG_STATE_MODIFIER != ch->recv_state
+      if (!(MSG_STATE_MODIFIER == ch->recv_state
+            || MSG_STATE_MOD_CONT == ch->recv_state)
           || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
              "Discarding out of order message modifier continuation.\n");
         GNUNET_break_op (0);
         recv_reset (ch);
-        break;
+        return;
       }
-
-      GNUNET_PSYC_MessageCallback message_cb
-        = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
-        ? ch->hist_message_cb
-        : ch->message_cb;
-
-      if (NULL != message_cb)
-        message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
       break;
     }
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
@@ -662,12 +723,23 @@
              "Discarding out of order message data fragment.\n");
         GNUNET_break_op (0);
         recv_reset (ch);
-        break;
+        return;
       }
-
       ch->recv_state = MSG_STATE_DATA;
       break;
     }
+    }
+
+    GNUNET_PSYC_MessageCallback message_cb
+      = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
+      ? ch->hist_message_cb
+      : ch->message_cb;
+
+    if (NULL != message_cb)
+      message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg);
+
+    switch (ptype)
+    {
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
       recv_reset (ch);
@@ -717,18 +789,7 @@
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
     size_min = sizeof (struct GNUNET_PSYC_MessageHeader);
     break;
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
-    size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
-    break;
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
-    size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
-    break;
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
-    size_min = sizeof (struct GNUNET_MessageHeader);
-    break;
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
-  case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
     size_eq = sizeof (struct GNUNET_MessageHeader);
     break;
   }
@@ -761,9 +822,15 @@
 #endif
     break;
   }
-  case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
   {
-    ch->tmit_ack_pending = GNUNET_NO;
+    if (0 == ch->tmit_ack_pending)
+    {
+      LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
+      GNUNET_break (0);
+      break;
+    }
+    ch->tmit_ack_pending--;
 
     if (ch->is_master)
     {
@@ -771,10 +838,6 @@
       switch (mst->tmit->state)
       {
       case MSG_STATE_MODIFIER:
-        if (GNUNET_NO == ch->tmit_paused)
-          master_transmit_mod (mst);
-        break;
-
       case MSG_STATE_MOD_CONT:
         if (GNUNET_NO == ch->tmit_paused)
           master_transmit_mod (mst);
@@ -795,12 +858,13 @@
         else
         {
           LOG (GNUNET_ERROR_TYPE_WARNING,
-               "Ignoring transmit ack, there's no transmission going on.\n");
+               "Ignoring message ACK, there's no transmission going on.\n");
+          GNUNET_break (0);
         }
         break;
       default:
-        LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Ignoring unexpected transmit ack.\n");
+        LOG (GNUNET_ERROR_TYPE_DEBUG,
+             "Ignoring message ACK in state %u.\n", mst->tmit->state);
       }
     }
     else
@@ -811,12 +875,15 @@
   }
 
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
-    handle_psyc_message(ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
+    handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
     break;
   }
 
-  GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
-                         GNUNET_TIME_UNIT_FOREVER_REL);
+  if (NULL != ch->client)
+  {
+    GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
+                           GNUNET_TIME_UNIT_FOREVER_REL);
+  }
 }
 
 
@@ -1029,6 +1096,8 @@
 GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
 {
   disconnect (master);
+  if (NULL != master->tmit)
+    GNUNET_free (master->tmit);
   GNUNET_free (master);
 }
 
@@ -1069,30 +1138,6 @@
 }
 
 
-/* FIXME: split up value into <64K chunks and transmit the continuations in
- *        MOD_CONT msgs */
-static int
-send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod)
-{
-  struct GNUNET_PSYC_Channel *ch = cls;
-  size_t name_size = strlen (mod->name) + 1;
-  struct GNUNET_PSYC_MessageModifier *pmod;
-  struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmod)
-                                              + name_size + mod->value_size);
-  pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
-  op->msg = (struct GNUNET_MessageHeader *) pmod;
-
-  pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
-  pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size);
-  pmod->name_size = htons (name_size);
-  memcpy (&pmod[1], mod->name, name_size);
-  memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size);
-
-  GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
-  return GNUNET_YES;
-}
-
-
 /**
  * Send a message to call a method to all members in the PSYC channel.
  *
@@ -1107,7 +1152,7 @@
 struct GNUNET_PSYC_MasterTransmitHandle *
 GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
                              const char *method_name,
-                             GNUNET_PSYC_MasterTransmitNotify notify_mod,
+                             GNUNET_PSYC_MasterTransmitNotifyModifier 
notify_mod,
                              GNUNET_PSYC_MasterTransmitNotify notify_data,
                              void *notify_cls,
                              enum GNUNET_PSYC_MasterTransmitFlags flags)
@@ -1120,25 +1165,27 @@
 
   size_t size = strlen (method_name) + 1;
   struct GNUNET_PSYC_MessageMethod *pmeth;
-  struct OperationHandle *op
-    = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth) + size);
-  pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1];
-  op->msg = (struct GNUNET_MessageHeader *) pmeth;
+  struct OperationHandle *op;
 
+  ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
+                                     + sizeof (*pmeth) + size);
+  op->msg = (struct GNUNET_MessageHeader *) &op[1];
+  op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
+
+  pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
   pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
   pmeth->header.size = htons (sizeof (*pmeth) + size);
   pmeth->flags = htonl (flags);
   memcpy (&pmeth[1], method_name, size);
 
-  GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
-  transmit_next (ch);
-
   master->tmit = GNUNET_malloc (sizeof (*master->tmit));
   master->tmit->master = master;
   master->tmit->notify_mod = notify_mod;
   master->tmit->notify_data = notify_data;
   master->tmit->notify_cls = notify_cls;
-  master->tmit->state = MSG_STATE_START; // FIXME
+  master->tmit->state = MSG_STATE_MODIFIER;
+
+  master_transmit_mod (master);
   return master->tmit;
 }
 
@@ -1152,7 +1199,7 @@
 GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle 
*th)
 {
   struct GNUNET_PSYC_Channel *ch = &th->master->ch;
-  if (GNUNET_NO == ch->tmit_ack_pending)
+  if (0 == ch->tmit_ack_pending)
   {
     ch->tmit_paused = GNUNET_NO;
     master_transmit_data (th->master);

Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2014-01-06 00:09:40 UTC (rev 31791)
+++ gnunet/src/psyc/test_psyc.c 2014-01-06 00:09:43 UTC (rev 31792)
@@ -25,6 +25,8 @@
  * @author Christian Grothoff
  */
 
+#include <inttypes.h>
+
 #include "platform.h"
 #include "gnunet_crypto_lib.h"
 #include "gnunet_common.h"
@@ -35,7 +37,7 @@
 
 #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
 
-#define DEBUG_SERVICE 1
+#define DEBUG_SERVICE 0
 
 
 /**
@@ -62,17 +64,37 @@
 
 struct GNUNET_PSYC_MasterTransmitHandle *mth;
 
+struct TransmitClosure
+{
+  struct GNUNET_PSYC_MasterTransmitHandle *handle;
+  struct GNUNET_ENV_Environment *env;
+  char *data[16];
+  const char *mod_value;
+  size_t mod_value_size;
+  uint8_t data_count;
+  uint8_t paused;
+  uint8_t n;
+};
+
+struct TransmitClosure *tmit;
+
 /**
  * Clean up all resources used.
  */
 static void
 cleanup ()
 {
-  if (mst != NULL)
+  if (NULL != mst)
   {
     GNUNET_PSYC_master_stop (mst);
     mst = NULL;
   }
+  if (NULL != tmit)
+  {
+    GNUNET_ENV_environment_destroy (tmit->env);
+    GNUNET_free (tmit);
+    tmit = NULL;
+  }
   GNUNET_SCHEDULER_shutdown ();
 }
 
@@ -121,46 +143,42 @@
 }
 
 
-static int
-method (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
-        uint64_t message_id, const char *name,
-        size_t modifier_count, const struct GNUNET_ENV_Modifier *modifiers,
-        uint64_t data_offset, const void *data, size_t data_size,
-        enum GNUNET_PSYC_MessageFlags flags)
+static void
+message (void *cls, uint64_t message_id, uint32_t flags,
+         const struct GNUNET_MessageHeader *msg)
 {
+  if (NULL == msg)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Error while receiving message %llu\n", message_id);
+    return;
+  }
+
+  uint16_t type = ntohs (msg->type);
+  uint16_t size = ntohs (msg->size);
+
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-              "Method: %s, modifiers: %lu, flags: %u\n%.*s\n",
-              name, modifier_count, flags, data_size, data);
-  return GNUNET_OK;
+              "Got message part of type %u and size %u "
+              "belonging to message ID %llu with flags %u\n",
+              type, size, message_id, flags);
+
+  if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
+    end ();
 }
 
 
-static int
-join (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
-      const char *method_name,
-      size_t variable_count, const struct GNUNET_ENV_Modifier *variables,
-      const void *data, size_t data_size, struct GNUNET_PSYC_JoinHandle *jh)
+static void
+join_request (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
+              const char *method_name,
+              size_t variable_count, const struct GNUNET_ENV_Modifier 
*variables,
+              const void *data, size_t data_size,
+              struct GNUNET_PSYC_JoinHandle *jh)
 {
-  return GNUNET_OK;
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              "Got join request.");
 }
 
 
-struct TransmitClosure
-{
-  struct GNUNET_PSYC_MasterTransmitHandle *handle;
-
-  char *mod_names[16];
-  char *mod_values[16];
-  char *data[16];
-
-  uint8_t mod_count;
-  uint8_t data_count;
-
-  uint8_t paused;
-  uint8_t n;
-};
-
-
 static void
 transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
@@ -172,45 +190,95 @@
 
 
 static int
-tmit_notify_mod (void *cls, size_t *data_size, void *data)
+tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper)
 {
   struct TransmitClosure *tmit = cls;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Transmit notify modifier: %lu bytes available, "
-              "processing modifier %u/%u.\n",
-              *data_size, tmit->n + 1, tmit->fragment_count);
-  /* FIXME: continuation */
-  uint16_t name_size = strlen (tmit->mod_names[tmit->n]);
-  uint16_t value_size = strlen (tmit->mod_values[tmit->n]);
-  if (name_size + 1 + value_size <= *data_size)
-    return GNUNET_NO;
+              "%u modifiers left to process.\n",
+              *data_size, GNUNET_ENV_environment_get_count (tmit->env));
 
-  *data_size = name_size + 1 + value_size;
-  memcpy (data, tmit->fragments[tmit->n], *data_size);
+  enum GNUNET_ENV_Operator op = 0;
+  const char *name = NULL;
+  const char *value = NULL;
+  uint16_t name_size = 0;
+  size_t value_size = 0;
 
-  if (++tmit->n < tmit->mod_count)
-  {
-    return GNUNET_NO;
+  if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
+  { /* Modifier continuation */
+    value = tmit->mod_value;
+    if (tmit->mod_value_size <= *data_size)
+    {
+      value_size = tmit->mod_value_size;
+      tmit->mod_value = NULL;
+    }
+    else
+    {
+      value_size = *data_size;
+      tmit->mod_value += value_size;
+    }
+    tmit->mod_value_size -= value_size;
+
+    if (*data_size < value_size)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "value larger than buffer: %u < %zu\n",
+                  *data_size, value_size);
+      *data_size = 0;
+      return GNUNET_NO;
+    }
+
+    *data_size = value_size;
+    memcpy (data, value, value_size);
   }
-  else
+  else if (NULL != oper)
   {
-    tmit->n = 0;
-    return GNUNET_YES;
+    if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name,
+                                                   (void *) &value, 
&value_size))
+    { /* No more modifiers, continue with data */
+      *data_size = 0;
+      return GNUNET_YES;
+    }
+
+    *oper = op;
+    name_size = strlen (name);
+
+    if (name_size + 1 + value_size <= *data_size)
+    {
+      *data_size = name_size + 1 + value_size;
+    }
+    else
+    {
+      tmit->mod_value_size = value_size;
+      value_size = *data_size - name_size - 1;
+      tmit->mod_value_size -= value_size;
+      tmit->mod_value = value + value_size;
+    }
+
+    memcpy (data, name, name_size);
+    ((char *)data)[name_size] = '\0';
+    memcpy ((char *)data + name_size + 1, value, value_size);
   }
+
+  return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO;
 }
 
 
 static int
-tmit_notify_data (void *cls, size_t *data_size, void *data)
+tmit_notify_data (void *cls, uint16_t *data_size, void *data)
 {
   struct TransmitClosure *tmit = cls;
+  uint16_t size = strlen (tmit->data[tmit->n]);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Transmit notify data: %lu bytes available, "
-              "processing fragment %u/%u.\n",
-              *data_size, tmit->n + 1, tmit->fragment_count);
-  uint16_t size = strlen (tmit->data[tmit->n]);
-  if (size <= *data_size)
-    return GNUNET_NO;
+              "processing fragment %u/%u (size %u).\n",
+              *data_size, tmit->n + 1, tmit->data_count, size);
+  if (*data_size < size)
+  {
+    *data_size = 0;
+    GNUNET_assert (0);
+    return GNUNET_SYSERR;
+  }
 
   if (GNUNET_YES == tmit->paused && tmit->n == tmit->data_count - 1)
   {
@@ -231,19 +299,18 @@
 }
 
 
-void
+static void
 master_started (void *cls, uint64_t max_message_id)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Master started: %lu\n", max_message_id);
+              "Master started: %" PRIu64 "\n", max_message_id);
 
-  struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
-  GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
-                                  "_foo", "bar baz", 7);
-  GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
-                                  "_foo_bar", "foo bar baz", 11);
-
-  struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure);
+  tmit = GNUNET_new (struct TransmitClosure);
+  tmit->env = GNUNET_ENV_environment_create ();
+  GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+                              "_foo", "bar baz", 7);
+  GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+                              "_foo_bar", "foo bar baz", 11);
   tmit->data[0] = "foo";
   tmit->data[1] = "foo bar";
   tmit->data[2] = "foo bar baz";
@@ -255,7 +322,7 @@
 }
 
 
-void
+static void
 slave_joined (void *cls, uint64_t max_message_id)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", 
max_message_id);
@@ -288,19 +355,19 @@
   GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key);
   GNUNET_CRYPTO_eddsa_key_get_public (slave_key, &slave_pub_key);
 
-  mst = GNUNET_PSYC_master_start (cfg, channel_key,
-                                  GNUNET_PSYC_CHANNEL_PRIVATE,
-                                  &method, &join, &master_started, NULL);
-  return;
+  mst = GNUNET_PSYC_master_start (cfg, channel_key, 
GNUNET_PSYC_CHANNEL_PRIVATE,
+                                  &message, &join_request, &master_started, 
NULL);
+  return; /* FIXME: test slave */
+
   struct GNUNET_PeerIdentity origin;
   struct GNUNET_PeerIdentity relays[16];
   struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
-  GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
-                                  "_foo", "bar baz", 7);
-  GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
-                                  "_foo_bar", "foo bar baz", 11);
+  GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
+                              "_foo", "bar baz", 7);
+  GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
+                              "_foo_bar", "foo bar baz", 11);
   slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
-                                16, relays, &method, &join, &slave_joined,
+                                16, relays, &message, &join_request, 
&slave_joined,
                                 NULL, "_request_join", env, "some data", 9);
   GNUNET_ENV_environment_destroy (env);
 }
@@ -319,8 +386,7 @@
                                        opts, &run, NULL))
     return 1;
 #else
-  if (0 != GNUNET_TESTING_service_run ("test-psyc", "psyc",
-                                       "test_psyc.conf", &run, NULL))
+  if (0 != GNUNET_TESTING_peer_run ("test-psyc", "test_psyc.conf", &run, NULL))
     return 1;
 #endif
   return res;

Modified: gnunet/src/psyc/test_psyc.conf
===================================================================
--- gnunet/src/psyc/test_psyc.conf      2014-01-06 00:09:40 UTC (rev 31791)
+++ gnunet/src/psyc/test_psyc.conf      2014-01-06 00:09:43 UTC (rev 31792)
@@ -8,3 +8,10 @@
 UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-psyc.sock
 UNIX_MATCH_UID = NO
 UNIX_MATCH_GID = YES
+
+[psycstore]
+AUTOSTART = YES
+BINARY = gnunet-service-psycstore
+UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-psycstore.sock
+UNIX_MATCH_UID = NO
+UNIX_MATCH_GID = YES




reply via email to

[Prev in Thread] Current Thread [Next in Thread]