gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r25116 - in gnunet/src: fs include stream


From: gnunet
Subject: [GNUnet-SVN] r25116 - in gnunet/src: fs include stream
Date: Sat, 24 Nov 2012 23:17:15 +0100

Author: grothoff
Date: 2012-11-24 23:17:15 +0100 (Sat, 24 Nov 2012)
New Revision: 25116

Modified:
   gnunet/src/fs/gnunet-service-fs_stream.c
   gnunet/src/include/gnunet_stream_lib.h
   gnunet/src/stream/stream_api.c
Log:
-ensure that either stream_api calls callbacks last or that we don't destroy a 
stream handle while it is in use below us on the stack

Modified: gnunet/src/fs/gnunet-service-fs_stream.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_stream.c    2012-11-24 08:08:27 UTC (rev 
25115)
+++ gnunet/src/fs/gnunet-service-fs_stream.c    2012-11-24 22:17:15 UTC (rev 
25116)
@@ -25,7 +25,6 @@
  *
  * TODO:
  * - limit # concurrent clients, have timeouts for server-side
- * - stream shutdown in callbacks from stream may not always work right now 
(check with stream_api!)
  */
 #include "platform.h"
 #include "gnunet_constants.h"
@@ -78,6 +77,11 @@
   struct GNUNET_DATASTORE_QueueEntry *qe;
 
   /**
+   * Task that is scheduled to asynchronously terminate the connection.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier terminate_task;
+
+  /**
    * Size of the last write that was initiated.
    */ 
   size_t reply_size;
@@ -248,6 +252,13 @@
   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
 
   /**
+   * Task to reset streams that had errors (asynchronously,
+   * as we may not be able to do it immediately during a
+   * callback from the stream API).
+   */
+  GNUNET_SCHEDULER_TaskIdentifier reset_task;
+
+  /**
    * Is this stream ready for transmission?
    */
   int is_ready;
@@ -378,6 +389,55 @@
 
 
 /**
+ * Task called when it is time to destroy an inactive stream.
+ *
+ * @param cls the 'struct StreamHandle' to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+stream_timeout (void *cls,
+               const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct StreamHandle *sh = cls;
+
+  sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  destroy_stream_handle (sh);
+}
+
+
+/**
+ * Task called when it is time to reset an stream.
+ *
+ * @param cls the 'struct StreamHandle' to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+reset_stream_task (void *cls,
+                  const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct StreamHandle *sh = cls;
+
+  sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
+  reset_stream (sh);
+}
+
+
+/**
+ * We had a serious error, tear down and re-create stream from scratch,
+ * but do so asynchronously.
+ *
+ * @param sh stream to reset
+ */
+static void
+reset_stream_async (struct StreamHandle *sh)
+{
+  if (GNUNET_SCHEDULER_NO_TASK == sh->reset_task)
+    sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
+                                              sh);
+}
+
+
+/**
  * We got a reply from the stream.  Process it.
  *
  * @param cls the struct StreamHandle 
@@ -403,7 +463,7 @@
                                 GNUNET_NO, GNUNET_NO))
   {
     GNUNET_break_op (0);
-    reset_stream (sh);
+    reset_stream_async (sh);
     return size;
   }
   sh->rh = GNUNET_STREAM_read (sh->stream,
@@ -513,6 +573,7 @@
     if (sizeof (struct StreamReplyMessage) > msize)
     {
       GNUNET_break_op (0);
+      reset_stream_async (sh);
       return GNUNET_SYSERR;
     }
     srm = (const struct StreamReplyMessage *) message;
@@ -523,7 +584,8 @@
                              type,
                              &srm[1], msize, &query))
     {
-      GNUNET_break_op (0);
+      GNUNET_break_op (0); 
+      reset_stream_async (sh);
       return GNUNET_SYSERR;
     }
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -553,6 +615,7 @@
     return GNUNET_OK;
   default:
     GNUNET_break_op (0);
+    reset_stream_async (sh);
     return GNUNET_SYSERR;
   }
 }
@@ -633,23 +696,6 @@
 
 
 /**
- * Task called when it is time to destroy an inactive stream.
- *
- * @param cls the 'struct StreamHandle' to tear down
- * @param tc scheduler context, unused
- */
-static void
-stream_timeout (void *cls,
-               const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct StreamHandle *sh = cls;
-
-  sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
-  destroy_stream_handle (sh);
-}
-
-
-/**
  * Cancel an active request; must not be called after 'proc'
  * was calld.
  *
@@ -691,7 +737,9 @@
   GNUNET_STATISTICS_update (GSF_stats,
                            gettext_noop ("# stream connections active"), -1,
                            GNUNET_NO);
-  if (NULL != sc->rh)
+  if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
+    GNUNET_SCHEDULER_cancel (sc->terminate_task); 
+ if (NULL != sc->rh)
     GNUNET_STREAM_io_read_cancel (sc->rh);
   if (NULL != sc->wh)
     GNUNET_STREAM_io_write_cancel (sc->wh);
@@ -707,6 +755,38 @@
 
 
 /**
+ * Task run to asynchronously terminate the stream.
+ *
+ * @param cls the 'struct StreamClient'
+ * @param tc scheduler context
+ */ 
+static void
+terminate_stream_task (void *cls,
+                      const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct StreamClient *sc = cls;
+
+  sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
+  terminate_stream (sc);
+}
+
+
+/**
+ * We had a serious error, termiante stream,
+ * but do so asynchronously.
+ *
+ * @param sc stream to reset
+ */
+static void
+terminate_stream_async (struct StreamClient *sc)
+{
+  if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
+    sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
+                                                  sc);
+}
+
+
+/**
  * Functions of this signature are called whenever data is available from the
  * stream.
  *
@@ -782,7 +862,7 @@
     if (GNUNET_SYSERR == ret)
     {
       GNUNET_break_op (0);
-      terminate_stream (sc);
+      terminate_stream_async (sc);
       return size;
     }
     break;
@@ -790,7 +870,7 @@
   case GNUNET_STREAM_SHUTDOWN:
   case GNUNET_STREAM_SYSERR:
   case GNUNET_STREAM_BROKEN:
-    terminate_stream (sc);
+    terminate_stream_async (sc);
     return size;
   default:
     GNUNET_break (0);
@@ -922,6 +1002,7 @@
        ntohs (message->size))
     {
       GNUNET_break_op (0);
+      terminate_stream_async (sc);
       return GNUNET_SYSERR;
     }
     sqm = (const struct StreamQueryMessage *) message;
@@ -944,6 +1025,7 @@
     return GNUNET_OK;
   default:
     GNUNET_break_op (0);
+    terminate_stream_async (sc);
     return GNUNET_SYSERR;
   }
 }

Modified: gnunet/src/include/gnunet_stream_lib.h
===================================================================
--- gnunet/src/include/gnunet_stream_lib.h      2012-11-24 08:08:27 UTC (rev 
25115)
+++ gnunet/src/include/gnunet_stream_lib.h      2012-11-24 22:17:15 UTC (rev 
25116)
@@ -246,12 +246,10 @@
  * Listens for stream connections for a specific application ports
  *
  * @param cfg the configuration to use
- *
  * @param app_port the application port for which new streams will be
  *         accepted. If another stream is listening on the same port the
  *         listen_cb will be called to signal binding error and the returned
  *         ListenSocket will be invalidated.
- *
  * @param listen_cb this function will be called when a peer tries to establish
  *            a stream with us
  * @param listen_cb_cls closure for listen_cb

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2012-11-24 08:08:27 UTC (rev 25115)
+++ gnunet/src/stream/stream_api.c      2012-11-24 22:17:15 UTC (rev 25116)
@@ -1917,13 +1917,16 @@
      that that stream has been shutdown */
   if (NULL != socket->write_handle)
   {
-    // FIXME: this breaks if 'write_cont' decides to 
-    // call SOCKET_close!
-    if (NULL != socket->write_handle->write_cont)
-      socket->write_handle->write_cont (socket->write_handle->write_cont_cls,
-                                        GNUNET_STREAM_SHUTDOWN, 0);
+    GNUNET_STREAM_CompletionContinuation wc;
+    void *wc_cls;
+
+    wc = socket->write_handle->write_cont;
+    wc_cls = socket->write_handle->write_cont_cls;
     GNUNET_STREAM_io_write_cancel (socket->write_handle);
     socket->write_handle = NULL;
+    if (NULL != wc)
+      wc (wc_cls,
+         GNUNET_STREAM_SHUTDOWN, 0);
   }
   return GNUNET_OK;
 }
@@ -2041,13 +2044,16 @@
      that that stream has been shutdown */
   if (NULL != socket->write_handle)
   {
-    // FIXME: this breaks if 'write_cont' decides to 
-    // call SOCKET_close!
-    if (NULL != socket->write_handle->write_cont)
-      socket->write_handle->write_cont (socket->write_handle->write_cont_cls,
-                                        GNUNET_STREAM_SHUTDOWN, 0);
+    GNUNET_STREAM_CompletionContinuation wc;
+    void *wc_cls;
+
+    wc = socket->write_handle->write_cont;
+    wc_cls = socket->write_handle->write_cont_cls;
     GNUNET_STREAM_io_write_cancel (socket->write_handle);
     socket->write_handle = NULL;
+    if (NULL != wc)
+      wc (wc_cls,
+         GNUNET_STREAM_SHUTDOWN, 0);
   }
   return GNUNET_OK;
 }




reply via email to

[Prev in Thread] Current Thread [Next in Thread]