[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r32625 - in gnunet/src: include multicast psyc
From: |
gnunet |
Subject: |
[GNUnet-SVN] r32625 - in gnunet/src: include multicast psyc |
Date: |
Wed, 12 Mar 2014 17:39:42 +0100 |
Author: tg
Date: 2014-03-12 17:39:41 +0100 (Wed, 12 Mar 2014)
New Revision: 32625
Modified:
gnunet/src/include/gnunet_psyc_service.h
gnunet/src/multicast/multicast_api.c
gnunet/src/psyc/gnunet-service-psyc.c
gnunet/src/psyc/psyc.h
gnunet/src/psyc/psyc_api.c
gnunet/src/psyc/psyc_common.c
gnunet/src/psyc/test_psyc.c
Log:
PSYC: in-order delivery of fragments; tests for large messages
Cache message received fragments from multicast and deliver them
in the correct order to clients.
Test messages with large modifier and data payloads.
Modified: gnunet/src/include/gnunet_psyc_service.h
===================================================================
--- gnunet/src/include/gnunet_psyc_service.h 2014-03-11 16:27:07 UTC (rev
32624)
+++ gnunet/src/include/gnunet_psyc_service.h 2014-03-12 16:39:41 UTC (rev
32625)
@@ -444,10 +444,14 @@
* contain: "name\0value". If the whole value does not fit, subsequent
* calls to this function should write continuations of the value to
* @a data.
- * @param oper Where to write the operator of the modifier. Only needed
during
- * the first call to this callback at the beginning of the modifier.
- * In case of subsequent calls asking for value continuations @a oper
is
- * set to #NULL.
+ * @param[out] oper Where to write the operator of the modifier.
+ * Only needed during the first call to this callback at the beginning
+ * of the modifier. In case of subsequent calls asking for value
+ * continuations @a oper is set to #NULL.
+ * @param[out] value_size Where to write the full size of the value.
+ * Only needed during the first call to this callback at the beginning
+ * of the modifier. In case of subsequent calls asking for value
+ * continuations @a value_size is set to #NULL.
* @return #GNUNET_SYSERR on error (fatal, aborts transmission)
* #GNUNET_NO on success, if more data is to be transmitted later.
* Should be used if @a data_size was not big enough to take all the
@@ -461,7 +465,8 @@
(*GNUNET_PSYC_TransmitNotifyModifier) (void *cls,
uint16_t *data_size,
void *data,
- uint8_t *oper);
+ uint8_t *oper,
+ uint32_t *value_size);
/**
* Flags for transmitting messages to a channel by the master.
@@ -659,7 +664,7 @@
* @param th Handle of the request that is being resumed.
*/
void
-GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle
*th);
+GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th);
/**
Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c 2014-03-11 16:27:07 UTC (rev
32624)
+++ gnunet/src/multicast/multicast_api.c 2014-03-12 16:39:41 UTC (rev
32625)
@@ -178,17 +178,21 @@
const struct GNUNET_MessageHeader *msg = cls;
struct GNUNET_MULTICAST_Group *grp = group;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Calling message callback for a message of type %u and size
%u.\n",
- ntohs (msg->type), ntohs (msg->size));
-
if (GNUNET_YES == grp->is_origin)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Calling origin's message callback "
+ "for a message of type %u and size %u.\n",
+ ntohs (msg->type), ntohs (msg->size));
struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *)
grp;
orig->message_cb (orig->cls, msg);
}
else
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Calling slave's message callback "
+ "for a message of type %u and size %u.\n",
+ ntohs (msg->type), ntohs (msg->size));
struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *)
grp;
mem->message_cb (mem->cls, msg);
}
@@ -449,8 +453,8 @@
struct GNUNET_MULTICAST_Origin *orig = cls;
struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle;
- size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD;
- char buf[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
+ size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
+ char buf[GNUNET_MULTICAST_FRAGMENT_MAX_SIZE] = "";
struct GNUNET_MULTICAST_MessageHeader *msg
= (struct GNUNET_MULTICAST_MessageHeader *) buf;
int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]);
@@ -495,9 +499,9 @@
handle_multicast_message (&orig->grp, msg);
if (GNUNET_NO == ret)
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 1),
- schedule_origin_to_all, orig);
+ GNUNET_SCHEDULER_add_delayed (
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1),
+ schedule_origin_to_all, orig);
}
@@ -526,10 +530,10 @@
mh->notify = notify;
mh->notify_cls = notify_cls;
- /* FIXME: remove delay, it's there only for testing */
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 1),
- schedule_origin_to_all, origin);
+ /* add some delay for testing */
+ GNUNET_SCHEDULER_add_delayed (
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1),
+ schedule_origin_to_all, origin);
return &origin->msg_handle;
}
Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c 2014-03-11 16:27:07 UTC (rev
32624)
+++ gnunet/src/psyc/gnunet-service-psyc.c 2014-03-12 16:39:41 UTC (rev
32625)
@@ -24,6 +24,8 @@
* @author Gabor X Toth
*/
+#include <inttypes.h>
+
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_constants.h"
@@ -77,7 +79,46 @@
uint8_t state;
};
+
/**
+ * Cache for received message fragments.
+ * Message fragments are only sent to clients after all modifiers arrived.
+ *
+ * chan_key -> MultiHashMap chan_msgs
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *recv_cache;
+
+
+/**
+ * Entry in the chan_msgs hashmap of @a recv_cache:
+ * fragment_id -> FragmentEntry
+ */
+struct FragmentEntry
+{
+ struct GNUNET_MULTICAST_MessageHeader *mmsg;
+ uint16_t ref_count;
+};
+
+
+/**
+ * Entry in the @a recv_msgs hash map of a @a Channel.
+ * message_id -> FragmentCache
+ */
+struct FragmentCache
+{
+ /**
+ * Total size of header fragments (METHOD & MODIFIERs)
+ */
+ uint64_t header_size;
+
+ /**
+ * Fragment IDs stored in @a recv_cache.
+ */
+ struct GNUNET_CONTAINER_Heap *fragments;
+};
+
+
+/**
* Common part of the client context for both a master and slave channel.
*/
struct Channel
@@ -87,6 +128,12 @@
struct TransmitMessage *tmit_head;
struct TransmitMessage *tmit_tail;
+ /**
+ * Received fragments not yet sent to the client.
+ * message_id -> FragmentCache
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *recv_msgs;
+
GNUNET_SCHEDULER_TaskIdentifier tmit_task;
/**
@@ -213,6 +260,8 @@
static void
client_cleanup (struct Channel *ch)
{
+ /* FIXME: fragment_cache_clear */
+
if (ch->is_master)
{
struct Master *mst = (struct Master *) ch;
@@ -323,7 +372,190 @@
}
+static void
+message_to_client (struct Channel *ch,
+ const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+{
+ uint16_t size = ntohs (mmsg->header.size);
+ struct GNUNET_PSYC_MessageHeader *pmsg;
+ uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Sending message to client. "
+ "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
+ ch, GNUNET_ntohll (mmsg->fragment_id),
+ GNUNET_ntohll (mmsg->message_id));
+
+ pmsg = GNUNET_malloc (psize);
+ pmsg->header.size = htons (psize);
+ pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ pmsg->message_id = mmsg->message_id;
+
+ memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
+
+ GNUNET_SERVER_notification_context_add (nc, ch->client);
+ GNUNET_SERVER_notification_context_unicast (nc, ch->client,
+ (const struct
GNUNET_MessageHeader *) pmsg,
+ GNUNET_NO);
+ GNUNET_free (pmsg);
+}
+
+
/**
+ * Convert an uint64_t in network byte order to a HashCode
+ * that can be used as key in a MultiHashMap
+ */
+static inline void
+hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
+{
+ /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int
*/
+
+ n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) &
0x00FF00FF00FF00FFULL);
+ n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) &
0x0000FFFF0000FFFFULL);
+
+ *key = (struct GNUNET_HashCode) {{ 0 }};
+ *((uint64_t *) key)
+ = (n << 32) | (n >> 32);
+}
+
+
+/**
+ * Convert an uint64_t in host byte order to a HashCode
+ * that can be used as key in a MultiHashMap
+ */
+static inline void
+hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n)
+{
+#if __BYTE_ORDER == __BIG_ENDIAN
+ hash_key_from_nll (key, n);
+#elif __BYTE_ORDER == __LITTLE_ENDIAN
+ *key = (struct GNUNET_HashCode) {{ 0 }};
+ *((uint64_t *) key) = n;
+#else
+ #error byteorder undefined
+#endif
+}
+
+
+static void
+fragment_cache_insert (struct Channel *ch,
+ const struct GNUNET_HashCode *chan_key_hash,
+ const struct GNUNET_HashCode *msg_id,
+ struct FragmentCache *frag_cache,
+ const struct GNUNET_MULTICAST_MessageHeader *mmsg,
+ uint16_t last_part_type)
+{
+ uint16_t size = ntohs (mmsg->header.size);
+ struct GNUNET_CONTAINER_MultiHashMap
+ *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, chan_key_hash);
+
+ if (NULL == frag_cache)
+ {
+ frag_cache = GNUNET_new (struct FragmentCache);
+ frag_cache->fragments
+ = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+
+ if (NULL == ch->recv_msgs)
+ {
+ ch->recv_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+ }
+ GNUNET_CONTAINER_multihashmap_put (ch->recv_msgs, msg_id, frag_cache,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+
+ if (NULL == chan_msgs)
+ {
+ chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+ GNUNET_CONTAINER_multihashmap_put (recv_cache, chan_key_hash, chan_msgs,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+ }
+ }
+
+ struct GNUNET_HashCode *frag_id = GNUNET_new (struct GNUNET_HashCode);
+ hash_key_from_nll (frag_id, mmsg->fragment_id);
+ struct FragmentEntry
+ *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
+ if (NULL == frag_entry)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Adding message fragment to cache. "
+ "fragment_id: %" PRIu64 ", "
+ "header_size: %" PRIu64 " + %" PRIu64 ").\n",
+ ch, GNUNET_ntohll (mmsg->fragment_id),
+ frag_cache->header_size, size);
+ frag_entry = GNUNET_new (struct FragmentEntry);
+ frag_entry->ref_count = 1;
+ frag_entry->mmsg = GNUNET_malloc (size);
+ memcpy (frag_entry->mmsg, mmsg, size);
+ GNUNET_CONTAINER_multihashmap_put (chan_msgs, frag_id, frag_entry,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+ }
+ else
+ {
+ frag_entry->ref_count++;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Message fragment already in cache. "
+ "fragment_id: %" PRIu64 ", ref_count: %u\n",
+ ch, GNUNET_ntohll (mmsg->fragment_id), frag_entry->ref_count);
+ }
+
+ switch (last_part_type)
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+ frag_cache->header_size += size;
+ }
+ GNUNET_CONTAINER_heap_insert (frag_cache->fragments, frag_id,
+ GNUNET_ntohll (mmsg->fragment_id));
+}
+
+
+static void
+fragment_cache_clear (struct Channel *ch,
+ const struct GNUNET_HashCode *chan_key_hash,
+ const struct GNUNET_HashCode *msg_id,
+ struct FragmentCache *frag_cache,
+ uint8_t send_to_client)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Clearing message fragment cache.\n", ch);
+
+ struct GNUNET_CONTAINER_MultiHashMap
+ *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, chan_key_hash);
+ GNUNET_assert (NULL != chan_msgs);
+ struct GNUNET_HashCode *frag_id;
+
+ while ((frag_id = GNUNET_CONTAINER_heap_remove_root (frag_cache->fragments)))
+ {
+ struct FragmentEntry
+ *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
+ if (frag_entry != NULL)
+ {
+ if (GNUNET_YES == send_to_client)
+ {
+ message_to_client (ch, frag_entry->mmsg);
+ }
+ if (1 == frag_entry->ref_count)
+ {
+ GNUNET_CONTAINER_multihashmap_remove (chan_msgs, frag_id, frag_entry);
+ GNUNET_free (frag_entry->mmsg);
+ GNUNET_free (frag_entry);
+ }
+ else
+ {
+ frag_entry->ref_count--;
+ }
+ }
+ GNUNET_free (frag_id);
+ }
+
+ GNUNET_CONTAINER_multihashmap_remove (ch->recv_msgs, msg_id, frag_cache);
+ GNUNET_CONTAINER_heap_destroy (frag_cache->fragments);
+ GNUNET_free (frag_cache);
+}
+
+
+/**
* Incoming message fragment from multicast.
*
* Store it using PSYCstore and send it to the client of the channel.
@@ -358,11 +590,15 @@
rcb, rcb_cls);
#endif
- const struct GNUNET_MULTICAST_MessageHeader *mmsg
- = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
+ const struct GNUNET_MULTICAST_MessageHeader
+ *mmsg = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
- if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
- (const char *)
&mmsg[1]))
+ uint16_t ptype = GNUNET_PSYC_message_last_part (size - sizeof (*mmsg),
+ (const char *) &mmsg[1]);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Last message part type %u\n", ptype);
+
+ if (GNUNET_NO == ptype)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Received message with invalid parts from multicast. "
@@ -371,20 +607,55 @@
break;
}
- struct GNUNET_PSYC_MessageHeader *pmsg;
- uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
- pmsg = GNUNET_malloc (psize);
- pmsg->header.size = htons (psize);
- pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
- pmsg->message_id = mmsg->message_id;
+ struct GNUNET_HashCode msg_id;
+ hash_key_from_nll (&msg_id, mmsg->message_id);
- memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
+ struct FragmentCache *frag_cache = NULL;
+ if (NULL != ch->recv_msgs)
+ frag_cache = GNUNET_CONTAINER_multihashmap_get (ch->recv_msgs, &msg_id);
- GNUNET_SERVER_notification_context_add (nc, ch->client);
- GNUNET_SERVER_notification_context_unicast (nc, ch->client,
- (const struct
GNUNET_MessageHeader *) pmsg,
- GNUNET_NO);
- GNUNET_free (pmsg);
+ switch (ptype)
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
+ /* FIXME: check state flag / max_state_message_id */
+ if (NULL == frag_cache)
+ {
+ message_to_client (ch, mmsg);
+ break;
+ }
+ else
+ {
+ if (GNUNET_ntohll (mmsg->fragment_offset) == frag_cache->header_size)
+ { /* first data fragment after the header, send cached fragments */
+ fragment_cache_clear (ch, chan_key_hash, &msg_id, frag_cache,
GNUNET_YES);
+ message_to_client (ch, mmsg);
+ break;
+ }
+ else
+ { /* still missing fragments from the header, cache data fragment */
+ /* fall thru */
+ }
+ }
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+ /* not all modifiers arrived yet, cache fragment */
+ fragment_cache_insert (ch, chan_key_hash, &msg_id, frag_cache, mmsg,
ptype);
+ break;
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
+ if (NULL != frag_cache)
+ { /* fragments not yet sent to client, remove from cache */
+ fragment_cache_clear (ch, chan_key_hash, &msg_id, frag_cache,
GNUNET_NO);
+ }
+ else
+ {
+ message_to_client (ch, mmsg);
+ }
+ break;
+ }
break;
}
default:
@@ -457,8 +728,9 @@
const struct GNUNET_MULTICAST_RequestHeader *req
= (const struct GNUNET_MULTICAST_RequestHeader *) msg;
- if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*req),
- (const char *) &req[1]))
+ /* FIXME: see message_cb() */
+ if (GNUNET_NO == GNUNET_PSYC_message_last_part (size - sizeof (*req),
+ (const char *) &req[1]))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Dropping message with invalid parts "
@@ -826,7 +1098,7 @@
if (GNUNET_YES != ch->ready)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Ignoring message from client, channel is not ready yet.\n",
+ "%p Dropping message from client, channel is not ready yet.\n",
ch);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
@@ -912,11 +1184,12 @@
store = GNUNET_PSYCSTORE_connect (cfg);
stats = GNUNET_STATISTICS_create ("psyc", cfg);
clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+ recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
nc = GNUNET_SERVER_notification_context_create (server, 1);
GNUNET_SERVER_add_handlers (server, handlers);
GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
- NULL);
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+ &shutdown_task, NULL);
}
Modified: gnunet/src/psyc/psyc.h
===================================================================
--- gnunet/src/psyc/psyc.h 2014-03-11 16:27:07 UTC (rev 32624)
+++ gnunet/src/psyc/psyc.h 2014-03-12 16:39:41 UTC (rev 32625)
@@ -31,8 +31,8 @@
#include "gnunet_psyc_service.h"
-int
-GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data);
+uint16_t
+GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data);
void
GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c 2014-03-11 16:27:07 UTC (rev 32624)
+++ gnunet/src/psyc/psyc_api.c 2014-03-12 16:39:41 UTC (rev 32625)
@@ -336,7 +336,7 @@
/**
- * Queue an incoming message part for transmission to the PSYC service.
+ * Queue a message part for transmission to the PSYC service.
*
* The message part is added to the current message buffer.
* When this buffer is full, it is added to the transmission queue.
@@ -390,7 +390,7 @@
op->msg->size = sizeof (*op->msg) + size;
memcpy (&op->msg[1], msg, size);
}
-
+
if (NULL != op
&& (GNUNET_YES == end
|| (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
@@ -433,12 +433,12 @@
max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
- notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
- &data_size, &mod[1], &mod->oper);
+ notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1],
+ &mod->oper, &mod->value_size);
mod->name_size = strnlen ((char *) &mod[1], data_size);
if (mod->name_size < data_size)
{
- mod->value_size = htons (data_size - 1 - mod->name_size);
+ mod->value_size = htonl (mod->value_size);
mod->name_size = htons (mod->name_size);
}
else if (0 < data_size)
@@ -451,10 +451,10 @@
case MSG_STATE_MOD_CONT:
{
max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
- msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
+ msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
msg->size = sizeof (struct GNUNET_MessageHeader);
notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
- &data_size, &msg[1], NULL);
+ &data_size, &msg[1], NULL, NULL);
break;
}
default:
@@ -669,6 +669,8 @@
ch->recv_message_id = GNUNET_ntohll (msg->message_id);
ch->recv_flags = flags;
ch->recv_slave_key = msg->slave_key;
+ ch->recv_mod_value_size = 0;
+ ch->recv_mod_value_size_expected = 0;
}
else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
{
@@ -703,7 +705,7 @@
if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Discarding message of type %u with invalid size %u.\n",
+ "Dropping message of type %u with invalid size %u.\n",
ptype, psize);
recv_error (ch);
return;
@@ -753,7 +755,8 @@
if (MSG_STATE_START != ch->recv_state)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
- "Discarding out of order message method.\n");
+ "Dropping out of order message method (%u).\n",
+ ch->recv_state);
/* It is normal to receive an incomplete message right after
connecting,
* but should not happen later.
* FIXME: add a check for this condition.
@@ -766,7 +769,7 @@
if ('\0' != *((char *) meth + psize - 1))
{
LOG (GNUNET_ERROR_TYPE_WARNING,
- "Discarding message with malformed method. "
+ "Dropping message with malformed method. "
"Message ID: %" PRIu64 "\n", ch->recv_message_id);
GNUNET_break_op (0);
recv_error (ch);
@@ -782,7 +785,8 @@
|| MSG_STATE_MOD_CONT == ch->recv_state))
{
LOG (GNUNET_ERROR_TYPE_WARNING,
- "Discarding out of order message modifier.\n");
+ "Dropping out of order message modifier (%u).\n",
+ ch->recv_state);
GNUNET_break_op (0);
recv_error (ch);
return;
@@ -792,14 +796,14 @@
= (struct GNUNET_PSYC_MessageModifier *) pmsg;
uint16_t name_size = ntohs (mod->name_size);
- ch->recv_mod_value_size_expected = ntohs (mod->value_size);
+ ch->recv_mod_value_size_expected = ntohl (mod->value_size);
ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
if (psize < sizeof (*mod) + name_size + 1
|| '\0' != *((char *) &mod[1] + name_size)
|| ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
{
- LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n");
+ LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
GNUNET_break_op (0);
recv_error (ch);
return;
@@ -816,7 +820,11 @@
|| ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
- "Discarding out of order message modifier continuation.\n");
+ "Dropping out of order message modifier continuation "
+ "!(%u == %u || %u == %u) || %lu < %lu.\n",
+ MSG_STATE_MODIFIER, ch->recv_state,
+ MSG_STATE_MOD_CONT, ch->recv_state,
+ ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
GNUNET_break_op (0);
recv_error (ch);
return;
@@ -829,7 +837,11 @@
|| ch->recv_mod_value_size_expected != ch->recv_mod_value_size)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
- "Discarding out of order message data fragment.\n");
+ "Dropping out of order message data fragment "
+ "(%u < %u || %lu != %lu).\n",
+ ch->recv_state, MSG_STATE_METHOD,
+ ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
+
GNUNET_break_op (0);
recv_error (ch);
return;
@@ -1412,7 +1424,7 @@
* @param th Handle of the request that is being resumed.
*/
void
-GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
+GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th)
{
channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
}
Modified: gnunet/src/psyc/psyc_common.c
===================================================================
--- gnunet/src/psyc/psyc_common.c 2014-03-11 16:27:07 UTC (rev 32624)
+++ gnunet/src/psyc/psyc_common.c 2014-03-12 16:39:41 UTC (rev 32625)
@@ -33,28 +33,33 @@
* @param data_size Size of @a data.
* @param data Data.
*
- * @return GNUNET_YES or GNUNET_NO
+ * @return Message type number
+ * or GNUNET_NO if the message contains invalid or no parts.
*/
-int
-GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data)
+uint16_t
+GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data)
{
const struct GNUNET_MessageHeader *pmsg;
+ uint16_t ptype = GNUNET_NO;
uint16_t psize = 0;
uint16_t pos = 0;
- for (pos = 0; data_size + pos < data_size; pos += psize)
+ for (pos = 0; pos < data_size; pos += psize)
{
pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
psize = ntohs (pmsg->size);
- if (psize < sizeof (*pmsg) || data_size + pos + psize > data_size)
+ ptype = ntohs (pmsg->type);
+ if (psize < sizeof (*pmsg) || pos + psize > data_size
+ || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
+ || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Invalid message part of type %u and size %u.",
- ntohs (pmsg->type), psize);
+ "Invalid message part of type %u and size %u.\n",
+ ptype, psize);
return GNUNET_NO;
}
}
- return GNUNET_YES;
+ return ptype;
}
@@ -89,7 +94,8 @@
uint16_t name_size = ntohs (mod->name_size);
char oper = ' ' < mod->oper ? mod->oper : ' ';
GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
- ntohs (mod->value_size), ((char *) &mod[1]) + name_size + 1);
+ size - sizeof (*mod) - name_size - 1,
+ ((char *) &mod[1]) + name_size + 1);
break;
}
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2014-03-11 16:27:07 UTC (rev 32624)
+++ gnunet/src/psyc/test_psyc.c 2014-03-12 16:39:41 UTC (rev 32625)
@@ -35,7 +35,7 @@
#include "gnunet_env_lib.h"
#include "gnunet_psyc_service.h"
-#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
+#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
#define DEBUG_SERVICE 1
@@ -72,6 +72,7 @@
char *data[16];
const char *mod_value;
size_t mod_value_size;
+ uint8_t data_delay[16];
uint8_t data_count;
uint8_t paused;
uint8_t n;
@@ -259,13 +260,16 @@
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n");
struct TransmitClosure *tmit = cls;
- tmit->paused = GNUNET_NO;
- GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit);
+ if (NULL != tmit->mst_tmit)
+ GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit);
+ else
+ GNUNET_PSYC_slave_transmit_resume (tmit->slv_tmit);
}
static int
-tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper)
+tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
+ uint32_t *full_value_size)
{
struct TransmitClosure *tmit = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -288,6 +292,8 @@
return GNUNET_YES;
}
+ GNUNET_assert (value_size < UINT32_MAX);
+ *full_value_size = value_size;
*oper = op;
name_size = strlen (name);
@@ -351,7 +357,7 @@
uint16_t size = strlen (tmit->data[tmit->n]);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmit notify data: %lu bytes available, "
+ "Transmit notify data: %u bytes available, "
"processing fragment %u/%u (size %u).\n",
*data_size, tmit->n + 1, tmit->data_count, size);
if (*data_size < size)
@@ -361,17 +367,18 @@
return GNUNET_SYSERR;
}
- if (GNUNET_YES == tmit->paused && tmit->n == tmit->data_count - 1)
+ if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n])
{
- /* Send last fragment later. */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
tmit->paused = GNUNET_YES;
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 3),
- &transmit_resume, tmit);
+ GNUNET_SCHEDULER_add_delayed (
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ tmit->data_delay[tmit->n]),
+ &transmit_resume, tmit);
*data_size = 0;
return GNUNET_NO;
}
+ tmit->paused = GNUNET_NO;
*data_size = size;
memcpy (data, tmit->data[tmit->n], size);
@@ -416,8 +423,9 @@
GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
"_foo_bar", "foo bar baz", 11);
slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
- 16, relays, &slave_message, &join_request,
&slave_joined,
- NULL, "_request_join", env, "some data", 9);
+ 16, relays, &slave_message, &join_request,
+ &slave_joined, NULL, "_request_join", env,
+ "some data", 9);
GNUNET_ENV_environment_destroy (env);
}
@@ -427,17 +435,45 @@
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n");
test = TEST_MASTER_TRANSMIT;
+ uint32_t i, j;
+ char *name_max = "_test_max";
+ uint8_t name_max_size = sizeof ("_test_max");
+ char *val_max = GNUNET_malloc (GNUNET_PSYC_MODIFIER_MAX_PAYLOAD);
+ for (i = 0; i < GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; i++)
+ val_max[i] = (0 == i % 10000) ? '0' + i / 10000 : '.';
+
+ char *name_cont = "_test_cont";
+ uint8_t name_cont_size = sizeof ("_test_cont");
+ char *val_cont = GNUNET_malloc (GNUNET_PSYC_MODIFIER_MAX_PAYLOAD
+ + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD);
+ for (i = 0; i < GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size; i++)
+ val_cont[i] = (0 == i % 10000) ? '0' + i / 10000 : ':';
+ for (j = 0; j < GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; j++, i++)
+ val_cont[i] = (0 == j % 10000) ? '0' + j / 10000 : '!';
+
tmit = GNUNET_new (struct TransmitClosure);
tmit->env = GNUNET_ENV_environment_create ();
GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
"_foo", "bar baz", 7);
GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+ name_max, val_max,
+ GNUNET_PSYC_MODIFIER_MAX_PAYLOAD
+ - name_max_size);
+ GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
"_foo_bar", "foo bar baz", 11);
+ GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+ name_cont, val_cont,
+ GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size
+ + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD);
tmit->data[0] = "foo";
- tmit->data[1] = "foo bar";
- tmit->data[2] = "foo bar baz";
- tmit->data_count = 3;
+ tmit->data[1] = GNUNET_malloc (GNUNET_PSYC_DATA_MAX_PAYLOAD + 1);
+ for (i = 0; i < GNUNET_PSYC_DATA_MAX_PAYLOAD; i++)
+ tmit->data[1][i] = (0 == i % 10000) ? '0' + i / 10000 : '_';
+ tmit->data[2] = "foo bar";
+ tmit->data[3] = "foo bar baz";
+ tmit->data_delay[1] = 3;
+ tmit->data_count = 4;
tmit->mst_tmit
= GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod,
tmit_notify_data, tmit,
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r32625 - in gnunet/src: include multicast psyc,
gnunet <=