[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r31792 - in gnunet: contrib src/include src/multicast src/p
From: |
gnunet |
Subject: |
[GNUnet-SVN] r31792 - in gnunet: contrib src/include src/multicast src/psyc |
Date: |
Mon, 6 Jan 2014 01:09:43 +0100 |
Author: tg
Date: 2014-01-06 01:09:43 +0100 (Mon, 06 Jan 2014)
New Revision: 31792
Modified:
gnunet/contrib/logread.pl
gnunet/src/include/gnunet_protocols.h
gnunet/src/include/gnunet_psyc_service.h
gnunet/src/multicast/multicast_api.c
gnunet/src/psyc/gnunet-service-psyc.c
gnunet/src/psyc/psyc_api.c
gnunet/src/psyc/test_psyc.c
gnunet/src/psyc/test_psyc.conf
Log:
psyc: ipc messages, notify callback for modifiers, tests
Modified: gnunet/contrib/logread.pl
===================================================================
--- gnunet/contrib/logread.pl 2014-01-06 00:09:40 UTC (rev 31791)
+++ gnunet/contrib/logread.pl 2014-01-06 00:09:43 UTC (rev 31792)
@@ -98,7 +98,7 @@
s/\b(multicast|psyc|psycstore|social)\b/BLUE $1/ex;
# Add message type names
- s/(message(?:\s+of)?\s+type\s+)(\d+)/
+ s/(message(?:\s+part)?(?:\s+of)?\s+type\s+)(\d+)/
$1 . BRIGHT_CYAN (exists $msgtypes{$2} ? $msgtypes{$2} : 'UNKNOWN') .
CYAN " ($2)"/e;
Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h 2014-01-06 00:09:40 UTC (rev
31791)
+++ gnunet/src/include/gnunet_protocols.h 2014-01-06 00:09:43 UTC (rev
31792)
@@ -2130,7 +2130,7 @@
#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL 697
-#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK 698
+#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK 698
#define GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST 701
Modified: gnunet/src/include/gnunet_psyc_service.h
===================================================================
--- gnunet/src/include/gnunet_psyc_service.h 2014-01-06 00:09:40 UTC (rev
31791)
+++ gnunet/src/include/gnunet_psyc_service.h 2014-01-06 00:09:43 UTC (rev
31792)
@@ -426,6 +426,11 @@
uint16_t *data_size,
void *data);
+typedef int
+(*GNUNET_PSYC_MasterTransmitNotifyModifier) (void *cls,
+ uint16_t *data_size,
+ void *data,
+ uint8_t *oper);
/**
* Flags for transmitting messages to a channel by the master.
@@ -472,7 +477,7 @@
struct GNUNET_PSYC_MasterTransmitHandle *
GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
const char *method_name,
- GNUNET_PSYC_MasterTransmitNotify notify_mod,
+ GNUNET_PSYC_MasterTransmitNotifyModifier
notify_mod,
GNUNET_PSYC_MasterTransmitNotify notify_data,
void *notify_cls,
enum GNUNET_PSYC_MasterTransmitFlags flags);
Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c 2014-01-06 00:09:40 UTC (rev
31791)
+++ gnunet/src/multicast/multicast_api.c 2014-01-06 00:09:43 UTC (rev
31792)
@@ -362,8 +362,9 @@
struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle;
size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD;
+ char buf[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
struct GNUNET_MULTICAST_MessageHeader *msg
- = GNUNET_malloc (buf_size);
+ = (struct GNUNET_MULTICAST_MessageHeader *) buf;
int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]);
if (! (GNUNET_YES == ret || GNUNET_NO == ret)
@@ -380,12 +381,12 @@
msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
msg->header.size = htons (sizeof (*msg) + buf_size);
- msg->message_id = mh->message_id;
+ msg->message_id = GNUNET_htonll (mh->message_id);
msg->group_generation = mh->group_generation;
/* FIXME: add fragment ID and signature in the service instead of here */
- msg->fragment_id = orig->next_fragment_id++;
- msg->fragment_offset = mh->fragment_offset;
+ msg->fragment_id = GNUNET_ntohll (orig->next_fragment_id++);
+ msg->fragment_offset = GNUNET_ntohll (mh->fragment_offset);
mh->fragment_offset += sizeof (*msg) + buf_size;
msg->purpose.size = htonl (sizeof (*msg) + buf_size
- sizeof (msg->header)
Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c 2014-01-06 00:09:40 UTC (rev
31791)
+++ gnunet/src/psyc/gnunet-service-psyc.c 2014-01-06 00:09:43 UTC (rev
31792)
@@ -171,8 +171,8 @@
};
-static void
-transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay);
+static inline void
+transmit_message (struct Channel *ch);
/**
@@ -205,6 +205,7 @@
struct Master *mst = (struct Master *) ch;
if (NULL != mst->origin)
GNUNET_MULTICAST_origin_stop (mst->origin);
+ GNUNET_CONTAINER_multihashmap_remove (clients, &mst->pub_key_hash, mst);
}
else
{
@@ -251,7 +252,7 @@
/* Send pending messages to multicast before cleanup. */
if (NULL != ch->tmit_head)
{
- transmit_message (ch, GNUNET_TIME_UNIT_ZERO);
+ transmit_message (ch);
}
else
{
@@ -321,6 +322,10 @@
const struct GNUNET_MessageHeader *msg = cls;
struct Channel *ch = chan;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending message of type %u and size %u to client 0x%zx.\n",
+ ntohs (msg->type), ntohs (msg->size), ch->client);
+
GNUNET_SERVER_notification_context_add (nc, ch->client);
GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO);
@@ -363,36 +368,38 @@
GNUNET_MULTICAST_MessageHeader *) msg,
0, NULL, NULL);
+#if TODO
+ /* FIXME: apply modifiers to state in PSYCstore */
+ GNUNET_PSYCSTORE_state_modify (store, chan_key,
+ GNUNET_ntohll (mmsg->message_id),
+ meth->mod_count, mods,
+ rcb, rcb_cls);
+#endif
+
+ const struct GNUNET_MULTICAST_MessageHeader *mmsg
+ = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
+ struct GNUNET_PSYC_MessageHeader *pmsg;
+
uint16_t size = ntohs (msg->size);
uint16_t psize = 0;
uint16_t pos = 0;
- for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
+ for (pos = 0; sizeof (*mmsg) + pos < size; pos += psize)
{
const struct GNUNET_MessageHeader *pmsg
- = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
- uint16_t psize = ntohs (pmsg->size);
- if (sizeof (*msg) + pos + psize > size)
+ = (const struct GNUNET_MessageHeader *) ((char *) &mmsg[1] + pos);
+ psize = ntohs (pmsg->size);
+ if (psize < sizeof (*pmsg) || sizeof (*mmsg) + pos + psize > size)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Message received from multicast contains invalid PSYC "
- "message. Not sending to clients.\n");
+ "Received invalid message part of type %u and size %u "
+ "from multicast. Not sending to clients.\n",
+ ntohs (pmsg->type), psize);
+ GNUNET_break_op (0);
return;
}
}
-#if TODO
- /* FIXME: apply modifiers to state in PSYCstore */
- GNUNET_PSYCSTORE_state_modify (store, chan_key,
- GNUNET_ntohll (mmsg->message_id),
- meth->mod_count, mods,
- rcb, rcb_cls);
-#endif
-
- const struct GNUNET_MULTICAST_MessageHeader *mmsg
- = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
- struct GNUNET_PSYC_MessageHeader *pmsg;
-
psize = sizeof (*pmsg) + size - sizeof (*mmsg);
pmsg = GNUNET_malloc (psize);
pmsg->header.size = htons (psize);
@@ -572,19 +579,18 @@
/**
- * Send transmission acknowledgement to a client.
+ * Send acknowledgement to a client.
*
- * Sent after the last GNUNET_PSYC_MessageModifier and after each
- * GNUNET_PSYC_MessageData.
+ * Sent after a message fragment has been passed on to multicast.
*
* @param ch The channel struct for the client.
*/
static void
-send_transmit_ack (struct Channel *ch)
+send_message_ack (struct Channel *ch)
{
struct GNUNET_MessageHeader res;
res.size = htons (sizeof (res));
- res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK);
+ res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
GNUNET_SERVER_notification_context_add (nc, ch->client);
GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res,
@@ -599,9 +605,9 @@
transmit_notify (void *cls, size_t *data_size, void *data)
{
struct Channel *ch = cls;
- struct TransmitMessage *msg = ch->tmit_head;
+ struct TransmitMessage *tmit_msg = ch->tmit_head;
- if (NULL == msg || *data_size < msg->size)
+ if (NULL == tmit_msg || *data_size < tmit_msg->size)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to
send.\n");
*data_size = 0;
@@ -609,21 +615,22 @@
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "transmit_notify: sending %u bytes.\n", msg->size);
+ "transmit_notify: sending %u bytes.\n", tmit_msg->size);
- *data_size = msg->size;
- memcpy (data, msg->buf, *data_size);
+ *data_size = tmit_msg->size;
+ memcpy (data, tmit_msg->buf, *data_size);
- GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg);
- GNUNET_free (msg);
+ GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
+ GNUNET_free (tmit_msg);
int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
+ send_message_ack (ch);
if (0 == ch->tmit_task)
{
if (NULL != ch->tmit_head)
{
- transmit_message (ch, GNUNET_TIME_UNIT_ZERO);
+ transmit_message (ch);
}
else if (ch->disconnected)
{
@@ -640,11 +647,9 @@
* Transmit a message from a channel master to the multicast group.
*/
static void
-master_transmit_message (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+master_transmit_message (struct Master *mst)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n");
- struct Master *mst = cls;
mst->channel.tmit_task = 0;
if (NULL == mst->tmit_handle)
{
@@ -664,10 +669,8 @@
* Transmit a message from a channel slave to the multicast group.
*/
static void
-slave_transmit_message (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+slave_transmit_message (struct Slave *slv)
{
- struct Slave *slv = cls;
slv->channel.tmit_task = 0;
if (NULL == slv->tmit_handle)
{
@@ -682,215 +685,86 @@
}
-/**
- * Schedule message transmission from a channel to the multicast group.
- *
- * @param ch The channel.
- * @param delay Transmission delay.
- */
-static void
-transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay)
+static inline void
+transmit_message (struct Channel *ch)
{
- if (0 != ch->tmit_task)
- GNUNET_SCHEDULER_cancel (ch->tmit_task);
-
- ch->tmit_task
- = ch->is_master
- ? GNUNET_SCHEDULER_add_delayed (delay, master_transmit_message, ch)
- : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch);
+ ch->is_master
+ ? master_transmit_message ((struct Master *) ch)
+ : slave_transmit_message ((struct Slave *) ch);
}
-/**
- * Queue incoming message parts from a client for transmission, and send them
to
- * the multicast group when the buffer is full or reached the end of message.
- *
- * @param ch Channel struct for the client.
- * @param msg Message from the client.
- *
- * @return #GNUNET_OK on success, else #GNUNET_SYSERR.
- */
-static int
-queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
-{
- uint16_t size = ntohs (msg->size);
- struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO;
- struct TransmitMessage *tmit_msg = ch->tmit_tail;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Queueing message of type %u and size %u "
- "for transmission to multicast.\n",
- ntohs (msg->type), size);
-
- if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size)
- return GNUNET_SYSERR;
-
- if (NULL == tmit_msg
- || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit_msg->size + size)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Appending message to new buffer.\n");
- /* Start filling up new buffer */
- tmit_msg = GNUNET_new (struct TransmitMessage);
- tmit_msg->buf = GNUNET_malloc (size);
- memcpy (tmit_msg->buf, msg, size);
- tmit_msg->size = size;
- tmit_msg->state = ch->tmit_state;
- GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Appending message to existing buffer.\n");
- /* Append to existing buffer */
- tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size);
- memcpy (tmit_msg->buf + tmit_msg->size, msg, size);
- tmit_msg->size += size;
- tmit_msg->state = ch->tmit_state;
- }
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size);
-
- /* Wait a bit for the remaining message parts from the client
- if there's still some space left in the buffer. */
- if (tmit_msg->state < MSG_STATE_END
- && (tmit_msg->size + sizeof (struct GNUNET_MessageHeader)
- < GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD))
- {
- tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2);
- }
- else
- {
- send_transmit_ack (ch);
- }
-
- transmit_message (ch, tmit_delay);
-
- return GNUNET_OK;
-}
-
-
static void
transmit_error (struct Channel *ch)
{
- struct GNUNET_MessageHeader *msg = GNUNET_malloc (sizeof (*msg));
+ struct GNUNET_MessageHeader *msg;
+ struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg)
+ + sizeof (*msg));
+ msg = (struct GNUNET_MessageHeader *) &tmit_msg[1];
msg->size = ntohs (sizeof (*msg));
msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
- queue_message (ch, msg);
+ tmit_msg->buf = (char *) &tmit_msg[1];
+ tmit_msg->size = sizeof (*msg);
+ tmit_msg->state = ch->tmit_state;
+ GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
+ transmit_message (ch);
+
+ /* FIXME: cleanup */
GNUNET_SERVER_client_disconnect (ch->client);
}
-/**
- * Incoming method from a client.
- */
-static void
-handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
-{
- /* const struct GNUNET_PSYC_MessageMethod *meth
- = (const struct GNUNET_PSYC_MessageMethod *) msg; */
- struct Channel *ch
- = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (NULL != ch);
- if (MSG_STATE_START != ch->tmit_state)
- {
- transmit_error (ch);
- return;
- }
- ch->tmit_state = MSG_STATE_METHOD;
-
- queue_message (ch, msg);
- send_transmit_ack (ch);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
-};
-
-
/**
- * Incoming modifier from a client.
+ * Incoming message from a client.
*/
static void
-handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
- const struct GNUNET_PSYC_MessageModifier *mod
- = (const struct GNUNET_PSYC_MessageModifier *) msg;
-
struct Channel *ch
= GNUNET_SERVER_client_get_user_context (client, struct Channel);
GNUNET_assert (NULL != ch);
- if (MSG_STATE_METHOD != ch->tmit_state
- || MSG_STATE_MODIFIER != ch->tmit_state
- || MSG_STATE_MOD_CONT != ch->tmit_state
- || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size)
- {
- transmit_error (ch);
- return;
- }
- ch->tmit_mod_value_size_expected = ntohl (mod->value_size);
- ch->tmit_mod_value_size = ntohs (msg->size) - ntohs(mod->name_size) - 1;
+ uint16_t size = ntohs (msg->size);
+ uint16_t psize = 0, pos = 0;
- queue_message (ch, msg);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
-};
-
-
-/**
- * Incoming modifier from a client.
- */
-static void
-handle_transmit_mod_cont (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
-{
- struct Channel *ch
- = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (NULL != ch);
-
- ch->tmit_mod_value_size += ntohs (msg->size);
-
- if (MSG_STATE_MODIFIER != ch->tmit_state
- || MSG_STATE_MOD_CONT != ch->tmit_state
- || ch->tmit_mod_value_size_expected < ch->tmit_mod_value_size)
+ if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
{
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Message payload too large\n");
+ GNUNET_break (0);
transmit_error (ch);
return;
}
- ch->tmit_state = MSG_STATE_MOD_CONT;
- queue_message (ch, msg);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
-};
-
-
-/**
- * Incoming data from a client.
- */
-static void
-handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
-{
- struct Channel *ch
- = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (NULL != ch);
-
- if (MSG_STATE_METHOD != ch->tmit_state
- || MSG_STATE_MODIFIER != ch->tmit_state
- || MSG_STATE_MOD_CONT != ch->tmit_state
- || ch->tmit_mod_value_size_expected != ch->tmit_mod_value_size)
+ for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
{
- transmit_error (ch);
- return;
+ const struct GNUNET_MessageHeader *pmsg
+ = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
+ psize = ntohs (pmsg->size);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message part of type %u and size %u "
+ "from client.\n", ntohs (pmsg->type), psize);
+ if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Received invalid message part of type %u and size %u "
+ "from client.\n", ntohs (pmsg->type), psize);
+ GNUNET_break (0);
+ transmit_error (ch);
+ return;
+ }
}
- ch->tmit_state = MSG_STATE_DATA;
- queue_message (ch, msg);
- send_transmit_ack (ch);
+ size -= sizeof (*msg);
+ struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
+ tmit_msg->buf = (char *) &tmit_msg[1];
+ memcpy (tmit_msg->buf, &msg[1], size);
+ tmit_msg->size = size;
+ tmit_msg->state = ch->tmit_state;
+ GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
+ transmit_message (ch);
- if (MSG_STATE_END <= ch->tmit_state)
- ch->tmit_state = MSG_STATE_START;
-
GNUNET_SERVER_receive_done (client, GNUNET_OK);
};
@@ -912,22 +786,9 @@
{ &handle_slave_join, NULL,
GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
-#if TODO
+
{ &handle_psyc_message, NULL,
GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
-#endif
- { &handle_transmit_method, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 },
-
- { &handle_transmit_modifier, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 },
-
- { &handle_transmit_mod_cont, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT, 0 },
-
- { &handle_transmit_data, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 },
- { NULL, NULL, 0, 0 }
};
cfg = c;
Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c 2014-01-06 00:09:40 UTC (rev 31791)
+++ gnunet/src/psyc/psyc_api.c 2014-01-06 00:09:43 UTC (rev 31792)
@@ -45,7 +45,7 @@
{
struct OperationHandle *prev;
struct OperationHandle *next;
- const struct GNUNET_MessageHeader *msg;
+ struct GNUNET_MessageHeader *msg;
};
/**
@@ -79,6 +79,11 @@
struct OperationHandle *tmit_tail;
/**
+ * Message being transmitted to the PSYC service.
+ */
+ struct OperationHandle *tmit_msg;
+
+ /**
* Message to send on reconnect.
*/
struct GNUNET_MessageHeader *reconnect_msg;
@@ -139,11 +144,6 @@
uint32_t recv_mod_value_size;
/**
- * Buffer space available for transmitting the next data fragment.
- */
- uint16_t tmit_size; // FIXME
-
- /**
* Is transmission paused?
*/
uint8_t tmit_paused;
@@ -151,7 +151,7 @@
/**
* Are we still waiting for a PSYC_TRANSMIT_ACK?
*/
- uint8_t tmit_ack_pending; // FIXME
+ uint8_t tmit_ack_pending;
/**
* Are we polling for incoming messages right now?
@@ -176,7 +176,7 @@
struct GNUNET_PSYC_MasterTransmitHandle
{
struct GNUNET_PSYC_Master *master;
- GNUNET_PSYC_MasterTransmitNotify notify_mod;
+ GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod;
GNUNET_PSYC_MasterTransmitNotify notify_data;
void *notify_cls;
enum MessageState state;
@@ -246,16 +246,14 @@
};
-/**
- * Try again to connect to the PSYC service.
- *
- * @param cls Handle to the PSYC service.
- * @param tc Scheduler context
- */
static void
reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+static void
+master_transmit_data (struct GNUNET_PSYC_Master *mst);
+
+
/**
* Reschedule a connect attempt to the service.
*
@@ -323,7 +321,80 @@
message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL);
}
+
/**
+ * Queue an incoming message part for transmission to the PSYC service.
+ *
+ * The message part is added to the current message buffer.
+ * When this buffer is full, it is added to the transmission queue.
+ *
+ * @param ch Channel struct for the client.
+ * @param msg Modifier message part, or NULL when there's no more modifiers.
+ * @param end End of message.
+ */
+static void
+queue_message (struct GNUNET_PSYC_Channel *ch,
+ const struct GNUNET_MessageHeader *msg,
+ uint8_t end)
+{
+ uint16_t size = msg ? ntohs (msg->size) : 0;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Queueing message of type %u and size %u (end: %u)).\n",
+ ntohs (msg->type), size, end);
+
+ struct OperationHandle *op = ch->tmit_msg;
+ if (NULL != op)
+ {
+ if (NULL == msg
+ || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < op->msg->size + size)
+ {
+ /* End of message or buffer is full, add it to transmission queue
+ * and start with empty buffer */
+ op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ op->msg->size = htons (op->msg->size);
+ GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
+ ch->tmit_msg = op = NULL;
+ ch->tmit_ack_pending++;
+ }
+ else
+ {
+ /* Message fits in current buffer, append */
+ ch->tmit_msg = op
+ = GNUNET_realloc (op, sizeof (*op) + op->msg->size + size);
+ op->msg = (struct GNUNET_MessageHeader *) &op[1];
+ memcpy ((char *) op->msg + op->msg->size, msg, size);
+ op->msg->size += size;
+ }
+ }
+
+ if (NULL == op && NULL != msg)
+ {
+ /* Empty buffer, copy over message. */
+ ch->tmit_msg = op
+ = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg) + size);
+ op->msg = (struct GNUNET_MessageHeader *) &op[1];
+ op->msg->size = sizeof (*op->msg) + size;
+ memcpy (&op->msg[1], msg, size);
+ }
+
+ if (NULL != op
+ && (end || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
+ < op->msg->size + sizeof (struct GNUNET_MessageHeader))))
+ {
+ /* End of message or buffer is full, add it to transmission queue. */
+ op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ op->msg->size = htons (op->msg->size);
+ GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
+ ch->tmit_msg = op = NULL;
+ ch->tmit_ack_pending++;
+ }
+
+ transmit_next (ch);
+}
+
+
+/**
* Request a modifier from a client to transmit.
*
* @param mst Master handle.
@@ -332,32 +403,71 @@
master_transmit_mod (struct GNUNET_PSYC_Master *mst)
{
struct GNUNET_PSYC_Channel *ch = &mst->ch;
- uint16_t max_data_size
- = ch->tmit_size > sizeof (struct GNUNET_MessageHeader)
- ? GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - ch->tmit_size
- : GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD - ch->tmit_size;
- uint16_t data_size = max_data_size;
+ uint16_t max_data_size, data_size;
+ char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
+ struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
+ int notify_ret;
- struct GNUNET_MessageHeader *msg;
- struct OperationHandle *op
- = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size);
- op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
- msg->type
- = MSG_STATE_MODIFIER == mst->tmit->state
- ? htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER)
- : htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
+ switch (mst->tmit->state)
+ {
+ case MSG_STATE_MODIFIER:
+ {
+ struct GNUNET_PSYC_MessageModifier *mod
+ = (struct GNUNET_PSYC_MessageModifier *) msg;
+ max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
+ msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
+ msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
+ notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls,
+ &data_size, &mod[1], &mod->oper);
+ mod->name_size = strnlen ((char *) &mod[1], data_size);
+ if (mod->name_size < data_size)
+ {
+ mod->oper = htons (mod->oper);
+ mod->value_size = htons (data_size - 1 - mod->name_size);
+ mod->name_size = htons (mod->name_size);
+ }
+ else if (0 < data_size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
+ notify_ret = GNUNET_SYSERR;
+ }
+ break;
+ }
+ case MSG_STATE_MOD_CONT:
+ {
+ max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
+ msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
+ msg->size = sizeof (struct GNUNET_MessageHeader);
+ notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls,
+ &data_size, &msg[1], NULL);
+ break;
+ }
+ default:
+ GNUNET_assert (0);
+ }
- int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls,
- &data_size, &msg[1]);
switch (notify_ret)
{
case GNUNET_NO:
- if (0 != data_size)
- mst->tmit->state = MSG_STATE_MOD_CONT;
+ if (0 == data_size)
+ { /* Transmission paused, nothing to send. */
+ ch->tmit_paused = GNUNET_YES;
+ return;
+ }
+ mst->tmit->state = MSG_STATE_MOD_CONT;
break;
case GNUNET_YES:
- mst->tmit->state = (0 == data_size) ? MSG_STATE_DATA : MSG_STATE_MODIFIER;
+ if (0 == data_size)
+ {
+ /* End of modifiers. */
+ mst->tmit->state = MSG_STATE_DATA;
+ if (0 == ch->tmit_ack_pending)
+ master_transmit_data (mst);
+
+ return;
+ }
+ mst->tmit->state = MSG_STATE_MODIFIER;
break;
default:
@@ -368,36 +478,18 @@
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
msg->size = htons (sizeof (*msg));
- GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
- transmit_next (ch);
+ queue_message (ch, msg, GNUNET_YES);
return;
}
- if ((GNUNET_NO == notify_ret && 0 == data_size))
- {
- /* Transmission paused, nothing to send. */
- ch->tmit_paused = GNUNET_YES;
- GNUNET_free (op);
- }
-
if (0 < data_size)
{
- GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
- msg->size = htons (sizeof (*msg) + data_size);
- GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
+ GNUNET_assert (data_size <= max_data_size);
+ msg->size = htons (msg->size + data_size);
+ queue_message (ch, msg, GNUNET_NO);
}
- /* End of message. */
- if (GNUNET_YES == notify_ret)
- {
- op = GNUNET_malloc (sizeof *(op) + sizeof (*msg));
- op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
- msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
- msg->size = htons (sizeof (*msg));
- GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
- }
-
- transmit_next (ch);
+ master_transmit_mod (mst);
}
@@ -410,11 +502,10 @@
master_transmit_data (struct GNUNET_PSYC_Master *mst)
{
struct GNUNET_PSYC_Channel *ch = &mst->ch;
- struct GNUNET_MessageHeader *msg;
uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
- struct OperationHandle *op
- = GNUNET_malloc (sizeof (*op) + sizeof (*msg) + data_size);
- op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
+ char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
+ struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
+
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls,
@@ -426,7 +517,7 @@
{
/* Transmission paused, nothing to send. */
ch->tmit_paused = GNUNET_YES;
- GNUNET_free (op);
+ return;
}
break;
@@ -441,9 +532,7 @@
mst->tmit->state = MSG_STATE_START;
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
msg->size = htons (sizeof (*msg));
-
- GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
- transmit_next (ch);
+ queue_message (ch, msg, GNUNET_YES);
return;
}
@@ -451,20 +540,16 @@
{
GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
msg->size = htons (sizeof (*msg) + data_size);
- GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
+ queue_message (ch, msg, !notify_ret);
}
/* End of message. */
if (GNUNET_YES == notify_ret)
{
- op = GNUNET_malloc (sizeof *(op) + sizeof (*msg));
- op->msg = msg = (struct GNUNET_MessageHeader *) &op[1];
msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
msg->size = htons (sizeof (*msg));
- GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
+ queue_message (ch, msg, GNUNET_YES);
}
-
- transmit_next (ch);
}
@@ -476,57 +561,55 @@
*/
static void
handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
- const struct GNUNET_PSYC_MessageHeader *pmsg)
+ const struct GNUNET_PSYC_MessageHeader *msg)
{
- const struct GNUNET_MessageHeader *msg;
- uint16_t msize = ntohs (pmsg->header.size);
- uint16_t pos = 0;
- uint16_t size = 0;
- uint16_t type, size_eq, size_min;
+ uint16_t size = ntohs (msg->header.size);
if (MSG_STATE_START == ch->recv_state)
{
- ch->recv_message_id = GNUNET_ntohll (pmsg->message_id);
- ch->recv_flags = ntohl (pmsg->flags);
+ ch->recv_message_id = GNUNET_ntohll (msg->message_id);
+ ch->recv_flags = ntohl (msg->flags);
}
- else if (GNUNET_ntohll (pmsg->message_id) != ch->recv_message_id)
+ else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
- GNUNET_ntohll (pmsg->message_id), ch->recv_message_id);
+ GNUNET_ntohll (msg->message_id), ch->recv_message_id);
GNUNET_break_op (0);
recv_error (ch);
}
- else if (ntohl (pmsg->flags) != ch->recv_flags)
+ else if (ntohl (msg->flags) != ch->recv_flags)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Unexpected message flags. Got: %lu, expected: %lu\n",
- ntohl (pmsg->flags), ch->recv_flags);
+ ntohl (msg->flags), ch->recv_flags);
GNUNET_break_op (0);
recv_error (ch);
}
- for (pos = 0; sizeof (*pmsg) + pos < msize; pos += size)
+ uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
+
+ for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
{
- msg = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
- size = ntohs (msg->size);
- type = ntohs (msg->type);
+ const struct GNUNET_MessageHeader *pmsg
+ = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
+ psize = ntohs (pmsg->size);
+ ptype = ntohs (pmsg->type);
size_eq = size_min = 0;
- if (msize < sizeof (*pmsg) + pos + size)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message part of type %u and size %u from PSYC.\n",
+ ptype, psize);
+
+ if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Discarding message of type %u with invalid size. "
- "(%u < %u + %u + %u)\n", ntohs (msg->type),
- msize, sizeof (*msg), pos, size);
+ "Discarding message of type %u with invalid size %u.\n",
+ ptype, psize);
break;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received message part of type %u and size %u from PSYC.\n",
- ntohs (msg->type), size);
-
- switch (type)
+ switch (ptype)
{
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
@@ -534,6 +617,7 @@
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
break;
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
size_min = sizeof (struct GNUNET_MessageHeader);
break;
@@ -543,22 +627,22 @@
break;
}
- if (! ((0 < size_eq && size == size_eq)
- || (0 < size_min && size_min <= size)))
+ if (! ((0 < size_eq && psize == size_eq)
+ || (0 < size_min && size_min <= psize)))
{
GNUNET_break (0);
reschedule_connect (ch);
return;
}
- switch (type)
+ switch (ptype)
{
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
{
struct GNUNET_PSYC_MessageMethod *meth
- = (struct GNUNET_PSYC_MessageMethod *) msg;
+ = (struct GNUNET_PSYC_MessageMethod *) pmsg;
- if (MSG_STATE_HEADER != ch->recv_state)
+ if (MSG_STATE_START != ch->recv_state)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Discarding out of order message method.\n");
@@ -568,89 +652,66 @@
*/
GNUNET_break_op (0);
recv_error (ch);
- break;
+ return;
}
- if ('\0' != (char *) meth + msg->size - 1)
+ if ('\0' != *((char *) meth + psize - 1))
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Discarding message with malformed method. "
"Message ID: %" PRIu64 "\n", ch->recv_message_id);
GNUNET_break_op (0);
recv_error (ch);
- break;
+ return;
}
- GNUNET_PSYC_MessageCallback message_cb
- = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
- ? ch->hist_message_cb
- : ch->message_cb;
-
- if (NULL != message_cb)
- message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
-
ch->recv_state = MSG_STATE_METHOD;
break;
}
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
{
- if (MSG_STATE_MODIFIER != ch->recv_state)
+ if (!(MSG_STATE_METHOD == ch->recv_state
+ || MSG_STATE_MODIFIER == ch->recv_state
+ || MSG_STATE_MOD_CONT == ch->recv_state))
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Discarding out of order message modifier.\n");
GNUNET_break_op (0);
recv_error (ch);
- break;
+ return;
}
struct GNUNET_PSYC_MessageModifier *mod
- = (struct GNUNET_PSYC_MessageModifier *) msg;
+ = (struct GNUNET_PSYC_MessageModifier *) pmsg;
uint16_t name_size = ntohs (mod->name_size);
ch->recv_mod_value_size_expected = ntohs (mod->value_size);
- ch->recv_mod_value_size = size - sizeof (*mod) - name_size - 1;
+ ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
- if (size < sizeof (*mod) + name_size + 1
- || '\0' != (char *) &mod[1] + mod->name_size
+ if (psize < sizeof (*mod) + name_size + 1
+ || '\0' != *((char *) &mod[1] + name_size)
|| ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
{
LOG (GNUNET_ERROR_TYPE_WARNING, "Discarding malformed modifier.\n");
GNUNET_break_op (0);
- break;
+ return;
}
-
ch->recv_state = MSG_STATE_MODIFIER;
-
- GNUNET_PSYC_MessageCallback message_cb
- = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
- ? ch->hist_message_cb
- : ch->message_cb;
-
- if (NULL != message_cb)
- message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
-
break;
}
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
{
- ch->recv_mod_value_size += size - sizeof (*msg);
+ ch->recv_mod_value_size += psize - sizeof (*pmsg);
- if (MSG_STATE_MODIFIER != ch->recv_state
+ if (!(MSG_STATE_MODIFIER == ch->recv_state
+ || MSG_STATE_MOD_CONT == ch->recv_state)
|| ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Discarding out of order message modifier continuation.\n");
GNUNET_break_op (0);
recv_reset (ch);
- break;
+ return;
}
-
- GNUNET_PSYC_MessageCallback message_cb
- = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
- ? ch->hist_message_cb
- : ch->message_cb;
-
- if (NULL != message_cb)
- message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, msg);
break;
}
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
@@ -662,12 +723,23 @@
"Discarding out of order message data fragment.\n");
GNUNET_break_op (0);
recv_reset (ch);
- break;
+ return;
}
-
ch->recv_state = MSG_STATE_DATA;
break;
}
+ }
+
+ GNUNET_PSYC_MessageCallback message_cb
+ = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
+ ? ch->hist_message_cb
+ : ch->message_cb;
+
+ if (NULL != message_cb)
+ message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg);
+
+ switch (ptype)
+ {
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
recv_reset (ch);
@@ -717,18 +789,7 @@
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
size_min = sizeof (struct GNUNET_PSYC_MessageHeader);
break;
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
- size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
- break;
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
- size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
- break;
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
- size_min = sizeof (struct GNUNET_MessageHeader);
- break;
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
- case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
size_eq = sizeof (struct GNUNET_MessageHeader);
break;
}
@@ -761,9 +822,15 @@
#endif
break;
}
- case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
{
- ch->tmit_ack_pending = GNUNET_NO;
+ if (0 == ch->tmit_ack_pending)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
+ GNUNET_break (0);
+ break;
+ }
+ ch->tmit_ack_pending--;
if (ch->is_master)
{
@@ -771,10 +838,6 @@
switch (mst->tmit->state)
{
case MSG_STATE_MODIFIER:
- if (GNUNET_NO == ch->tmit_paused)
- master_transmit_mod (mst);
- break;
-
case MSG_STATE_MOD_CONT:
if (GNUNET_NO == ch->tmit_paused)
master_transmit_mod (mst);
@@ -795,12 +858,13 @@
else
{
LOG (GNUNET_ERROR_TYPE_WARNING,
- "Ignoring transmit ack, there's no transmission going on.\n");
+ "Ignoring message ACK, there's no transmission going on.\n");
+ GNUNET_break (0);
}
break;
default:
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Ignoring unexpected transmit ack.\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Ignoring message ACK in state %u.\n", mst->tmit->state);
}
}
else
@@ -811,12 +875,15 @@
}
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
- handle_psyc_message(ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
+ handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
break;
}
- GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
- GNUNET_TIME_UNIT_FOREVER_REL);
+ if (NULL != ch->client)
+ {
+ GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
+ GNUNET_TIME_UNIT_FOREVER_REL);
+ }
}
@@ -1029,6 +1096,8 @@
GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
{
disconnect (master);
+ if (NULL != master->tmit)
+ GNUNET_free (master->tmit);
GNUNET_free (master);
}
@@ -1069,30 +1138,6 @@
}
-/* FIXME: split up value into <64K chunks and transmit the continuations in
- * MOD_CONT msgs */
-static int
-send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod)
-{
- struct GNUNET_PSYC_Channel *ch = cls;
- size_t name_size = strlen (mod->name) + 1;
- struct GNUNET_PSYC_MessageModifier *pmod;
- struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmod)
- + name_size + mod->value_size);
- pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
- op->msg = (struct GNUNET_MessageHeader *) pmod;
-
- pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
- pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size);
- pmod->name_size = htons (name_size);
- memcpy (&pmod[1], mod->name, name_size);
- memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size);
-
- GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
- return GNUNET_YES;
-}
-
-
/**
* Send a message to call a method to all members in the PSYC channel.
*
@@ -1107,7 +1152,7 @@
struct GNUNET_PSYC_MasterTransmitHandle *
GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
const char *method_name,
- GNUNET_PSYC_MasterTransmitNotify notify_mod,
+ GNUNET_PSYC_MasterTransmitNotifyModifier
notify_mod,
GNUNET_PSYC_MasterTransmitNotify notify_data,
void *notify_cls,
enum GNUNET_PSYC_MasterTransmitFlags flags)
@@ -1120,25 +1165,27 @@
size_t size = strlen (method_name) + 1;
struct GNUNET_PSYC_MessageMethod *pmeth;
- struct OperationHandle *op
- = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth) + size);
- pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1];
- op->msg = (struct GNUNET_MessageHeader *) pmeth;
+ struct OperationHandle *op;
+ ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
+ + sizeof (*pmeth) + size);
+ op->msg = (struct GNUNET_MessageHeader *) &op[1];
+ op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
+
+ pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
pmeth->header.size = htons (sizeof (*pmeth) + size);
pmeth->flags = htonl (flags);
memcpy (&pmeth[1], method_name, size);
- GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
- transmit_next (ch);
-
master->tmit = GNUNET_malloc (sizeof (*master->tmit));
master->tmit->master = master;
master->tmit->notify_mod = notify_mod;
master->tmit->notify_data = notify_data;
master->tmit->notify_cls = notify_cls;
- master->tmit->state = MSG_STATE_START; // FIXME
+ master->tmit->state = MSG_STATE_MODIFIER;
+
+ master_transmit_mod (master);
return master->tmit;
}
@@ -1152,7 +1199,7 @@
GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle
*th)
{
struct GNUNET_PSYC_Channel *ch = &th->master->ch;
- if (GNUNET_NO == ch->tmit_ack_pending)
+ if (0 == ch->tmit_ack_pending)
{
ch->tmit_paused = GNUNET_NO;
master_transmit_data (th->master);
Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2014-01-06 00:09:40 UTC (rev 31791)
+++ gnunet/src/psyc/test_psyc.c 2014-01-06 00:09:43 UTC (rev 31792)
@@ -25,6 +25,8 @@
* @author Christian Grothoff
*/
+#include <inttypes.h>
+
#include "platform.h"
#include "gnunet_crypto_lib.h"
#include "gnunet_common.h"
@@ -35,7 +37,7 @@
#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
-#define DEBUG_SERVICE 1
+#define DEBUG_SERVICE 0
/**
@@ -62,17 +64,37 @@
struct GNUNET_PSYC_MasterTransmitHandle *mth;
+struct TransmitClosure
+{
+ struct GNUNET_PSYC_MasterTransmitHandle *handle;
+ struct GNUNET_ENV_Environment *env;
+ char *data[16];
+ const char *mod_value;
+ size_t mod_value_size;
+ uint8_t data_count;
+ uint8_t paused;
+ uint8_t n;
+};
+
+struct TransmitClosure *tmit;
+
/**
* Clean up all resources used.
*/
static void
cleanup ()
{
- if (mst != NULL)
+ if (NULL != mst)
{
GNUNET_PSYC_master_stop (mst);
mst = NULL;
}
+ if (NULL != tmit)
+ {
+ GNUNET_ENV_environment_destroy (tmit->env);
+ GNUNET_free (tmit);
+ tmit = NULL;
+ }
GNUNET_SCHEDULER_shutdown ();
}
@@ -121,46 +143,42 @@
}
-static int
-method (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- uint64_t message_id, const char *name,
- size_t modifier_count, const struct GNUNET_ENV_Modifier *modifiers,
- uint64_t data_offset, const void *data, size_t data_size,
- enum GNUNET_PSYC_MessageFlags flags)
+static void
+message (void *cls, uint64_t message_id, uint32_t flags,
+ const struct GNUNET_MessageHeader *msg)
{
+ if (NULL == msg)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Error while receiving message %llu\n", message_id);
+ return;
+ }
+
+ uint16_t type = ntohs (msg->type);
+ uint16_t size = ntohs (msg->size);
+
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Method: %s, modifiers: %lu, flags: %u\n%.*s\n",
- name, modifier_count, flags, data_size, data);
- return GNUNET_OK;
+ "Got message part of type %u and size %u "
+ "belonging to message ID %llu with flags %u\n",
+ type, size, message_id, flags);
+
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
+ end ();
}
-static int
-join (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- const char *method_name,
- size_t variable_count, const struct GNUNET_ENV_Modifier *variables,
- const void *data, size_t data_size, struct GNUNET_PSYC_JoinHandle *jh)
+static void
+join_request (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
+ const char *method_name,
+ size_t variable_count, const struct GNUNET_ENV_Modifier
*variables,
+ const void *data, size_t data_size,
+ struct GNUNET_PSYC_JoinHandle *jh)
{
- return GNUNET_OK;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Got join request.");
}
-struct TransmitClosure
-{
- struct GNUNET_PSYC_MasterTransmitHandle *handle;
-
- char *mod_names[16];
- char *mod_values[16];
- char *data[16];
-
- uint8_t mod_count;
- uint8_t data_count;
-
- uint8_t paused;
- uint8_t n;
-};
-
-
static void
transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
@@ -172,45 +190,95 @@
static int
-tmit_notify_mod (void *cls, size_t *data_size, void *data)
+tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper)
{
struct TransmitClosure *tmit = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmit notify modifier: %lu bytes available, "
- "processing modifier %u/%u.\n",
- *data_size, tmit->n + 1, tmit->fragment_count);
- /* FIXME: continuation */
- uint16_t name_size = strlen (tmit->mod_names[tmit->n]);
- uint16_t value_size = strlen (tmit->mod_values[tmit->n]);
- if (name_size + 1 + value_size <= *data_size)
- return GNUNET_NO;
+ "%u modifiers left to process.\n",
+ *data_size, GNUNET_ENV_environment_get_count (tmit->env));
- *data_size = name_size + 1 + value_size;
- memcpy (data, tmit->fragments[tmit->n], *data_size);
+ enum GNUNET_ENV_Operator op = 0;
+ const char *name = NULL;
+ const char *value = NULL;
+ uint16_t name_size = 0;
+ size_t value_size = 0;
- if (++tmit->n < tmit->mod_count)
- {
- return GNUNET_NO;
+ if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
+ { /* Modifier continuation */
+ value = tmit->mod_value;
+ if (tmit->mod_value_size <= *data_size)
+ {
+ value_size = tmit->mod_value_size;
+ tmit->mod_value = NULL;
+ }
+ else
+ {
+ value_size = *data_size;
+ tmit->mod_value += value_size;
+ }
+ tmit->mod_value_size -= value_size;
+
+ if (*data_size < value_size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "value larger than buffer: %u < %zu\n",
+ *data_size, value_size);
+ *data_size = 0;
+ return GNUNET_NO;
+ }
+
+ *data_size = value_size;
+ memcpy (data, value, value_size);
}
- else
+ else if (NULL != oper)
{
- tmit->n = 0;
- return GNUNET_YES;
+ if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name,
+ (void *) &value,
&value_size))
+ { /* No more modifiers, continue with data */
+ *data_size = 0;
+ return GNUNET_YES;
+ }
+
+ *oper = op;
+ name_size = strlen (name);
+
+ if (name_size + 1 + value_size <= *data_size)
+ {
+ *data_size = name_size + 1 + value_size;
+ }
+ else
+ {
+ tmit->mod_value_size = value_size;
+ value_size = *data_size - name_size - 1;
+ tmit->mod_value_size -= value_size;
+ tmit->mod_value = value + value_size;
+ }
+
+ memcpy (data, name, name_size);
+ ((char *)data)[name_size] = '\0';
+ memcpy ((char *)data + name_size + 1, value, value_size);
}
+
+ return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO;
}
static int
-tmit_notify_data (void *cls, size_t *data_size, void *data)
+tmit_notify_data (void *cls, uint16_t *data_size, void *data)
{
struct TransmitClosure *tmit = cls;
+ uint16_t size = strlen (tmit->data[tmit->n]);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmit notify data: %lu bytes available, "
- "processing fragment %u/%u.\n",
- *data_size, tmit->n + 1, tmit->fragment_count);
- uint16_t size = strlen (tmit->data[tmit->n]);
- if (size <= *data_size)
- return GNUNET_NO;
+ "processing fragment %u/%u (size %u).\n",
+ *data_size, tmit->n + 1, tmit->data_count, size);
+ if (*data_size < size)
+ {
+ *data_size = 0;
+ GNUNET_assert (0);
+ return GNUNET_SYSERR;
+ }
if (GNUNET_YES == tmit->paused && tmit->n == tmit->data_count - 1)
{
@@ -231,19 +299,18 @@
}
-void
+static void
master_started (void *cls, uint64_t max_message_id)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Master started: %lu\n", max_message_id);
+ "Master started: %" PRIu64 "\n", max_message_id);
- struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
- GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
- "_foo", "bar baz", 7);
- GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
- "_foo_bar", "foo bar baz", 11);
-
- struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure);
+ tmit = GNUNET_new (struct TransmitClosure);
+ tmit->env = GNUNET_ENV_environment_create ();
+ GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+ "_foo", "bar baz", 7);
+ GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+ "_foo_bar", "foo bar baz", 11);
tmit->data[0] = "foo";
tmit->data[1] = "foo bar";
tmit->data[2] = "foo bar baz";
@@ -255,7 +322,7 @@
}
-void
+static void
slave_joined (void *cls, uint64_t max_message_id)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n",
max_message_id);
@@ -288,19 +355,19 @@
GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key);
GNUNET_CRYPTO_eddsa_key_get_public (slave_key, &slave_pub_key);
- mst = GNUNET_PSYC_master_start (cfg, channel_key,
- GNUNET_PSYC_CHANNEL_PRIVATE,
- &method, &join, &master_started, NULL);
- return;
+ mst = GNUNET_PSYC_master_start (cfg, channel_key,
GNUNET_PSYC_CHANNEL_PRIVATE,
+ &message, &join_request, &master_started,
NULL);
+ return; /* FIXME: test slave */
+
struct GNUNET_PeerIdentity origin;
struct GNUNET_PeerIdentity relays[16];
struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
- GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
- "_foo", "bar baz", 7);
- GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
- "_foo_bar", "foo bar baz", 11);
+ GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
+ "_foo", "bar baz", 7);
+ GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
+ "_foo_bar", "foo bar baz", 11);
slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
- 16, relays, &method, &join, &slave_joined,
+ 16, relays, &message, &join_request,
&slave_joined,
NULL, "_request_join", env, "some data", 9);
GNUNET_ENV_environment_destroy (env);
}
@@ -319,8 +386,7 @@
opts, &run, NULL))
return 1;
#else
- if (0 != GNUNET_TESTING_service_run ("test-psyc", "psyc",
- "test_psyc.conf", &run, NULL))
+ if (0 != GNUNET_TESTING_peer_run ("test-psyc", "test_psyc.conf", &run, NULL))
return 1;
#endif
return res;
Modified: gnunet/src/psyc/test_psyc.conf
===================================================================
--- gnunet/src/psyc/test_psyc.conf 2014-01-06 00:09:40 UTC (rev 31791)
+++ gnunet/src/psyc/test_psyc.conf 2014-01-06 00:09:43 UTC (rev 31792)
@@ -8,3 +8,10 @@
UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-psyc.sock
UNIX_MATCH_UID = NO
UNIX_MATCH_GID = YES
+
+[psycstore]
+AUTOSTART = YES
+BINARY = gnunet-service-psycstore
+UNIXPATH = $GNUNET_RUNTIME_DIR/test-gnunet-service-psycstore.sock
+UNIX_MATCH_UID = NO
+UNIX_MATCH_GID = YES
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r31792 - in gnunet: contrib src/include src/multicast src/psyc,
gnunet <=