gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r25361 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r25361 - gnunet/src/fs
Date: Mon, 10 Dec 2012 15:34:16 +0100

Author: grothoff
Date: 2012-12-10 15:34:16 +0100 (Mon, 10 Dec 2012)
New Revision: 25361

Modified:
   gnunet/src/fs/gnunet-service-fs_stream.c
Log:
do not queue more than one write at a time to stream, -- hopefully fixing #2672

Modified: gnunet/src/fs/gnunet-service-fs_stream.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_stream.c    2012-12-10 14:25:53 UTC (rev 
25360)
+++ gnunet/src/fs/gnunet-service-fs_stream.c    2012-12-10 14:34:16 UTC (rev 
25361)
@@ -40,6 +40,28 @@
 
 
 /**
+ * A message in the queue to be written to the stream.
+ */
+struct WriteQueueItem
+{
+  /**
+   * Kept in a DLL.
+   */
+  struct WriteQueueItem *next;
+
+  /**
+   * Kept in a DLL.
+   */
+  struct WriteQueueItem *prev;
+
+  /**
+   * Number of bytes of payload, allocated at the end of this struct.
+   */
+  size_t msize;
+};
+
+
+/**
  * Information we keep around for each active streaming client.
  */
 struct StreamClient
@@ -68,6 +90,16 @@
    * Handle for active write operation, or NULL.
    */ 
   struct GNUNET_STREAM_IOWriteHandle *wh;
+
+  /**
+   * Head of write queue.
+   */
+  struct WriteQueueItem *wqi_head;
+
+  /**
+   * Tail of write queue.
+   */
+  struct WriteQueueItem *wqi_tail;
   
   /**
    * Tokenizer for requests.
@@ -890,6 +922,16 @@
     GNUNET_DATASTORE_cancel (sc->qe);
   GNUNET_SERVER_mst_destroy (sc->mst);
   GNUNET_STREAM_close (sc->socket);
+  struct WriteQueueItem *wqi;
+  while (NULL != (wqi = sc->wqi_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+                                sc->wqi_tail,
+                                wqi);
+    GNUNET_free (wqi);
+  }
+
+
   GNUNET_CONTAINER_DLL_remove (sc_head,
                               sc_tail,
                               sc);
@@ -1062,6 +1104,15 @@
 
 
 /**
+ * Transmit the next entry from the write queue.
+ *
+ * @param sc where to process the write queue
+ */
+static void
+continue_writing (struct StreamClient *sc);
+
+
+/**
  * Sending a reply was completed, continue processing.
  *
  * @param cls closure with the struct StreamClient which sent the query
@@ -1085,7 +1136,10 @@
     GNUNET_STATISTICS_update (GSF_stats,
                              gettext_noop ("# Blocks transferred via stream"), 
1,
                              GNUNET_NO);
-    continue_reading (sc);
+    if (NULL != sc->wqi_head)
+      continue_writing (sc);
+    else
+      continue_reading (sc);
   }
   else
   {
@@ -1097,6 +1151,37 @@
 
 
 /**
+ * Transmit the next entry from the write queue.
+ *
+ * @param sc where to process the write queue
+ */
+static void
+continue_writing (struct StreamClient *sc)
+{
+  struct WriteQueueItem *wqi;
+
+  if (NULL != sc->wh)
+    return; /* write already pending */
+  if (NULL == (wqi = sc->wqi_head))
+    continue_reading (sc);
+  GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+                              sc->wqi_tail,
+                              wqi);
+  sc->wh = GNUNET_STREAM_write (sc->socket,
+                               &wqi[1], wqi->msize,
+                               GNUNET_TIME_UNIT_FOREVER_REL,
+                               &write_continuation,
+                               sc);
+  GNUNET_free (wqi);
+  if (NULL == sc->wh)
+  {
+    terminate_stream (sc);
+    return;
+  }
+}
+
+
+/**
  * Process a datum that was stored in the datastore.
  *
  * @param cls closure with the struct StreamClient which sent the query
@@ -1122,8 +1207,8 @@
 {
   struct StreamClient *sc = cls;
   size_t msize = size + sizeof (struct StreamReplyMessage);
-  char buf[msize] GNUNET_ALIGN;
-  struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf;
+  struct WriteQueueItem *wqi;
+  struct StreamReplyMessage *srm;
 
   sc->qe = NULL;
   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
@@ -1149,22 +1234,18 @@
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Starting transmission of %u byte reply via stream\n",
              (unsigned int) size);
+  wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
+  srm = (struct StreamReplyMessage *) &wqi[1];
   srm->header.size = htons ((uint16_t) msize);
   srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
   srm->type = htonl (type);
   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
   memcpy (&srm[1], data, size);
   sc->reply_size = msize;
-  sc->wh = GNUNET_STREAM_write (sc->socket,
-                               buf, msize,
-                               GNUNET_TIME_UNIT_FOREVER_REL,
-                               &write_continuation,
-                               sc);
-  if (NULL == sc->wh)
-  {
-    terminate_stream (sc);
-    return;
-  }
+  GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
+                              sc->wqi_tail,
+                              wqi);
+  continue_writing (sc);
 }
 
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]