bug-coreutils
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH] sort: Add --threads option, which parallelizes internal sort


From: Chen Guo
Subject: Re: [PATCH] sort: Add --threads option, which parallelizes internal sort.
Date: Sat, 17 Oct 2009 15:21:50 -0700 (PDT)

Hi all,
    In my last patch submission I noted while sorting in LC_ALL the endline 
characters of a couple of lines would be randomly cut off. The cause was 
memcoll being not threadsafe, I've since included a workaround.

diff --git a/bootstrap.conf b/bootstrap.conf
index 6671027..a0959b8 100644
--- a/bootstrap.conf
+++ b/bootstrap.conf
@@ -153,6 +153,7 @@ gnulib_modules="
   modechange
   mountlist
   mpsort
+  nproc
   obstack
   pathmax
   perl
@@ -163,6 +164,7 @@ gnulib_modules="
   priv-set
   progname
   propername
+  pthread
   putenv
   quote
   quotearg
diff --git a/src/sort.c b/src/sort.c
index 62ddd49..31ff732 100644
--- a/src/sort.c
+++ b/src/sort.c
@@ -23,6 +23,7 @@
 #include <config.h>
 
 #include <getopt.h>
+#include <pthread.h>
 #include <sys/types.h>
 #include <sys/wait.h>
 #include <signal.h>
@@ -32,6 +33,7 @@
 #include "filevercmp.h"
 #include "hash.h"
 #include "md5.h"
+#include "nproc.h"
 #include "physmem.h"
 #include "posixver.h"
 #include "quote.h"
@@ -83,6 +85,13 @@ struct rlimit { size_t rlim_cur; };
 # define OPEN_MAX 20
 #endif
 
+/* Heuristic value for the number of lines for which it is worth
+   creating a subthread, during an internal merge sort, on a machine
+   that has processors galore.  Currently this number is just a guess.
+   This value must be at least 4.  We don't know of any machine where
+   this number has any practical effect.  */
+enum { SUBTHREAD_LINES_HEURISTIC = 4 };
+
 #define UCHAR_LIM (UCHAR_MAX + 1)
 
 #ifndef DEFAULT_TMPDIR
@@ -292,8 +301,6 @@ static char const *compress_program;
    number are present, temp files will be used. */
 static unsigned int nmerge = NMERGE_DEFAULT;
 
-static void sortlines_temp (struct line *, size_t, struct line *);
-
 /* Report MESSAGE for FILE, then clean up and exit.
    If FILE is null, it represents standard output.  */
 
@@ -386,6 +393,7 @@ Other options:\n\
   -t, --field-separator=SEP  use SEP instead of non-blank to blank 
transition\n\
   -T, --temporary-directory=DIR  use DIR for temporaries, not $TMPDIR or %s;\n\
                               multiple options specify multiple directories\n\
+      --threads=N           use no more than N threads to improve 
parallelism\n\
   -u, --unique              with -c, check for strict ordering;\n\
                               without -c, output only the first of an equal 
run\n\
 "), DEFAULT_TMPDIR);
@@ -429,7 +437,8 @@ enum
   FILES0_FROM_OPTION,
   NMERGE_OPTION,
   RANDOM_SOURCE_OPTION,
-  SORT_OPTION
+  SORT_OPTION,
+  THREADS_OPTION
 };
 
 static char const short_options[] = "-bcCdfghik:mMno:rRsS:t:T:uVy:z";
@@ -462,6 +471,7 @@ static struct option const long_options[] =
   {"temporary-directory", required_argument, NULL, 'T'},
   {"unique", no_argument, NULL, 'u'},
   {"zero-terminated", no_argument, NULL, 'z'},
+  {"threads", required_argument, NULL, THREADS_OPTION},
   {GETOPT_HELP_OPTION_DECL},
   {GETOPT_VERSION_OPTION_DECL},
   {NULL, 0, NULL, 0},
@@ -1262,6 +1272,21 @@ specify_sort_size (int oi, char c, char const *s)
   xstrtol_fatal (e, oi, c, long_options, s);
 }
 
+/* Specify the number of threads to spawn during internal sort.  */
+static unsigned long int
+specify_nthreads (int oi, char c, char const *s)
+{
+  unsigned long int nthreads;
+  enum strtol_error e = xstrtoul (s, NULL, 10, &nthreads, "");
+  if (e == LONGINT_OVERFLOW)
+    return ULONG_MAX;
+  if (e != LONGINT_OK)
+    xstrtol_fatal (e, oi, c, long_options, s);
+  if (nthreads == 0)
+    error (SORT_FAILURE, 0, _("number of threads must be nonzero"));
+  return nthreads;
+}
+
 /* Return the default sort size.  */
 static size_t
 default_sort_size (void)
@@ -1473,7 +1498,7 @@ limfield (const struct line *line, const struct keyfield 
*key)
       {
     while (ptr < lim && *ptr != tab)
       ++ptr;
-    if (ptr < lim && (eword | echar))
+        if (ptr < lim && (eword || echar))
       ++ptr;
       }
   else
@@ -1701,9 +1726,28 @@ check_mixed_SI_IEC (char prefix, struct keyfield *key)
 static int
 find_unit_order (const char *number, struct keyfield *key)
 {
-  static const char orders [UCHAR_LIM] = {
+  static const char orders [UCHAR_LIM] =
+    {
+#if SOME_DAY_WE_WILL_REQUIRE_C99
     ['K']=1, ['M']=2, ['G']=3, ['T']=4, ['P']=5, ['E']=6, ['Z']=7, ['Y']=8,
     ['k']=1,
+#else
+      /* Generate the following table with this command:
+         perl -e 'my %a=(k=>1, K=>1, M=>2, G=>3, T=>4, P=>5, E=>6, Z=>7, Y=>8);
+         foreach my $i (0..255) {my $c=chr($i); $a{$c} ||= 0;print "$a{$c}, 
"}'\
+         |fmt  */
+      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, 0, 3,
+      0, 0, 0, 1, 0, 2, 0, 0, 5, 0, 0, 0, 4, 0, 0, 0, 0, 8, 7, 0, 0, 0, 0, 0,
+      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+      0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
+#endif
   };
 
   const unsigned char *p = number;
@@ -1982,7 +2026,7 @@ compare_version (char *restrict texta, size_t lena,
 static int
 keycompare (const struct line *a, const struct line *b)
 {
-  struct keyfield *key = keylist;
+  struct keyfield const *key = keylist;
 
   /* For the first iteration only, the key positions have been
      precomputed for us. */
@@ -2010,7 +2054,7 @@ keycompare (const struct line *a, const struct line *b)
 
       if (key->random)
     diff = compare_random (texta, lena, textb, lenb);
-      else if (key->numeric | key->general_numeric | key->human_numeric)
+      else if (key->numeric || key->general_numeric || key->human_numeric)
     {
       char savea = *lima, saveb = *limb;
 
@@ -2175,7 +2219,7 @@ compare (const struct line *a, const struct line *b)
   if (keylist)
     {
       diff = keycompare (a, b);
-      if (diff | unique | stable)
+      if (diff || unique || stable)
     return diff;
     }
 
@@ -2535,10 +2579,13 @@ mergefiles (struct sortfile *files, size_t ntemps, 
size_t nfiles,
    NHI must be positive, and HI - NHI must equal T - (NLO + NHI).  */
 
 static inline void
-mergelines (struct line *t,
-        struct line const *lo, size_t nlo,
-        struct line const *hi, size_t nhi)
+mergelines (struct line *t, size_t nlines,
+            struct line const *restrict lo)
 {
+  size_t nlo = nlines / 2;
+  size_t nhi = nlines - nlo;
+  struct line const *hi = t - nlo;
+
   for (;;)
     if (compare (lo - 1, hi - 1) <= 0)
       {
@@ -2565,28 +2612,69 @@ mergelines (struct line *t,
       }
 }
 
-/* Sort the array LINES with NLINES members, using TEMP for temporary space.
-   NLINES must be at least 2.
+static void sortlines_merge (struct line *restrict, size_t, struct line 
*restrict,
+                     size_t, bool);
+
+/* Thread arguments for sortlines_thread.  */
+struct merge_args
+{
+  struct line *lines;
+  size_t nlines;
+  struct line *temp;
+  unsigned long int nthreads;
+  bool to_temp;
+};
+
+/* Like sortlines, except with a signature acceptable to pthread_create.  */
+static void *
+sortlines_merge_thread (void *data)
+{
+  struct merge_args const *args = data;
+  sortlines_merge (args->lines, args->nlines, args->temp, args->nthreads,
+               args->to_temp);
+  return NULL;
+}
+
+/* Sort the array LINES with NLINES members, using TEMP for temporary space,
+   spawning NTHREADS threads.  NLINES must be at least 2.
    The input and output arrays are in reverse order, and LINES and
    TEMP point just past the end of their respective arrays.
 
+   If TO_TEMP, place the result in TEMP (trashing LINES in the
+   process); otherwise, place the result back into LINES so that it is
+   an in-place sort (trashing TEMP in the process).
+
    Use a recursive divide-and-conquer algorithm, in the style
-   suggested by Knuth volume 3 (2nd edition), exercise 5.2.4-23.  Use
-   the optimization suggested by exercise 5.2.4-10; this requires room
-   for only 1.5*N lines, rather than the usual 2*N lines.  Knuth
-   writes that this memory optimization was originally published by
-   D. A. Bell, Comp J. 1 (1958), 75.  */
+   suggested by Knuth volume 3 (2nd edition), exercise 5.2.4-23.  If
+   multithreaded, this requires that TEMP contain NLINES entries; if
+   singlethreaded, use the optimization suggested by Knuth exercise
+   5.2.4-10, which requires room for only 1.5*N lines, rather than the
+   usual 2*N lines.  Knuth writes that this memory optimization was
+   originally published by D. A. Bell, Comp J. 1 (1958), 75.
+
+   This function is inline so that its tests for multthreadedness and
+   inplacedness can be optimized away in common cases.  */
 
 static void
-sortlines (struct line *lines, size_t nlines, struct line *temp)
+sortlines_merge (struct line *restrict lines, size_t nlines,
+             struct line *restrict temp, size_t nthreads, bool to_temp)
 {
   if (nlines == 2)
     {
-      if (0 < compare (&lines[-1], &lines[-2]))
+      /* Declare `swap' as int, not bool, to work around a bug
+     <http://lists.gnu.org/archive/html/bug-coreutils/2005-10/msg00086.html>
+     in the IBM xlc 6.0.0.0 compiler in 64-bit mode.  */
+      int swap = (0 < compare (&lines[-1], &lines[-2]));
+      if (to_temp)
     {
-      struct line tmp = lines[-1];
+      temp[-1] = lines[-1 - swap];
+      temp[-2] = lines[-2 + swap];
+    }
+      else if (swap)
+    {
+      temp[-1] = lines[-1];
       lines[-1] = lines[-2];
-      lines[-2] = tmp;
+      lines[-2] = temp[-1];
     }
     }
   else
@@ -2595,46 +2683,386 @@ sortlines (struct line *lines, size_t nlines, struct 
line *temp)
       size_t nhi = nlines - nlo;
       struct line *lo = lines;
       struct line *hi = lines - nlo;
-      struct line *sorted_lo = temp;
+      unsigned long int child_subthreads = nthreads / 2;
+      unsigned long int my_subthreads = nthreads - child_subthreads;
+      pthread_t thread;
+      struct merge_args args = {hi, nhi, temp - nlo, child_subthreads, 
to_temp};
 
-      sortlines (hi, nhi, temp);
+      if (child_subthreads != 0 && SUBTHREAD_LINES_HEURISTIC <= nlines
+      && pthread_create (&thread, NULL, sortlines_merge_thread, &args) == 0)
+    {
+      /* Guarantee that nlo and nhi are each at least 2.  */
+      verify (4 <= SUBTHREAD_LINES_HEURISTIC);
+
+      sortlines_merge (lo, nlo, temp, my_subthreads, !to_temp);
+      pthread_join (thread, NULL);
+    }
+      else
+    {
+      sortlines_merge (hi, nhi, temp - (to_temp ? nlo : 0), 1, to_temp);
       if (1 < nlo)
-    sortlines_temp (lo, nlo, sorted_lo);
+        sortlines_merge (lo, nlo, temp, 1, !to_temp);
+      else if (!to_temp)
+        temp[-1] = lo[-1];
+    }
+
+      struct line *dest;
+      struct line const *sorted_lo;
+      if (to_temp)
+    {
+      dest = temp;
+      sorted_lo = lines;
+    }
       else
-    sorted_lo[-1] = lo[-1];
+    {
+      dest = lines;
+      sorted_lo = temp;
+    }
+      mergelines (dest, nlines, sorted_lo);
+    }
+}
+
+
+struct sortlines_args
+{
+  struct line *lines;
+  size_t nlines;
+  int nthreads;
+  struct line* temp;
+};
+
+static void *
+sortlines_merge_thread2 (void *data)
+{
+  struct sortlines_args const *args = data;
+  sortlines_merge (args->lines, args->nlines, args->temp, args->nthreads, 0);
+  return NULL;
+}
 
-      mergelines (lines, sorted_lo, nlo, hi, nhi);
+static inline void swap_lines (struct line* line1, struct line* line2)
+{
+  struct line tmp = *line1;
+  *line1 = *line2;
+  *line2 = tmp;
+}
+
+/* Finds the median of the three passed in lines. If swap is true, arrange the
+   three lines in order. If swap is false, just return the pointer to the
+   median line.  */
+static struct line*
+medianof3 (struct line* first, struct line* middle, struct line* last,
+           int swap)
+{
+  if (0 < compare (middle, first))            // f < m
+    {
+      if (0 < compare (first, last))          // l < f < m
+        {
+          if (swap)
+            {
+              struct line tmp = *middle;
+              *middle = *first;
+              *first = *last;
+              *last = tmp;
+            }
+          else
+            return first;
+        }
+      else if (0 < compare (middle, last))    // f < l < m
+        {
+          if (swap)
+            swap_lines (middle, last);
+          else
+            return last;
+        }
+      // Else, first < middle < last. Perfect.
+    }
+  else                                        // m < f
+    {
+      if (0 < compare (last, first))          // m < f < l
+        {
+          if (swap)
+            swap_lines (middle, first);
+          else
+            return first;
+        }
+      else if (0 < compare (last, middle))    // m < l < f
+        {
+          if (swap)
+            {
+              struct line tmp = *middle;
+              *middle = *last;
+              *last = *first;
+              *first = tmp;
+            }
+          else
+            return last;
     }
+      else if (swap)                          // l < m < f
+        swap_lines (first, last);
+    }
+  return middle;
+}
+
+
+struct dist_args
+{
+  struct line* iter;
+  size_t nlines;
+  struct line* buf;
+  struct line* pivot;
+};
+
+struct counters
+{
+  size_t nless;
+  size_t neq;
+  size_t ngrt;
+};
+
+/* Distribute lines of a section of the buffer into three piles, depending
+   on how they compare to the pivot. */
+
+static struct counters*
+distribute (struct line *iter, size_t nlines, struct line* buf,
+            struct line* pivot)
+{
+  struct line* bufless = buf;
+  struct line* bufeq = buf - nlines;
+  struct line* bufgrt = bufeq;
+  size_t i = 0;
+  while (i++ < nlines)
+    {
+      int result = compare (pivot, --iter);
+      if (result >= 0)
+        *(--bufless) = *iter;
+      else if (result == 0)
+        *(bufeq++) = *iter;
+      else
+        *(--bufgrt) = *iter;
+    }
+  struct counters* ret = malloc (sizeof (struct counters));
+  ret->nless = buf - bufless;
+  ret->neq = bufeq - buf + nlines;
+  ret->ngrt = buf - nlines- bufgrt;
+  return ret;
+}
+
+static void *
+distribute_thread (void *data)
+{
+  struct dist_args const *args = data;
+  pthread_exit(distribute (args->iter, args->nlines, args->buf, args->pivot));
+  return NULL;
+}
+
+
+struct copy_args
+{
+  struct line* buf;
+  struct line* destless;
+  struct line* desteq;
+  struct line* destgrt;
+  size_t nlines;
+  size_t nless;
+  size_t neq;
+};
+
+/* Copy the three piles of lines created by distribute_lines back into the
+   original buffer at the calculated passed in destinations.  */
+
+static void *
+copy_lines (void* data)
+{
+  struct copy_args* args = data;
+  struct line *bufeq = args->buf - args->nlines;
+  struct line *bufgrt = bufeq;
+  size_t ngrt = args->nlines - args->nless - args->neq;
+  while (args->nless-- != 0)
+    *(--args->destless) = *(--args->buf);
+  while (args->neq-- != 0)
+    *(--args->desteq) = *(bufeq++);
+  while (ngrt-- != 0)
+    *(--args->destgrt) = *(--bufgrt);
+  return NULL;
+}
+
+/* Unfortunately, memcoll isn't threadsafe. This is a workaround so the pivot
+   line isn't corrupted.  */
+
+static void
+copy_pivot (struct line* pivotp, struct line* pivot)
+{
+  pivot->text = malloc (pivotp->length);
+  pivot->text = memcpy (pivot->text, pivotp->text, pivotp->length);
+  pivot->length = pivotp->length;
+  pivot->keybeg = pivotp->keybeg;
+  pivot->keylim = pivotp->keylim;
+  return;
 }
 
-/* Like sortlines (LINES, NLINES, TEMP), except output into TEMP
-   rather than sorting in place.  */
 
 static void
-sortlines_temp (struct line *lines, size_t nlines, struct line *temp)
+sortlines (struct line *restrict, size_t, size_t, struct line *restrict);
+
+static void *
+sortlines_thread (void *data)
+{
+  struct sortlines_args const *args = data;
+  sortlines (args->lines, args->nlines, args->nthreads, args->temp);
+  return NULL;
+}
+
+/* Dutch national flag quicksort. First, threads are spun off to distribute
+   each 1/nthreads of the buffer into a <, ==, and > pile, per how lines
+   compare to the pivot. Then, the destinations of all 3*nthreads piles are
+   calculated, and threads are spun off to copy them from the piles to the
+   appropriate destination in the original buffer.  */
+
+static void
+sortlines (struct line *restrict lines, size_t nlines, size_t nthreads,
+           struct line *restrict buf)
 {
   if (nlines == 2)
     {
-      /* Declare `swap' as int, not bool, to work around a bug
-     <http://lists.gnu.org/archive/html/bug-coreutils/2005-10/msg00086.html>
-     in the IBM xlc 6.0.0.0 compiler in 64-bit mode.  */
-      int swap = (0 < compare (&lines[-1], &lines[-2]));
-      temp[-1] = lines[-1 - swap];
-      temp[-2] = lines[-2 + swap];
+      if (0 < compare (&lines[-1], &lines[-2]))
+    {
+      struct line tmp = lines[-1];
+      lines[-1] = lines[-2];
+      lines[-2] = tmp;
     }
-  else
+    }
+  else if (nlines > 2)
     {
-      size_t nlo = nlines / 2;
-      size_t nhi = nlines - nlo;
-      struct line *lo = lines;
-      struct line *hi = lines - nlo;
-      struct line *sorted_hi = temp - nlo;
-
-      sortlines_temp (hi, nhi, sorted_hi);
-      if (1 < nlo)
-    sortlines (lo, nlo, temp);
+      // The array is backwards, so right is first, left is last
+      struct line* middle = &lines[-(nlines+1)/2];
+      struct line* right = &lines[-1];
+      struct line* left = &lines[-nlines];
+      struct line* pivotp;
+      struct line pivot;
 
-      mergelines (temp, lo, nlo, sorted_hi, nhi);
+      // Guess. Main goal is so that pivot selection is not predictable, and
+      // thus not exploitable by attackers.
+      if (nlines < 10000)
+        {
+          pivotp = medianof3 (right, middle, left, 1);
+          if (nlines == 3)
+            return;
+        }
+      else
+        {
+          // srand(time(0)) is called in sort()
+          middle = medianof3 (right, middle, left, 0);
+          struct line* middle2 = medianof3 (left + rand() % nlines,
+                                            left + rand() % nlines,
+                                            left + rand() % nlines, 0);
+          struct line* middle3 = medianof3 (right - rand() % nlines,
+                                            right - rand() % nlines,
+                                            right - rand() % nlines, 0);
+          pivotp = (medianof3 (middle2, middle, middle3, 0));
+        }
+      // Workaround for non-threadsafe memcoll
+      copy_pivot (pivotp, &pivot);
+
+      // For each of nthread subsections of array, construct a <, ==, and >
+      // section.
+      pthread_t* threads = malloc ((nthreads-1)*sizeof(pthread_t));
+      size_t threadlines = nlines/nthreads;
+      size_t* nless_array = malloc (nthreads*sizeof(size_t));
+      size_t* neq_array = malloc (nthreads*sizeof(size_t));
+      size_t* ngrt_array = malloc (nthreads*sizeof(size_t));
+      struct dist_args* d_args = malloc ((nthreads-1)*sizeof(struct 
dist_args));
+      size_t i = 0;
+      for (; i < nthreads-1; i++)
+        {
+          d_args[i].iter = lines - i*threadlines;
+          d_args[i].nlines = threadlines;
+          d_args[i].buf = buf - 2*i*threadlines;
+          d_args[i].pivot = &pivot;
+          pthread_create (&threads[i], NULL, distribute_thread, &d_args[i]);
+        }
+      struct counters* temp = distribute (lines - i*threadlines, nlines - 
i*threadlines,
+                                          buf - 2*i*threadlines, &pivot);
+      nless_array[nthreads-1] = temp->nless;
+      neq_array[nthreads-1] = temp->neq;
+      ngrt_array[nthreads-1] = temp->ngrt;
+      free(temp);
+      for (i = 0; i < nthreads-1; i++)
+        {
+          pthread_join (threads[i], (void**) &temp);
+          nless_array[i] = temp->nless;
+          neq_array[i] = temp->neq;
+          ngrt_array[i] = temp->ngrt;
+          free(temp);
+        }
+      free (d_args);
+      free (pivot.text);
+
+      // Calculate destinations and copy lines back.
+      size_t n1 = 0;
+      size_t n2 = 0;
+      for (i = 0; i < nthreads; i++)
+        {
+          n1 += nless_array[i];
+          n2 += ngrt_array[i];
+        }
+      struct line* destless = lines;
+      struct line* desteq = lines - n1;
+      struct line* destgrt = lines - nlines + n2;
+      struct copy_args* c_args = malloc((nthreads-1)*sizeof(struct copy_args));
+      for (i = 0; i < nthreads-1; i++)
+        {
+          c_args[i].buf = buf - 2*i*threadlines;
+          c_args[i].destless = destless;
+          c_args[i].desteq = desteq;
+          c_args[i].destgrt = destgrt;
+          c_args[i].nlines = threadlines;
+          c_args[i].nless = nless_array[i];
+          c_args[i].neq = neq_array[i];
+          pthread_create (&threads[i], NULL, copy_lines, &c_args[i]);
+          destless -= nless_array[i];
+          desteq -= neq_array[i];
+          destgrt -= ngrt_array[i];
+        }
+      struct copy_args lastone =
+          {buf - 2*i*threadlines, destless, desteq, destgrt,
+           nlines - i*threadlines, nless_array[i], neq_array[i]};
+      copy_lines (&lastone);
+      for (i = 0; i < nthreads-1; i++)
+        pthread_join(threads[i], NULL);
+
+      free (c_args);
+      free (threads);
+      free (nless_array);
+      free (neq_array);
+      free (ngrt_array);
+
+      if (nthreads > 1 && n1 > 1 && n2 > 1)
+        {
+          size_t child_threads = (n1 * nthreads / nlines) + 0.5;
+          if (child_threads == 0)
+            child_threads = 1;
+          struct sortlines_args args = {lines, n1, child_threads, buf};
+          pthread_t new_thread;
+
+          if (child_threads < 3)
+            pthread_create (&new_thread, NULL, sortlines_merge_thread2, &args);
+          else
+            pthread_create (&new_thread, NULL, sortlines_thread, &args);
+          if (nthreads - child_threads < 3)
+              sortlines_merge (lines-nlines+n2, n2, buf - 2*n1,
+                               nthreads - child_threads, 0);
+          else
+              sortlines (lines-nlines+n2, n2, nthreads-child_threads,
+                         buf - 2*n1);
+          pthread_join (new_thread, NULL);
+        }
+      else
+        {
+          if (n1 > 1)
+              sortlines_merge (lines, n1, buf, 1, 0);
+          if (n2 > 1)
+              sortlines_merge (lines - nlines + n2, n2, buf - n1, 1, 0);
+        }
     }
 }
 
@@ -2840,13 +3268,15 @@ merge (struct sortfile *files, size_t ntemps, size_t 
nfiles,
 /* Sort NFILES FILES onto OUTPUT_FILE. */
 
 static void
-sort (char * const *files, size_t nfiles, char const *output_file)
+sort (char * const *files, size_t nfiles, char const *output_file,
+      size_t nthreads)
 {
   struct buffer buf;
   size_t ntemps = 0;
   bool output_file_created = false;
 
   buf.alloc = 0;
+  srand (time (0));
 
   while (nfiles)
     {
@@ -2854,8 +3284,11 @@ sort (char * const *files, size_t nfiles, char const 
*output_file)
       char const *file = *files;
       FILE *fp = xfopen (file, "r");
       FILE *tfp;
-      size_t bytes_per_line = (2 * sizeof (struct line)
-                   - sizeof (struct line) / 2);
+
+      /* If singlethreaded, the merge uses the memory optimization
+     suggested in Knuth exercise 5.2.4-10; see sortlines.  */
+      size_t bytes_per_line = 3*sizeof (struct line);
+                  - (1 < nthreads ? 0 : sizeof (struct line)*3/2);
 
       if (! buf.alloc)
     initbuf (&buf, bytes_per_line,
@@ -2883,7 +3316,12 @@ sort (char * const *files, size_t nfiles, char const 
*output_file)
       line = buffer_linelim (&buf);
       linebase = line - buf.nlines;
       if (1 < buf.nlines)
-        sortlines (line, buf.nlines, linebase);
+            {
+              if (nthreads > 2)         
+                sortlines (line, buf.nlines, nthreads, linebase);
+              else
+                sortlines_merge (line, buf.nlines, linebase, nthreads, false);
+            }
       if (buf.eof && !nfiles && !ntemps && !buf.left)
         {
           xfclose (fp, file);
@@ -3142,6 +3580,7 @@ main (int argc, char **argv)
   char *random_source = NULL;
   bool need_random = false;
   size_t nfiles = 0;
+  size_t nthreads = 0;
   bool posixly_correct = (getenv ("POSIXLY_CORRECT") != NULL);
   bool obsolete_usage = (posix2_version () < 200112);
   char **files;
@@ -3279,7 +3718,7 @@ main (int argc, char **argv)
         {
           bool minus_pos_usage = (optind != argc && argv[optind][0] == '-'
                       && ISDIGIT (argv[optind][1]));
-          obsolete_usage |= minus_pos_usage & ~posixly_correct;
+              obsolete_usage |= minus_pos_usage && !posixly_correct;
           if (obsolete_usage)
         {
           /* Treat +POS1 [-POS2] as a key if possible; but silently
@@ -3288,7 +3727,7 @@ main (int argc, char **argv)
           s = parse_field_count (optarg + 1, &key->sword, NULL);
           if (s && *s == '.')
             s = parse_field_count (s + 1, &key->schar, NULL);
-          if (! (key->sword | key->schar))
+                  if (! (key->sword || key->schar))
             key->sword = SIZE_MAX;
           if (! s || *set_ordering (s, key, bl_start))
             key = NULL;
@@ -3379,7 +3818,7 @@ main (int argc, char **argv)
           badfieldspec (optarg, N_("character offset is zero"));
         }
         }
-      if (! (key->sword | key->schar))
+          if (! (key->sword || key->schar))
         key->sword = SIZE_MAX;
       s = set_ordering (s, key, bl_start);
       if (*s != ',')
@@ -3466,6 +3905,10 @@ main (int argc, char **argv)
       add_temp_dir (optarg);
       break;
 
+    case THREADS_OPTION:
+      nthreads = specify_nthreads (oi, c, optarg);
+      break;
+
     case 'u':
       unique = true;
       break;
@@ -3568,14 +4011,14 @@ main (int argc, char **argv)
       if (! (key->ignore
          || key->translate
          || (key->skipsblanks
-         | key->reverse
-         | key->skipeblanks
-         | key->month
-         | key->numeric
-         | key->version
-         | key->general_numeric
-         | key->human_numeric
-         | key->random)))
+                 || key->reverse
+                 || key->skipeblanks
+                 || key->month
+                 || key->numeric
+                 || key->version
+                 || key->general_numeric
+                 || key->human_numeric
+                 || key->random)))
         {
           key->ignore = gkey.ignore;
           key->translate = gkey.translate;
@@ -3596,13 +4039,13 @@ main (int argc, char **argv)
   if (!keylist && (gkey.ignore
            || gkey.translate
            || (gkey.skipsblanks
-               | gkey.skipeblanks
-               | gkey.month
-               | gkey.numeric
-               | gkey.general_numeric
-               | gkey.human_numeric
-               | gkey.random
-               | gkey.version)))
+                       || gkey.skipeblanks
+                       || gkey.month
+                       || gkey.numeric
+                       || gkey.general_numeric
+                       || gkey.human_numeric
+                       || gkey.random
+                       || gkey.version)))
     {
       insertkey (&gkey);
       need_random |= gkey.random;
@@ -3614,6 +4057,9 @@ main (int argc, char **argv)
 
   if (need_random)
     {
+      /* Threading does not lock the randread_source structure, so
+     downgrade to one thread to avoid race conditions. */
+      nthreads = 1;
       randread_source = randread_new (random_source, MD5_DIGEST_SIZE);
       if (! randread_source)
     die (_("open failed"), random_source);
@@ -3668,8 +4114,20 @@ main (int argc, char **argv)
       IF_LINT (free (sortfiles));
     }
   else
-    sort (files, nfiles, outfile);
-
+    {
+      if (!nthreads)
+    {
+      /* The default NTHREADS is 2 ** floor (log2 (number of processors)).
+         If comparisons do not vary in cost and threads are
+         scheduled evenly, with the current algorithm there is no
+         performance advantage to using a number of threads that
+         is not a power of 2.  */
+      unsigned long int np2 = num_processors () / 2;
+      for (nthreads = 1; nthreads <= np2; nthreads *= 2)
+        continue;
+    }
+      sort (files, nfiles, outfile, nthreads);
+    }
   if (have_read_stdin && fclose (stdin) == EOF)
     die (_("close failed"), "-");





reply via email to

[Prev in Thread] Current Thread [Next in Thread]