gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r27364 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r27364 - gnunet/src/fs
Date: Tue, 4 Jun 2013 15:10:48 +0200

Author: grothoff
Date: 2013-06-04 15:10:47 +0200 (Tue, 04 Jun 2013)
New Revision: 27364

Modified:
   gnunet/src/fs/Makefile.am
   gnunet/src/fs/gnunet-service-fs_stream.c
Log:
-towards using mesh instead of stream

Modified: gnunet/src/fs/Makefile.am
===================================================================
--- gnunet/src/fs/Makefile.am   2013-06-04 13:08:52 UTC (rev 27363)
+++ gnunet/src/fs/Makefile.am   2013-06-04 13:10:47 UTC (rev 27364)
@@ -193,7 +193,7 @@
  $(top_builddir)/src/block/libgnunetblock.la \
  $(top_builddir)/src/datastore/libgnunetdatastore.la \
  $(top_builddir)/src/statistics/libgnunetstatistics.la \
- $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/mesh/libgnunetmesh.la \
  $(top_builddir)/src/ats/libgnunetats.la \
  $(top_builddir)/src/core/libgnunetcore.la \
  $(top_builddir)/src/util/libgnunetutil.la \

Modified: gnunet/src/fs/gnunet-service-fs_stream.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_stream.c    2013-06-04 13:08:52 UTC (rev 
27363)
+++ gnunet/src/fs/gnunet-service-fs_stream.c    2013-06-04 13:10:47 UTC (rev 
27364)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2012 Christian Grothoff (and other contributing authors)
+     (C) 2012, 2013 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -22,11 +22,26 @@
  * @file fs/gnunet-service-fs_stream.c
  * @brief non-anonymous file-transfer
  * @author Christian Grothoff
+ *
+ * TODO:
+ * - update comments on functions (still matches 'stream')
+ * - MESH2 API doesn't allow flow control for server yet (needed!)
+ * - likely need to register clean up handler with mesh to handle
+ *   client disconnect (likely leaky right now)
+ * - server is optional, currently client code will NPE if we have
+ *   no server, again MESH2 API requirement forcing this for now
+ * - message handlers are symmetric for client/server, should be
+ *   separated (currently clients can get requests and servers can
+ *   handle answers, not good)
+ * - code is entirely untested
+ * - might have overlooked a few possible simplifications
+ * - PORT is set to old application type, unsure if we should keep
+ *   it that way (fine for now)
  */
 #include "platform.h"
 #include "gnunet_constants.h"
 #include "gnunet_util_lib.h"
-#include "gnunet_stream_lib.h"
+#include "gnunet_mesh2_service.h"
 #include "gnunet_protocols.h"
 #include "gnunet_applications.h"
 #include "gnunet-service-fs.h"
@@ -84,17 +99,12 @@
   /**
    * Socket for communication.
    */ 
-  struct GNUNET_STREAM_Socket *socket;
+  struct GNUNET_MESH_Tunnel *socket;
 
   /**
-   * Handle for active read operation, or NULL.
-   */ 
-  struct GNUNET_STREAM_ReadHandle *rh;
-
-  /**
    * Handle for active write operation, or NULL.
    */ 
-  struct GNUNET_STREAM_WriteHandle *wh;
+  struct GNUNET_MESH_TransmitHandle *wh;
 
   /**
    * Head of write queue.
@@ -107,11 +117,6 @@
   struct WriteQueueItem *wqi_tail;
   
   /**
-   * Tokenizer for requests.
-   */
-  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
-  
-  /**
    * Current active request to the datastore, if we have one pending.
    */
   struct GNUNET_DATASTORE_QueueEntry *qe;
@@ -263,24 +268,14 @@
   /**
    * Connection to the other peer.
    */
-  struct GNUNET_STREAM_Socket *stream;
+  struct GNUNET_MESH_Tunnel *stream;
 
   /**
-   * Handle for active read operation, or NULL.
-   */ 
-  struct GNUNET_STREAM_ReadHandle *rh;
-
-  /**
    * Handle for active write operation, or NULL.
    */ 
-  struct GNUNET_STREAM_WriteHandle *wh;
+  struct GNUNET_MESH_TransmitHandle *wh;
 
   /**
-   * Tokenizer for replies.
-   */
-  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
-
-  /**
    * Which peer does this stream go to?
    */ 
   struct GNUNET_PeerIdentity target;
@@ -310,7 +305,7 @@
 /**
  * Listen socket for incoming requests.
  */
-static struct GNUNET_STREAM_ListenSocket *listen_socket;
+static struct GNUNET_MESH_Handle *listen_socket;
 
 /**
  * Head of DLL of stream clients.
@@ -387,14 +382,12 @@
                                         &free_waiting_entry,
                                         sh);
   if (NULL != sh->wh)
-    GNUNET_STREAM_write_cancel (sh->wh);
-  if (NULL != sh->rh)
-    GNUNET_STREAM_read_cancel (sh->rh);
+    GNUNET_MESH_notify_transmit_ready_cancel (sh->wh);
   if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
     GNUNET_SCHEDULER_cancel (sh->timeout_task);
   if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
     GNUNET_SCHEDULER_cancel (sh->reset_task);
-  GNUNET_STREAM_close (sh->stream);
+  GNUNET_MESH_tunnel_destroy (sh->stream);
   GNUNET_assert (GNUNET_OK ==
                 GNUNET_CONTAINER_multihashmap_remove (stream_map,
                                                       &sh->target.hashPubKey,
@@ -414,23 +407,6 @@
 
 
 /**
- * Function called once the stream is ready for transmission.
- *
- * @param cls the 'struct StreamHandle'
- * @param socket stream socket handle
- */
-static void
-stream_ready_cb (void *cls,
-                struct GNUNET_STREAM_Socket *socket)
-{
-  struct StreamHandle *sh = cls;
-
-  sh->is_ready = GNUNET_YES;
-  transmit_pending (sh);
-}
-
-
-/**
  * Iterator called on each entry in a waiting map to 
  * move it back to the pending list.
  *
@@ -470,21 +446,15 @@
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Resetting stream to %s\n",
              GNUNET_i2s (&sh->target));
-  if (NULL != sh->rh)
-  {
-    GNUNET_STREAM_read_cancel (sh->rh);
-    sh->rh = NULL;
-  }
-  GNUNET_STREAM_close (sh->stream);
+  GNUNET_MESH_tunnel_destroy (sh->stream);
   sh->is_ready = GNUNET_NO;
   GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
                                         &move_to_pending,
                                         sh);
-  sh->stream = GNUNET_STREAM_open (GSF_cfg,
-                                  &sh->target,
-                                  GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
-                                  &stream_ready_cb, sh,
-                                  GNUNET_STREAM_OPTION_END);
+  sh->stream = GNUNET_MESH_tunnel_create (listen_socket,
+                                         sh,                               
+                                         &sh->target,
+                                         
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER);
 }
 
 
@@ -542,105 +512,33 @@
 
 
 /**
- * We got a reply from the stream.  Process it.
- *
- * @param cls the struct StreamHandle 
- * @param status the status of the stream at the time this function is called
- * @param data traffic from the other side
- * @param size the number of bytes available in data read; will be 0 on 
timeout 
- * @return number of bytes of processed from 'data' (any data remaining should 
be
- *         given to the next time the read processor is called).
- */
-static size_t
-handle_stream_reply (void *cls,
-                    enum GNUNET_STREAM_Status status,
-                    const void *data,
-                    size_t size)
-{
-  struct StreamHandle *sh = cls;
-
-  sh->rh = NULL;
-  GNUNET_SCHEDULER_cancel (sh->reset_task);
-  sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
-                                                &reset_stream_task,
-                                                sh);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Received %u bytes from stream to %s\n",
-             (unsigned int) size,
-             GNUNET_i2s (&sh->target));
-  if (GNUNET_SYSERR == 
-      GNUNET_SERVER_mst_receive (sh->mst,
-                                NULL,
-                                data, size,
-                                GNUNET_NO, GNUNET_NO))
-  {
-    GNUNET_break_op (0);
-    reset_stream_async (sh);
-    return size;
-  }
-  if (NULL == sh->rh)
-    sh->rh = GNUNET_STREAM_read (sh->stream,
-                                GNUNET_TIME_UNIT_FOREVER_REL,
-                                &handle_stream_reply,
-                                sh);
-  return size;
-}
-
-
-/**
- * Functions of this signature are called whenever we transmitted a
+ * Functions of this signature are called whenever we are ready to transmit
  * query via a stream.
  *
  * @param cls the struct StreamHandle for which we did the write call
- * @param status the status of the stream at the time this function is called;
- *          GNUNET_OK if writing to stream was completed successfully,
- *          GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
- *          mean time.
- * @param size the number of bytes written
+ * @param size the number of bytes that can be written to 'buf'
+ * @param buf where to write the message
+ * @return number of bytes written to 'buf'
  */
-static void
-query_write_continuation (void *cls,
-                         enum GNUNET_STREAM_Status status,
-                         size_t size)
+static size_t
+transmit_sqm (void *cls,
+             size_t size,
+             void *buf)
 {
   struct StreamHandle *sh = cls;
+  struct StreamQueryMessage sqm;
+  struct GSF_StreamRequest *sr;
 
   sh->wh = NULL;
-  if ( (GNUNET_STREAM_OK != status) ||
-       (sizeof (struct StreamQueryMessage) != size) )
+  if (NULL == buf)
   {
     reset_stream (sh);
-    return;
+    return 0;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Successfully transmitted %u bytes via stream to %s\n",
-             (unsigned int) size,
-             GNUNET_i2s (&sh->target));
-  if (NULL == sh->rh)
-    sh->rh = GNUNET_STREAM_read (sh->stream,
-                                GNUNET_TIME_UNIT_FOREVER_REL,
-                                &handle_stream_reply,
-                                sh);
-  transmit_pending (sh);
-}
-         
-
-/**
- * Transmit pending requests via the stream.
- *
- * @param sh stream to process
- */
-static void
-transmit_pending (struct StreamHandle *sh)
-{
-  struct StreamQueryMessage sqm;
-  struct GSF_StreamRequest *sr;
-
-  if (NULL != sh->wh)
-    return;
   sr = sh->pending_head;
   if (NULL == sr)
-    return;
+    return 0;
+  GNUNET_assert (size >= sizeof (struct StreamQueryMessage));
   GNUNET_CONTAINER_DLL_remove (sh->pending_head,
                               sh->pending_tail,
                               sr);
@@ -656,14 +554,33 @@
   sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
   sqm.type = htonl (sr->type);
   sqm.query = sr->query;
-  sh->wh = GNUNET_STREAM_write (sh->stream,
-                               &sqm, sizeof (sqm),
-                               GNUNET_TIME_UNIT_FOREVER_REL,
-                               &query_write_continuation,
-                               sh);
+  memcpy (buf, &sqm, sizeof (sqm));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Successfully transmitted %u bytes via mesh to %s\n",
+             (unsigned int) size,
+             GNUNET_i2s (&sh->target));
+  transmit_pending (sh);
+  return sizeof (sqm);
 }
+         
 
+/**
+ * Transmit pending requests via the stream.
+ *
+ * @param sh stream to process
+ */
+static void
+transmit_pending (struct StreamHandle *sh)
+{
+  if (NULL != sh->wh)
+    return;
+  sh->wh = GNUNET_MESH_notify_transmit_ready (sh->stream, GNUNET_YES /* allow 
cork */,
+                                             GNUNET_TIME_UNIT_FOREVER_REL,
+                                             sizeof (struct 
StreamQueryMessage),
+                                             &transmit_sqm, sh);
+}
 
+
 /**
  * Closure for 'handle_reply'.
  */
@@ -729,8 +646,6 @@
  * Functions with this signature are called whenever a
  * complete reply is received.
  *
- * Do not call GNUNET_SERVER_mst_destroy in callback
- *
  * @param cls closure with the 'struct StreamHandle'
  * @param client identification of the client, NULL
  * @param message the actual message
@@ -738,10 +653,12 @@
  */
 static int
 reply_cb (void *cls,
-         void *client,
+         struct GNUNET_MESH_Tunnel *tunnel,
+         void **tunnel_ctx,
+         const struct GNUNET_PeerIdentity *sender,
          const struct GNUNET_MessageHeader *message)
 {
-  struct StreamHandle *sh = cls;
+  struct StreamHandle *sh = *tunnel_ctx;
   const struct StreamReplyMessage *srm;
   struct HandleReplyClosure hrc;
   uint16_t msize;
@@ -749,55 +666,47 @@
   struct GNUNET_HashCode query;
 
   msize = ntohs (message->size);
-  switch (ntohs (message->type))
+  if (sizeof (struct StreamReplyMessage) > msize)
   {
-  case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY:
-    if (sizeof (struct StreamReplyMessage) > msize)
-    {
-      GNUNET_break_op (0);
-      reset_stream_async (sh);
-      return GNUNET_SYSERR;
-    }
-    srm = (const struct StreamReplyMessage *) message;
-    msize -= sizeof (struct StreamReplyMessage);
-    type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
-    if (GNUNET_YES !=
-       GNUNET_BLOCK_get_key (GSF_block_ctx,
-                             type,
-                             &srm[1], msize, &query))
-    {
-      GNUNET_break_op (0); 
-      reset_stream_async (sh);
-      return GNUNET_SYSERR;
-    }
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Received reply `%s' via stream\n",
-               GNUNET_h2s (&query));
-    GNUNET_STATISTICS_update (GSF_stats,
-                             gettext_noop ("# replies received via stream"), 1,
-                             GNUNET_NO);
-    hrc.data = &srm[1];
-    hrc.data_size = msize;
-    hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
-    hrc.type = type;
-    hrc.found = GNUNET_NO;
-    GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
-                                               &query,
-                                               &handle_reply,
-                                               &hrc);
-    if (GNUNET_NO == hrc.found)
-    {
-      GNUNET_STATISTICS_update (GSF_stats,
-                               gettext_noop ("# replies received via stream 
dropped"), 1,
-                               GNUNET_NO);
-      return GNUNET_OK;
-    }
-    return GNUNET_OK;
-  default:
     GNUNET_break_op (0);
     reset_stream_async (sh);
     return GNUNET_SYSERR;
   }
+  srm = (const struct StreamReplyMessage *) message;
+  msize -= sizeof (struct StreamReplyMessage);
+  type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
+  if (GNUNET_YES !=
+      GNUNET_BLOCK_get_key (GSF_block_ctx,
+                           type,
+                           &srm[1], msize, &query))
+  {
+    GNUNET_break_op (0); 
+    reset_stream_async (sh);
+    return GNUNET_SYSERR;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received reply `%s' via stream\n",
+             GNUNET_h2s (&query));
+  GNUNET_STATISTICS_update (GSF_stats,
+                           gettext_noop ("# replies received via stream"), 1,
+                           GNUNET_NO);
+  hrc.data = &srm[1];
+  hrc.data_size = msize;
+  hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
+  hrc.type = type;
+  hrc.found = GNUNET_NO;
+  GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
+                                             &query,
+                                             &handle_reply,
+                                             &hrc);
+  if (GNUNET_NO == hrc.found)
+  {
+    GNUNET_STATISTICS_update (GSF_stats,
+                             gettext_noop ("# replies received via stream 
dropped"), 1,
+                             GNUNET_NO);
+    return GNUNET_OK;
+  }
+  return GNUNET_OK;
 }
 
 
@@ -829,15 +738,12 @@
   sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
                                                 &reset_stream_task,
                                                 sh);
-  sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
-                                     sh);
   sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
   sh->target = *target;
-  sh->stream = GNUNET_STREAM_open (GSF_cfg,
-                                  &sh->target,
-                                  GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
-                                  &stream_ready_cb, sh,
-                                  GNUNET_STREAM_OPTION_END);
+  sh->stream = GNUNET_MESH_tunnel_create (listen_socket,
+                                         sh,
+                                         &sh->target,
+                                         
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER);
   GNUNET_assert (GNUNET_OK ==
                 GNUNET_CONTAINER_multihashmap_put (stream_map,
                                                    &sh->target.hashPubKey,
@@ -933,14 +839,11 @@
     GNUNET_SCHEDULER_cancel (sc->terminate_task); 
   if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
     GNUNET_SCHEDULER_cancel (sc->timeout_task); 
- if (NULL != sc->rh)
-    GNUNET_STREAM_read_cancel (sc->rh);
   if (NULL != sc->wh)
-    GNUNET_STREAM_write_cancel (sc->wh);
+    GNUNET_MESH_notify_transmit_ready_cancel (sc->wh);
   if (NULL != sc->qe)
     GNUNET_DATASTORE_cancel (sc->qe);
-  GNUNET_SERVER_mst_destroy (sc->mst);
-  GNUNET_STREAM_close (sc->socket);
+  GNUNET_MESH_tunnel_destroy (sc->socket);
   struct WriteQueueItem *wqi;
   while (NULL != (wqi = sc->wqi_head))
   {
@@ -949,8 +852,6 @@
                                 wqi);
     GNUNET_free (wqi);
   }
-
-
   GNUNET_CONTAINER_DLL_remove (sc_head,
                               sc_tail,
                               sc);
@@ -960,23 +861,6 @@
 
 
 /**
- * Task run to asynchronously terminate the stream.
- *
- * @param cls the 'struct StreamClient'
- * @param tc scheduler context
- */ 
-static void
-terminate_stream_task (void *cls,
-                      const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct StreamClient *sc = cls;
-
-  sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
-  terminate_stream (sc);
-}
-
-
-/**
  * Task run to asynchronously terminate the stream due to timeout.
  *
  * @param cls the 'struct StreamClient'
@@ -1010,39 +894,6 @@
 
 
 /**
- * We had a serious error, termiante stream,
- * but do so asynchronously.
- *
- * @param sc stream to reset
- */
-static void
-terminate_stream_async (struct StreamClient *sc)
-{
-  if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
-    sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
-                                                  sc);
-}
-
-
-/**
- * Functions of this signature are called whenever data is available from the
- * stream.
- *
- * @param cls the closure from GNUNET_STREAM_read
- * @param status the status of the stream at the time this function is called
- * @param data traffic from the other side
- * @param size the number of bytes available in data read; will be 0 on 
timeout 
- * @return number of bytes of processed from 'data' (any data remaining should 
be
- *         given to the next time the read processor is called).
- */
-static size_t 
-process_request (void *cls,
-                enum GNUNET_STREAM_Status status,
-                const void *data,
-                size_t size);
-
-
-/**
  * We're done handling a request from a client, read the next one.
  *
  * @param sc client to continue reading requests from
@@ -1050,22 +901,7 @@
 static void
 continue_reading (struct StreamClient *sc)
 {
-  int ret;
-
-  ret = 
-    GNUNET_SERVER_mst_receive (sc->mst,
-                              NULL,
-                              NULL, 0,
-                              GNUNET_NO, GNUNET_NO);
-  if (GNUNET_NO == ret)
-    return; 
   refresh_timeout_task (sc);
-  if (NULL != sc->rh)
-    return;
-  sc->rh = GNUNET_STREAM_read (sc->socket,
-                              GNUNET_TIME_UNIT_FOREVER_REL,
-                              &process_request,
-                              sc);      
 }
 
 
@@ -1079,93 +915,49 @@
 
 
 /**
- * Functions of this signature are called whenever data is available from the
- * stream.
+ * Send a reply now, mesh is ready.
  *
- * @param cls the closure from GNUNET_STREAM_read
- * @param status the status of the stream at the time this function is called
- * @param data traffic from the other side
- * @param size the number of bytes available in data read; will be 0 on 
timeout 
- * @return number of bytes of processed from 'data' (any data remaining should 
be
- *         given to the next time the read processor is called).
- */
-static size_t 
-process_request (void *cls,
-                enum GNUNET_STREAM_Status status,
-                const void *data,
-                size_t size)
-{
-  struct StreamClient *sc = cls;
-  int ret;
-
-  sc->rh = NULL;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Received %u byte query via stream\n",
-             (unsigned int) size);
-  switch (status)
-  {
-  case GNUNET_STREAM_OK:
-    ret = 
-      GNUNET_SERVER_mst_receive (sc->mst,
-                                NULL,
-                                data, size,
-                                GNUNET_NO, GNUNET_NO);
-    if (GNUNET_NO == ret)
-      return size; /* more messages in MST */
-    if (GNUNET_SYSERR == ret)
-    {
-      GNUNET_break_op (0);
-      terminate_stream_async (sc);
-      return size;
-    }
-    break;
-  case GNUNET_STREAM_TIMEOUT:
-  case GNUNET_STREAM_SHUTDOWN:
-  case GNUNET_STREAM_SYSERR:
-    terminate_stream_async (sc);
-    return size;
-  default:
-    GNUNET_break (0);
-    return size;
-  }
-  continue_writing (sc);
-  return size;
-}
-
-
-/**
- * Sending a reply was completed, continue processing.
- *
  * @param cls closure with the struct StreamClient which sent the query
- * @param status result code for the operation
- * @param size number of bytes that were transmitted
+ * @param size number of bytes available in 'buf'
+ * @param buf where to write the message
+ * @return number of bytes written to 'buf'
  */
-static void
+static size_t
 write_continuation (void *cls,
-                   enum GNUNET_STREAM_Status status,
-                   size_t size)
+                   size_t size,
+                   void *buf)
 {
   struct StreamClient *sc = cls;
-  
+  struct WriteQueueItem *wqi;
+  size_t ret;
+
   sc->wh = NULL;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Write continuation called on 'server' side with status %d\n",
-             status);
-  if ( (GNUNET_STREAM_OK != status) ||
-       (size != sc->reply_size) )
+  if (NULL == (wqi = sc->wqi_head))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Write queue empty, reading more requests\n");
+    return 0;
+  }
+  if (0 == size)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                "Transmission of reply failed, terminating stream\n");
     terminate_stream (sc);    
-    return;
+    return 0;
   }
+  GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+                              sc->wqi_tail,
+                              wqi);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Transmitted %u byte reply via stream\n",
              (unsigned int) size);
   GNUNET_STATISTICS_update (GSF_stats,
                            gettext_noop ("# Blocks transferred via stream"), 1,
                            GNUNET_NO);
+  memcpy (buf, &wqi[1], ret = wqi->msize);
+  GNUNET_free (wqi);
   continue_writing (sc);
+  return ret;
 }
 
 
@@ -1192,19 +984,11 @@
     continue_reading (sc);
     return;
   }
-  GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
-                              sc->wqi_tail,
-                              wqi);
-  sc->wh = GNUNET_STREAM_write (sc->socket,
-                               &wqi[1], wqi->msize,
-                               GNUNET_TIME_UNIT_FOREVER_REL,
-                               &write_continuation,
-                               sc);
-  if (NULL != sc->wh)
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Gave %u bytes for stream for transmission\n",
-               (unsigned int) wqi->msize);
-  GNUNET_free (wqi);
+  sc->wh = GNUNET_MESH_notify_transmit_ready (sc->socket, GNUNET_NO,
+                                             GNUNET_TIME_UNIT_FOREVER_REL,
+                                             wqi->msize,                       
              
+                                             &write_continuation,
+                                             sc);
   if (NULL == sc->wh)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1302,50 +1086,37 @@
  */
 static int
 request_cb (void *cls,
-           void *client,
+           struct GNUNET_MESH_Tunnel *tunnel,
+           void **tunnel_ctx,
+           const struct GNUNET_PeerIdentity *sender,
            const struct GNUNET_MessageHeader *message)
 {
-  struct StreamClient *sc = cls;
+  struct StreamClient *sc = *tunnel_ctx;
   const struct StreamQueryMessage *sqm;
 
-  switch (ntohs (message->type))
+  sqm = (const struct StreamQueryMessage *) message;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received query for `%s' via stream\n",
+             GNUNET_h2s (&sqm->query));
+  GNUNET_STATISTICS_update (GSF_stats,
+                           gettext_noop ("# queries received via stream"), 1,
+                           GNUNET_NO);
+  refresh_timeout_task (sc);
+  sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
+                                    0,
+                                    &sqm->query,
+                                    ntohl (sqm->type),
+                                    0 /* priority */, 
+                                    GSF_datastore_queue_size,
+                                    GNUNET_TIME_UNIT_FOREVER_REL,
+                                    &handle_datastore_reply, sc);
+  if (NULL == sc->qe)
   {
-  case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
-    if (sizeof (struct StreamQueryMessage) != 
-       ntohs (message->size))
-    {
-      GNUNET_break_op (0);
-      terminate_stream_async (sc);
-      return GNUNET_SYSERR;
-    }
-    sqm = (const struct StreamQueryMessage *) message;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Received query for `%s' via stream\n",
-               GNUNET_h2s (&sqm->query));
-    GNUNET_STATISTICS_update (GSF_stats,
-                             gettext_noop ("# queries received via stream"), 1,
-                             GNUNET_NO);
-    refresh_timeout_task (sc);
-    sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
-                                      0,
-                                      &sqm->query,
-                                      ntohl (sqm->type),
-                                      0 /* priority */, 
-                                      GSF_datastore_queue_size,
-                                      GNUNET_TIME_UNIT_FOREVER_REL,
-                                      &handle_datastore_reply, sc);
-    if (NULL == sc->qe)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Queueing request with datastore failed (queue full?)\n");
-      continue_writing (sc);
-    }
-    return GNUNET_OK;
-  default:
-    GNUNET_break_op (0);
-    terminate_stream_async (sc);
-    return GNUNET_SYSERR;
+               "Queueing request with datastore failed (queue full?)\n");
+    continue_writing (sc);
   }
+  return GNUNET_OK;
 }
 
 
@@ -1355,27 +1126,27 @@
  * GNUNET_STREAM_listen() is already taken.
  *
  * @param cls the closure from GNUNET_STREAM_listen
- * @param socket the socket representing the stream; NULL on binding error
+ * @param socket the socket representing the stream
  * @param initiator the identity of the peer who wants to establish a stream
  *            with us; NULL on binding error
- * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
- *             stream (the socket will be invalid after the call)
+ * @return initial tunnel context (our 'struct StreamClient')
  */
-static int 
+static void *
 accept_cb (void *cls,
-          struct GNUNET_STREAM_Socket *socket,
-          const struct GNUNET_PeerIdentity *initiator)
+          struct GNUNET_MESH_Tunnel *socket,
+          const struct GNUNET_PeerIdentity *initiator,
+          uint32_t port)
 {
   struct StreamClient *sc;
 
-  if (NULL == socket)
-    return GNUNET_SYSERR;
+  GNUNET_assert (NULL != socket);
   if (sc_count >= sc_count_max)
   {
     GNUNET_STATISTICS_update (GSF_stats,
                              gettext_noop ("# stream client connections 
rejected"), 1,
                              GNUNET_NO);
-    return GNUNET_SYSERR;
+    GNUNET_MESH_tunnel_destroy (socket);
+    return NULL;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Accepting inbound stream connection from `%s'\n",
@@ -1385,18 +1156,12 @@
                            GNUNET_NO);
   sc = GNUNET_malloc (sizeof (struct StreamClient));
   sc->socket = socket;
-  sc->mst = GNUNET_SERVER_mst_create (&request_cb,
-                                     sc);
-  sc->rh = GNUNET_STREAM_read (sc->socket,
-                              GNUNET_TIME_UNIT_FOREVER_REL,
-                              &process_request,
-                              sc);
   GNUNET_CONTAINER_DLL_insert (sc_head,
                               sc_tail,
                               sc);
   sc_count++;
   refresh_timeout_task (sc);
-  return GNUNET_OK;
+  return sc;
 }
 
 
@@ -1406,6 +1171,16 @@
 void
 GSF_stream_start ()
 {
+  static const struct GNUNET_MESH_MessageHandler handlers[] = {
+    { &request_cb, GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY, sizeof (struct 
StreamQueryMessage)},
+    { &reply_cb, GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY, 0 },
+    { NULL, 0, 0 }
+  };
+  static const uint32_t ports[] = {
+    GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+    0
+  };
+
   stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
   if (GNUNET_YES ==
       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
@@ -1413,10 +1188,12 @@
                                             "MAX_STREAM_CLIENTS",
                                             &sc_count_max))
   {
-    listen_socket = GNUNET_STREAM_listen (GSF_cfg,
-                                         
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
-                                         &accept_cb, NULL,
-                                         GNUNET_STREAM_OPTION_END);
+    listen_socket = GNUNET_MESH_connect (GSF_cfg,
+                                        NULL,
+                                        &accept_cb,
+                                        NULL /* FIXME: have a cleanup 
callback? */,
+                                        handlers,
+                                        ports);
   } 
 }
 
@@ -1453,7 +1230,7 @@
     terminate_stream (sc);
   if (NULL != listen_socket)
   {
-    GNUNET_STREAM_listen_close (listen_socket);
+    GNUNET_MESH_disconnect (listen_socket);
     listen_socket = NULL;
   }
   GNUNET_CONTAINER_multihashmap_iterate (stream_map,




reply via email to

[Prev in Thread] Current Thread [Next in Thread]