pspp-dev
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[bug37999 10/11] RANK: Adopt a new ranking implementation.


From: Ben Pfaff
Subject: [bug37999 10/11] RANK: Adopt a new ranking implementation.
Date: Thu, 31 Jan 2013 22:03:32 -0800

Before this commit, the implementation of RANK made multiple passes
through the active file, opening and closing it (with proc_open()
and proc_commit()) as many times as there were input variables.
This worked in simple cases, but it could never work with
TEMPORARY since the second proc_open() will see a different set
of data from the first one.

This commit rewrites RANK to open and read the active file only
once.  It does not make RANK properly work with TEMPORARY, but
it brings it much closer.  It may also be faster in some cases
because, although it makes the same number of passes through
the input data (necessarily), each pass discards all the input
columns except the ones that are really need for that pass.
---
 src/language/stats/rank.c |  334 ++++++++++++++++++++++++++++++++-------------
 1 file changed, 237 insertions(+), 97 deletions(-)

diff --git a/src/language/stats/rank.c b/src/language/stats/rank.c
index a4f8185..19861cc 100644
--- a/src/language/stats/rank.c
+++ b/src/language/stats/rank.c
@@ -144,8 +144,8 @@ enum fraction
 struct rank_spec
 {
   enum rank_func rfunc;
-  struct variable **destvars;
   const char **dest_names;
+  const char **dest_labels;
 };
 
 /* If NEW_NAME exists in DICT or NEW_NAMES, returns NULL without changing
@@ -538,29 +538,24 @@ sum_weights (const struct casereader *input, int 
weight_idx)
 static void
 rank_sorted_file (struct casereader *input,
                   struct casewriter *output,
-                  const struct dictionary *dict,
-                  int dest_idx,
-                 const struct rank *cmd
-                 )
+                  int weight_idx,
+                 const struct rank *cmd)
 {
-  struct variable *weight_var = dict_get_weight (dict);
-  int weight_idx = weight_var ? var_get_case_index (weight_var) : -1;
   struct casegrouper *tie_grouper;
   struct casereader *tied_cases;
+  struct subcase input_var;
+  int tie_group = 1;
   struct ccase *c;
   double cc = 0.0;
   double w;
-  int tie_group = 1;
-
-  input = casereader_create_filter_missing (input, &cmd->vars[dest_idx], 1,
-                                            cmd->exclude, NULL, output);
-  input = casereader_create_filter_weight (input, dict, NULL, output);
 
   /* Get total group weight. */
   w = sum_weights (input, weight_idx);
 
   /* Do ranking. */
-  tie_grouper = casegrouper_create_vars (input, &cmd->vars[dest_idx], 1);
+  subcase_init (&input_var, 0, 0, SC_ASCEND);
+  tie_grouper = casegrouper_create_subcase (input, &input_var);
+  subcase_destroy (&input_var);
   for (; casegrouper_get_next_group (tie_grouper, &tied_cases);
        casereader_destroy (tied_cases))
     {
@@ -572,20 +567,22 @@ rank_sorted_file (struct casereader *input,
                        casewriter_get_taint (output));
 
       /* Rank tied cases. */
-      while ((c = casereader_read (tied_cases)) != NULL)
+      for (; (c = casereader_read (tied_cases)) != NULL; case_unref (c))
         {
+          struct ccase *out_case;
           size_t i;
 
-          c = case_unshare (c);
+          out_case = case_create (casewriter_get_proto (output));
+          case_data_rw_idx (out_case, 0)->f = case_num_idx (c, 1);
           for (i = 0; i < cmd->n_rs; ++i)
             {
-              const struct variable *dst_var = cmd->rs[i].destvars[dest_idx];
-              double *dst_value = &case_data_rw (c, dst_var)->f;
-              *dst_value = rank_func[cmd->rs[i].rfunc] (cmd, tw, cc, cc_1, 
tie_group, w);
+              rank_function_t func = rank_func[cmd->rs[i].rfunc];
+              double rank = func (cmd, tw, cc, cc_1, tie_group, w);
+              case_data_rw_idx (out_case, i + 1)->f = rank;
             }
-          casewriter_write (output, c);
-        }
 
+          casewriter_write (output, out_case);
+        }
       tie_group++;
     }
   casegrouper_destroy (tie_grouper);
@@ -664,9 +661,7 @@ cmd_rank (struct lexer *lexer, struct dataset *ds)
 {
   struct string_set new_names;
   struct rank rank;
-  struct variable *order;
   struct rank_spec *rs;
-  bool result = true;
   int i;
 
   subcase_init_empty (&rank.sc);
@@ -803,6 +798,7 @@ cmd_rank (struct lexer *lexer, struct dataset *ds)
       rs = pool_calloc (rank.pool, 1, sizeof *rs);
       rs->rfunc = RANK;
       rs->dest_names = pool_calloc (rank.pool, 1, sizeof *rs->dest_names);
+      rs->dest_labels = pool_calloc (rank.pool, 1, sizeof *rs->dest_labels);
 
       rank.rs = rs;
       rank.n_rs = 1;
@@ -814,6 +810,8 @@ cmd_rank (struct lexer *lexer, struct dataset *ds)
     {
       int v;
 
+      rs->dest_labels = pool_calloc (rank.pool, rank.n_vars,
+                                     sizeof *rs->dest_labels);
       for ( v = 0 ; v < rank.n_vars ;  v ++ )
         {
           const char **dst_name = &rs->dest_names[v];
@@ -825,26 +823,9 @@ cmd_rank (struct lexer *lexer, struct dataset *ds)
               if (*dst_name == NULL)
                 goto error;
             }
-        }
-    }
-
-  /* Create variables. */
-  for (rs = rank.rs; rs < &rank.rs[rank.n_rs]; rs++)
-    rs->destvars = pool_nmalloc (rank.pool, rank.n_vars, sizeof *rs->destvars);
-  for ( i = 0 ; i < rank.n_vars ;  i ++ )
-    {
-      struct variable *var;
 
-      for (rs = rank.rs; rs < &rank.rs[rank.n_rs]; rs++)
-        {
-          var = dict_create_var_assert (dataset_dict (ds),
-                                        rs->dest_names[i], 0);
-          var_set_both_formats (var, &dest_format[rs->rfunc]);
-          var_set_label (var, create_var_label (&rank, rank.vars[i],
-                                                rs->rfunc),
-                         false);
-
-          rs->destvars[i] = var;
+          rs->dest_labels[v] = create_var_label (&rank, rank.vars[v],
+                                                 rs->rfunc);
         }
     }
 
@@ -918,32 +899,8 @@ cmd_rank (struct lexer *lexer, struct dataset *ds)
        }
     }
 
-  /* Add a variable which we can sort by to get back the original
-     order */
-  order = dict_create_var_assert (dataset_dict (ds), "$ORDER_", 0);
-
-  add_transformation (ds, create_resort_key, 0, order);
-
   /* Do the ranking */
-  result = rank_cmd (ds, &rank);
-  
-  /* Put the active dataset back in its original order.  Delete
-     our sort key, which we don't need anymore.  */
-  {
-    struct casereader *sorted;
-
-
-    /* FIXME: loses error conditions. */
-
-    proc_discard_output (ds);
-    sorted = sort_execute_1var (proc_open (ds), order);
-    result = proc_commit (ds) && result;
-
-    dict_delete_var (dataset_dict (ds), order);
-    result = dataset_set_source (ds, sorted) && result;
-    if ( result != true)
-      goto error;
-  }
+  rank_cmd (ds, &rank);
 
   destroy_rank (&rank);
   string_set_destroy (&new_names);
@@ -956,61 +913,244 @@ cmd_rank (struct lexer *lexer, struct dataset *ds)
   return CMD_FAILURE;
 }
 
+/* RANK transformation. */
+struct rank_trns
+  {
+    int order_case_idx;
+
+    struct rank_trns_input_var *input_vars;
+    size_t n_input_vars;
+
+    size_t n_funcs;
+  };
+
+struct rank_trns_input_var
+  {
+    struct casereader *input;
+    struct ccase *current;
+
+    struct variable **output_vars;
+  };
+
+static void
+advance_ranking (struct rank_trns_input_var *iv)
+{
+  case_unref (iv->current);
+  iv->current = casereader_read (iv->input);
+}
 
+static int
+rank_trns_proc (void *trns_, struct ccase **c, casenumber case_idx UNUSED)
+{
+  struct rank_trns *trns = trns_;
+  double order = case_num_idx (*c, trns->order_case_idx);
+  struct rank_trns_input_var *iv;
+
+  *c = case_unshare (*c);
+  for (iv = trns->input_vars; iv < &trns->input_vars[trns->n_input_vars]; iv++)
+    while (iv->current != NULL)
+      {
+        double iv_order = case_num_idx (iv->current, 0);
+        if (iv_order == order)
+          {
+            size_t i;
+
+            for (i = 0; i < trns->n_funcs; i++)
+              case_data_rw (*c, iv->output_vars[i])->f
+                = case_num_idx (iv->current, i + 1);
+            advance_ranking (iv);
+            break;
+          }
+        else if (iv_order > order)
+          break;
+        else
+          advance_ranking (iv);
+      }
+  return TRNS_CONTINUE;
+}
+
+static bool
+rank_trns_free (void *trns_)
+{
+  struct rank_trns *trns = trns_;
+  struct rank_trns_input_var *iv;
+
+  for (iv = trns->input_vars; iv < &trns->input_vars[trns->n_input_vars]; iv++)
+    {
+      casereader_destroy (iv->input);
+      case_unref (iv->current);
+
+      free (iv->output_vars);
+    }
+  free (trns->input_vars);
+  free (trns);
+
+  return true;
+}
 
 static bool
 rank_cmd (struct dataset *ds, const struct rank *cmd)
 {
   struct dictionary *d = dataset_dict (ds);
+  struct variable *weight_var = dict_get_weight (d);
+  struct casewriter **outputs;
+  struct variable *order_var;
+  struct casereader *input;
+  struct rank_trns *trns;
   bool ok = true;
   int i;
 
-  for (i = 0 ; i < subcase_get_n_fields (&cmd->sc) ; ++i )
+  /* Add a variable which we can sort by to get back the original
+     order */
+  order_var = dict_create_var_assert (dataset_dict (ds), "$ORDER", 0);
+
+  add_transformation (ds, create_resort_key, 0, order_var);
+
+  /* Create output files. */
+  {
+    struct caseproto *output_proto;
+    struct subcase by_order;
+
+    output_proto = caseproto_create ();
+    for (i = 0; i < cmd->n_rs + 1; i++)
+      output_proto = caseproto_add_width (output_proto, 0);
+
+    subcase_init (&by_order, 0, 0, SC_ASCEND);
+
+    outputs = xnmalloc (cmd->n_vars, sizeof *outputs);
+    for (i = 0; i < cmd->n_vars; i++)
+      outputs[i] = sort_create_writer (&by_order, output_proto);
+
+    subcase_destroy (&by_order);
+    caseproto_unref (output_proto);
+  }
+
+  /* Open the active file and make one pass per input variable. */
+  input = proc_open (ds);
+  input = casereader_create_filter_weight (input, d, NULL, NULL);
+  for (i = 0 ; i < cmd->n_vars ; ++i )
     {
-      /* Rank variable at index I in SC. */
+      const struct variable *input_var = cmd->vars[i];
+      struct casereader *input_pass;
       struct casegrouper *split_grouper;
       struct casereader *split_group;
-      struct casewriter *output;
-
-      proc_discard_output (ds);
-      split_grouper = casegrouper_create_splits (proc_open (ds), d);
-      output = autopaging_writer_create (dict_get_proto (d));
+      struct subcase rank_ordering;
+      struct subcase projection;
+      struct subcase split_vars;
+      struct subcase group_vars;
+      int weight_idx;
+      int j;
 
+      /* Discard cases that have missing values of input variable. */
+      input_pass = i == cmd->n_vars - 1 ? input : casereader_clone (input);
+      input_pass = casereader_create_filter_missing (input_pass, &input_var, 1,
+                                                     cmd->exclude, NULL, NULL);
+
+      /* Keep only the columns we really need, to save time and space when we
+         sort them just below.
+
+         After this projection, the input_pass case indexes look like:
+
+           - 0: input_var.
+           - 1: order_var.
+           - 2 and up: cmd->n_group_vars group variables
+           - 2 + cmd->n_group_vars and up: split variables
+           - 2 + cmd->n_group_vars + n_split_vars: weight var
+      */
+      subcase_init_empty (&projection);
+      subcase_add_var_always (&projection, input_var, SC_ASCEND);
+      subcase_add_var_always (&projection, order_var, SC_ASCEND);
+      subcase_add_vars_always (&projection,
+                               cmd->group_vars, cmd->n_group_vars);
+      subcase_add_vars_always (&projection, dict_get_split_vars (d),
+                               dict_get_split_cnt (d));
+      if (weight_var != NULL)
+        {
+          subcase_add_var_always (&projection, weight_var, SC_ASCEND);
+          weight_idx = 2 + cmd->n_group_vars + dict_get_split_cnt (d);
+        }
+      else
+        weight_idx = -1;
+      input_pass = casereader_project (input_pass, &projection);
+      subcase_destroy (&projection);
+
+      /* Prepare 'group_vars' as the set of grouping variables. */
+      subcase_init_empty (&group_vars);
+      for (j = 0; j < cmd->n_group_vars; j++)
+        subcase_add_always (&group_vars,
+                            j + 2, var_get_width (cmd->group_vars[j]),
+                            SC_ASCEND);
+
+      /* Prepare 'rank_ordering' for sorting with the group variables as
+         primary key and the input variable as secondary key. */
+      subcase_clone (&rank_ordering, &group_vars);
+      subcase_add (&rank_ordering, 0, 0, subcase_get_direction (&cmd->sc, i));
+
+      /* Group by split variables */
+      subcase_init_empty (&split_vars);
+      for (j = 0; j < dict_get_split_cnt (d); j++)
+        subcase_add_always (&split_vars, 2 + j + cmd->n_group_vars,
+                            var_get_width (dict_get_split_vars (d)[j]),
+                            SC_ASCEND);
+      split_grouper = casegrouper_create_subcase (input_pass, &split_vars);
+      subcase_destroy (&split_vars);
       while (casegrouper_get_next_group (split_grouper, &split_group))
         {
-          struct subcase ordering;
           struct casereader *ordered;
           struct casegrouper *by_grouper;
           struct casereader *by_group;
 
-          /* Sort this split group by the BY variables as primary
-             keys and the rank variable as secondary key. */
-          subcase_init_vars (&ordering, cmd->group_vars, cmd->n_group_vars);
-          subcase_add_var (&ordering, cmd->vars[i],
-                           subcase_get_direction (&cmd->sc, i));
-          ordered = sort_execute (split_group, &ordering);
-          subcase_destroy (&ordering);
-
-          /* Rank the rank variable within this split group. */
-          by_grouper = casegrouper_create_vars (ordered,
-                                                cmd->group_vars, 
cmd->n_group_vars);
+          ordered = sort_execute (split_group, &rank_ordering);
+          by_grouper = casegrouper_create_subcase (ordered, &group_vars);
           while (casegrouper_get_next_group (by_grouper, &by_group))
-            {
-              /* Rank the rank variable within this BY group
-                 within the split group. */
+            rank_sorted_file (by_group, outputs[i], weight_idx, cmd);
+          ok = casegrouper_destroy (by_grouper) && ok;
+        }
+      subcase_destroy (&group_vars);
+      subcase_destroy (&rank_ordering);
 
-              rank_sorted_file (by_group, output, d,  i, cmd);
+      ok = casegrouper_destroy (split_grouper) && ok;
+    }
+  ok = proc_commit (ds) && ok;
+
+  /* Re-fetch the dictionary and order variable, because if TEMPORARY was in
+     effect then there's a new dictionary. */
+  d = dataset_dict (ds);
+  order_var = dict_lookup_var_assert (d, "$ORDER");
+
+  /* Merge the original data set with the ranks (which we already sorted on
+     $ORDER). */
+  trns = xmalloc (sizeof *trns);
+  trns->order_case_idx = var_get_case_index (order_var);
+  trns->input_vars = xnmalloc (cmd->n_vars, sizeof *trns->input_vars);
+  trns->n_input_vars = cmd->n_vars;
+  trns->n_funcs = cmd->n_rs;
+  for (i = 0; i < trns->n_input_vars; i++)
+    {
+      struct rank_trns_input_var *iv = &trns->input_vars[i];
+      int j;
 
-            }
-          ok = casegrouper_destroy (by_grouper) && ok;
+      iv->input = casewriter_make_reader (outputs[i]);
+      iv->current = casereader_read (iv->input);
+      iv->output_vars = xnmalloc (trns->n_funcs, sizeof *iv->output_vars);
+      for (j = 0; j < trns->n_funcs; j++)
+        {
+          struct rank_spec *rs = &cmd->rs[j];
+          struct variable *var;
+
+          var = dict_create_var_assert (d, rs->dest_names[i], 0);
+          var_set_both_formats (var, &dest_format[rs->rfunc]);
+          var_set_label (var, rs->dest_labels[i], false);
+
+          iv->output_vars[j] = var;
         }
-      ok = casegrouper_destroy (split_grouper);
-      ok = proc_commit (ds) && ok;
-      ok = (dataset_set_source (ds, casewriter_make_reader (output))
-            && ok);
-      if (!ok)
-        break;
     }
+  free (outputs);
+
+  add_transformation (ds, rank_trns_proc, rank_trns_free, trns);
+
+  /* Delete our sort key, which we don't need anymore. */
+  dict_delete_var (d, order_var);
 
   return ok;
 }
-- 
1.7.10.4




reply via email to

[Prev in Thread] Current Thread [Next in Thread]