gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r32625 - in gnunet/src: include multicast psyc


From: gnunet
Subject: [GNUnet-SVN] r32625 - in gnunet/src: include multicast psyc
Date: Wed, 12 Mar 2014 17:39:42 +0100

Author: tg
Date: 2014-03-12 17:39:41 +0100 (Wed, 12 Mar 2014)
New Revision: 32625

Modified:
   gnunet/src/include/gnunet_psyc_service.h
   gnunet/src/multicast/multicast_api.c
   gnunet/src/psyc/gnunet-service-psyc.c
   gnunet/src/psyc/psyc.h
   gnunet/src/psyc/psyc_api.c
   gnunet/src/psyc/psyc_common.c
   gnunet/src/psyc/test_psyc.c
Log:
PSYC: in-order delivery of fragments; tests for large messages

Cache message received fragments from multicast and deliver them
in the correct order to clients.

Test messages with large modifier and data payloads.

Modified: gnunet/src/include/gnunet_psyc_service.h
===================================================================
--- gnunet/src/include/gnunet_psyc_service.h    2014-03-11 16:27:07 UTC (rev 
32624)
+++ gnunet/src/include/gnunet_psyc_service.h    2014-03-12 16:39:41 UTC (rev 
32625)
@@ -444,10 +444,14 @@
  *         contain: "name\0value".  If the whole value does not fit, subsequent
  *         calls to this function should write continuations of the value to
  *         @a data.
- * @param oper  Where to write the operator of the modifier.  Only needed 
during
- *         the first call to this callback at the beginning of the modifier.
- *         In case of subsequent calls asking for value continuations @a oper 
is
- *         set to #NULL.
+ * @param[out] oper  Where to write the operator of the modifier.
+ *         Only needed during the first call to this callback at the beginning
+ *         of the modifier.  In case of subsequent calls asking for value
+ *         continuations @a oper is set to #NULL.
+ * @param[out] value_size  Where to write the full size of the value.
+ *         Only needed during the first call to this callback at the beginning
+ *         of the modifier.  In case of subsequent calls asking for value
+ *         continuations @a value_size is set to #NULL.
  * @return #GNUNET_SYSERR on error (fatal, aborts transmission)
  *         #GNUNET_NO on success, if more data is to be transmitted later.
  *         Should be used if @a data_size was not big enough to take all the
@@ -461,7 +465,8 @@
 (*GNUNET_PSYC_TransmitNotifyModifier) (void *cls,
                                        uint16_t *data_size,
                                        void *data,
-                                       uint8_t *oper);
+                                       uint8_t *oper,
+                                       uint32_t *value_size);
 
 /**
  * Flags for transmitting messages to a channel by the master.
@@ -659,7 +664,7 @@
  * @param th Handle of the request that is being resumed.
  */
 void
-GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle 
*th);
+GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th);
 
 
 /**

Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c        2014-03-11 16:27:07 UTC (rev 
32624)
+++ gnunet/src/multicast/multicast_api.c        2014-03-12 16:39:41 UTC (rev 
32625)
@@ -178,17 +178,21 @@
   const struct GNUNET_MessageHeader *msg = cls;
   struct GNUNET_MULTICAST_Group *grp = group;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Calling message callback for a message of type %u and size 
%u.\n",
-              ntohs (msg->type), ntohs (msg->size));
-
   if (GNUNET_YES == grp->is_origin)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Calling origin's message callback "
+                "for a message of type %u and size %u.\n",
+              ntohs (msg->type), ntohs (msg->size));
     struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) 
grp;
     orig->message_cb (orig->cls, msg);
   }
   else
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Calling slave's message callback "
+                "for a message of type %u and size %u.\n",
+                ntohs (msg->type), ntohs (msg->size));
     struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) 
grp;
     mem->message_cb (mem->cls, msg);
   }
@@ -449,8 +453,8 @@
   struct GNUNET_MULTICAST_Origin *orig = cls;
   struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle;
 
-  size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD;
-  char buf[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
+  size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
+  char buf[GNUNET_MULTICAST_FRAGMENT_MAX_SIZE] = "";
   struct GNUNET_MULTICAST_MessageHeader *msg
     = (struct GNUNET_MULTICAST_MessageHeader *) buf;
   int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]);
@@ -495,9 +499,9 @@
   handle_multicast_message (&orig->grp, msg);
 
   if (GNUNET_NO == ret)
-    GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
-                                  (GNUNET_TIME_UNIT_SECONDS, 1),
-                                  schedule_origin_to_all, orig);
+    GNUNET_SCHEDULER_add_delayed (
+      GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1),
+      schedule_origin_to_all, orig);
 }
 
 
@@ -526,10 +530,10 @@
   mh->notify = notify;
   mh->notify_cls = notify_cls;
 
-  /* FIXME: remove delay, it's there only for testing */
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
-                                (GNUNET_TIME_UNIT_SECONDS, 1),
-                                schedule_origin_to_all, origin);
+  /* add some delay for testing */
+  GNUNET_SCHEDULER_add_delayed (
+    GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1),
+    schedule_origin_to_all, origin);
   return &origin->msg_handle;
 }
 

Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c       2014-03-11 16:27:07 UTC (rev 
32624)
+++ gnunet/src/psyc/gnunet-service-psyc.c       2014-03-12 16:39:41 UTC (rev 
32625)
@@ -24,6 +24,8 @@
  * @author Gabor X Toth
  */
 
+#include <inttypes.h>
+
 #include "platform.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_constants.h"
@@ -77,7 +79,46 @@
   uint8_t state;
 };
 
+
 /**
+ * Cache for received message fragments.
+ * Message fragments are only sent to clients after all modifiers arrived.
+ *
+ * chan_key -> MultiHashMap chan_msgs
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
+
+
+/**
+ * Entry in the chan_msgs hashmap of @a recv_cache:
+ * fragment_id -> FragmentEntry
+ */
+struct FragmentEntry
+{
+  struct GNUNET_MULTICAST_MessageHeader *mmsg;
+  uint16_t ref_count;
+};
+
+
+/**
+ * Entry in the @a recv_msgs hash map of a @a Channel.
+ * message_id -> FragmentCache
+ */
+struct FragmentCache
+{
+  /**
+   * Total size of header fragments (METHOD & MODIFIERs)
+   */
+  uint64_t header_size;
+
+  /**
+   * Fragment IDs stored in @a recv_cache.
+   */
+  struct GNUNET_CONTAINER_Heap *fragments;
+};
+
+
+/**
  * Common part of the client context for both a master and slave channel.
  */
 struct Channel
@@ -87,6 +128,12 @@
   struct TransmitMessage *tmit_head;
   struct TransmitMessage *tmit_tail;
 
+  /**
+   * Received fragments not yet sent to the client.
+   * message_id -> FragmentCache
+   */
+  struct GNUNET_CONTAINER_MultiHashMap *recv_msgs;
+
   GNUNET_SCHEDULER_TaskIdentifier tmit_task;
 
   /**
@@ -213,6 +260,8 @@
 static void
 client_cleanup (struct Channel *ch)
 {
+  /* FIXME: fragment_cache_clear */
+
   if (ch->is_master)
   {
     struct Master *mst = (struct Master *) ch;
@@ -323,7 +372,190 @@
 }
 
 
+static void
+message_to_client (struct Channel *ch,
+                   const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+{
+  uint16_t size = ntohs (mmsg->header.size);
+  struct GNUNET_PSYC_MessageHeader *pmsg;
+  uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Sending message to client. "
+              "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
+              ch, GNUNET_ntohll (mmsg->fragment_id),
+              GNUNET_ntohll (mmsg->message_id));
+
+  pmsg = GNUNET_malloc (psize);
+  pmsg->header.size = htons (psize);
+  pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+  pmsg->message_id = mmsg->message_id;
+
+  memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
+
+  GNUNET_SERVER_notification_context_add (nc, ch->client);
+  GNUNET_SERVER_notification_context_unicast (nc, ch->client,
+                                              (const struct 
GNUNET_MessageHeader *) pmsg,
+                                              GNUNET_NO);
+  GNUNET_free (pmsg);
+}
+
+
 /**
+ * Convert an uint64_t in network byte order to a HashCode
+ * that can be used as key in a MultiHashMap
+ */
+static inline void
+hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
+{
+  /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int 
*/
+
+  n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 
0x00FF00FF00FF00FFULL);
+  n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 
0x0000FFFF0000FFFFULL);
+
+  *key = (struct GNUNET_HashCode) {{ 0 }};
+  *((uint64_t *) key)
+    = (n << 32) | (n >> 32);
+}
+
+
+/**
+ * Convert an uint64_t in host byte order to a HashCode
+ * that can be used as key in a MultiHashMap
+ */
+static inline void
+hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
+{
+#if __BYTE_ORDER == __BIG_ENDIAN
+  hash_key_from_nll (key, n);
+#elif __BYTE_ORDER == __LITTLE_ENDIAN
+  *key = (struct GNUNET_HashCode) {{ 0 }};
+  *((uint64_t *) key) = n;
+#else
+  #error byteorder undefined
+#endif
+}
+
+
+static void
+fragment_cache_insert (struct Channel *ch,
+                       const struct GNUNET_HashCode *chan_key_hash,
+                       const struct GNUNET_HashCode *msg_id,
+                       struct FragmentCache *frag_cache,
+                       const struct GNUNET_MULTICAST_MessageHeader *mmsg,
+                       uint16_t last_part_type)
+{
+  uint16_t size = ntohs (mmsg->header.size);
+  struct GNUNET_CONTAINER_MultiHashMap
+    *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, chan_key_hash);
+
+  if (NULL == frag_cache)
+  {
+    frag_cache = GNUNET_new (struct FragmentCache);
+    frag_cache->fragments
+      = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+
+    if (NULL == ch->recv_msgs)
+    {
+      ch->recv_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+    }
+    GNUNET_CONTAINER_multihashmap_put (ch->recv_msgs, msg_id, frag_cache,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+
+    if (NULL == chan_msgs)
+    {
+      chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+      GNUNET_CONTAINER_multihashmap_put (recv_cache, chan_key_hash, chan_msgs,
+                                         
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+    }
+  }
+
+  struct GNUNET_HashCode *frag_id = GNUNET_new (struct GNUNET_HashCode);
+  hash_key_from_nll (frag_id, mmsg->fragment_id);
+  struct FragmentEntry
+    *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
+  if (NULL == frag_entry)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%p Adding message fragment to cache. "
+                "fragment_id: %" PRIu64 ", "
+                "header_size: %" PRIu64 " + %" PRIu64 ").\n",
+                ch, GNUNET_ntohll (mmsg->fragment_id),
+                frag_cache->header_size, size);
+    frag_entry = GNUNET_new (struct FragmentEntry);
+    frag_entry->ref_count = 1;
+    frag_entry->mmsg = GNUNET_malloc (size);
+    memcpy (frag_entry->mmsg, mmsg, size);
+    GNUNET_CONTAINER_multihashmap_put (chan_msgs, frag_id, frag_entry,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+  }
+  else
+  {
+    frag_entry->ref_count++;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%p Message fragment already in cache. "
+                "fragment_id: %" PRIu64 ", ref_count: %u\n",
+                ch, GNUNET_ntohll (mmsg->fragment_id), frag_entry->ref_count);
+  }
+
+  switch (last_part_type)
+  {
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+    frag_cache->header_size += size;
+  }
+  GNUNET_CONTAINER_heap_insert (frag_cache->fragments, frag_id,
+                                GNUNET_ntohll (mmsg->fragment_id));
+}
+
+
+static void
+fragment_cache_clear (struct Channel *ch,
+                      const struct GNUNET_HashCode *chan_key_hash,
+                      const struct GNUNET_HashCode *msg_id,
+                      struct FragmentCache *frag_cache,
+                      uint8_t send_to_client)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Clearing message fragment cache.\n", ch);
+
+  struct GNUNET_CONTAINER_MultiHashMap
+    *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, chan_key_hash);
+  GNUNET_assert (NULL != chan_msgs);
+  struct GNUNET_HashCode *frag_id;
+
+  while ((frag_id = GNUNET_CONTAINER_heap_remove_root (frag_cache->fragments)))
+  {
+    struct FragmentEntry
+      *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
+    if (frag_entry != NULL)
+    {
+      if (GNUNET_YES == send_to_client)
+      {
+        message_to_client (ch, frag_entry->mmsg);
+      }
+      if (1 == frag_entry->ref_count)
+      {
+        GNUNET_CONTAINER_multihashmap_remove (chan_msgs, frag_id, frag_entry);
+        GNUNET_free (frag_entry->mmsg);
+        GNUNET_free (frag_entry);
+      }
+      else
+      {
+        frag_entry->ref_count--;
+      }
+    }
+    GNUNET_free (frag_id);
+  }
+
+  GNUNET_CONTAINER_multihashmap_remove (ch->recv_msgs, msg_id, frag_cache);
+  GNUNET_CONTAINER_heap_destroy (frag_cache->fragments);
+  GNUNET_free (frag_cache);
+}
+
+
+/**
  * Incoming message fragment from multicast.
  *
  * Store it using PSYCstore and send it to the client of the channel.
@@ -358,11 +590,15 @@
                                    rcb, rcb_cls);
 #endif
 
-    const struct GNUNET_MULTICAST_MessageHeader *mmsg
-      = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
+    const struct GNUNET_MULTICAST_MessageHeader
+      *mmsg = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
 
-    if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
-                                                       (const char *) 
&mmsg[1]))
+    uint16_t ptype = GNUNET_PSYC_message_last_part (size - sizeof (*mmsg),
+                                                    (const char *) &mmsg[1]);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Last message part type %u\n", ptype);
+
+    if (GNUNET_NO == ptype)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "%p Received message with invalid parts from multicast. "
@@ -371,20 +607,55 @@
       break;
     }
 
-    struct GNUNET_PSYC_MessageHeader *pmsg;
-    uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
-    pmsg = GNUNET_malloc (psize);
-    pmsg->header.size = htons (psize);
-    pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
-    pmsg->message_id = mmsg->message_id;
+    struct GNUNET_HashCode msg_id;
+    hash_key_from_nll (&msg_id, mmsg->message_id);
 
-    memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
+    struct FragmentCache *frag_cache = NULL;
+    if (NULL != ch->recv_msgs)
+      frag_cache = GNUNET_CONTAINER_multihashmap_get (ch->recv_msgs, &msg_id);
 
-    GNUNET_SERVER_notification_context_add (nc, ch->client);
-    GNUNET_SERVER_notification_context_unicast (nc, ch->client,
-                                                (const struct 
GNUNET_MessageHeader *) pmsg,
-                                                GNUNET_NO);
-    GNUNET_free (pmsg);
+    switch (ptype)
+    {
+    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
+      /* FIXME: check state flag / max_state_message_id */
+      if (NULL == frag_cache)
+      {
+        message_to_client (ch, mmsg);
+        break;
+      }
+      else
+      {
+        if (GNUNET_ntohll (mmsg->fragment_offset) == frag_cache->header_size)
+        { /* first data fragment after the header, send cached fragments */
+          fragment_cache_clear (ch, chan_key_hash, &msg_id, frag_cache, 
GNUNET_YES);
+          message_to_client (ch, mmsg);
+          break;
+        }
+        else
+        { /* still missing fragments from the header, cache data fragment */
+          /* fall thru */
+        }
+      }
+
+    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+      /* not all modifiers arrived yet, cache fragment */
+      fragment_cache_insert (ch, chan_key_hash, &msg_id, frag_cache, mmsg, 
ptype);
+      break;
+
+    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
+      if (NULL != frag_cache)
+      { /* fragments not yet sent to client, remove from cache */
+        fragment_cache_clear (ch, chan_key_hash, &msg_id, frag_cache, 
GNUNET_NO);
+      }
+      else
+      {
+        message_to_client (ch, mmsg);
+      }
+      break;
+    }
     break;
   }
   default:
@@ -457,8 +728,9 @@
     const struct GNUNET_MULTICAST_RequestHeader *req
       = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
 
-    if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*req),
-                                                       (const char *) &req[1]))
+    /* FIXME: see message_cb() */
+    if (GNUNET_NO == GNUNET_PSYC_message_last_part (size - sizeof (*req),
+                                                    (const char *) &req[1]))
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "%p Dropping message with invalid parts "
@@ -826,7 +1098,7 @@
   if (GNUNET_YES != ch->ready)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "%p Ignoring message from client, channel is not ready yet.\n",
+                "%p Dropping message from client, channel is not ready yet.\n",
                 ch);
     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
     return;
@@ -912,11 +1184,12 @@
   store = GNUNET_PSYCSTORE_connect (cfg);
   stats = GNUNET_STATISTICS_create ("psyc", cfg);
   clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+  recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
   nc = GNUNET_SERVER_notification_context_create (server, 1);
   GNUNET_SERVER_add_handlers (server, handlers);
   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
-                                NULL);
+  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+                                &shutdown_task, NULL);
 }
 
 

Modified: gnunet/src/psyc/psyc.h
===================================================================
--- gnunet/src/psyc/psyc.h      2014-03-11 16:27:07 UTC (rev 32624)
+++ gnunet/src/psyc/psyc.h      2014-03-12 16:39:41 UTC (rev 32625)
@@ -31,8 +31,8 @@
 #include "gnunet_psyc_service.h"
 
 
-int
-GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data);
+uint16_t
+GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data);
 
 void
 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,

Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c  2014-03-11 16:27:07 UTC (rev 32624)
+++ gnunet/src/psyc/psyc_api.c  2014-03-12 16:39:41 UTC (rev 32625)
@@ -336,7 +336,7 @@
 
 
 /**
- * Queue an incoming message part for transmission to the PSYC service.
+ * Queue a message part for transmission to the PSYC service.
  *
  * The message part is added to the current message buffer.
  * When this buffer is full, it is added to the transmission queue.
@@ -390,7 +390,7 @@
     op->msg->size = sizeof (*op->msg) + size;
     memcpy (&op->msg[1], msg, size);
   }
- 
+
   if (NULL != op
       && (GNUNET_YES == end
           || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
@@ -433,12 +433,12 @@
     max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
     msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
-    notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
-                                      &data_size, &mod[1], &mod->oper);
+    notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1],
+                                      &mod->oper, &mod->value_size);
     mod->name_size = strnlen ((char *) &mod[1], data_size);
     if (mod->name_size < data_size)
     {
-      mod->value_size = htons (data_size - 1 - mod->name_size);
+      mod->value_size = htonl (mod->value_size);
       mod->name_size = htons (mod->name_size);
     }
     else if (0 < data_size)
@@ -451,10 +451,10 @@
   case MSG_STATE_MOD_CONT:
   {
     max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
-    msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);    
+    msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
     msg->size = sizeof (struct GNUNET_MessageHeader);
     notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
-                                      &data_size, &msg[1], NULL);
+                                      &data_size, &msg[1], NULL, NULL);
     break;
   }
   default:
@@ -669,6 +669,8 @@
     ch->recv_message_id = GNUNET_ntohll (msg->message_id);
     ch->recv_flags = flags;
     ch->recv_slave_key = msg->slave_key;
+    ch->recv_mod_value_size = 0;
+    ch->recv_mod_value_size_expected = 0;
   }
   else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
   {
@@ -703,7 +705,7 @@
     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "Discarding message of type %u with invalid size %u.\n",
+                  "Dropping message of type %u with invalid size %u.\n",
                   ptype, psize);
       recv_error (ch);
       return;
@@ -753,7 +755,8 @@
       if (MSG_STATE_START != ch->recv_state)
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Discarding out of order message method.\n");
+             "Dropping out of order message method (%u).\n",
+             ch->recv_state);
         /* It is normal to receive an incomplete message right after 
connecting,
          * but should not happen later.
          * FIXME: add a check for this condition.
@@ -766,7 +769,7 @@
       if ('\0' != *((char *) meth + psize - 1))
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Discarding message with malformed method. "
+             "Dropping message with malformed method. "
              "Message ID: %" PRIu64 "\n", ch->recv_message_id);
         GNUNET_break_op (0);
         recv_error (ch);
@@ -782,7 +785,8 @@
             || MSG_STATE_MOD_CONT == ch->recv_state))
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Discarding out of order message modifier.\n");
+             "Dropping out of order message modifier (%u).\n",
+             ch->recv_state);
         GNUNET_break_op (0);
         recv_error (ch);
         return;
@@ -792,14 +796,14 @@
         = (struct GNUNET_PSYC_MessageModifier *) pmsg;
 
       uint16_t name_size = ntohs (mod->name_size);
-      ch->recv_mod_value_size_expected = ntohs (mod->value_size);
+      ch->recv_mod_value_size_expected = ntohl (mod->value_size);
       ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
 
       if (psize < sizeof (*mod) + name_size + 1
           || '\0' != *((char *) &mod[1] + name_size)
           || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
       {
-        LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n");
+        LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
         GNUNET_break_op (0);
         recv_error (ch);
         return;
@@ -816,7 +820,11 @@
           || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Discarding out of order message modifier continuation.\n");
+             "Dropping out of order message modifier continuation "
+             "!(%u == %u || %u == %u) || %lu < %lu.\n",
+             MSG_STATE_MODIFIER, ch->recv_state,
+             MSG_STATE_MOD_CONT, ch->recv_state,
+             ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
         GNUNET_break_op (0);
         recv_error (ch);
         return;
@@ -829,7 +837,11 @@
           || ch->recv_mod_value_size_expected != ch->recv_mod_value_size)
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Discarding out of order message data fragment.\n");
+             "Dropping out of order message data fragment "
+             "(%u < %u || %lu != %lu).\n",
+             ch->recv_state, MSG_STATE_METHOD,
+             ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
+
         GNUNET_break_op (0);
         recv_error (ch);
         return;
@@ -1412,7 +1424,7 @@
  * @param th Handle of the request that is being resumed.
  */
 void
-GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
+GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th)
 {
   channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
 }

Modified: gnunet/src/psyc/psyc_common.c
===================================================================
--- gnunet/src/psyc/psyc_common.c       2014-03-11 16:27:07 UTC (rev 32624)
+++ gnunet/src/psyc/psyc_common.c       2014-03-12 16:39:41 UTC (rev 32625)
@@ -33,28 +33,33 @@
  * @param data_size  Size of @a data.
  * @param data       Data.
  *
- * @return GNUNET_YES or GNUNET_NO
+ * @return Message type number
+ *         or GNUNET_NO if the message contains invalid or no parts.
  */
-int
-GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data)
+uint16_t
+GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data)
 {
   const struct GNUNET_MessageHeader *pmsg;
+  uint16_t ptype = GNUNET_NO;
   uint16_t psize = 0;
   uint16_t pos = 0;
 
-  for (pos = 0; data_size + pos < data_size; pos += psize)
+  for (pos = 0; pos < data_size; pos += psize)
   {
     pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
     psize = ntohs (pmsg->size);
-    if (psize < sizeof (*pmsg) || data_size + pos + psize > data_size)
+    ptype = ntohs (pmsg->type);
+    if (psize < sizeof (*pmsg) || pos + psize > data_size
+        || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
+        || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "Invalid message part of type %u and size %u.",
-                  ntohs (pmsg->type), psize);
+                  "Invalid message part of type %u and size %u.\n",
+                  ptype, psize);
       return GNUNET_NO;
     }
   }
-  return GNUNET_YES;
+  return ptype;
 }
 
 
@@ -89,7 +94,8 @@
     uint16_t name_size = ntohs (mod->name_size);
     char oper = ' ' < mod->oper ? mod->oper : ' ';
     GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
-                ntohs (mod->value_size), ((char *) &mod[1]) + name_size + 1);
+                size - sizeof (*mod) - name_size - 1,
+                ((char *) &mod[1]) + name_size + 1);
     break;
   }
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:

Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2014-03-11 16:27:07 UTC (rev 32624)
+++ gnunet/src/psyc/test_psyc.c 2014-03-12 16:39:41 UTC (rev 32625)
@@ -35,7 +35,7 @@
 #include "gnunet_env_lib.h"
 #include "gnunet_psyc_service.h"
 
-#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
+#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
 
 #define DEBUG_SERVICE 1
 
@@ -72,6 +72,7 @@
   char *data[16];
   const char *mod_value;
   size_t mod_value_size;
+  uint8_t data_delay[16];
   uint8_t data_count;
   uint8_t paused;
   uint8_t n;
@@ -259,13 +260,16 @@
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n");
   struct TransmitClosure *tmit = cls;
-  tmit->paused = GNUNET_NO;
-  GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit);
+  if (NULL != tmit->mst_tmit)
+    GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit);
+  else
+    GNUNET_PSYC_slave_transmit_resume (tmit->slv_tmit);
 }
 
 
 static int
-tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper)
+tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
+                 uint32_t *full_value_size)
 {
   struct TransmitClosure *tmit = cls;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -288,6 +292,8 @@
       return GNUNET_YES;
     }
 
+    GNUNET_assert (value_size < UINT32_MAX);
+    *full_value_size = value_size;
     *oper = op;
     name_size = strlen (name);
 
@@ -351,7 +357,7 @@
 
   uint16_t size = strlen (tmit->data[tmit->n]);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Transmit notify data: %lu bytes available, "
+              "Transmit notify data: %u bytes available, "
               "processing fragment %u/%u (size %u).\n",
               *data_size, tmit->n + 1, tmit->data_count, size);
   if (*data_size < size)
@@ -361,17 +367,18 @@
     return GNUNET_SYSERR;
   }
 
-  if (GNUNET_YES == tmit->paused && tmit->n == tmit->data_count - 1)
+  if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n])
   {
-    /* Send last fragment later. */
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
     tmit->paused = GNUNET_YES;
-    GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
-                                  (GNUNET_TIME_UNIT_SECONDS, 3),
-                                  &transmit_resume, tmit);
+    GNUNET_SCHEDULER_add_delayed (
+      GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+                                     tmit->data_delay[tmit->n]),
+      &transmit_resume, tmit);
     *data_size = 0;
     return GNUNET_NO;
   }
+  tmit->paused = GNUNET_NO;
 
   *data_size = size;
   memcpy (data, tmit->data[tmit->n], size);
@@ -416,8 +423,9 @@
   GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
                               "_foo_bar", "foo bar baz", 11);
   slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
-                                16, relays, &slave_message, &join_request, 
&slave_joined,
-                                NULL, "_request_join", env, "some data", 9);
+                                16, relays, &slave_message, &join_request,
+                                &slave_joined, NULL, "_request_join", env,
+                                "some data", 9);
   GNUNET_ENV_environment_destroy (env);
 }
 
@@ -427,17 +435,45 @@
 {
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n");
   test = TEST_MASTER_TRANSMIT;
+  uint32_t i, j;
 
+  char *name_max = "_test_max";
+  uint8_t name_max_size = sizeof ("_test_max");
+  char *val_max = GNUNET_malloc (GNUNET_PSYC_MODIFIER_MAX_PAYLOAD);
+  for (i = 0; i < GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; i++)
+    val_max[i] = (0 == i % 10000) ? '0' + i / 10000 : '.';
+
+  char *name_cont = "_test_cont";
+  uint8_t name_cont_size = sizeof ("_test_cont");
+  char *val_cont = GNUNET_malloc (GNUNET_PSYC_MODIFIER_MAX_PAYLOAD
+                                  + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD);
+  for (i = 0; i < GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size; i++)
+    val_cont[i] = (0 == i % 10000) ? '0' + i / 10000 : ':';
+  for (j = 0; j < GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; j++, i++)
+    val_cont[i] = (0 == j % 10000) ? '0' + j / 10000 : '!';
+
   tmit = GNUNET_new (struct TransmitClosure);
   tmit->env = GNUNET_ENV_environment_create ();
   GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
                               "_foo", "bar baz", 7);
   GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+                              name_max, val_max,
+                              GNUNET_PSYC_MODIFIER_MAX_PAYLOAD
+                              - name_max_size);
+  GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
                               "_foo_bar", "foo bar baz", 11);
+  GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+                              name_cont, val_cont,
+                              GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size
+                              + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD);
   tmit->data[0] = "foo";
-  tmit->data[1] = "foo bar";
-  tmit->data[2] = "foo bar baz";
-  tmit->data_count = 3;
+  tmit->data[1] =  GNUNET_malloc (GNUNET_PSYC_DATA_MAX_PAYLOAD + 1);
+  for (i = 0; i < GNUNET_PSYC_DATA_MAX_PAYLOAD; i++)
+    tmit->data[1][i] = (0 == i % 10000) ? '0' + i / 10000 : '_';
+  tmit->data[2] = "foo bar";
+  tmit->data[3] = "foo bar baz";
+  tmit->data_delay[1] = 3;
+  tmit->data_count = 4;
   tmit->mst_tmit
     = GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod,
                                    tmit_notify_data, tmit,




reply via email to

[Prev in Thread] Current Thread [Next in Thread]