[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r25361 - gnunet/src/fs
From: |
gnunet |
Subject: |
[GNUnet-SVN] r25361 - gnunet/src/fs |
Date: |
Mon, 10 Dec 2012 15:34:16 +0100 |
Author: grothoff
Date: 2012-12-10 15:34:16 +0100 (Mon, 10 Dec 2012)
New Revision: 25361
Modified:
gnunet/src/fs/gnunet-service-fs_stream.c
Log:
do not queue more than one write at a time to stream, -- hopefully fixing #2672
Modified: gnunet/src/fs/gnunet-service-fs_stream.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_stream.c 2012-12-10 14:25:53 UTC (rev
25360)
+++ gnunet/src/fs/gnunet-service-fs_stream.c 2012-12-10 14:34:16 UTC (rev
25361)
@@ -40,6 +40,28 @@
/**
+ * A message in the queue to be written to the stream.
+ */
+struct WriteQueueItem
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct WriteQueueItem *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct WriteQueueItem *prev;
+
+ /**
+ * Number of bytes of payload, allocated at the end of this struct.
+ */
+ size_t msize;
+};
+
+
+/**
* Information we keep around for each active streaming client.
*/
struct StreamClient
@@ -68,6 +90,16 @@
* Handle for active write operation, or NULL.
*/
struct GNUNET_STREAM_IOWriteHandle *wh;
+
+ /**
+ * Head of write queue.
+ */
+ struct WriteQueueItem *wqi_head;
+
+ /**
+ * Tail of write queue.
+ */
+ struct WriteQueueItem *wqi_tail;
/**
* Tokenizer for requests.
@@ -890,6 +922,16 @@
GNUNET_DATASTORE_cancel (sc->qe);
GNUNET_SERVER_mst_destroy (sc->mst);
GNUNET_STREAM_close (sc->socket);
+ struct WriteQueueItem *wqi;
+ while (NULL != (wqi = sc->wqi_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+ sc->wqi_tail,
+ wqi);
+ GNUNET_free (wqi);
+ }
+
+
GNUNET_CONTAINER_DLL_remove (sc_head,
sc_tail,
sc);
@@ -1062,6 +1104,15 @@
/**
+ * Transmit the next entry from the write queue.
+ *
+ * @param sc where to process the write queue
+ */
+static void
+continue_writing (struct StreamClient *sc);
+
+
+/**
* Sending a reply was completed, continue processing.
*
* @param cls closure with the struct StreamClient which sent the query
@@ -1085,7 +1136,10 @@
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# Blocks transferred via stream"),
1,
GNUNET_NO);
- continue_reading (sc);
+ if (NULL != sc->wqi_head)
+ continue_writing (sc);
+ else
+ continue_reading (sc);
}
else
{
@@ -1097,6 +1151,37 @@
/**
+ * Transmit the next entry from the write queue.
+ *
+ * @param sc where to process the write queue
+ */
+static void
+continue_writing (struct StreamClient *sc)
+{
+ struct WriteQueueItem *wqi;
+
+ if (NULL != sc->wh)
+ return; /* write already pending */
+ if (NULL == (wqi = sc->wqi_head))
+ continue_reading (sc);
+ GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+ sc->wqi_tail,
+ wqi);
+ sc->wh = GNUNET_STREAM_write (sc->socket,
+ &wqi[1], wqi->msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &write_continuation,
+ sc);
+ GNUNET_free (wqi);
+ if (NULL == sc->wh)
+ {
+ terminate_stream (sc);
+ return;
+ }
+}
+
+
+/**
* Process a datum that was stored in the datastore.
*
* @param cls closure with the struct StreamClient which sent the query
@@ -1122,8 +1207,8 @@
{
struct StreamClient *sc = cls;
size_t msize = size + sizeof (struct StreamReplyMessage);
- char buf[msize] GNUNET_ALIGN;
- struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf;
+ struct WriteQueueItem *wqi;
+ struct StreamReplyMessage *srm;
sc->qe = NULL;
if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
@@ -1149,22 +1234,18 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting transmission of %u byte reply via stream\n",
(unsigned int) size);
+ wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
+ srm = (struct StreamReplyMessage *) &wqi[1];
srm->header.size = htons ((uint16_t) msize);
srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
srm->type = htonl (type);
srm->expiration = GNUNET_TIME_absolute_hton (expiration);
memcpy (&srm[1], data, size);
sc->reply_size = msize;
- sc->wh = GNUNET_STREAM_write (sc->socket,
- buf, msize,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &write_continuation,
- sc);
- if (NULL == sc->wh)
- {
- terminate_stream (sc);
- return;
- }
+ GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
+ sc->wqi_tail,
+ wqi);
+ continue_writing (sc);
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r25361 - gnunet/src/fs,
gnunet <=