[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r20225 - gnunet/src/stream
From: |
gnunet |
Subject: |
[GNUnet-SVN] r20225 - gnunet/src/stream |
Date: |
Sat, 3 Mar 2012 15:57:24 +0100 |
Author: harsha
Date: 2012-03-03 15:57:24 +0100 (Sat, 03 Mar 2012)
New Revision: 20225
Modified:
gnunet/src/stream/Makefile.am
gnunet/src/stream/stream_api.c
gnunet/src/stream/test_stream_local.c
gnunet/src/stream/test_stream_local.conf
Log:
-modified testcase according to modified interface
Modified: gnunet/src/stream/Makefile.am
===================================================================
--- gnunet/src/stream/Makefile.am 2012-03-03 14:48:28 UTC (rev 20224)
+++ gnunet/src/stream/Makefile.am 2012-03-03 14:57:24 UTC (rev 20225)
@@ -33,7 +33,8 @@
test_stream_local.c
test_stream_local_LDADD = \
$(top_builddir)/src/stream/libgnunetstream.la \
- $(top_builddir)/src/util/libgnunetutil.la
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la
#test_stream_halfclose_SOURCES = \
# test_stream_halfclose.c
Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c 2012-03-03 14:48:28 UTC (rev 20224)
+++ gnunet/src/stream/stream_api.c 2012-03-03 14:57:24 UTC (rev 20225)
@@ -18,6 +18,10 @@
Boston, MA 02111-1307, USA.
*/
+/* TODO:
+ * Checks for matching the sender and socket->other_peer in server
+ * message handlers */
+
/**
* @file stream/stream_api.c
* @brief Implementation of the stream library
@@ -257,6 +261,11 @@
unsigned int retries;
/**
+ * The application port number (type: uint32_t)
+ */
+ GNUNET_MESH_ApplicationType app_port;
+
+ /**
* The session id associated with this stream connection
* FIXME: Not used currently, may be removed
*/
@@ -328,6 +337,7 @@
/**
* The service port
+ * FIXME: Remove if not required!
*/
GNUNET_MESH_ApplicationType port;
};
@@ -1314,6 +1324,12 @@
struct GNUNET_STREAM_HelloAckMessage *reply;
GNUNET_assert (socket->tunnel == tunnel);
+
+ /* Catch possible protocol breaks */
+ GNUNET_break_op (0 != memcmp (&socket->other_peer,
+ sender,
+ sizeof (struct GNUNET_PeerIdentity)));
+
if (STATE_INIT == socket->state)
{
/* Get the random sequence number */
@@ -1791,6 +1807,78 @@
}
+/**
+ * Method called whenever a peer creates a tunnel to us
+ *
+ * @param cls closure
+ * @param tunnel new handle to the tunnel
+ * @param initiator peer that started the tunnel
+ * @param atsi performance information for the tunnel
+ * @return initial tunnel context for the tunnel
+ * (can be NULL -- that's not an error)
+ */
+static void *
+new_tunnel_notify (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *initiator,
+ const struct GNUNET_ATS_Information *atsi)
+{
+ struct GNUNET_STREAM_ListenSocket *lsocket = cls;
+ struct GNUNET_STREAM_Socket *socket;
+
+ socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
+ socket->tunnel = tunnel;
+ socket->session_id = 0; /* FIXME */
+ socket->other_peer = *initiator;
+ socket->state = STATE_INIT;
+
+ if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
+ socket,
+ &socket->other_peer))
+ {
+ socket->state = STATE_CLOSED;
+ /* FIXME: Send CLOSE message and then free */
+ GNUNET_free (socket);
+ GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
+ }
+ return socket;
+}
+
+
+/**
+ * Function called whenever an inbound tunnel is destroyed. Should clean up
+ * any associated state. This function is NOT called if the client has
+ * explicitly asked for the tunnel to be destroyed using
+ * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
+ * the tunnel.
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end (henceforth invalid)
+ * @param tunnel_ctx place where local state associated
+ * with the tunnel is stored
+ */
+static void
+tunnel_cleaner (void *cls,
+ const struct GNUNET_MESH_Tunnel *tunnel,
+ void *tunnel_ctx)
+{
+ struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer %s has terminated connection abruptly\n",
+ GNUNET_i2s (&socket->other_peer));
+
+ socket->status = GNUNET_STREAM_SHUTDOWN;
+ /* Clear Transmit handles */
+ if (NULL != socket->transmit_handle)
+ {
+ GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+ socket->transmit_handle = NULL;
+ }
+ socket->tunnel = NULL;
+}
+
+
/*****************/
/* API functions */
/*****************/
@@ -1820,6 +1908,7 @@
struct GNUNET_STREAM_Socket *socket;
enum GNUNET_STREAM_Option option;
va_list vargs; /* Variable arguments */
+ GNUNET_MESH_ApplicationType no_port;
socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
socket->other_peer = *target;
@@ -1845,15 +1934,19 @@
}
} while (GNUNET_STREAM_OPTION_END != option);
va_end (vargs); /* End of variable args parsing */
-
+ no_port = 0;
socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
1, /* QUEUE size as parameter? */
socket, /* cls */
NULL, /* No inbound tunnel handler */
NULL, /* No inbound tunnel cleaner */
client_message_handlers,
- NULL); /* We don't get inbound tunnels */
- // FIXME: if (NULL == socket->mesh) ...
+ &no_port); /* We don't get inbound
tunnels */
+ if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */
+ {
+ GNUNET_free (socket);
+ return NULL;
+ }
/* Now create the mesh tunnel to target */
socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
@@ -1868,6 +1961,20 @@
/**
+ * Shutdown the stream for reading or writing (man 2 shutdown).
+ *
+ * @param socket the stream socket
+ * @param how SHUT_RD, SHUT_WR or SHUT_RDWR
+ */
+void
+GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
+ int how)
+{
+ return;
+}
+
+
+/**
* Closes the stream
*
* @param socket the stream socket
@@ -1926,78 +2033,6 @@
/**
- * Method called whenever a peer creates a tunnel to us
- *
- * @param cls closure
- * @param tunnel new handle to the tunnel
- * @param initiator peer that started the tunnel
- * @param atsi performance information for the tunnel
- * @return initial tunnel context for the tunnel
- * (can be NULL -- that's not an error)
- */
-static void *
-new_tunnel_notify (void *cls,
- struct GNUNET_MESH_Tunnel *tunnel,
- const struct GNUNET_PeerIdentity *initiator,
- const struct GNUNET_ATS_Information *atsi)
-{
- struct GNUNET_STREAM_ListenSocket *lsocket = cls;
- struct GNUNET_STREAM_Socket *socket;
-
- socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
- socket->tunnel = tunnel;
- socket->session_id = 0; /* FIXME */
- socket->other_peer = *initiator;
- socket->state = STATE_INIT;
-
- if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
- socket,
- &socket->other_peer))
- {
- socket->state = STATE_CLOSED;
- /* FIXME: Send CLOSE message and then free */
- GNUNET_free (socket);
- GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
- }
- return socket;
-}
-
-
-/**
- * Function called whenever an inbound tunnel is destroyed. Should clean up
- * any associated state. This function is NOT called if the client has
- * explicitly asked for the tunnel to be destroyed using
- * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
- * the tunnel.
- *
- * @param cls closure (set from GNUNET_MESH_connect)
- * @param tunnel connection to the other end (henceforth invalid)
- * @param tunnel_ctx place where local state associated
- * with the tunnel is stored
- */
-static void
-tunnel_cleaner (void *cls,
- const struct GNUNET_MESH_Tunnel *tunnel,
- void *tunnel_ctx)
-{
- struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Peer %s has terminated connection abruptly\n",
- GNUNET_i2s (&socket->other_peer));
-
- socket->status = GNUNET_STREAM_SHUTDOWN;
- /* Clear Transmit handles */
- if (NULL != socket->transmit_handle)
- {
- GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
- socket->transmit_handle = NULL;
- }
- socket->tunnel = NULL;
-}
-
-
-/**
* Listens for stream connections for a specific application ports
*
* @param cfg the configuration to use
@@ -2178,3 +2213,27 @@
socket);
return read_handle;
}
+
+
+/**
+ * Cancel pending write operation.
+ *
+ * @param ioh handle to operation to cancel
+ */
+void
+GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
+{
+ return;
+}
+
+
+/**
+ * Cancel pending read operation.
+ *
+ * @param ioh handle to operation to cancel
+ */
+void
+GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
+{
+ return;
+}
Modified: gnunet/src/stream/test_stream_local.c
===================================================================
--- gnunet/src/stream/test_stream_local.c 2012-03-03 14:48:28 UTC (rev
20224)
+++ gnunet/src/stream/test_stream_local.c 2012-03-03 14:57:24 UTC (rev
20225)
@@ -30,6 +30,7 @@
#include "gnunet_util_lib.h"
#include "gnunet_mesh_service.h"
#include "gnunet_stream_lib.h"
+#include "gnunet_testing_lib.h"
#define VERBOSE 1
@@ -44,11 +45,16 @@
struct GNUNET_STREAM_Socket *socket;
/**
- * Peer's io handle
+ * Peer's io write handle
*/
- struct GNUNET_STREAM_IOHandle *io_handle;
+ struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
/**
+ * Peer's io read handle
+ */
+ struct GNUNET_STREAM_IOReadHandle *io_read_handle;
+
+ /**
* Bytes the peer has written
*/
unsigned int bytes_wrote;
@@ -63,6 +69,7 @@
static struct PeerData peer1;
static struct PeerData peer2;
static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
+static struct GNUNET_CONFIGURATION_Handle *config;
static GNUNET_SCHEDULER_TaskIdentifier abort_task;
static GNUNET_SCHEDULER_TaskIdentifier test_task;
@@ -78,7 +85,8 @@
do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
GNUNET_STREAM_close (peer1.socket);
- GNUNET_STREAM_close (peer2.socket);
+ if (NULL != peer2.socket)
+ GNUNET_STREAM_close (peer2.socket);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
if (0 != abort_task)
{
@@ -90,6 +98,8 @@
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n");
+ /* Free the duplicated configuration */
+ GNUNET_CONFIGURATION_destroy (config);
GNUNET_assert (GNUNET_OK == GNUNET_OS_process_wait (arm_pid));
GNUNET_OS_process_close (arm_pid);
}
@@ -115,7 +125,23 @@
do_shutdown (cls, tc);
}
+/**
+ * Signature for input processor
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read
+ * @return number of bytes of processed from 'data' (any data remaining should
be
+ * given to the next time the read processor is called).
+ */
+static size_t
+input_processor (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *input_data,
+ size_t size);
+
/**
* The write completion function; called upon writing some data to stream or
* upon error
@@ -138,26 +164,28 @@
if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
{
- peer->io_handle = GNUNET_STREAM_write (peer->socket,
- (void *) data,
- strlen(data) - peer->bytes_wrote,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &write_completion,
- cls);
- GNUNET_assert (NULL != peer->io_handle);
+ peer->io_write_handle =
+ GNUNET_STREAM_write (peer->socket,
+ (void *) data,
+ strlen(data) - peer->bytes_wrote,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &write_completion,
+ cls);
+ GNUNET_assert (NULL != peer->io_write_handle);
}
else
{
if (&peer1 == peer) /* Peer1 has finished writing; should read now */
{
- peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
- peer->socket,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &input_processor,
- cls);
- GNUNET_assert (NULL!=peer->io_handle);
+ peer->io_read_handle =
+ GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
+ peer->socket,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &input_processor,
+ cls);
+ GNUNET_assert (NULL!=peer->io_read_handle);
}
}
}
@@ -175,18 +203,19 @@
{
struct PeerData *peer;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n");
peer = (struct PeerData *) cls;
peer->bytes_wrote = 0;
GNUNET_assert (socket == peer1.socket);
GNUNET_assert (socket == peer->socket);
- peer->io_handle = GNUNET_STREAM_write (peer->socket, /* socket */
- (void *) data, /* data */
- strlen(data),
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &write_completion,
+ peer->io_write_handle = GNUNET_STREAM_write (peer->socket, /* socket */
+ (void *) data, /* data */
+ strlen(data),
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &write_completion,
cls);
- GNUNET_assert (NULL != peer->io_handle);
+ GNUNET_assert (NULL != peer->io_write_handle);
}
@@ -210,7 +239,7 @@
peer = (struct PeerData *) cls;
- GNUNET_assert (GNUNET_STERAM_OK == status);
+ GNUNET_assert (GNUNET_STREAM_OK == status);
GNUNET_assert (size < strlen (data));
GNUNET_assert (strncmp ((const char *) data + peer->bytes_read,
(const char *) input_data,
@@ -219,27 +248,28 @@
if (peer->bytes_read < strlen (data))
{
- peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
- peer->socket,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &input_processor,
- cls);
- GNUNET_assert (NULL != peer->io_handle);
+ peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket
*)
+ peer->socket,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &input_processor,
+ cls);
+ GNUNET_assert (NULL != peer->io_read_handle);
}
else
{
if (&peer2 == peer) /* Peer2 has completed reading; should write */
{
peer->bytes_wrote = 0;
- peer->io_handle = GNUNET_STREAM_write ((struct GNUNET_STREAM_Socket
*)
- peer->socket,
- (void *) data,
- strlen(data),
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &write_completion,
- cls);
+ peer->io_write_handle =
+ GNUNET_STREAM_write ((struct GNUNET_STREAM_Socket *)
+ peer->socket,
+ (void *) data,
+ strlen(data),
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &write_completion,
+ cls);
}
else /* Peer1 has completed reading. End of tests */
{
@@ -261,12 +291,13 @@
GNUNET_assert (NULL != cls);
peer2.bytes_read = 0;
GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
- peer2.io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &input_processor,
- (void *) &peer2);
- GNUNET_assert (NULL != peer2.io_handle);
+ peer2.io_read_handle =
+ GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &input_processor,
+ (void *) &peer2);
+ GNUNET_assert (NULL != peer2.io_read_handle);
}
@@ -289,6 +320,9 @@
GNUNET_assert (NULL == initiator); /* Local peer=NULL? */
GNUNET_assert (socket != peer1.socket);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer connected: %s\n", GNUNET_i2s(initiator));
+
peer2.socket = socket;
read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket);
return GNUNET_OK;
@@ -297,23 +331,33 @@
/**
* Testing function
+ *
+ * @param cls NULL
+ * @param tc the task context
*/
static void
test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ struct GNUNET_PeerIdentity self;
+
test_task = GNUNET_SCHEDULER_NO_TASK;
+ /* Get our identity */
+ GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config,
+ &self));
+ peer2_listen_socket = GNUNET_STREAM_listen (config,
+ 10, /* App port */
+ &stream_listen_cb,
+ NULL);
+ GNUNET_assert (NULL != peer2_listen_socket);
+
/* Connect to stream library */
- peer1.socket = GNUNET_STREAM_open (NULL, /* Null for local peer? */
+ peer1.socket = GNUNET_STREAM_open (config,
+ &self, /* Null for local peer? */
10, /* App port */
&stream_open_cb,
(void *) &peer1);
- GNUNET_assert (NULL != peer1.socket);
- peer2_listen_socket = GNUNET_STREAM_listen (10 /* App port */
- &stream_listen_cb,
- NULL);
- GNUNET_assert (NULL != peer2_listen_socket);
-
+ GNUNET_assert (NULL != peer1.socket);
}
/**
@@ -330,6 +374,8 @@
"WARNING",
#endif
NULL);
+ /* Duplicate the configuration */
+ config = GNUNET_CONFIGURATION_dup (cfg);
arm_pid =
GNUNET_OS_start_process (GNUNET_YES, NULL, NULL, "gnunet-service-arm",
"gnunet-service-arm",
@@ -342,9 +388,9 @@
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 20), &do_abort,
NULL);
+
+ test_task = GNUNET_SCHEDULER_add_now (&test, NULL);
- test_task = GNUNET_SCHEDULER_add_now (&test, (void *) cfg);
-
}
/**
@@ -355,7 +401,7 @@
int ret;
char *const argv2[] = { "test-stream-local",
- "-c", "test_stream.conf",
+ "-c", "test_stream_local.conf",
#if VERBOSE
"-L", "DEBUG",
#endif
Modified: gnunet/src/stream/test_stream_local.conf
===================================================================
--- gnunet/src/stream/test_stream_local.conf 2012-03-03 14:48:28 UTC (rev
20224)
+++ gnunet/src/stream/test_stream_local.conf 2012-03-03 14:57:24 UTC (rev
20225)
@@ -59,8 +59,8 @@
HOSTKEY = $SERVICEHOME/.hostkey
[PATHS]
-DEFAULTCONFIG = test_mesh.conf
-SERVICEHOME = /tmp/test-mesh/
+DEFAULTCONFIG = test_stream_local.conf
+SERVICEHOME = /tmp/test-stream/
[dns]
AUTOSTART = NO
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r20225 - gnunet/src/stream,
gnunet <=