gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [taler-exchange] branch master updated: add wire_out tracki


From: gnunet
Subject: [GNUnet-SVN] [taler-exchange] branch master updated: add wire_out tracking to exchangedb, including deferred constraint, and to aggregator
Date: Sat, 18 Mar 2017 03:45:02 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository exchange.

The following commit(s) were added to refs/heads/master by this push:
     new 6a98b07  add wire_out tracking to exchangedb, including deferred 
constraint, and to aggregator
6a98b07 is described below

commit 6a98b07ff2e75a429982eaf6b00ce54c95a28e8e
Author: Christian Grothoff <address@hidden>
AuthorDate: Sat Mar 18 03:44:59 2017 +0100

    add wire_out tracking to exchangedb, including deferred constraint, and to 
aggregator
---
 ChangeLog                                   |   4 +
 src/exchange/taler-exchange-aggregator.c    |  50 ++++++--
 src/exchangedb/plugin_exchangedb_postgres.c | 188 ++++++++++++++++++----------
 src/exchangedb/test_exchangedb.c            |  41 ++++--
 src/include/taler_exchangedb_plugin.h       |  15 +++
 5 files changed, 212 insertions(+), 86 deletions(-)

diff --git a/ChangeLog b/ChangeLog
index 2c512a6..40743eb 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,7 @@
+Sat Mar 18 03:44:38 CET 2017
+       Add 'wire_out' table to exchange DB to track outgoing
+       wire transfers. -CG
+
 Fri Nov 18 18:53:30 CET 2016
        Improved error reporting for bogus wire specifications.
        Releasing taler-exchange 0.2.0. -CG
diff --git a/src/exchange/taler-exchange-aggregator.c 
b/src/exchange/taler-exchange-aggregator.c
index ae4ee24..44154d2 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -705,8 +705,8 @@ run_aggregation (void *cls)
     return;
   }
   if (GNUNET_OK !=
-      db_plugin->start (db_plugin->cls,
-                        session))
+      db_plugin->start_deferred_wire_out (db_plugin->cls,
+                                          session))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "Failed to start database transaction!\n");
@@ -908,11 +908,6 @@ prepare_cb (void *cls,
 {
   struct TALER_EXCHANGEDB_Session *session = au->session;
 
-  if (NULL != au->wire)
-  {
-    json_decref (au->wire);
-    au->wire = NULL;
-  }
   GNUNET_free_non_null (au->additional_rows);
   if (NULL == buf)
   {
@@ -922,6 +917,11 @@ prepare_cb (void *cls,
     /* start again */
     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
                                      NULL);
+    if (NULL != au->wire)
+    {
+      json_decref (au->wire);
+      au->wire = NULL;
+    }
     GNUNET_free (au);
     au = NULL;
     return;
@@ -941,10 +941,46 @@ prepare_cb (void *cls,
     /* start again */
     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
                                      NULL);
+    if (NULL != au->wire)
+    {
+      json_decref (au->wire);
+      au->wire = NULL;
+    }
+    GNUNET_free (au);
+    au = NULL;
+    return;
+  }
+
+  /* Commit the WTID data to 'wire_out' to finally satisfy aggregation
+     table constraints */
+  if (GNUNET_OK !=
+      db_plugin->store_wire_transfer_out (db_plugin->cls,
+                                          session,
+                                          au->execution_time,
+                                          &au->wtid,
+                                          au->wire,
+                                          &au->total_amount))
+  {
+    GNUNET_break (0); /* why? how to best recover? */
+    db_plugin->rollback (db_plugin->cls,
+                         session);
+    /* start again */
+    task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+                                     NULL);
+    if (NULL != au->wire)
+    {
+      json_decref (au->wire);
+      au->wire = NULL;
+    }
     GNUNET_free (au);
     au = NULL;
     return;
   }
+  if (NULL != au->wire)
+  {
+    json_decref (au->wire);
+    au->wire = NULL;
+  }
   GNUNET_free (au);
   au = NULL;
 
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c 
b/src/exchangedb/plugin_exchangedb_postgres.c
index b7a3b5f..f686a8c 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -40,13 +40,23 @@
 
 
 /**
- * Log a really unexpected PQ error.
+ * Log a really unexpected PQ error with all the details we can get hold of.
  *
  * @param result PQ result object of the PQ operation that failed
+ * @param conn SQL connection that was used
  */
-#define BREAK_DB_ERR(result) do { \
+#define BREAK_DB_ERR(result,conn) do {                                      \
+    char *err = PQresultVerboseErrorMessage (result, PQERRORS_VERBOSE, 
PQSHOW_CONTEXT_ALWAYS); \
     GNUNET_break (0); \
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Database failure: %s (%s)\n", 
PQresultErrorMessage (result), PQresStatus (PQresultStatus (result))); \
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, \
+                "Database failure: %s/%s/%s/%s/%s/%s", \
+                PQresultErrorField (result, PG_DIAG_MESSAGE_PRIMARY), \
+                PQresultErrorField (result, PG_DIAG_MESSAGE_DETAIL), \
+                PQresultErrorMessage (result), \
+                PQresStatus (PQresultStatus (result)), \
+                PQerrorMessage(conn), \
+                err);                 \
+    PQfreemem (err); \
   } while (0)
 
 
@@ -75,7 +85,7 @@
     PGresult *result = PQexec (conn, sql);                              \
     if (PGRES_COMMAND_OK != PQresultStatus (result))                    \
     {                                                                   \
-      BREAK_DB_ERR (result);                                            \
+      BREAK_DB_ERR (result, conn);                                      \
       PQclear (result);                                                 \
       goto SQLEXEC_fail;                                                \
     }                                                                   \
@@ -134,7 +144,7 @@ struct PostgresClosure
  * We already log whenever we care, so this function does nothing
  * and merely exists to silence the libpq logging.
  *
- * @param arg NULL
+ * @param arg the SQL connection that was used
  * @param res information about some libpq event
  */
 static void
@@ -149,7 +159,7 @@ pq_notice_receiver_cb (void *arg,
  * Function called by libpq whenever it wants to log something.
  * We log those using the Taler logger.
  *
- * @param arg NULL
+ * @param arg the SQL connection that was used
  * @param message information about some libpq event
  */
 static void
@@ -186,10 +196,10 @@ connect_to_postgres (struct PostgresClosure *pc)
   }
   PQsetNoticeReceiver (conn,
                        &pq_notice_receiver_cb,
-                       NULL);
+                       conn);
   PQsetNoticeProcessor (conn,
                         &pq_notice_processor_cb,
-                        NULL);
+                        conn);
   return conn;
 }
 
@@ -465,11 +475,23 @@ postgres_create_tables (void *cls)
           ",PRIMARY KEY (coin_pub, merchant_pub, h_proposal_data, 
rtransaction_id)" /* this combo must be unique, and we usually select by 
coin_pub */
           ") ");
 
+  /* This table contains the data for
+     wire transfers the exchange has executed. */
+  SQLEXEC("CREATE TABLE IF NOT EXISTS wire_out "
+          "(wireout_uuid BIGSERIAL PRIMARY KEY"
+          ",execution_date INT8 NOT NULL"
+          ",wtid_raw BYTEA UNIQUE NOT NULL CHECK (LENGTH(wtid_raw)=" 
TALER_WIRE_TRANSFER_IDENTIFIER_LEN_STR ")"
+          ",wire_target TEXT NOT NULL"
+          ",amount_val INT8 NOT NULL"
+          ",amount_frac INT4 NOT NULL"
+          ",amount_curr VARCHAR("TALER_CURRENCY_LEN_STR") NOT NULL"
+          ")");
+
   /* Table for the tracking API, mapping from wire transfer identifiers
      to transactions and back */
   SQLEXEC("CREATE TABLE IF NOT EXISTS aggregation_tracking "
           "(deposit_serial_id INT8 PRIMARY KEY REFERENCES deposits 
(deposit_serial_id) ON DELETE CASCADE"
-          ",wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=" 
TALER_WIRE_TRANSFER_IDENTIFIER_LEN_STR ")"
+          ",wtid_raw BYTEA  CONSTRAINT wire_out_ref REFERENCES 
wire_out(wtid_raw) ON DELETE CASCADE DEFERRABLE"
           ",execution_time INT8 NOT NULL"
           ")");
   /* Index for lookup_transactions statement on wtid */
@@ -505,18 +527,6 @@ postgres_create_tables (void *cls)
   SQLEXEC_INDEX("CREATE INDEX prepare_iteration_index "
                 "ON prewire(type,finished)");
 
-  /* This table contains the data for
-     wire transfers the exchange has executed. */
-  SQLEXEC("CREATE TABLE IF NOT EXISTS wire_out "
-          "(wireout_uuid BIGSERIAL PRIMARY KEY"
-          ",execution_date INT8 NOT NULL"
-          ",wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=" 
TALER_WIRE_TRANSFER_IDENTIFIER_LEN_STR ")"
-          ",wire_target TEXT NOT NULL"
-          ",amount_val INT8 NOT NULL"
-          ",amount_frac INT4 NOT NULL"
-          ",amount_curr VARCHAR("TALER_CURRENCY_LEN_STR") NOT NULL"
-          ")");
-
 #undef SQLEXEC
 #undef SQLEXEC_INDEX
 
@@ -545,7 +555,7 @@ postgres_prepare (PGconn *db_conn)
     result = PQprepare (db_conn, name, sql, __VA_ARGS__);       \
     if (PGRES_COMMAND_OK != PQresultStatus (result))            \
     {                                                           \
-      BREAK_DB_ERR (result);                                    \
+      BREAK_DB_ERR (result, db_conn);                           \
       PQclear (result); result = NULL;                          \
       return GNUNET_SYSERR;                                     \
     }                                                           \
@@ -1270,7 +1280,7 @@ postgres_prepare (PGconn *db_conn)
            "INSERT INTO aggregation_tracking "
            "(deposit_serial_id"
            ",wtid_raw"
-           ",execution_time"
+           ",execution_time" /* TODO: this field should be eliminated and 
obtained from wire_out */
            ") VALUES "
            "($1, $2, $3)",
            3, NULL);
@@ -1601,7 +1611,7 @@ postgres_insert_denomination_info (void *cls,
   if (PGRES_COMMAND_OK != PQresultStatus (result))
   {
     ret = GNUNET_SYSERR;
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
   }
   else
   {
@@ -2311,7 +2321,7 @@ postgres_have_deposit (void *cls,
   if (PGRES_TUPLES_OK !=
       PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -2402,7 +2412,7 @@ postgres_mark_deposit_tiny (void *cls,
   if (PGRES_COMMAND_OK !=
       PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -2441,7 +2451,7 @@ postgres_test_deposit_done (void *cls,
   if (PGRES_TUPLES_OK !=
       PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -2507,7 +2517,7 @@ postgres_mark_deposit_done (void *cls,
   if (PGRES_COMMAND_OK !=
       PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -2549,7 +2559,7 @@ postgres_get_ready_deposit (void *cls,
   if (PGRES_TUPLES_OK !=
       PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -2651,7 +2661,7 @@ postgres_iterate_matching_deposits (void *cls,
   if (PGRES_TUPLES_OK !=
       PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -2742,7 +2752,7 @@ get_known_coin (void *cls,
                                    params);
   if (PGRES_TUPLES_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -2809,7 +2819,7 @@ insert_known_coin (void *cls,
                                    params);
   if (PGRES_COMMAND_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -2874,7 +2884,7 @@ postgres_insert_deposit (void *cls,
                                     params);
   if (PGRES_COMMAND_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     ret = GNUNET_SYSERR;
   }
   else
@@ -2919,7 +2929,7 @@ postgres_insert_refund (void *cls,
   if (PGRES_COMMAND_OK != PQresultStatus (result))
   {
     ret = GNUNET_SYSERR;
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
   }
   else
   {
@@ -2959,7 +2969,7 @@ postgres_get_refresh_session (void *cls,
                                     params);
   if (PGRES_TUPLES_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -3076,7 +3086,7 @@ postgres_create_refresh_session (void *cls,
                                    params);
   if (PGRES_COMMAND_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -3124,7 +3134,7 @@ postgres_insert_refresh_order (void *cls,
     }
     if (PGRES_COMMAND_OK != PQresultStatus (result))
     {
-      BREAK_DB_ERR (result);
+      BREAK_DB_ERR (result, session->conn);
       PQclear (result);
       return GNUNET_SYSERR;
     }
@@ -3199,7 +3209,7 @@ postgres_get_refresh_order (void *cls,
     }
     if (PGRES_TUPLES_OK != PQresultStatus (result))
     {
-      BREAK_DB_ERR (result);
+      BREAK_DB_ERR (result, session->conn);
       PQclear (result);
       free_dpk_result (denom_pubs, i);
       return GNUNET_SYSERR;
@@ -3275,7 +3285,7 @@ postgres_insert_refresh_commit_coins (void *cls,
     }
     if (PGRES_COMMAND_OK != PQresultStatus (result))
     {
-      BREAK_DB_ERR (result);
+      BREAK_DB_ERR (result, session->conn);
       PQclear (result);
       return GNUNET_SYSERR;
     }
@@ -3354,7 +3364,7 @@ postgres_get_refresh_commit_coins (void *cls,
                                      params);
     if (PGRES_TUPLES_OK != PQresultStatus (result))
     {
-      BREAK_DB_ERR (result);
+      BREAK_DB_ERR (result, session->conn);
       PQclear (result);
       postgres_free_refresh_commit_coins (cls,
                                           i,
@@ -3426,7 +3436,7 @@ postgres_insert_refresh_transfer_public_key (void *cls,
                                     params);
   if (PGRES_COMMAND_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -3470,7 +3480,7 @@ postgres_get_refresh_transfer_public_key (void *cls,
                                     params);
   if (PGRES_TUPLES_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -3538,7 +3548,7 @@ postgres_get_refresh_out (void *cls,
                                     params);
   if (PGRES_TUPLES_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -3595,7 +3605,7 @@ postgres_insert_refresh_out (void *cls,
                                     params);
   if (PGRES_COMMAND_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -3633,7 +3643,7 @@ postgres_get_link_data_list (void *cls,
   ldl = NULL;
   if (PGRES_TUPLES_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return NULL;
   }
@@ -3719,7 +3729,7 @@ postgres_get_transfer (void *cls,
   if (PGRES_TUPLES_OK !=
       PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -3872,7 +3882,7 @@ postgres_get_coin_transactions (void *cls,
                                       params);
     if (PGRES_TUPLES_OK != PQresultStatus (result))
     {
-      BREAK_DB_ERR (result);
+      BREAK_DB_ERR (result, session->conn);
       PQclear (result);
       goto cleanup;
     }
@@ -3943,7 +3953,7 @@ postgres_get_coin_transactions (void *cls,
                                       params);
     if (PGRES_TUPLES_OK != PQresultStatus (result))
     {
-      BREAK_DB_ERR (result);
+      BREAK_DB_ERR (result, session->conn);
       PQclear (result);
       goto cleanup;
     }
@@ -4043,7 +4053,7 @@ postgres_lookup_wire_transfer (void *cls,
                                     params);
   if (PGRES_TUPLES_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -4159,7 +4169,7 @@ postgres_wire_lookup_deposit_wtid (void *cls,
                                     params);
   if (PGRES_TUPLES_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -4187,7 +4197,7 @@ postgres_wire_lookup_deposit_wtid (void *cls,
                                         params2);
       if (PGRES_TUPLES_OK != PQresultStatus (result))
       {
-        BREAK_DB_ERR (result);
+        BREAK_DB_ERR (result, session->conn);
         PQclear (result);
         return GNUNET_SYSERR;
       }
@@ -4301,7 +4311,7 @@ postgres_insert_aggregation_tracking (void *cls,
                                    params);
   if (PGRES_COMMAND_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -4361,7 +4371,7 @@ postgres_get_wire_fee (void *cls,
   if (PGRES_TUPLES_OK !=
       PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -4465,7 +4475,7 @@ postgres_insert_wire_fee (void *cls,
                                    params);
   if (PGRES_COMMAND_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -4509,7 +4519,7 @@ postgres_wire_prepare_data_insert (void *cls,
                                    params);
   if (PGRES_COMMAND_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -4543,7 +4553,7 @@ postgres_wire_prepare_data_mark_finished (void *cls,
   if (PGRES_COMMAND_OK !=
       PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -4634,6 +4644,51 @@ postgres_wire_prepare_data_get (void *cls,
 
 
 /**
+ * Start a transaction where we transiently violate the foreign
+ * constraints on the "wire_out" table as we insert aggregations
+ * and only add the wire transfer out at the end.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param session connection to use
+ * @return #GNUNET_OK on success
+ */
+static int
+postgres_start_deferred_wire_out (void *cls,
+                                  struct TALER_EXCHANGEDB_Session *session)
+{
+  PGresult *result;
+  ExecStatusType ex;
+
+  result = PQexec (session->conn,
+                   "START TRANSACTION ISOLATION LEVEL SERIALIZABLE");
+  if (PGRES_COMMAND_OK !=
+      (ex = PQresultStatus (result)))
+  {
+    TALER_LOG_ERROR ("Failed to start transaction (%s): %s\n",
+                     PQresStatus (ex),
+                     PQerrorMessage (session->conn));
+    GNUNET_break (0);
+    PQclear (result);
+    return GNUNET_SYSERR;
+  }
+  result = PQexec (session->conn,
+                   "SET CONSTRAINTS wire_out_ref DEFERRED");
+  if (PGRES_COMMAND_OK !=
+      (ex = PQresultStatus (result)))
+  {
+    TALER_LOG_ERROR ("Failed to defer wire_out_ref constraint on transaction 
(%s): %s\n",
+                     PQresStatus (ex),
+                     PQerrorMessage (session->conn));
+    GNUNET_break (0);
+    PQclear (result);
+    return GNUNET_SYSERR;
+  }
+  PQclear (result);
+  return GNUNET_OK;
+}
+
+
+/**
  * Store information about an outgoing wire transfer that was executed.
  *
  * @param cls closure
@@ -4667,7 +4722,7 @@ postgres_store_wire_transfer_out (void *cls,
                                     params);
   if (PGRES_COMMAND_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -4714,7 +4769,7 @@ postgres_gc (void *cls)
                                     params_none);
   if (PGRES_COMMAND_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, conn);
     PQclear (result);
     PQfinish (conn);
     return GNUNET_SYSERR;
@@ -4725,7 +4780,7 @@ postgres_gc (void *cls)
                                     params_time);
   if (PGRES_COMMAND_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, conn);
     PQclear (result);
     PQfinish (conn);
     return GNUNET_SYSERR;
@@ -4736,7 +4791,7 @@ postgres_gc (void *cls)
                                     params_time);
   if (PGRES_COMMAND_OK != PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, conn);
     PQclear (result);
     PQfinish (conn);
     return GNUNET_SYSERR;
@@ -4777,7 +4832,7 @@ postgres_select_deposits_above_serial_id (void *cls,
   if (PGRES_TUPLES_OK !=
       PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -4881,7 +4936,7 @@ postgres_select_refreshs_above_serial_id (void *cls,
   if (PGRES_TUPLES_OK !=
       PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -4978,7 +5033,7 @@ postgres_select_refunds_above_serial_id (void *cls,
   if (PGRES_TUPLES_OK !=
       PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -5069,7 +5124,7 @@ postgres_select_reserves_in_above_serial_id (void *cls,
   if (PGRES_TUPLES_OK !=
       PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -5164,7 +5219,7 @@ postgres_select_reserves_out_above_serial_id (void *cls,
   if (PGRES_TUPLES_OK !=
       PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -5266,7 +5321,7 @@ postgres_select_wire_out_above_serial_id (void *cls,
   if (PGRES_TUPLES_OK !=
       PQresultStatus (result))
   {
-    BREAK_DB_ERR (result);
+    BREAK_DB_ERR (result, session->conn);
     PQclear (result);
     return GNUNET_SYSERR;
   }
@@ -5417,6 +5472,7 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
   plugin->wire_prepare_data_insert = &postgres_wire_prepare_data_insert;
   plugin->wire_prepare_data_mark_finished = 
&postgres_wire_prepare_data_mark_finished;
   plugin->wire_prepare_data_get = &postgres_wire_prepare_data_get;
+  plugin->start_deferred_wire_out = &postgres_start_deferred_wire_out;
   plugin->store_wire_transfer_out = &postgres_store_wire_transfer_out;
   plugin->gc = &postgres_gc;
   plugin->select_deposits_above_serial_id = 
&postgres_select_deposits_above_serial_id;
diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c
index fba1625..bfa1e6a 100644
--- a/src/exchangedb/test_exchangedb.c
+++ b/src/exchangedb/test_exchangedb.c
@@ -1241,20 +1241,13 @@ test_wire_out (struct TALER_EXCHANGEDB_Session *session,
   GNUNET_assert (GNUNET_OK ==
                  TALER_string_to_amount (CURRENCY ":1",
                                          &wire_out_amount));
+
+  /* we will transiently violate the wtid constraint on
+     the aggregation table, so we need to start the special
+     transaction where this is allowed... */
   FAILIF (GNUNET_OK !=
-          plugin->store_wire_transfer_out (plugin->cls,
-                                           session,
-                                           wire_out_date,
-                                           &wire_out_wtid,
-                                           wire_out_account,
-                                           &wire_out_amount));
-  FAILIF (GNUNET_OK !=
-          plugin->select_wire_out_above_serial_id (plugin->cls,
-                                                   session,
-                                                   0,
-                                                   &audit_wire_cb,
-                                                   NULL));
-  FAILIF (1 != auditor_row_cnt);
+          plugin->start_deferred_wire_out (plugin->cls,
+                                           session));
 
   /* setup values for wire transfer aggregation data */
   merchant_pub_wt = deposit->merchant_pub;
@@ -1289,6 +1282,7 @@ test_wire_out (struct TALER_EXCHANGEDB_Session *session,
                                               &cb_wtid_never,
                                               NULL));
   }
+  wtid_wt = wire_out_wtid; /* to statisfy foreign constraint */
   /* insert WT data */
   FAILIF (GNUNET_OK !=
           plugin->insert_aggregation_tracking (plugin->cls,
@@ -1312,6 +1306,27 @@ test_wire_out (struct TALER_EXCHANGEDB_Session *session,
                                             &cb_wtid_check,
                                             &cb_wtid_never));
 
+  /* Now let's fix the transient constraint violation by
+     putting in the WTID into the wire_out table */
+  FAILIF (GNUNET_OK !=
+          plugin->store_wire_transfer_out (plugin->cls,
+                                           session,
+                                           wire_out_date,
+                                           &wire_out_wtid,
+                                           wire_out_account,
+                                           &wire_out_amount));
+  FAILIF (GNUNET_OK !=
+          plugin->select_wire_out_above_serial_id (plugin->cls,
+                                                   session,
+                                                   0,
+                                                   &audit_wire_cb,
+                                                   NULL));
+  FAILIF (1 != auditor_row_cnt);
+
+  /* And now the commit should still succeed! */
+  FAILIF (GNUNET_OK !=
+          plugin->commit (plugin->cls,
+                          session));
 
   return GNUNET_OK;
  drop:
diff --git a/src/include/taler_exchangedb_plugin.h 
b/src/include/taler_exchangedb_plugin.h
index ef49074..4ab3e4a 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -1640,6 +1640,21 @@ struct TALER_EXCHANGEDB_Plugin
 
 
   /**
+   * Start a transaction where we transiently violate the foreign
+   * constraints on the "wire_out" table as we insert aggregations
+   * and only add the wire transfer out at the end.
+   *
+   * @param cls the @e cls of this struct with the plugin-specific state
+   * @param session connection to use
+   * @return #GNUNET_OK on success
+   */
+  int
+  (*start_deferred_wire_out) (void *cls,
+                              struct TALER_EXCHANGEDB_Session *session);
+
+
+
+  /**
    * Store information about an outgoing wire transfer that was executed.
    *
    * @param cls closure

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]