gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated (98fead2b7 -> 0e73a0143)


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated (98fead2b7 -> 0e73a0143)
Date: Tue, 31 Jan 2017 05:09:08 +0100

This is an automated email from the git hooks/post-receive script.

bart-polot pushed a change to branch master
in repository gnunet.

    from 98fead2b7 fair, global message buffer implemented
     new b2c3389e8 CADET MQ API documentation improvements
     new bc38effee Implement the connect and create_channel call for mq api
     new ae5354d85 Add note for clarification
     new 18197ec85 Implementation of port opening and handling in MQ
     new 7a469336d Fix free of NULL pointer
     new bdfdbcc9d Implement channel cleanup in MQ API, simplify destroy channel
     new e6f317e83 Refactor reconnect code
     new 9112c6ee1 Reconnect to service instead of aborting on a malformed data 
message
     new 0e73a0143 Implement incoming traffic handling on MQ

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/cadet/cadet_api.c              | 573 ++++++++++++++++++++++++++++++++-----
 src/include/gnunet_cadet_service.h |  45 ++-
 2 files changed, 517 insertions(+), 101 deletions(-)

diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c
index 2b50f781c..7640a924a 100644
--- a/src/cadet/cadet_api.c
+++ b/src/cadet/cadet_api.c
@@ -38,6 +38,8 @@
 
 /**
  * Transmission queue to the service
+ *
+ * @deprecated
  */
 struct GNUNET_CADET_TransmitHandle
 {
@@ -117,17 +119,26 @@ union CadetInfoCB
 struct GNUNET_CADET_Handle
 {
   /**
+   * Flag to indicate old or MQ API.
+   */
+  int mq_api;
+
+  /**
    * Message queue (if available).
    */
   struct GNUNET_MQ_Handle *mq;
 
   /**
    * Set of handlers used for processing incoming messages in the channels
+   *
+   * @deprecated
    */
   const struct GNUNET_CADET_MessageHandler *message_handlers;
 
   /**
    * Number of handlers in the handlers array.
+   *
+   * @deprecated
    */
   unsigned int n_handlers;
 
@@ -153,16 +164,22 @@ struct GNUNET_CADET_Handle
 
   /**
    * Closure for all the handlers given by the client
+   *
+   * @deprecated
    */
   void *cls;
 
   /**
    * Messages to send to the service, head.
+   *
+   * @deprecated
    */
   struct GNUNET_CADET_TransmitHandle *th_head;
 
   /**
    * Messages to send to the service, tail.
+   *
+   * @deprecated
    */
   struct GNUNET_CADET_TransmitHandle *th_tail;
 
@@ -241,9 +258,9 @@ struct GNUNET_CADET_Channel
   struct GNUNET_CADET_ClientChannelNumber ccn;
 
   /**
-   * Channel's port, if any.
+   * Channel's port, if incoming.
    */
-  struct GNUNET_CADET_Port *port;
+  struct GNUNET_CADET_Port *incoming_port;
 
   /**
    * Other end of the channel.
@@ -262,9 +279,27 @@ struct GNUNET_CADET_Channel
 
   /**
    * Are we allowed to send to the service?
+   *
+   * @deprecated?
    */
   unsigned int allow_send;
 
+  /*****************************    MQ     
************************************/
+  /**
+   * Message Queue for the channel.
+   */
+  struct GNUNET_MQ_Handle *mq;
+
+  /**
+   * Window change handler.
+   */
+  GNUNET_CADET_WindowSizeEventHandler window_changes;
+
+  /**
+   * Disconnect handler.
+   */
+  GNUNET_CADET_DisconnectEventHandler disconnects;
+
 };
 
 
@@ -292,6 +327,38 @@ struct GNUNET_CADET_Port
    * Closure for @a handler.
    */
   void *cls;
+
+  /*****************************    MQ     
************************************/
+
+  /**
+   * Port "number"
+   */
+  struct GNUNET_HashCode id;
+
+  /**
+   * Handler for incoming channels on this port
+   */
+  GNUNET_CADET_ConnectEventHandler connects;
+
+  /**
+   * Closure for @ref connects
+   */
+  void * connects_cls;
+
+  /**
+   * Window size change handler.
+   */
+  GNUNET_CADET_WindowSizeEventHandler window_changes;
+
+  /**
+   * Handler called when an incoming channel is destroyed..
+   */
+  GNUNET_CADET_DisconnectEventHandler disconnects;
+
+  /**
+   * Payload handlers for incoming channels.
+   */
+  const struct GNUNET_MQ_MessageHandler *handlers;
 };
 
 
@@ -313,6 +380,41 @@ struct CadetMQState
 };
 
 
+
+/******************************************************************************/
+/*********************      FUNCTION DECLARATIONS     
*************************/
+/******************************************************************************/
+
+/**
+ * Reconnect to the service, retransmit all infomation to try to restore the
+ * original state.
+ *
+ * @param h Handle to the CADET service.
+ */
+static void
+schedule_reconnect (struct GNUNET_CADET_Handle *h);
+
+
+/**
+ * Reconnect callback: tries to reconnect again after a failer previous
+ * reconnection.
+ *
+ * @param cls Closure (cadet handle).
+ */
+static void
+reconnect_cbk (void *cls);
+
+
+/**
+ * Reconnect to the service, retransmit all infomation to try to restore the
+ * original state.
+ *
+ * @param h handle to the cadet
+ */
+static void
+reconnect (struct GNUNET_CADET_Handle *h);
+
+
 
/******************************************************************************/
 /***********************     AUXILIARY FUNCTIONS      
*************************/
 
/******************************************************************************/
@@ -424,10 +526,8 @@ create_channel (struct GNUNET_CADET_Handle *h,
  *
  * @return Handle to the required channel or NULL if not found.
  */
-// FIXME: simplify: call_cleaner is always #GNUNET_YES!!!
 static void
-destroy_channel (struct GNUNET_CADET_Channel *ch,
-                 int call_cleaner)
+destroy_channel (struct GNUNET_CADET_Channel *ch)
 {
   struct GNUNET_CADET_Handle *h;
   struct GNUNET_CADET_TransmitHandle *th;
@@ -449,13 +549,28 @@ destroy_channel (struct GNUNET_CADET_Channel *ch,
                                ch);
 
   /* signal channel destruction */
-  if ( (NULL != h->cleaner) &&
-       (0 != ch->peer) &&
-       (GNUNET_YES == call_cleaner) )
+  if (0 != ch->peer)
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         " calling cleaner\n");
-    h->cleaner (h->cls, ch, ch->ctx);
+    if (NULL != h->cleaner)
+    {
+      /** @a deprecated  */
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+          " calling cleaner\n");
+      h->cleaner (h->cls, ch, ch->ctx);
+    }
+    else if (NULL != ch->disconnects)
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+          " calling disconnect handler\n");
+      ch->disconnects (ch->ctx, ch);
+    }
+    else
+    {
+      /* Application won't be aware of the channel destruction and use
+       * a pointer to free'd memory.
+       */
+      GNUNET_assert (0);
+    }
   }
 
   /* check that clients did not leave messages behind in the queue */
@@ -515,6 +630,114 @@ remove_from_queue (struct GNUNET_CADET_TransmitHandle *th)
 }
 
 
+/******************************************************************************/
+/***********************      MQ API CALLBACKS     
****************************/
+/******************************************************************************/
+
+
+/**
+ * Implement sending functionality of a message queue for
+ * us sending messages to a peer.
+ *
+ * Encapsulates the payload message in a #GNUNET_CADET_LocalData message
+ * in order to label the message with the channel ID and send the
+ * encapsulated message to the service.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state state of the implementation
+ */
+static void
+cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
+                    const struct GNUNET_MessageHeader *msg,
+                    void *impl_state)
+{
+  struct GNUNET_CADET_Channel *ch = impl_state;
+  struct GNUNET_CADET_Handle *h = ch->cadet;
+  uint16_t msize;
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_CADET_LocalData *cadet_msg;
+
+
+  if (NULL == h->mq)
+  {
+    /* We're currently reconnecting, pretend this worked */
+    GNUNET_MQ_impl_send_continue (mq);
+    return;
+  }
+
+  /* check message size for sanity */
+  msize = ntohs (msg->size);
+  if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    GNUNET_MQ_impl_send_continue (mq);
+    return;
+  }
+
+  env = GNUNET_MQ_msg_nested_mh (cadet_msg,
+                                 GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
+                                 msg);
+  cadet_msg->ccn = ch->ccn;
+  GNUNET_MQ_send (h->mq, env);
+  GNUNET_MQ_impl_send_continue (mq);
+}
+
+
+/**
+ * Handle destruction of a message queue.  Implementations must not
+ * free @a mq, but should take care of @a impl_state.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state state of the implementation
+ */
+static void
+cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
+                       void *impl_state)
+{
+  struct GNUNET_CADET_Channel *ch = impl_state;
+
+  GNUNET_assert (mq == ch->mq);
+  ch->mq = NULL;
+}
+
+
+/**
+ * We had an error processing a message we forwarded from a peer to
+ * the CADET service.  We should just complain about it but otherwise
+ * continue processing.
+ *
+ * @param cls closure
+ * @param error error code
+ */
+static void
+cadet_mq_error_handler (void *cls,
+                        enum GNUNET_MQ_Error error)
+{
+  GNUNET_break_op (0);
+}
+
+
+/**
+ * Implementation function that cancels the currently sent message.
+ * Should basically undo whatever #mq_send_impl() did.
+ *
+ * @param mq message queue
+ * @param impl_state state specific to the implementation
+ */
+static void
+cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
+                     void *impl_state)
+{
+  struct GNUNET_CADET_Channel *ch = impl_state;
+
+  LOG (GNUNET_ERROR_TYPE_WARNING,
+       "Cannot cancel mq message on channel %X of %p\n",
+       ch->ccn.channel_of_client, ch->cadet);
+
+  GNUNET_break (0);
+}
+
 
 
/******************************************************************************/
 /***********************      RECEIVE HANDLERS     
****************************/
@@ -589,15 +812,17 @@ handle_channel_created (void *cls,
   port = find_port (h, port_number);
   if (NULL == port)
   {
+    /* We could have closed the port but the service didn't know about it yet
+     * This is not an error.
+     */
     struct GNUNET_CADET_LocalChannelDestroyMessage *d_msg;
     struct GNUNET_MQ_Envelope *env;
 
     GNUNET_break (0);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "No handler for incoming channel %X [%s]\n",
+         "No handler for incoming channel %X (on port %s, recently closed?)\n",
          ntohl (ccn.channel_of_client),
          GNUNET_h2s (port_number));
-    /* FIXME: should disconnect instead, this is a serious error! */
     env = GNUNET_MQ_msg (d_msg,
                          GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_DESTROY);
     d_msg->ccn = msg->ccn;
@@ -611,19 +836,40 @@ handle_channel_created (void *cls,
   ch->peer = GNUNET_PEER_intern (&msg->peer);
   ch->cadet = h;
   ch->ccn = ccn;
-  ch->port = port;
+  ch->incoming_port = port;
   ch->options = ntohl (msg->opt);
-
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Creating incoming channel %X [%s] %p\n",
        ntohl (ccn.channel_of_client),
        GNUNET_h2s (port_number),
        ch);
-  ch->ctx = port->handler (port->cls,
-                           ch,
-                           &msg->peer,
-                           port->hash,
-                           ch->options);
+
+  if (NULL != port->handler)
+  {
+    /** @deprecated */
+    /* Old style API */
+    ch->ctx = port->handler (port->cls,
+                            ch,
+                            &msg->peer,
+                            port->hash,
+                            ch->options);
+  } else {
+    /* MQ API */
+    GNUNET_assert (NULL != port->connects);
+    ch->window_changes = port->window_changes;
+    ch->disconnects = port->disconnects;
+    ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
+                                            &cadet_mq_destroy_impl,
+                                            &cadet_mq_cancel_impl,
+                                            ch,
+                                            port->handlers,
+                                            &cadet_mq_error_handler,
+                                            ch);
+    ch->ctx = port->connects (port->cadet->cls,
+                              ch,
+                              &msg->peer);
+    GNUNET_MQ_set_handlers_closure (ch->mq, ch->ctx);
+  }
 }
 
 
@@ -655,8 +901,7 @@ handle_channel_destroy (void *cls,
          ntohl (ccn.channel_of_client));
     return;
   }
-  destroy_channel (ch,
-                   GNUNET_YES);
+  destroy_channel (ch);
 }
 
 
@@ -713,7 +958,12 @@ handle_local_data (void *cls,
 
   ch = retrieve_channel (h,
                          message->ccn);
-  GNUNET_assert (NULL != ch);
+  if (NULL == ch)
+  {
+    GNUNET_break_op (0);
+    reconnect (h);
+    return;
+  }
 
   payload = (struct GNUNET_MessageHeader *) &message[1];
   type = ntohs (payload->type);
@@ -725,6 +975,12 @@ handle_local_data (void *cls,
        ntohl (message->ccn.channel_of_client),
        GC_m2s (type),
        type);
+  if (NULL != ch->mq)
+  {
+    GNUNET_MQ_inject_message (ch->mq, payload);
+    return;
+  }
+  /** @a deprecated */
   for (unsigned i=0;i<h->n_handlers;i++)
   {
     handler = &h->message_handlers[i];
@@ -756,6 +1012,8 @@ handle_local_data (void *cls,
  *
  * @param h Cadet handle.
  * @param message Message itself.
+ *
+ * FIXME either delete or port to MQ
  */
 static void
 handle_local_ack (void *cls,
@@ -793,27 +1051,6 @@ handle_local_ack (void *cls,
   }
 }
 
-/**
- * Reconnect to the service, retransmit all infomation to try to restore the
- * original state.
- *
- * @param h handle to the cadet
- *
- * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service 
down...)
- */
-static void
-reconnect (struct GNUNET_CADET_Handle *h);
-
-
-/**
- * Reconnect callback: tries to reconnect again after a failer previous
- * reconnection.
- *
- * @param cls closure (cadet handle)
- */
-static void
-reconnect_cbk (void *cls);
-
 
 /**
  * Generic error handler, called with the appropriate error code and
@@ -1245,16 +1482,14 @@ handle_get_tunnel (void *cls,
 }
 
 
-
 /**
  * Reconnect to the service, retransmit all infomation to try to restore the
  * original state.
  *
  * @param h handle to the cadet
- * @return #GNUNET_YES in case of success, #GNUNET_NO otherwise (service 
down...)
  */
-static int
-do_reconnect (struct GNUNET_CADET_Handle *h)
+static void
+reconnect (struct GNUNET_CADET_Handle *h)
 {
   struct GNUNET_MQ_MessageHandler handlers[] = {
     GNUNET_MQ_hd_fixed_size (channel_created,
@@ -1289,16 +1524,28 @@ do_reconnect (struct GNUNET_CADET_Handle *h)
                            GNUNET_MESSAGE_TYPE_CADET_LOCAL_INFO_TUNNEL,
                            struct GNUNET_CADET_LocalInfoTunnel,
                            h),
-  // FIXME
+// FIXME
 //   GNUNET_MQ_hd_fixed_Y       size (channel_destroyed,
-//                            
GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_NACK_DEPRECATED,
-//                            struct GNUNET_CADET_ChannelDestroyMessage);
+//                        
GNUNET_MESSAGE_TYPE_CADET_CHANNEL_OPEN_NACK_DEPRECATED,
+//                        struct GNUNET_CADET_ChannelDestroyMessage);
     GNUNET_MQ_handler_end ()
   };
+  struct GNUNET_CADET_Channel *ch;
+
+  while (NULL != (ch = h->channels_head))
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Destroying channel due to a reconnect\n");
+    destroy_channel (ch);
+  }
 
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CADET\n");
 
-  GNUNET_assert (NULL == h->mq);
+  if (NULL != h->mq)
+  {
+    GNUNET_MQ_destroy (h->mq);
+    h->mq = NULL;
+  }
   h->mq = GNUNET_CLIENT_connect (h->cfg,
                                  "cadet",
                                  handlers,
@@ -1306,14 +1553,13 @@ do_reconnect (struct GNUNET_CADET_Handle *h)
                                  h);
   if (NULL == h->mq)
   {
-    reconnect (h);
-    return GNUNET_NO;
+    schedule_reconnect (h);
+    return;
   }
   else
   {
     h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
   }
-  return GNUNET_YES;
 }
 
 /**
@@ -1328,7 +1574,7 @@ reconnect_cbk (void *cls)
   struct GNUNET_CADET_Handle *h = cls;
 
   h->reconnect_task = NULL;
-  do_reconnect (h);
+  reconnect (h);
 }
 
 
@@ -1341,17 +1587,14 @@ reconnect_cbk (void *cls)
  * @return #GNUNET_YES in case of sucess, #GNUNET_NO otherwise (service 
down...)
  */
 static void
-reconnect (struct GNUNET_CADET_Handle *h)
+schedule_reconnect (struct GNUNET_CADET_Handle *h)
 {
-  struct GNUNET_CADET_Channel *ch;
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Requested RECONNECT, destroying all channels\n");
-  while (NULL != (ch = h->channels_head))
-    destroy_channel (ch, GNUNET_YES);
   if (NULL == h->reconnect_task)
+  {
     h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_time,
                                                       &reconnect_cbk, h);
+    h->reconnect_time = GNUNET_TIME_STD_BACKOFF (h->reconnect_time);
+  }
 }
 
 
@@ -1374,7 +1617,7 @@ GNUNET_CADET_connect (const struct 
GNUNET_CONFIGURATION_Handle *cfg,
   h->cfg = cfg;
   h->cleaner = cleaner;
   h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
-  do_reconnect (h);
+  reconnect (h);
   if (h->mq == NULL)
   {
     GNUNET_break (0);
@@ -1423,8 +1666,7 @@ GNUNET_CADET_disconnect (struct GNUNET_CADET_Handle 
*handle)
            "channel %X not destroyed\n",
            ntohl (ch->ccn.channel_of_client));
     }
-    destroy_channel (ch,
-                     GNUNET_YES);
+    destroy_channel (ch);
     ch = aux;
   }
   while (NULL != (th = handle->th_head))
@@ -1526,13 +1768,15 @@ GNUNET_CADET_close_port (struct GNUNET_CADET_Port *p)
 {
   struct GNUNET_CADET_PortMessage *msg;
   struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_HashCode *id;
 
   env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_CLOSE);
 
-  msg->port = *p->hash;
+  id = NULL != p->hash ? p->hash : &p->id;
+  msg->port = *id;
   GNUNET_MQ_send (p->cadet->mq, env);
-  GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, p->hash, p);
-  GNUNET_free (p->hash);
+  GNUNET_CONTAINER_multihashmap_remove (p->cadet->ports, id, p);
+  GNUNET_free_non_null (p->hash);
   GNUNET_free (p);
 }
 
@@ -1629,8 +1873,7 @@ GNUNET_CADET_channel_destroy (struct GNUNET_CADET_Channel 
*channel)
   GNUNET_MQ_send (h->mq,
                   env);
 
-  destroy_channel (channel,
-                   GNUNET_YES);
+  destroy_channel (channel);
 }
 
 
@@ -2047,9 +2290,9 @@ cadet_mq_ntr (void *cls, size_t size,
  * @param impl_state state of the implementation
  */
 static void
-cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
-                    const struct GNUNET_MessageHeader *msg,
-                    void *impl_state)
+cadet_mq_send_impl_old (struct GNUNET_MQ_Handle *mq,
+                        const struct GNUNET_MessageHeader *msg,
+                        void *impl_state)
 {
   struct CadetMQState *state = impl_state;
 
@@ -2075,8 +2318,8 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
  * @param impl_state state of the implementation
  */
 static void
-cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
-                       void *impl_state)
+cadet_mq_destroy_impl_old (struct GNUNET_MQ_Handle *mq,
+                           void *impl_state)
 {
   struct CadetMQState *state = impl_state;
 
@@ -2104,8 +2347,8 @@ GNUNET_CADET_mq_create (struct GNUNET_CADET_Channel 
*channel)
   state = GNUNET_new (struct CadetMQState);
   state->channel = channel;
 
-  mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
-                                      &cadet_mq_destroy_impl,
+  mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl_old,
+                                      &cadet_mq_destroy_impl_old,
                                       NULL, /* FIXME: cancel impl. */
                                       state,
                                       NULL, /* no msg handlers */
@@ -2136,3 +2379,177 @@ GC_u2h (uint32_t port)
 
   return &hash;
 }
+
+
+
+/******************************************************************************/
+/******************************* MQ-BASED API 
*********************************/
+/******************************************************************************/
+
+/**
+ * Connect to the MQ-based cadet service.
+ *
+ * @param cfg Configuration to use.
+ *
+ * @return Handle to the cadet service NULL on error.
+ */
+struct GNUNET_CADET_Handle *
+GNUNET_CADET_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+  struct GNUNET_CADET_Handle *h;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "GNUNET_CADET_connecT()\n");
+  h = GNUNET_new (struct GNUNET_CADET_Handle);
+  h->cfg = cfg;
+  h->mq_api = GNUNET_YES;
+  h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
+  reconnect (h);
+  if (NULL == h->mq)
+  {
+    GNUNET_break (0);
+    GNUNET_CADET_disconnect (h);
+    return NULL;
+  }
+  h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
+  h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
+  h->reconnect_task = NULL;
+
+  return h;
+}
+
+
+/**
+ * Open a port to receive incomming MQ-based channels.
+ *
+ * @param h CADET handle.
+ * @param port Hash identifying the port.
+ * @param connects Function called when an incoming channel is connected.
+ * @param connects_cls Closure for the @a connects handler.
+ * @param window_changes Function called when the transmit window size changes.
+ * @param disconnects Function called when a channel is disconnected.
+ * @param handlers Callbacks for messages we care about, NULL-terminated.
+ *
+ * @return Port handle.
+ */
+struct GNUNET_CADET_Port *
+GNUNET_CADET_open_porT (struct GNUNET_CADET_Handle *h,
+                        const struct GNUNET_HashCode *port,
+                        GNUNET_CADET_ConnectEventHandler connects,
+                        void * connects_cls,
+                        GNUNET_CADET_WindowSizeEventHandler window_changes,
+                        GNUNET_CADET_DisconnectEventHandler disconnects,
+                        const struct GNUNET_MQ_MessageHandler *handlers)
+{
+  struct GNUNET_CADET_PortMessage *msg;
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_CADET_Port *p;
+
+  GNUNET_assert (NULL != connects);
+  GNUNET_assert (NULL != disconnects);
+
+  p = GNUNET_new (struct GNUNET_CADET_Port);
+  p->cadet = h;
+  p->id = *port;
+  p->connects = connects;
+  p->cls = connects_cls;
+  p->window_changes = window_changes;
+  p->disconnects = disconnects;
+  p->handlers = handlers;
+
+  GNUNET_assert (GNUNET_OK ==
+                GNUNET_CONTAINER_multihashmap_put (h->ports,
+                                                   p->hash,
+                                                   p,
+                                                   
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+
+  env = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_CADET_LOCAL_PORT_OPEN);
+  msg->port = p->id;
+  GNUNET_MQ_send (h->mq, env);
+
+  return p;
+}
+
+
+/**
+ * Create a new channel towards a remote peer.
+ *
+ * If the destination port is not open by any peer or the destination peer
+ * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called
+ * for this channel.
+ *
+ * @param h CADET handle.
+ * @param channel_cls Closure for the channel. It's given to:
+ *                    - The disconnect handler @a disconnects
+ *                    - Each message type callback in @a handlers
+ * @param destination Peer identity the channel should go to.
+ * @param port Identification of the destination port.
+ * @param options CadetOption flag field, with all desired option bits set to 
1.
+ * @param window_changes Function called when the transmit window size changes.
+ * @param disconnects Function called when the channel is disconnected.
+ * @param handlers Callbacks for messages we care about, NULL-terminated.
+ *
+ * @return Handle to the channel.
+ */
+struct GNUNET_CADET_Channel *
+GNUNET_CADET_channel_creatE (struct GNUNET_CADET_Handle *h,
+                             void *channel_cls,
+                             const struct GNUNET_PeerIdentity *destination,
+                             const struct GNUNET_HashCode *port,
+                             enum GNUNET_CADET_ChannelOption options,
+                             GNUNET_CADET_WindowSizeEventHandler 
window_changes,
+                             GNUNET_CADET_DisconnectEventHandler disconnects,
+                             const struct GNUNET_MQ_MessageHandler *handlers)
+{
+  struct GNUNET_CADET_Channel *ch;
+  struct GNUNET_CADET_ClientChannelNumber ccn;
+  struct GNUNET_CADET_LocalChannelCreateMessage *msg;
+  struct GNUNET_MQ_Envelope *env;
+
+  GNUNET_assert (NULL != disconnects);
+
+  /* Save parameters */
+  ccn.channel_of_client = htonl (0);
+  ch = create_channel (h, ccn);
+  ch->ctx = channel_cls;
+  ch->peer = GNUNET_PEER_intern (destination);
+  ch->options = options;
+  ch->window_changes = window_changes;
+  ch->disconnects = disconnects;
+
+  /* Create MQ for channel */
+  ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
+                                          &cadet_mq_destroy_impl,
+                                          &cadet_mq_cancel_impl,
+                                          ch,
+                                          handlers,
+                                          &cadet_mq_error_handler,
+                                          ch);
+  GNUNET_MQ_set_handlers_closure (ch->mq, channel_cls);
+
+  /* Request channel creation to service */
+  env = GNUNET_MQ_msg (msg,
+                       GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
+  msg->ccn = ch->ccn;
+  msg->port = *port;
+  msg->peer = *destination;
+  msg->opt = htonl (options);
+  GNUNET_MQ_send (h->mq,
+                  env);
+
+  return ch;
+}
+
+
+/**
+ * Obtain the message queue for a connected peer.
+ *
+ * @param channel The channel handle from which to get the MQ.
+ *
+ * @return NULL if @a channel is not yet connected.
+ */
+struct GNUNET_MQ_Handle *
+GNUNET_CADET_get_mq (const struct GNUNET_CADET_Channel *channel)
+{
+  return channel->mq;
+}
diff --git a/src/include/gnunet_cadet_service.h 
b/src/include/gnunet_cadet_service.h
index 99d74f223..1434180f4 100644
--- a/src/include/gnunet_cadet_service.h
+++ b/src/include/gnunet_cadet_service.h
@@ -697,26 +697,29 @@ GC_u2h (uint32_t port);
 
/******************************************************************************/
 
 /**
- * Method called whenever a given peer connects in MQ-based CADET.
+ * Method called whenever a peer connects to a port in MQ-based CADET.
  *
- * @param cls Closure from @a GNUNET_CADET_open_porT.
+ * @param cls Closure from #GNUNET_CADET_open_porT.
  * @param channel New handle to the channel.
  * @param source Peer that started this channel.
- *
- * @return Closure for the incoming channel. It's given to:
- *         - The @a GNUNET_CADET_DisconnectEventHandler when the channel dies.
- *         - Each the @a GNUNET_MQ_MessageCallback for each message.
+ * FIXME: Add port that this channel is created for, or is cls enough?
+ *        Port cannot be closed yet, #handle_channel_create would have
+ *        rejected it.
+ * @return Closure for the incoming @a channel. It's given to:
+ *         - The #GNUNET_CADET_DisconnectEventHandler (given to
+ *           #GNUNET_CADET_open_porT) when the channel dies.
+ *         - Each the #GNUNET_MQ_MessageCallback handlers for each message
+ *           received on the @a channel.
  */
 typedef void *
 (*GNUNET_CADET_ConnectEventHandler) (void *cls,
                                      struct GNUNET_CADET_Channel *channel,
                                      const struct GNUNET_PeerIdentity *source);
 
-
 /**
  * Function called whenever an MQ-channel is destroyed, even if the destruction
- * was requested by @a GNUNET_CADET_channel_destroy.
- * It must NOT call @a GNUNET_CADET_channel_destroy on the channel.
+ * was requested by #GNUNET_CADET_channel_destroy.
+ * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
  *
  * It should clean up any associated state, including cancelling any pending
  * transmission on this channel.
@@ -728,7 +731,6 @@ typedef void
 (*GNUNET_CADET_DisconnectEventHandler) (void *cls,
                                         const struct GNUNET_CADET_Channel 
*channel);
 
-
 /**
  * Function called whenever an MQ-channel's transmission window size changes.
  *
@@ -736,29 +738,27 @@ typedef void
  * and will mean the channel is connected to the destination.
  *
  * For an incoming channel it will be called immediately after the
- * @a GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
+ * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
  *
  * @param cls Channel closure.
  * @param channel Connection to the other end (henceforth invalid).
- * @param window_size New window size.
+ * @param window_size New window size. If the is more messages than buffer size
+ *                    this value will be negative..
  */
 typedef void
 (*GNUNET_CADET_WindowSizeEventHandler) (void *cls,
                                         const struct GNUNET_CADET_Channel 
*channel,
                                         int window_size);
 
-
 /**
  * Connect to the MQ-based cadet service.
  *
  * @param cfg Configuration to use.
- *
  * @return Handle to the cadet service NULL on error.
  */
 struct GNUNET_CADET_Handle *
 GNUNET_CADET_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg);
 
-
 /**
  * Open a port to receive incomming MQ-based channels.
  *
@@ -767,21 +767,20 @@ GNUNET_CADET_connecT (const struct 
GNUNET_CONFIGURATION_Handle *cfg);
  * @param connects Function called when an incoming channel is connected.
  * @param connects_cls Closure for the @a connects handler.
  * @param window_changes Function called when the transmit window size changes.
+ *                       Can be NULL.
  * @param disconnects Function called when a channel is disconnected.
  * @param handlers Callbacks for messages we care about, NULL-terminated.
- *
  * @return Port handle.
  */
 struct GNUNET_CADET_Port *
 GNUNET_CADET_open_porT (struct GNUNET_CADET_Handle *h,
                         const struct GNUNET_HashCode *port,
                         GNUNET_CADET_ConnectEventHandler connects,
-                        void * connects_cls,
+                        void *connects_cls,
                         GNUNET_CADET_WindowSizeEventHandler window_changes,
                         GNUNET_CADET_DisconnectEventHandler disconnects,
                         const struct GNUNET_MQ_MessageHandler *handlers);
 
-
 /**
  * Create a new channel towards a remote peer.
  *
@@ -791,15 +790,17 @@ GNUNET_CADET_open_porT (struct GNUNET_CADET_Handle *h,
  *
  * @param h CADET handle.
  * @param channel_cls Closure for the channel. It's given to:
+ *                    - The management handler @a window_changes.
  *                    - The disconnect handler @a disconnects
  *                    - Each message type callback in @a handlers
  * @param destination Peer identity the channel should go to.
  * @param port Identification of the destination port.
  * @param options CadetOption flag field, with all desired option bits set to 
1.
  * @param window_changes Function called when the transmit window size changes.
+ *                       Can be NULL if this data is of no interest.
+ * TODO                  Not yet implemented.
  * @param disconnects Function called when the channel is disconnected.
  * @param handlers Callbacks for messages we care about, NULL-terminated.
- *
  * @return Handle to the channel.
  */
 struct GNUNET_CADET_Channel *
@@ -812,13 +813,11 @@ GNUNET_CADET_channel_creatE (struct GNUNET_CADET_Handle 
*h,
                              GNUNET_CADET_DisconnectEventHandler disconnects,
                              const struct GNUNET_MQ_MessageHandler *handlers);
 
-
 /**
- * Obtain the message queue for a connected peer.
+ * Obtain the message queue for a connected channel.
  *
  * @param channel The channel handle from which to get the MQ.
- *
- * @return NULL if @a channel is not yet connected.
+ * @return The message queue of the channel.
  */
 struct GNUNET_MQ_Handle *
 GNUNET_CADET_get_mq (const struct GNUNET_CADET_Channel *channel);

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]