gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-exchange] branch master updated: -implement long polling support


From: gnunet
Subject: [taler-exchange] branch master updated: -implement long polling support on reserve status (but not yet in C client library)
Date: Sun, 22 Aug 2021 00:12:23 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository exchange.

The following commit(s) were added to refs/heads/master by this push:
     new 10f9272e -implement long polling support on reserve status (but not 
yet in C client library)
10f9272e is described below

commit 10f9272e45ea97d1b8f8059c9d285049ff4b606d
Author: Christian Grothoff <christian@grothoff.org>
AuthorDate: Sun Aug 22 00:12:18 2021 +0200

    -implement long polling support on reserve status (but not yet in C client 
library)
---
 contrib/gana                                     |   2 +-
 src/exchange/taler-exchange-httpd.c              |  26 +++-
 src/exchange/taler-exchange-httpd_reserves_get.c | 176 +++++++++++++++++++++--
 src/exchange/taler-exchange-httpd_reserves_get.h |   9 ++
 src/exchangedb/plugin_exchangedb_postgres.c      | 102 +++++++++++--
 src/include/taler_exchangedb_plugin.h            |  31 +++-
 6 files changed, 313 insertions(+), 33 deletions(-)

diff --git a/contrib/gana b/contrib/gana
index 2e967c48..efb2a1fd 160000
--- a/contrib/gana
+++ b/contrib/gana
@@ -1 +1 @@
-Subproject commit 2e967c48b395a3edb85982e2e349cb82e76dcb27
+Subproject commit efb2a1fd64e17159c56ff3674083837b5a657a64
diff --git a/src/exchange/taler-exchange-httpd.c 
b/src/exchange/taler-exchange-httpd.c
index 80649c0b..c06695e4 100644
--- a/src/exchange/taler-exchange-httpd.c
+++ b/src/exchange/taler-exchange-httpd.c
@@ -1430,8 +1430,14 @@ run_single_request (void)
     }
     MHD_run (mhd);
   }
-  TEH_resume_keys_requests (true);
-  MHD_stop_daemon (mhd);
+  {
+    MHD_socket sock = MHD_quiesce_daemon (mhd);
+
+    TEH_resume_keys_requests (true);
+    TEH_reserves_get_cleanup ();
+    MHD_stop_daemon (mhd);
+    GNUNET_break (0 == close (sock));
+  }
   mhd = NULL;
   if (cld != waitpid (cld,
                       &status,
@@ -1494,8 +1500,15 @@ run_main_loop (int fh,
   {
   case GNUNET_OK:
   case GNUNET_SYSERR:
-    TEH_resume_keys_requests (true);
-    MHD_stop_daemon (mhd);
+    {
+      MHD_socket sock = MHD_quiesce_daemon (mhd);
+
+      TEH_resume_keys_requests (true);
+      TEH_reserves_get_cleanup ();
+      MHD_stop_daemon (mhd);
+      GNUNET_break (0 == close (sock));
+    }
+    mhd = NULL;
     break;
   case GNUNET_NO:
     {
@@ -1507,7 +1520,9 @@ run_main_loop (int fh,
       flags = fcntl (sock, F_GETFD);
       GNUNET_assert (-1 != flags);
       flags &= ~FD_CLOEXEC;
-      GNUNET_assert (-1 != fcntl (sock, F_SETFD, flags));
+      GNUNET_assert (-1 != fcntl (sock,
+                                  F_SETFD,
+                                  flags));
       chld = fork ();
       if (-1 == chld)
       {
@@ -1551,6 +1566,7 @@ run_main_loop (int fh,
         sleep (1);
       /* Now we're really done, practice clean shutdown */
       TEH_resume_keys_requests (true);
+      TEH_reserves_get_cleanup ();
       MHD_stop_daemon (mhd);
     }
     break;
diff --git a/src/exchange/taler-exchange-httpd_reserves_get.c 
b/src/exchange/taler-exchange-httpd_reserves_get.c
index d08543a4..6501f600 100644
--- a/src/exchange/taler-exchange-httpd_reserves_get.c
+++ b/src/exchange/taler-exchange-httpd_reserves_get.c
@@ -1,6 +1,6 @@
 /*
   This file is part of TALER
-  Copyright (C) 2014-2020 Taler Systems SA
+  Copyright (C) 2014-2021 Taler Systems SA
 
   TALER is free software; you can redistribute it and/or modify it under the
   terms of the GNU Affero General Public License as published by the Free 
Software
@@ -25,6 +25,7 @@
 #include <jansson.h>
 #include "taler_mhd_lib.h"
 #include "taler_json_lib.h"
+#include "taler_dbevents.h"
 #include "taler-exchange-httpd_reserves_get.h"
 #include "taler-exchange-httpd_responses.h"
 
@@ -49,25 +50,113 @@ struct ReservePoller
    */
   struct MHD_Connection *connection;
 
-  /**
-   * Entry in the timeout heap.
-   */
-  struct GNUNET_CONTAINER_HeapNode *hn;
-
   /**
    * Subscription for the database event we are
    * waiting for.
    */
-  struct GNUNET_DB_EventHandler *eh;
+  struct TALER_EXCHANGEDB_EventHandler *eh;
 
   /**
    * When will this request time out?
    */
   struct GNUNET_TIME_Absolute timeout;
 
+  /**
+   * True if we are still suspended.
+   */
+  bool suspended;
+
 };
 
 
+/**
+ * Head of list of requests in long polling.
+ */
+static struct ReservePoller *rp_head;
+
+/**
+ * Tail of list of requests in long polling.
+ */
+static struct ReservePoller *rp_tail;
+
+
+void
+TEH_reserves_get_cleanup ()
+{
+  struct ReservePoller *rp;
+
+  while (NULL != (rp = rp_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (rp_head,
+                                 rp_tail,
+                                 rp);
+    if (rp->suspended)
+    {
+      rp->suspended = false;
+      MHD_resume_connection (rp->connection);
+    }
+  }
+}
+
+
+/**
+ * Function called once a connection is done to
+ * clean up the `struct ReservePoller` state.
+ *
+ * @param rc context to clean up for
+ */
+static void
+rp_cleanup (struct TEH_RequestContext *rc)
+{
+  struct ReservePoller *rp = rc->rh_ctx;
+
+  if (NULL != rp->eh)
+  {
+    TEH_plugin->event_listen_cancel (TEH_plugin->cls,
+                                     rp->eh);
+    rp->eh = NULL;
+  }
+  GNUNET_free (rp);
+}
+
+
+/**
+ * Function called on events received from Postgres.
+ * Wakes up long pollers.
+ *
+ * @param cls the `struct TEH_RequestContext *`
+ * @param extra additional event data provided
+ * @param extra_size number of bytes in @a extra
+ */
+static void
+db_event_cb (void *cls,
+             const void *extra,
+             size_t extra_size)
+{
+  struct TEH_RequestContext *rc = cls;
+  struct ReservePoller *rp = rc->rh_ctx;
+  struct GNUNET_AsyncScopeSave old_scope;
+
+  (void) extra;
+  (void) extra_size;
+  if (NULL == rp)
+    return; /* event triggered while main transaction
+               was still running */
+  if (! rp->suspended)
+    return; /* might get multiple wake-up events */
+  rp->suspended = false;
+  GNUNET_async_scope_enter (&rc->async_scope_id,
+                            &old_scope);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Resuming from long-polling on reserve\n");
+  GNUNET_CONTAINER_DLL_remove (rp_head,
+                               rp_tail,
+                               rp);
+  MHD_resume_connection (rp->connection);
+  GNUNET_async_scope_restore (&old_scope);
+}
+
+
 /**
  * Send reserve history to client.
  *
@@ -157,6 +246,8 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc,
 {
   struct ReserveHistoryContext rsc;
   MHD_RESULT mhd_ret;
+  struct GNUNET_TIME_Relative timeout;
+  struct TALER_EXCHANGEDB_EventHandler *eh = NULL;
 
   if (GNUNET_OK !=
       GNUNET_STRINGS_string_to_data (args[0],
@@ -170,6 +261,47 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc,
                                        
TALER_EC_MERCHANT_GENERIC_RESERVE_PUB_MALFORMED,
                                        args[0]);
   }
+  {
+    const char *long_poll_timeout_ms;
+
+    long_poll_timeout_ms
+      = MHD_lookup_connection_value (rc->connection,
+                                     MHD_GET_ARGUMENT_KIND,
+                                     "timeout_ms");
+    if (NULL != long_poll_timeout_ms)
+    {
+      unsigned int timeout_ms;
+      char dummy;
+
+      if (1 != sscanf (long_poll_timeout_ms,
+                       "%u%c",
+                       &timeout_ms,
+                       &dummy))
+      {
+        GNUNET_break_op (0);
+        return TALER_MHD_reply_with_error (rc->connection,
+                                           MHD_HTTP_BAD_REQUEST,
+                                           
TALER_EC_GENERIC_PARAMETER_MALFORMED,
+                                           "timeout_ms (must be non-negative 
number)");
+      }
+      timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+                                               timeout_ms);
+    }
+  }
+  if (! GNUNET_TIME_relative_is_zero (timeout))
+  {
+    struct TALER_ReserveEventP rep = {
+      .header.size = htons (sizeof (rep)),
+      .header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING),
+      .reserve_pub = rsc.reserve_pub
+    };
+
+    eh = TEH_plugin->event_listen (TEH_plugin->cls,
+                                   timeout,
+                                   &rep.header,
+                                   &db_event_cb,
+                                   rc);
+  }
   rsc.rh = NULL;
   if (GNUNET_OK !=
       TEH_DB_run_transaction (rc->connection,
@@ -178,13 +310,33 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc,
                               &reserve_history_transaction,
                               &rsc))
     return mhd_ret;
-
   /* generate proper response */
   if (NULL == rsc.rh)
-    return TALER_MHD_reply_with_error (rc->connection,
-                                       MHD_HTTP_NOT_FOUND,
-                                       
TALER_EC_EXCHANGE_RESERVES_GET_STATUS_UNKNOWN,
-                                       args[0]);
+  {
+    struct ReservePoller *rp = rc->rh_ctx;
+
+    if ( (NULL != rp) ||
+         (GNUNET_TIME_relative_is_zero (timeout)) )
+    {
+      return TALER_MHD_reply_with_error (rc->connection,
+                                         MHD_HTTP_NOT_FOUND,
+                                         
TALER_EC_EXCHANGE_RESERVES_GET_STATUS_UNKNOWN,
+                                         args[0]);
+    }
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Long-polling on reserve for %s\n",
+                GNUNET_STRINGS_relative_time_to_string (timeout,
+                                                        GNUNET_YES));
+    rp = GNUNET_new (struct ReservePoller);
+    rp->connection = rc->connection;
+    rp->timeout = GNUNET_TIME_relative_to_absolute (timeout);
+    rp->eh = eh;
+    rc->rh_ctx = rp;
+    rc->rh_cleaner = &rp_cleanup;
+    rp->suspended = true;
+    MHD_suspend_connection (rc->connection);
+    return MHD_YES;
+  }
   mhd_ret = reply_reserve_history_success (rc->connection,
                                            rsc.rh);
   TEH_plugin->free_reserve_history (TEH_plugin->cls,
diff --git a/src/exchange/taler-exchange-httpd_reserves_get.h 
b/src/exchange/taler-exchange-httpd_reserves_get.h
index 1eb9ab60..30c6559f 100644
--- a/src/exchange/taler-exchange-httpd_reserves_get.h
+++ b/src/exchange/taler-exchange-httpd_reserves_get.h
@@ -27,6 +27,15 @@
 #include "taler-exchange-httpd.h"
 
 
+/**
+ * Shutdown reserves-get subsystem.  Resumes all
+ * suspended long-polling clients and cleans up
+ * data structures.
+ */
+void
+TEH_reserves_get_cleanup (void);
+
+
 /**
  * Handle a GET "/reserves/" request.  Parses the
  * given "reserve_pub" in @a args (which should contain the
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c 
b/src/exchangedb/plugin_exchangedb_postgres.c
index 1d05fb49..2d7ca057 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -109,6 +109,39 @@ struct TALER_EXCHANGEDB_Session
 };
 
 
+/**
+ * Event registration record.
+ */
+struct TALER_EXCHANGEDB_EventHandler
+{
+  /**
+   * Underlying GNUnet event handler.
+   */
+  struct GNUNET_DB_EventHandler *geh;
+
+  /**
+   * Entry in the heap.
+   */
+  struct GNUNET_CONTAINER_HeapNode *hn;
+
+  /**
+   * Our timeout.
+   */
+  struct GNUNET_TIME_Absolute timeout;
+
+  /**
+   * Callback to invoke (on @e timeout).
+   */
+  GNUNET_DB_EventCallback cb;
+
+  /**
+   * Closure for @e cb.
+   */
+  void *cb_cls;
+
+};
+
+
 /**
  * Type of the "cls" argument given to each of the functions in
  * our API.
@@ -132,6 +165,12 @@ struct PostgresClosure
    */
   char *sql_dir;
 
+  /**
+   * Heap of `struct TALER_EXCHANGEDB_EventHandler`
+   * by timeout.
+   */
+  struct GNUNET_CONTAINER_Heap *event_heap;
+
   /**
    * After how long should idle reserves be closed?
    */
@@ -2832,18 +2871,41 @@ handle_events (void *cls)
     }
   };
   nfds_t nfds = (-1 == pg->pg_sock) ? 1 : 2;
+  struct TALER_EXCHANGEDB_EventHandler *r;
 
   GNUNET_assert (0 ==
                  pthread_mutex_lock (&pg->event_lock));
   while (0 != pg->listener_count)
   {
     int ret;
+    int timeout = -1; /* no timeout */
 
     GNUNET_assert (0 ==
                    pthread_mutex_unlock (&pg->event_lock));
+    while (1)
+    {
+      r = GNUNET_CONTAINER_heap_peek (pg->event_heap);
+      if (NULL == r)
+        break;
+      if (GNUNET_TIME_absolute_is_future (r->timeout))
+        break;
+      GNUNET_assert (r ==
+                     GNUNET_CONTAINER_heap_remove_root (pg->event_heap));
+      r->hn = NULL;
+      r->cb (r->cb_cls,
+             NULL,
+             0);
+    }
+    if (NULL != r)
+    {
+      struct GNUNET_TIME_Relative rem;
+
+      rem = GNUNET_TIME_absolute_get_remaining (r->timeout);
+      timeout = rem.rel_value_us / GNUNET_TIME_UNIT_MILLISECONDS.rel_value_us;
+    }
     ret = poll (pfds,
                 nfds,
-                -1 /* no timeout */);
+                timeout);
     if (-1 == ret)
       GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
                            "poll");
@@ -2909,16 +2971,30 @@ pq_socket_cb (void *cls,
  * @param cb_cls closure for @a cb
  * @return handle useful to cancel the listener
  */
-static struct GNUNET_DB_EventHandler *
+static struct TALER_EXCHANGEDB_EventHandler *
 postgres_event_listen (void *cls,
-                       struct TALER_EXCHANGEDB_Session *session,
+                       struct GNUNET_TIME_Relative timeout,
                        const struct GNUNET_DB_EventHeaderP *es,
                        GNUNET_DB_EventCallback cb,
                        void *cb_cls)
 {
   struct PostgresClosure *pg = cls;
-  struct GNUNET_DB_EventHandler *eh;
+  struct TALER_EXCHANGEDB_EventHandler *eh;
+  struct TALER_EXCHANGEDB_Session *session;
 
+  session = postgres_get_session (pg);
+  eh = GNUNET_new (struct TALER_EXCHANGEDB_EventHandler);
+  eh->cb = cb;
+  eh->cb_cls = cb_cls;
+  eh->timeout = GNUNET_TIME_relative_to_absolute (timeout);
+  eh->geh = GNUNET_PQ_event_listen (session->conn,
+                                    es,
+                                    cb,
+                                    cb_cls);
+  GNUNET_assert (NULL != eh->geh);
+  eh->hn = GNUNET_CONTAINER_heap_insert (pg->event_heap,
+                                         eh,
+                                         eh->timeout.abs_value_us);
   GNUNET_assert (0 ==
                  pthread_mutex_lock (&pg->event_lock));
   pg->listener_count++;
@@ -2932,11 +3008,6 @@ postgres_event_listen (void *cls,
   }
   GNUNET_assert (0 ==
                  pthread_mutex_unlock (&pg->event_lock));
-  eh = GNUNET_PQ_event_listen (session->conn,
-                               es,
-                               cb,
-                               cb_cls);
-  GNUNET_assert (NULL != eh);
   return eh;
 }
 
@@ -2949,7 +3020,7 @@ postgres_event_listen (void *cls,
  */
 static void
 postgres_event_listen_cancel (void *cls,
-                              struct GNUNET_DB_EventHandler *eh)
+                              struct TALER_EXCHANGEDB_EventHandler *eh)
 {
   struct PostgresClosure *pg = cls;
 
@@ -2971,7 +3042,13 @@ postgres_event_listen_cancel (void *cls,
   }
   GNUNET_assert (0 ==
                  pthread_mutex_unlock (&pg->event_lock));
-  GNUNET_PQ_event_listen_cancel (eh);
+  if (NULL != eh->hn)
+  {
+    GNUNET_CONTAINER_heap_remove_node (eh->hn);
+    eh->hn = NULL;
+  }
+  GNUNET_PQ_event_listen_cancel (eh->geh);
+  GNUNET_free (eh);
 }
 
 
@@ -10917,6 +10994,8 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
 
   pg = GNUNET_new (struct PostgresClosure);
   pg->cfg = cfg;
+  pg->event_heap = GNUNET_CONTAINER_heap_create (
+    GNUNET_CONTAINER_HEAP_ORDER_MIN);
   pg->main_self = pthread_self (); /* loaded while single-threaded! */
   if (GNUNET_OK !=
       GNUNET_CONFIGURATION_get_value_filename (cfg,
@@ -11166,6 +11245,7 @@ libtaler_plugin_exchangedb_postgres_done (void *cls)
   GNUNET_break (0 ==
                 close (pg->event_fd));
   pthread_mutex_destroy (&pg->event_lock);
+  GNUNET_CONTAINER_heap_destroy (pg->event_heap);
   GNUNET_free (pg->sql_dir);
   GNUNET_free (pg->currency);
   GNUNET_free (pg);
diff --git a/src/include/taler_exchangedb_plugin.h 
b/src/include/taler_exchangedb_plugin.h
index 61c764a5..4cf6514f 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -73,8 +73,31 @@ struct TALER_EXCHANGEDB_DenominationKeyInformationP
 };
 
 
+/**
+ * Signature of events signalling a reseve got funding.
+ */
+struct TALER_ReserveEventP
+{
+  /**
+   * Of type #TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING.
+   */
+  struct GNUNET_DB_EventHeaderP header;
+
+  /**
+   * Public key of the reserve the event is about.
+   */
+  struct TALER_ReservePublicKeyP reserve_pub;
+};
+
+
 GNUNET_NETWORK_STRUCT_END
 
+/**
+ * Event registration record.
+ */
+struct TALER_EXCHANGEDB_EventHandler;
+
+
 /**
  * Meta data about an exchange online signing key.
  */
@@ -2149,16 +2172,16 @@ struct TALER_EXCHANGEDB_Plugin
    * Register callback to be invoked on events of type @a es.
    *
    * @param cls database context to use
-   * @param session connection to use
+   * @param timeout how long to wait at most
    * @param es specification of the event to listen for
    * @param cb function to call when the event happens, possibly
    *         multiple times (until cancel is invoked)
    * @param cb_cls closure for @a cb
    * @return handle useful to cancel the listener
    */
-  struct GNUNET_DB_EventHandler *
+  struct TALER_EXCHANGEDB_EventHandler *
   (*event_listen)(void *cls,
-                  struct TALER_EXCHANGEDB_Session *session,
+                  struct GNUNET_TIME_Relative timeout,
                   const struct GNUNET_DB_EventHeaderP *es,
                   GNUNET_DB_EventCallback cb,
                   void *cb_cls);
@@ -2171,7 +2194,7 @@ struct TALER_EXCHANGEDB_Plugin
    */
   void
   (*event_listen_cancel)(void *cls,
-                         struct GNUNET_DB_EventHandler *eh);
+                         struct TALER_EXCHANGEDB_EventHandler *eh);
 
 
   /**

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]