gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r20225 - gnunet/src/stream


From: gnunet
Subject: [GNUnet-SVN] r20225 - gnunet/src/stream
Date: Sat, 3 Mar 2012 15:57:24 +0100

Author: harsha
Date: 2012-03-03 15:57:24 +0100 (Sat, 03 Mar 2012)
New Revision: 20225

Modified:
   gnunet/src/stream/Makefile.am
   gnunet/src/stream/stream_api.c
   gnunet/src/stream/test_stream_local.c
   gnunet/src/stream/test_stream_local.conf
Log:
-modified testcase according to modified interface

Modified: gnunet/src/stream/Makefile.am
===================================================================
--- gnunet/src/stream/Makefile.am       2012-03-03 14:48:28 UTC (rev 20224)
+++ gnunet/src/stream/Makefile.am       2012-03-03 14:57:24 UTC (rev 20225)
@@ -33,7 +33,8 @@
  test_stream_local.c
 test_stream_local_LDADD = \
  $(top_builddir)/src/stream/libgnunetstream.la \
- $(top_builddir)/src/util/libgnunetutil.la  
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la 
 
 #test_stream_halfclose_SOURCES = \
 # test_stream_halfclose.c

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2012-03-03 14:48:28 UTC (rev 20224)
+++ gnunet/src/stream/stream_api.c      2012-03-03 14:57:24 UTC (rev 20225)
@@ -18,6 +18,10 @@
   Boston, MA 02111-1307, USA.
 */
 
+/* TODO:
+ * Checks for matching the sender and socket->other_peer in server
+ * message handlers  */
+
 /**
  * @file stream/stream_api.c
  * @brief Implementation of the stream library
@@ -257,6 +261,11 @@
   unsigned int retries;
 
   /**
+   * The application port number (type: uint32_t)
+   */
+  GNUNET_MESH_ApplicationType app_port;
+
+  /**
    * The session id associated with this stream connection
    * FIXME: Not used currently, may be removed
    */
@@ -328,6 +337,7 @@
 
   /**
    * The service port
+   * FIXME: Remove if not required!
    */
   GNUNET_MESH_ApplicationType port;
 };
@@ -1314,6 +1324,12 @@
   struct GNUNET_STREAM_HelloAckMessage *reply;
 
   GNUNET_assert (socket->tunnel == tunnel);
+
+  /* Catch possible protocol breaks */
+  GNUNET_break_op (0 != memcmp (&socket->other_peer, 
+                                sender,
+                                sizeof (struct GNUNET_PeerIdentity)));
+
   if (STATE_INIT == socket->state)
     {
       /* Get the random sequence number */
@@ -1791,6 +1807,78 @@
 }
 
 
+/**
+ * Method called whenever a peer creates a tunnel to us
+ *
+ * @param cls closure
+ * @param tunnel new handle to the tunnel
+ * @param initiator peer that started the tunnel
+ * @param atsi performance information for the tunnel
+ * @return initial tunnel context for the tunnel
+ *         (can be NULL -- that's not an error)
+ */
+static void *
+new_tunnel_notify (void *cls,
+                   struct GNUNET_MESH_Tunnel *tunnel,
+                   const struct GNUNET_PeerIdentity *initiator,
+                   const struct GNUNET_ATS_Information *atsi)
+{
+  struct GNUNET_STREAM_ListenSocket *lsocket = cls;
+  struct GNUNET_STREAM_Socket *socket;
+
+  socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
+  socket->tunnel = tunnel;
+  socket->session_id = 0;       /* FIXME */
+  socket->other_peer = *initiator;
+  socket->state = STATE_INIT;
+
+  if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
+                                           socket,
+                                           &socket->other_peer))
+    {
+      socket->state = STATE_CLOSED;
+      /* FIXME: Send CLOSE message and then free */
+      GNUNET_free (socket);
+      GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
+    }
+  return socket;
+}
+
+
+/**
+ * Function called whenever an inbound tunnel is destroyed.  Should clean up
+ * any associated state.  This function is NOT called if the client has
+ * explicitly asked for the tunnel to be destroyed using
+ * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
+ * the tunnel.
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end (henceforth invalid)
+ * @param tunnel_ctx place where local state associated
+ *                   with the tunnel is stored
+ */
+static void 
+tunnel_cleaner (void *cls,
+                const struct GNUNET_MESH_Tunnel *tunnel,
+                void *tunnel_ctx)
+{
+  struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
+  
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Peer %s has terminated connection abruptly\n",
+              GNUNET_i2s (&socket->other_peer));
+
+  socket->status = GNUNET_STREAM_SHUTDOWN;
+  /* Clear Transmit handles */
+  if (NULL != socket->transmit_handle)
+    {
+      GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+      socket->transmit_handle = NULL;
+    }
+  socket->tunnel = NULL;
+}
+
+
 /*****************/
 /* API functions */
 /*****************/
@@ -1820,6 +1908,7 @@
   struct GNUNET_STREAM_Socket *socket;
   enum GNUNET_STREAM_Option option;
   va_list vargs;                /* Variable arguments */
+  GNUNET_MESH_ApplicationType no_port;
 
   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
   socket->other_peer = *target;
@@ -1845,15 +1934,19 @@
       }
   } while (GNUNET_STREAM_OPTION_END != option);
   va_end (vargs);               /* End of variable args parsing */
-
+  no_port = 0;
   socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
                                       1,  /* QUEUE size as parameter? */
                                       socket, /* cls */
                                       NULL, /* No inbound tunnel handler */
                                       NULL, /* No inbound tunnel cleaner */
                                       client_message_handlers,
-                                      NULL); /* We don't get inbound tunnels */
-  // FIXME: if (NULL == socket->mesh) ...
+                                      &no_port); /* We don't get inbound 
tunnels */
+  if (NULL == socket->mesh)   /* Fail if we cannot connect to mesh */
+    {
+      GNUNET_free (socket);
+      return NULL;
+    }
 
   /* Now create the mesh tunnel to target */
   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
@@ -1868,6 +1961,20 @@
 
 
 /**
+ * Shutdown the stream for reading or writing (man 2 shutdown).
+ *
+ * @param socket the stream socket
+ * @param how SHUT_RD, SHUT_WR or SHUT_RDWR 
+ */
+void
+GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
+                       int how)
+{
+  return;
+}
+
+
+/**
  * Closes the stream
  *
  * @param socket the stream socket
@@ -1926,78 +2033,6 @@
 
 
 /**
- * Method called whenever a peer creates a tunnel to us
- *
- * @param cls closure
- * @param tunnel new handle to the tunnel
- * @param initiator peer that started the tunnel
- * @param atsi performance information for the tunnel
- * @return initial tunnel context for the tunnel
- *         (can be NULL -- that's not an error)
- */
-static void *
-new_tunnel_notify (void *cls,
-                   struct GNUNET_MESH_Tunnel *tunnel,
-                   const struct GNUNET_PeerIdentity *initiator,
-                   const struct GNUNET_ATS_Information *atsi)
-{
-  struct GNUNET_STREAM_ListenSocket *lsocket = cls;
-  struct GNUNET_STREAM_Socket *socket;
-
-  socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
-  socket->tunnel = tunnel;
-  socket->session_id = 0;       /* FIXME */
-  socket->other_peer = *initiator;
-  socket->state = STATE_INIT;
-
-  if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
-                                           socket,
-                                           &socket->other_peer))
-    {
-      socket->state = STATE_CLOSED;
-      /* FIXME: Send CLOSE message and then free */
-      GNUNET_free (socket);
-      GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
-    }
-  return socket;
-}
-
-
-/**
- * Function called whenever an inbound tunnel is destroyed.  Should clean up
- * any associated state.  This function is NOT called if the client has
- * explicitly asked for the tunnel to be destroyed using
- * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
- * the tunnel.
- *
- * @param cls closure (set from GNUNET_MESH_connect)
- * @param tunnel connection to the other end (henceforth invalid)
- * @param tunnel_ctx place where local state associated
- *                   with the tunnel is stored
- */
-static void 
-tunnel_cleaner (void *cls,
-                const struct GNUNET_MESH_Tunnel *tunnel,
-                void *tunnel_ctx)
-{
-  struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
-  
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Peer %s has terminated connection abruptly\n",
-              GNUNET_i2s (&socket->other_peer));
-
-  socket->status = GNUNET_STREAM_SHUTDOWN;
-  /* Clear Transmit handles */
-  if (NULL != socket->transmit_handle)
-    {
-      GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
-      socket->transmit_handle = NULL;
-    }
-  socket->tunnel = NULL;
-}
-
-
-/**
  * Listens for stream connections for a specific application ports
  *
  * @param cfg the configuration to use
@@ -2178,3 +2213,27 @@
                                                                socket);
   return read_handle;
 }
+
+
+/**
+ * Cancel pending write operation.
+ *
+ * @param ioh handle to operation to cancel
+ */
+void
+GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
+{
+  return;
+}
+
+
+/**
+ * Cancel pending read operation.
+ *
+ * @param ioh handle to operation to cancel
+ */
+void
+GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
+{
+  return;
+}

Modified: gnunet/src/stream/test_stream_local.c
===================================================================
--- gnunet/src/stream/test_stream_local.c       2012-03-03 14:48:28 UTC (rev 
20224)
+++ gnunet/src/stream/test_stream_local.c       2012-03-03 14:57:24 UTC (rev 
20225)
@@ -30,6 +30,7 @@
 #include "gnunet_util_lib.h"
 #include "gnunet_mesh_service.h"
 #include "gnunet_stream_lib.h"
+#include "gnunet_testing_lib.h"
 
 #define VERBOSE 1
 
@@ -44,11 +45,16 @@
   struct GNUNET_STREAM_Socket *socket;
 
   /**
-   * Peer's io handle
+   * Peer's io write handle
    */
-  struct GNUNET_STREAM_IOHandle *io_handle;
+  struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
 
   /**
+   * Peer's io read handle
+   */
+  struct GNUNET_STREAM_IOReadHandle *io_read_handle;
+
+  /**
    * Bytes the peer has written
    */
   unsigned int bytes_wrote;
@@ -63,6 +69,7 @@
 static struct PeerData peer1;
 static struct PeerData peer2;
 static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
+static struct GNUNET_CONFIGURATION_Handle *config;
 
 static GNUNET_SCHEDULER_TaskIdentifier abort_task;
 static GNUNET_SCHEDULER_TaskIdentifier test_task;
@@ -78,7 +85,8 @@
 do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   GNUNET_STREAM_close (peer1.socket);
-  GNUNET_STREAM_close (peer2.socket);
+  if (NULL != peer2.socket)
+    GNUNET_STREAM_close (peer2.socket);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
   if (0 != abort_task)
   {
@@ -90,6 +98,8 @@
     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n");
+  /* Free the duplicated configuration */
+  GNUNET_CONFIGURATION_destroy (config);
   GNUNET_assert (GNUNET_OK == GNUNET_OS_process_wait (arm_pid));
   GNUNET_OS_process_close (arm_pid);
 }
@@ -115,7 +125,23 @@
   do_shutdown (cls, tc);
 }
 
+/**
+ * Signature for input processor 
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read 
+ * @return number of bytes of processed from 'data' (any data remaining should 
be
+ *         given to the next time the read processor is called).
+ */
+static size_t
+input_processor (void *cls,
+                 enum GNUNET_STREAM_Status status,
+                 const void *input_data,
+                 size_t size);
 
+
 /**
  * The write completion function; called upon writing some data to stream or
  * upon error
@@ -138,26 +164,28 @@
 
   if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
     {
-      peer->io_handle = GNUNET_STREAM_write (peer->socket,
-                                             (void *) data,
-                                             strlen(data) - peer->bytes_wrote,
-                                             GNUNET_TIME_relative_multiply
-                                             (GNUNET_TIME_UNIT_SECONDS, 5),
-                                             &write_completion,
-                                             cls);
-      GNUNET_assert (NULL != peer->io_handle);
+      peer->io_write_handle =
+        GNUNET_STREAM_write (peer->socket,
+                             (void *) data,
+                             strlen(data) - peer->bytes_wrote,
+                             GNUNET_TIME_relative_multiply
+                             (GNUNET_TIME_UNIT_SECONDS, 5),
+                             &write_completion,
+                             cls);
+      GNUNET_assert (NULL != peer->io_write_handle);
     }
   else
     {
       if (&peer1 == peer)   /* Peer1 has finished writing; should read now */
         {
-          peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
-                                                peer->socket,
-                                                GNUNET_TIME_relative_multiply
-                                                (GNUNET_TIME_UNIT_SECONDS, 5),
-                                                &input_processor,
-                                                cls);
-          GNUNET_assert (NULL!=peer->io_handle);
+          peer->io_read_handle =
+            GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
+                                peer->socket,
+                                GNUNET_TIME_relative_multiply
+                                (GNUNET_TIME_UNIT_SECONDS, 5),
+                                &input_processor,
+                                cls);
+          GNUNET_assert (NULL!=peer->io_read_handle);
         }
     }
 }
@@ -175,18 +203,19 @@
 {
   struct PeerData *peer;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n");
   peer = (struct PeerData *) cls;
   peer->bytes_wrote = 0;
   GNUNET_assert (socket == peer1.socket);
   GNUNET_assert (socket == peer->socket);
-  peer->io_handle = GNUNET_STREAM_write (peer->socket, /* socket */
-                                         (void *) data, /* data */
-                                         strlen(data),
-                                         GNUNET_TIME_relative_multiply
-                                         (GNUNET_TIME_UNIT_SECONDS, 5),
-                                         &write_completion,
+  peer->io_write_handle = GNUNET_STREAM_write (peer->socket, /* socket */
+                                               (void *) data, /* data */
+                                               strlen(data),
+                                               GNUNET_TIME_relative_multiply
+                                               (GNUNET_TIME_UNIT_SECONDS, 5),
+                                               &write_completion,
                                          cls);
-  GNUNET_assert (NULL != peer->io_handle);
+  GNUNET_assert (NULL != peer->io_write_handle);
 }
 
 
@@ -210,7 +239,7 @@
 
   peer = (struct PeerData *) cls;
 
-  GNUNET_assert (GNUNET_STERAM_OK == status);
+  GNUNET_assert (GNUNET_STREAM_OK == status);
   GNUNET_assert (size < strlen (data));
   GNUNET_assert (strncmp ((const char *) data + peer->bytes_read, 
                           (const char *) input_data,
@@ -219,27 +248,28 @@
   
   if (peer->bytes_read < strlen (data))
     {
-      peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
-                                            peer->socket,
-                                            GNUNET_TIME_relative_multiply
-                                            (GNUNET_TIME_UNIT_SECONDS, 5),
-                                            &input_processor,
-                                            cls);
-      GNUNET_assert (NULL != peer->io_handle);
+      peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket 
*)
+                                                 peer->socket,
+                                                 GNUNET_TIME_relative_multiply
+                                                 (GNUNET_TIME_UNIT_SECONDS, 5),
+                                                 &input_processor,
+                                                 cls);
+      GNUNET_assert (NULL != peer->io_read_handle);
     }
   else 
     {
       if (&peer2 == peer)    /* Peer2 has completed reading; should write */
         {
           peer->bytes_wrote = 0;
-          peer->io_handle = GNUNET_STREAM_write ((struct GNUNET_STREAM_Socket 
*)
-                                                 peer->socket,
-                                                 (void *) data,
-                                                 strlen(data),
-                                                 GNUNET_TIME_relative_multiply
-                                                 (GNUNET_TIME_UNIT_SECONDS, 5),
-                                                 &write_completion,
-                                                 cls);
+          peer->io_write_handle = 
+            GNUNET_STREAM_write ((struct GNUNET_STREAM_Socket *)
+                                 peer->socket,
+                                 (void *) data,
+                                 strlen(data),
+                                 GNUNET_TIME_relative_multiply
+                                 (GNUNET_TIME_UNIT_SECONDS, 5),
+                                 &write_completion,
+                                 cls);
         }
       else                      /* Peer1 has completed reading. End of tests */
         {
@@ -261,12 +291,13 @@
   GNUNET_assert (NULL != cls);
   peer2.bytes_read = 0;
   GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
-  peer2.io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls,
-                                        GNUNET_TIME_relative_multiply
-                                        (GNUNET_TIME_UNIT_SECONDS, 5),
-                                        &input_processor,
-                                        (void *) &peer2);
-  GNUNET_assert (NULL != peer2.io_handle);
+  peer2.io_read_handle =
+    GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls,
+                        GNUNET_TIME_relative_multiply
+                        (GNUNET_TIME_UNIT_SECONDS, 5),
+                        &input_processor,
+                        (void *) &peer2);
+  GNUNET_assert (NULL != peer2.io_read_handle);
 }
 
 
@@ -289,6 +320,9 @@
   GNUNET_assert (NULL == initiator);   /* Local peer=NULL? */
   GNUNET_assert (socket != peer1.socket);
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Peer connected: %s\n", GNUNET_i2s(initiator));
+
   peer2.socket = socket;
   read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket);
   return GNUNET_OK;
@@ -297,23 +331,33 @@
 
 /**
  * Testing function
+ *
+ * @param cls NULL
+ * @param tc the task context
  */
 static void
 test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
+  struct GNUNET_PeerIdentity self;
+
   test_task = GNUNET_SCHEDULER_NO_TASK;
+  /* Get our identity */
+  GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config,
+                                                                &self));
 
+  peer2_listen_socket = GNUNET_STREAM_listen (config,
+                                              10, /* App port */
+                                              &stream_listen_cb,
+                                              NULL);
+  GNUNET_assert (NULL != peer2_listen_socket);
+
   /* Connect to stream library */
-  peer1.socket = GNUNET_STREAM_open (NULL,         /* Null for local peer? */
+  peer1.socket = GNUNET_STREAM_open (config,
+                                     &self,         /* Null for local peer? */
                                      10,           /* App port */
                                      &stream_open_cb,
                                      (void *) &peer1);
-  GNUNET_assert (NULL != peer1.socket);
-  peer2_listen_socket = GNUNET_STREAM_listen (10 /* App port */
-                                              &stream_listen_cb,
-                                              NULL);
-  GNUNET_assert (NULL != peer2_listen_socket);
-                  
+  GNUNET_assert (NULL != peer1.socket);                  
 }
 
 /**
@@ -330,6 +374,8 @@
                     "WARNING",
 #endif
                     NULL);
+   /* Duplicate the configuration */
+   config = GNUNET_CONFIGURATION_dup (cfg);
    arm_pid =
      GNUNET_OS_start_process (GNUNET_YES, NULL, NULL, "gnunet-service-arm",
                               "gnunet-service-arm",
@@ -342,9 +388,9 @@
      GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
                                    (GNUNET_TIME_UNIT_SECONDS, 20), &do_abort,
                                     NULL);
+   
+   test_task = GNUNET_SCHEDULER_add_now (&test, NULL);
 
-   test_task = GNUNET_SCHEDULER_add_now (&test, (void *) cfg);
-
 }
 
 /**
@@ -355,7 +401,7 @@
   int ret;
 
   char *const argv2[] = { "test-stream-local",
-                          "-c", "test_stream.conf",
+                          "-c", "test_stream_local.conf",
 #if VERBOSE
                           "-L", "DEBUG",
 #endif

Modified: gnunet/src/stream/test_stream_local.conf
===================================================================
--- gnunet/src/stream/test_stream_local.conf    2012-03-03 14:48:28 UTC (rev 
20224)
+++ gnunet/src/stream/test_stream_local.conf    2012-03-03 14:57:24 UTC (rev 
20225)
@@ -59,8 +59,8 @@
 HOSTKEY = $SERVICEHOME/.hostkey
 
 [PATHS]
-DEFAULTCONFIG = test_mesh.conf
-SERVICEHOME = /tmp/test-mesh/
+DEFAULTCONFIG = test_stream_local.conf
+SERVICEHOME = /tmp/test-stream/
 
 [dns]
 AUTOSTART = NO




reply via email to

[Prev in Thread] Current Thread [Next in Thread]