gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r37473 - gnunet/src/transport


From: gnunet
Subject: [GNUnet-SVN] r37473 - gnunet/src/transport
Date: Thu, 7 Jul 2016 00:52:03 +0200

Author: grothoff
Date: 2016-07-07 00:52:03 +0200 (Thu, 07 Jul 2016)
New Revision: 37473

Modified:
   gnunet/src/transport/transport_api_monitor_plugins.c
Log:
converting monitor plugin functionality to MQ

Modified: gnunet/src/transport/transport_api_monitor_plugins.c
===================================================================
--- gnunet/src/transport/transport_api_monitor_plugins.c        2016-07-06 
22:10:48 UTC (rev 37472)
+++ gnunet/src/transport/transport_api_monitor_plugins.c        2016-07-06 
22:52:03 UTC (rev 37473)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2014 GNUnet e.V.
+     Copyright (C) 2014, 2016 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -41,7 +41,7 @@
   /**
    * Connection to the service.
    */
-  struct GNUNET_CLIENT_Connection *client;
+  struct GNUNET_MQ_Handle *mq;
 
   /**
    * Our configuration.
@@ -72,7 +72,7 @@
   /**
    * Task ID for reconnect.
    */
-  struct GNUNET_SCHEDULER_Task * reconnect_task;
+  struct GNUNET_SCHEDULER_Task *reconnect_task;
 
 };
 
@@ -95,57 +95,16 @@
 };
 
 
-/**
- * Function called with responses from the service.
- *
- * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
- * @param msg NULL on timeout or error, otherwise presumably a
- *        message with the human-readable address
- */
-static void
-response_processor (void *cls,
-                    const struct GNUNET_MessageHeader *msg);
 
-
 /**
- * Send our subscription request to the service.
- *
- * @param pal_ctx our context
- */
-static void
-send_plugin_mon_request (struct GNUNET_TRANSPORT_PluginMonitor *pm)
-{
-  struct GNUNET_MessageHeader msg;
-
-  msg.size = htons (sizeof (struct GNUNET_MessageHeader));
-  msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START);
-  GNUNET_assert (GNUNET_OK ==
-                 GNUNET_CLIENT_transmit_and_get_response (pm->client,
-                                                          &msg,
-                                                          
GNUNET_TIME_UNIT_FOREVER_REL,
-                                                          GNUNET_YES,
-                                                          &response_processor,
-                                                          pm));
-}
-
-
-/**
  * Task run to re-establish the connection.
  *
  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
  */
 static void
-do_plugin_connect (void *cls)
-{
-  struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
+do_plugin_connect (void *cls);
 
-  pm->reconnect_task = NULL;
-  pm->client = GNUNET_CLIENT_connect ("transport", pm->cfg);
-  GNUNET_assert (NULL != pm->client);
-  send_plugin_mon_request (pm);
-}
 
-
 /**
  * Free the session entry and notify the callback about its demise.
  *
@@ -184,8 +143,8 @@
 static void
 reconnect_plugin_ctx (struct GNUNET_TRANSPORT_PluginMonitor *pm)
 {
-  GNUNET_CLIENT_disconnect (pm->client);
-  pm->client = NULL;
+  GNUNET_MQ_destroy (pm->mq);
+  pm->mq = NULL;
   GNUNET_CONTAINER_multihashmap32_iterate (pm->sessions,
                                            &free_entry,
                                            pm);
@@ -257,15 +216,44 @@
  * Function called with responses from the service.
  *
  * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
- * @param msg NULL on timeout or error, otherwise presumably a
- *        message with the human-readable address
+ * @paramm tpmm message with event data
+ * @return #GNUNET_Ok if message is well-formed
  */
+static int
+check_event (void *cls,
+             const struct TransportPluginMonitorMessage *tpmm)
+{
+  const char *pname;
+  size_t pname_len;
+  size_t paddr_len;
+
+  pname = (const char *) &tpmm[1];
+  pname_len = ntohs (tpmm->plugin_name_len);
+  paddr_len = ntohs (tpmm->plugin_address_len);
+  if ( (pname_len +
+        paddr_len +
+        sizeof (struct TransportPluginMonitorMessage) != ntohs 
(tpmm->header.size)) ||
+       ( (0 != pname_len) &&
+         ('\0' != pname[pname_len - 1]) ) )
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Function called with responses from the service.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
+ * @paramm tpmm message with event data
+ */
 static void
-response_processor (void *cls,
-                    const struct GNUNET_MessageHeader *msg)
+handle_event (void *cls,
+              const struct TransportPluginMonitorMessage *tpmm)
 {
   struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
-  const struct TransportPluginMonitorMessage *tpmm;
   struct GNUNET_TRANSPORT_PluginSession *ps;
   const char *pname;
   const void *paddr;
@@ -276,47 +264,9 @@
   struct GNUNET_HELLO_Address addr;
   struct SearchContext rv;
 
-  if (NULL == msg)
-  {
-    reconnect_plugin_ctx (pm);
-    return;
-  }
-  if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC == ntohs 
(msg->type)) &&
-       (sizeof (struct GNUNET_MessageHeader) == ntohs (msg->size)) )
-  {
-    /* we are in sync */
-    pm->cb (pm->cb_cls,
-            NULL,
-            NULL,
-            NULL);
-    GNUNET_CLIENT_receive (pm->client,
-                           &response_processor,
-                           pm,
-                           GNUNET_TIME_UNIT_FOREVER_REL);
-    return;
-  }
-
-  if ( (GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT != ntohs 
(msg->type)) ||
-       (sizeof (struct TransportPluginMonitorMessage) > ntohs (msg->size)) )
-  {
-    GNUNET_break (0);
-    reconnect_plugin_ctx (pm);
-    return;
-  }
-  tpmm = (const struct TransportPluginMonitorMessage *) msg;
   pname = (const char *) &tpmm[1];
   pname_len = ntohs (tpmm->plugin_name_len);
   paddr_len = ntohs (tpmm->plugin_address_len);
-  if ( (pname_len +
-        paddr_len +
-        sizeof (struct TransportPluginMonitorMessage) != ntohs (msg->size)) ||
-       ( (0 != pname_len) &&
-         ('\0' != pname[pname_len - 1]) ) )
-  {
-    GNUNET_break (0);
-    reconnect_plugin_ctx (pm);
-    return;
-  }
   paddr = &pname[pname_len];
   ps = NULL;
   ss = (enum GNUNET_TRANSPORT_SessionState) ntohs (tpmm->session_state);
@@ -372,14 +322,87 @@
                                                            ps));
     GNUNET_free (ps);
   }
-  GNUNET_CLIENT_receive (pm->client,
-                         &response_processor,
-                         pm,
-                         GNUNET_TIME_UNIT_FOREVER_REL);
 }
 
 
 /**
+ * Function called with sync responses from the service.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
+ * @param msg message from the service
+ */
+static void
+handle_sync (void *cls,
+             const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
+
+  /* we are in sync, notify callback */
+  pm->cb (pm->cb_cls,
+          NULL,
+          NULL,
+          NULL);
+}
+
+
+/**
+ * Generic error handler, called with the appropriate
+ * error code and the same closure specified at the creation of
+ * the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_NSE_Handle *`
+ * @param error error code
+ */
+static void
+mq_error_handler (void *cls,
+                  enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
+
+  reconnect_plugin_ctx (pm);
+}
+
+
+/**
+ * Task run to re-establish the connection.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_PluginMonitor *`
+ */
+static void
+do_plugin_connect (void *cls)
+{
+  GNUNET_MQ_hd_var_size (event,
+                         GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_EVENT,
+                         struct TransportPluginMonitorMessage);
+  GNUNET_MQ_hd_fixed_size (sync,
+                           GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_SYNC,
+                           struct GNUNET_MessageHeader);
+  struct GNUNET_TRANSPORT_PluginMonitor *pm = cls;
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    make_event_handler (pm),
+    make_sync_handler (pm),
+    GNUNET_MQ_handler_end ()
+  };
+  struct GNUNET_MessageHeader *msg;
+  struct GNUNET_MQ_Envelope *env;
+
+  pm->reconnect_task = NULL;
+  pm->mq = GNUNET_CLIENT_connecT (pm->cfg,
+                                  "transport",
+                                  handlers,
+                                  &mq_error_handler,
+                                  pm);
+  if (NULL == pm->mq)
+    return;
+  env = GNUNET_MQ_msg (msg,
+                       GNUNET_MESSAGE_TYPE_TRANSPORT_MONITOR_PLUGIN_START);
+  GNUNET_MQ_send (pm->mq,
+                  env);
+}
+
+
+/**
  * Install a plugin session state monitor callback.  The callback
  * will be notified whenever the session changes.
  *
@@ -394,19 +417,18 @@
                                   void *cb_cls)
 {
   struct GNUNET_TRANSPORT_PluginMonitor *pm;
-  struct GNUNET_CLIENT_Connection *client;
 
-  client = GNUNET_CLIENT_connect ("transport",
-                                  cfg);
-  if (NULL == client)
-    return NULL;
   pm = GNUNET_new (struct GNUNET_TRANSPORT_PluginMonitor);
   pm->cb = cb;
   pm->cb_cls = cb_cls;
   pm->cfg = cfg;
-  pm->client = client;
+  do_plugin_connect (pm);
+  if (NULL == pm->mq)
+  {
+    GNUNET_free (pm);
+    return NULL;
+  }
   pm->sessions = GNUNET_CONTAINER_multihashmap32_create (128);
-  send_plugin_mon_request (pm);
   return pm;
 }
 
@@ -422,10 +444,10 @@
 void
 GNUNET_TRANSPORT_monitor_plugins_cancel (struct GNUNET_TRANSPORT_PluginMonitor 
*pm)
 {
-  if (NULL != pm->client)
+  if (NULL != pm->mq)
   {
-    GNUNET_CLIENT_disconnect (pm->client);
-    pm->client = NULL;
+    GNUNET_MQ_destroy (pm->mq);
+    pm->mq = NULL;
   }
   if (NULL != pm->reconnect_task)
   {




reply via email to

[Prev in Thread] Current Thread [Next in Thread]