bug-coreutils
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Multi-threading in sort(or core-utils)


From: Bo Borgerson
Subject: Re: Multi-threading in sort(or core-utils)
Date: Wed, 25 Jun 2008 20:22:09 -0400
User-agent: Thunderbird 2.0.0.14 (X11/20080505)

Bo Borgerson wrote:
> Cons:
> - It's limited by the feeder.  If the sorters were able to read at their
> own pace I think this would scale better.
> - It uses N+2 processes.  When sorters are run in parallel there are two
> helper processes, one feeding input and one merging output.

Hello again.

In light of the drawbacks mentioned for my previous parallel sort patch,
I've made the attached modifications.

If all inputs are regular files then SORTERS read directly rather than
being fed by an extra process.

This exhibits better performance with a concurrency of 2, but still does
not realize the full benefit of greater concurrency that I was expecting:

-----

$ for i in 0 1 2 3; do cat /dev/urandom | base64 | head -2000000 | cut
-da -f1 > p$i; done

$ time ~/sort p0 p1 p2 p3 > /dev/null

real    0m31.444s

$ time ~/sort --concurrency=2 p0 p1 p2 p3 > /dev/null

real    0m16.908s

$ time ~/sort --concurrency=4 p0 p1 p2 p3 > /dev/null

real    0m15.353s

$ time ~/sort -m <(~/sort p0 p1) <(~/sort p2 p3) > /dev/null

real    0m17.066s <-- similar to --concurrency=2

$ time ~/sort -m <(~/sort p0) <(~/sort p1) <(~/sort p2) <(~/sort p3) >
/dev/null

real    0m10.832s <-- _this_ is what I want from --concurrency=4!

-----

Jim pointed out a mistake in my performance testing script that was
causing me to use a smaller sample size for each data point than I
intended.  I've attached a version with his patch.

Thanks,

Bo

PNG image

>From 298d6871aee1a1a506015681fb88ce5ba2b24644 Mon Sep 17 00:00:00 2001
From: Bo Borgerson <address@hidden>
Date: Tue, 24 Jun 2008 14:02:22 -0400
Subject: [PATCH] o sort: Don't use a feeder for regular file concurrency.

* src/sort.c (xlseek): Try to lseek, complain on failure.  Stolen from 
src/tail.c.
(fillbuf): If SORTER_BYTES_LEFT is non-negative, treat it as a limit.
(sort): When running multiple sorters concurrently, if all inputs are regular
files set sorters up to read directly rather than spawning a feeder to 
distribute
work among them.
---
 src/sort.c |  220 ++++++++++++++++++++++++++++++++++++++++++++++++++++-------
 1 files changed, 194 insertions(+), 26 deletions(-)

diff --git a/src/sort.c b/src/sort.c
index 18a8882..4aa7609 100644
--- a/src/sort.c
+++ b/src/sort.c
@@ -307,6 +307,11 @@ static unsigned int nmerge = NMERGE_DEFAULT;
    their output through a pipe to the parent who will merge. */
 static int sorter_output_fd = -1;
 
+/* If multiple sorters are each reading their own input rather
+   than being fed by a single process then they'll have a cap
+   on how much they can read. */
+static size_t sorter_bytes_left = -1;
+
 static void sortlines_temp (struct line *, size_t, struct line *);
 
 /* Report MESSAGE for FILE, then clean up and exit.
@@ -831,6 +836,45 @@ xfclose (FILE *fp, char const *file)
     }
 }
 
+/* Call lseek with the specified arguments, where file descriptor FD
+   corresponds to the file, FILENAME.
+   Give a diagnostic and exit nonzero if lseek fails.
+   Otherwise, return the resulting offset.
+
+   This is stolen from src/tail.c */
+
+static off_t
+xlseek (int fd, off_t offset, int whence, char const *filename)
+{
+  off_t new_offset = lseek (fd, offset, whence);
+  char buf[INT_BUFSIZE_BOUND (off_t)];
+  char *s;
+
+  if (0 <= new_offset)
+    return new_offset;
+
+  s = offtostr (offset, buf);
+  switch (whence)
+    {
+    case SEEK_SET:
+      error (0, errno, _("%s: cannot seek to offset %s"),
+            filename, s);
+      break;
+    case SEEK_CUR:
+      error (0, errno, _("%s: cannot seek to relative offset %s"),
+            filename, s);
+      break;
+    case SEEK_END:
+      error (0, errno, _("%s: cannot seek to end-relative offset %s"),
+            filename, s);
+      break;
+    default:
+      abort ();
+    }
+
+  exit (EXIT_FAILURE);
+}
+
 static void
 dup2_or_die (int oldfd, int newfd)
 {
@@ -1556,7 +1600,8 @@ fillbuf (struct buffer *buf, FILE *fp, char const *file)
   size_t line_bytes = buf->line_bytes;
   size_t mergesize = merge_buffer_size - MIN_MERGE_BUFFER_SIZE;
 
-  if (buf->eof)
+
+  if (buf->eof || 0 == sorter_bytes_left)
     return false;
 
   if (buf->used != buf->left)
@@ -1582,9 +1627,23 @@ fillbuf (struct buffer *buf, FILE *fp, char const *file)
             rest of the input file consists entirely of newlines,
             except that the last byte is not a newline.  */
          size_t readsize = (avail - 1) / (line_bytes + 1);
-         size_t bytes_read = fread (ptr, 1, readsize, fp);
-         char *ptrlim = ptr + bytes_read;
+         size_t bytes_read;
+         char *ptrlim;
          char *p;
+
+         if (0 < sorter_bytes_left)
+           readsize = MIN (readsize, sorter_bytes_left);
+
+         bytes_read = fread (ptr, 1, readsize, fp);
+
+         if (0 < sorter_bytes_left)
+           sorter_bytes_left -= bytes_read;
+
+         /* This is a fake end-of-file for this sorter child. */
+         if (0 == sorter_bytes_left)
+           buf->eof = true;
+
+         ptrlim = ptr + bytes_read;
          avail -= bytes_read;
 
          if (bytes_read != readsize)
@@ -2627,6 +2686,9 @@ sort (char * const *incoming_files, size_t nfiles, char 
const *output_file)
   bool is_sort_feeder = false;
   size_t feeder_rotator = -1;
   FILE **feeder_fps = NULL;
+  size_t sorter_seek_to = 0;
+  size_t bytes_per_line = (2 * sizeof (struct line)
+                          - sizeof (struct line) / 2);
 
   buf.alloc = 0;
 
@@ -2646,11 +2708,98 @@ sort (char * const *incoming_files, size_t nfiles, char 
const *output_file)
       int pipe_from_fds[2];
       int *pipe_to_sorter = xnmalloc (n_sorters, sizeof *pipe_to_sorter);
       int *pipe_from_sorter = xnmalloc (n_sorters, sizeof *pipe_from_sorter);
+      size_t *file_sizes = xnmalloc (nfiles, sizeof *file_sizes);
+      size_t total_size = 0;
+      bool all_reg_files = true;
+      size_t sorter_limit_lo = 0;
+      size_t sorter_limit_hi = 0;
+      size_t first_file = 0;
+      size_t last_file = 0;
+
+      for (i = 0; all_reg_files && i < nfiles; i++)
+       {
+         struct stat st;
+         off_t file_size;
+         size_t worst_case;
+
+         if (STREQ (files[i], "-"))
+           all_reg_files = false;
+         else
+           {
+             if (stat (files[i], &st))
+               die (_("stat failed"), files[i]);
+
+             if (S_ISREG (st.st_mode))
+               {
+                 file_sizes[i] = st.st_size;
+                 total_size += st.st_size;
+               }
+             else
+               all_reg_files = false;
+           }
+       }
 
       /* Fork off N_SORTERS children.  Only continue to fork if we're
         the parent and we haven't failed a fork yet. */
       for (i = 0; i < n_sorters && 0 < pid; i++)
        {
+         sorter_seek_to = 0;
+
+         if (all_reg_files)
+           {
+             size_t ff_skipped_size = 0;
+             size_t lf_skipped_size = 0;
+
+             sorter_limit_lo = sorter_limit_hi;
+             sorter_limit_hi = sorter_limit_lo + total_size / n_sorters - 1;
+
+             first_file = 0;
+
+             while ((first_file + 1 < nfiles)
+                    && ff_skipped_size + file_sizes[first_file] <=
+                       sorter_limit_lo)
+               ff_skipped_size += file_sizes[first_file++];
+
+             last_file = first_file;
+             lf_skipped_size = ff_skipped_size;
+             while ((last_file + 1 < nfiles)
+                    && lf_skipped_size + file_sizes[last_file] <=
+                       sorter_limit_hi)
+               lf_skipped_size += file_sizes[last_file++];
+
+
+             if (i + 1 < n_sorters)
+               {
+                 struct buffer seekbuf;
+                 size_t seek_probe = 0;
+                 FILE *sfp;
+
+                 seek_probe = sorter_limit_hi - lf_skipped_size;
+
+                 sfp = xfopen (files[last_file], "r", -1);
+
+                 xlseek (fileno (sfp), seek_probe, SEEK_SET,
+                         files[last_file]);
+
+                 initbuf (&seekbuf, bytes_per_line, 1);
+
+                 if (fillbuf (&seekbuf, sfp, files[last_file]))
+                   {
+                     struct line *first_line = buffer_linelim (&seekbuf) - 1;
+                     sorter_limit_hi += first_line->length;
+                   }
+                 else
+                   sorter_limit_hi += file_sizes[last_file] - seek_probe;
+
+                 xfclose (sfp, files[last_file]);
+               }
+             else
+               sorter_limit_hi = total_size;
+
+             sorter_bytes_left = sorter_limit_hi - sorter_limit_lo;
+             sorter_seek_to = sorter_limit_lo - ff_skipped_size;
+           }
+
          if (pipe (pipe_from_fds) < 0)
            error (SORT_FAILURE, errno, _("pipe failed"));
 
@@ -2685,9 +2834,18 @@ sort (char * const *incoming_files, size_t nfiles, char 
const *output_file)
              sorter_input_fd = pipe_to_fds[0];
              sorter_output_fd = pipe_from_fds[1];
 
-             nfiles = 1;
-             files = null_files;
              output_file = NULL;
+
+             if (all_reg_files)
+               {
+                 nfiles = last_file - first_file + 1;
+                 files += first_file;
+               }
+             else
+               {
+                 nfiles = 1;
+                 files = null_files;
+               }
            }
          else
            {
@@ -2707,31 +2865,15 @@ sort (char * const *incoming_files, size_t nfiles, char 
const *output_file)
 
       /* Only try to fork off a feeder if we're the parent and
          we didn't fail a fork above. */
-      if (0 < pid)
+      if (0 < pid && !all_reg_files)
        {
 
          /* We haven't created any temp files yet, so don't need
             to worry about a critical section here. */
          pid = fork ();
-         if (0 < pid)
-           {
-             /* Parent. */
-             struct sortfile *mfiles = xnmalloc (n_sorters, sizeof *mfiles);
 
-             nmerge = n_sorters;
-
-             register_proc (pid);
-             for (i = 0; i < n_sorters; i++)
-               {
-                 close (pipe_to_sorter[i]);
-                 mfiles[i].name = NULL;
-                 mfiles[i].pid = 0;
-                 mfiles[i].fd = pipe_from_sorter[i];
-               }
-             mergefps (mfiles, 0, n_sorters, NULL, output_file);
-             free (mfiles);
-             exit (EXIT_SUCCESS);
-           }
+         if (0 < pid)
+           register_proc (pid);
          else if (0 == pid)
            {
              /* Child. */
@@ -2761,9 +2903,30 @@ sort (char * const *incoming_files, size_t nfiles, char 
const *output_file)
 
 
        }
+       if (0 < pid)
+         {
+           /* Parent. */
+           struct sortfile *mfiles = xnmalloc (n_sorters, sizeof *mfiles);
+
+           nmerge = n_sorters;
+
+           sorter_bytes_left = -1;
+
+           for (i = 0; i < n_sorters; i++)
+             {
+               close (pipe_to_sorter[i]);
+               mfiles[i].name = NULL;
+               mfiles[i].pid = 0;
+               mfiles[i].fd = pipe_from_sorter[i];
+             }
+           mergefps (mfiles, 0, n_sorters, NULL, output_file);
+           free (mfiles);
+           exit (EXIT_SUCCESS);
+         }
 
       free (pipe_to_sorter);
       free (pipe_from_sorter);
+      free (file_sizes);
     }
 
   while (nfiles)
@@ -2772,8 +2935,6 @@ sort (char * const *incoming_files, size_t nfiles, char 
const *output_file)
       char const *file = *files;
       FILE *fp = xfopen (file, "r", sorter_input_fd);
       FILE *tfp;
-      size_t bytes_per_line = (2 * sizeof (struct line)
-                              - sizeof (struct line) / 2);
 
       if (! buf.alloc)
        {
@@ -2782,6 +2943,12 @@ sort (char * const *incoming_files, size_t nfiles, char 
const *output_file)
          initbuf (&buf, bytes_per_line, is_sort_feeder
                   ? FEEDER_BUF_SIZE
                   : MAX (size / n_sorters, MIN_SORT_SIZE));
+
+         if (0 < sorter_seek_to)
+           {
+             xlseek (fileno (fp), sorter_seek_to, SEEK_SET, file);
+             sorter_seek_to = 0;
+           }
        }
       buf.eof = false;
       files++;
@@ -2870,6 +3037,7 @@ sort (char * const *incoming_files, size_t nfiles, char 
const *output_file)
          tempfiles[i].fd = -1;
          node = node->next;
        }
+      sorter_bytes_left = -1;
       merge (tempfiles, ntemps, ntemps, output_file);
       free (tempfiles);
     }
-- 
1.5.4.3

   100000|   0.1907500|   0.1212137|   0.1492970|   0.1426613| 63.55| 78.27| 
74.79
   200000|   0.4407075|   0.2594357|   0.3089027|   0.2900586| 58.87| 70.09| 
65.82
   300000|   0.7126391|   0.4144977|   0.4767102|   0.4515018| 58.16| 66.89| 
63.36
   400000|   1.0011743|   0.5756823|   0.6545527|   0.6031176| 57.50| 65.38| 
60.24
   500000|   1.2983972|   0.7421886|   0.8412851|   0.7720997| 57.16| 64.79| 
59.47
   600000|   1.6070564|   0.9166016|   1.0290445|   0.9489639| 57.04| 64.03| 
59.05
   700000|   1.9241715|   1.0897433|   1.2080779|   1.1169183| 56.63| 62.78| 
58.05
   800000|   2.2455566|   1.2735297|   1.4052373|   1.2942373| 56.71| 62.58| 
57.64
   900000|   2.5722278|   1.4487721|   1.5964728|   1.4631221| 56.32| 62.07| 
56.88
  1000000|   2.9058448|   1.6322845|   1.7933541|   1.6398010| 56.17| 61.72| 
56.43
  1100000|   3.2413821|   1.8218724|   1.9856010|   1.8181271| 56.21| 61.26| 
56.09
  1200000|   3.5877815|   2.0090074|   2.1863550|   2.0018847| 56.00| 60.94| 
55.80
  1300000|   3.9344238|   2.1981795|   2.3838138|   2.1827234| 55.87| 60.59| 
55.48
  1400000|   4.2869315|   2.4001298|   2.5800343|   2.3631609| 55.99| 60.18| 
55.12
  1500000|   4.6423150|   2.5941071|   2.7762429|   2.5345204| 55.88| 59.80| 
54.60
  1600000|   4.9974098|   2.7849455|   2.9755218|   2.7180162| 55.73| 59.54| 
54.39
  1700000|   5.3550738|   2.9819159|   3.1908292|   2.9051223| 55.68| 59.59| 
54.25
  1800000|   5.7168665|   3.1765866|   3.3953169|   3.1062621| 55.57| 59.39| 
54.34
  1900000|   6.0777430|   3.3841768|   3.5842696|   3.2766509| 55.68| 58.97| 
53.91
  2000000|   6.4494099|   3.5889102|   3.8042020|   3.4713289| 55.65| 58.99| 
53.82
  2100000|   6.8131444|   3.7754872|   3.9977934|   3.6440647| 55.41| 58.68| 
53.49
  2200000|   7.1904283|   3.9888608|   4.2002222|   3.8260563| 55.47| 58.41| 
53.21
  2300000|   7.5661058|   4.1902994|   4.4112677|   4.0186554| 55.38| 58.30| 
53.11
  2400000|   7.9428476|   4.4105409|   4.6193866|   4.2025506| 55.53| 58.16| 
52.91
  2500000|   8.3262551|   4.5975206|   4.8391130|   4.3705808| 55.22| 58.12| 
52.49
  2600000|   8.7071159|   4.8165302|   5.0661341|   4.5983404| 55.32| 58.18| 
52.81
  2700000|   9.0928882|   5.0205970|   5.2572794|   4.7585346| 55.21| 57.82| 
52.33
  2800000|   9.4781096|   5.2287590|   5.4770362|   4.9688201| 55.17| 57.79| 
52.42
  2900000|   9.8713534|   5.4430972|   5.6842840|   5.1797704| 55.14| 57.58| 
52.47
  3000000|  10.2584994|   5.6546786|   5.8811572|   5.3612936| 55.12| 57.33| 
52.26
  3100000|  10.6475192|   5.8613536|   6.1200937|   5.5399095| 55.05| 57.48| 
52.03
  3200000|  11.0416567|   6.0699580|   6.3079900|   5.7265803| 54.97| 57.13| 
51.86
  3300000|  11.4381700|   6.2870910|   6.5351404|   5.9162662| 54.97| 57.13| 
51.72
  3400000|  11.8291353|   6.5215042|   6.7382136|   6.0843453| 55.13| 56.96| 
51.44
  3500000|  12.2307064|   6.7267673|   6.9559792|   6.3108958| 55.00| 56.87| 
51.60
  3600000|  12.6218697|   6.9284535|   7.1772278|   6.4626065| 54.89| 56.86| 
51.20
  3700000|  13.0177764|   7.1543026|   7.3906161|   6.6695140| 54.96| 56.77| 
51.23
  3800000|  13.4183712|   7.3735513|   7.6140454|   6.8889141| 54.95| 56.74| 
51.34
  3900000|  13.8235417|   7.5995789|   7.8115756|   7.0817510| 54.98| 56.51| 
51.23
  4000000|  14.2274189|   7.8114545|   8.0328980|   7.2631327| 54.90| 56.46| 
51.05
  4100000|  14.6265697|   8.0184248|   8.2756556|   7.4423216| 54.82| 56.58| 
50.88
  4200000|  15.0284704|   8.2306172|   8.4773390|   7.6623220| 54.77| 56.41| 
50.99
  4300000|  15.4470305|   8.4465486|   8.6921658|   7.8294720| 54.68| 56.27| 
50.69
  4400000|  15.8557857|   8.6858353|   8.8931192|   8.0536212| 54.78| 56.09| 
50.79
  4500000|  16.2791507|   8.9154484|   9.1254121|   8.2392632| 54.77| 56.06| 
50.61
  4600000|  16.6930788|   9.1410664|   9.3356111|   8.4336024| 54.76| 55.93| 
50.52
  4700000|  17.1026763|   9.3607884|   9.5205901|   8.5923598| 54.73| 55.67| 
50.24
  4800000|  17.5223779|   9.5671944|   9.7902250|   8.8637957| 54.60| 55.87| 
50.59
  4900000|  17.9375180|   9.8011485|  10.0237976|   9.0387738| 54.64| 55.88| 
50.39
  5000000|  18.3691483|  10.0212573|  10.2543922|   9.2044217| 54.55| 55.82| 
50.11
  5100000|  18.7820712|  10.2439085|  10.4461926|   9.4317166| 54.54| 55.62| 
50.22
  5200000|  19.2000576|  10.5021854|  10.6648112|   9.6201342| 54.70| 55.55| 
50.10
  5300000|  19.6268853|  10.7259188|  10.8855893|   9.8038512| 54.65| 55.46| 
49.95
  5400000|  20.0472636|  10.9477037|  11.0792249|  10.0622558| 54.61| 55.27| 
50.19
  5500000|  20.4808984|  11.1545736|  11.3508855|  10.2263413| 54.46| 55.42| 
49.93
  5600000|  20.9024041|  11.4072540|  11.5796600|  10.4164473| 54.57| 55.40| 
49.83
  5700000|  21.3219777|  11.6174535|  11.7728860|  10.6100948| 54.49| 55.21| 
49.76
  5800000|  21.7558453|  11.8172338|  11.9923465|  10.7985256| 54.32| 55.12| 
49.64
  5900000|  22.1975734|  12.0638658|  12.1874312|  11.0635708| 54.35| 54.90| 
49.84
  6000000|  22.6178266|  12.2907313|  12.3991687|  11.1829701| 54.34| 54.82| 
49.44
  6100000|  23.0555613|  12.5697571|  12.6771984|  11.4066584| 54.52| 54.99| 
49.47
  6200000|  23.4759880|  12.7715191|  12.8634446|  11.6243555| 54.40| 54.79| 
49.52
  6300000|  23.9116348|  12.9877583|  13.0994780|  11.8180951| 54.32| 54.78| 
49.42
  6400000|  24.3472945|  13.2202427|  13.3083804|  11.9960815| 54.30| 54.66| 
49.27
  6500000|  24.7743772|  13.4866485|  13.5164406|  12.2114529| 54.44| 54.56| 
49.29
  6600000|  25.2166468|  13.7175530|  13.7916486|  12.4206740| 54.40| 54.69| 
49.26
  6700000|  25.6542254|  13.9371482|  14.0397197|  12.6462823| 54.33| 54.73| 
49.30
  6800000|  26.0939097|  14.1675297|  14.2136584|  12.7595433| 54.29| 54.47| 
48.90
  6900000|  26.5086095|  14.4303756|  14.4640741|  13.0125710| 54.44| 54.56| 
49.09
  7000000|  26.9556531|  14.6363618|  14.6941602|  13.2644810| 54.30| 54.51| 
49.21
#!/usr/bin/perl -w

use strict;
use Time::HiRes qw(gettimeofday tv_interval);

my ($prog, $max_conc) = @ARGV;

$max_conc ||= 4;

$prog or die "usage: $0 SORT_PROGRAM [MAX_CONCURRENCY]\n";

my @points = map{$_*100_000}1..100;

# Number of samples (excluding outliers) for each point.
my $n = 10;

my @conc = 1..$max_conc;

my @rot = 0..($max_conc - 1);

my @times = ();

my $t0;
for my $i (@points) {
  @times = map { [] } 1..$max_conc;

  # Make fresh data for each sample.  Rotate who gets first crack
  # at the data.
  for (0..$n+3) {
    system "cat /dev/urandom | base64 | head -$i | cut -da -f1 > bench_in";
    for my $j (0..($max_conc - 1)) {
      $t0 = [gettimeofday()];
      system "$prog --concurrency=$conc[$rot[$j]] bench_in > /dev/null";
      push @{$times[$rot[$j]]}, tv_interval($t0);
    }
    push @rot, shift @rot; # rotate who goes first
  }

  # Take the average time for each concurrency level.
  for my $j (0..($max_conc - 1)){
    my $total = 0;
    # Drop two outliers on each end
    for my $val ((sort {$a<=>$b} @{$times[$j]})[2..$n+1]) {
      $total += $val;
    }
    $times[$j] = $total / $n;
  }

  # Output
  # 1. Number of records
  # 2. Average time for each concurrency (including single process)
  # 3. Percentage of single-process time each concurrency level took
  print join("|",
    sprintf ('%9d', $i),
    +(map { sprintf ('%12.7f', $_) address@hidden($max_conc - 1)]),
    +(map { sprintf ('%6.2f', $_/$times[0]*100) address@hidden($max_conc - 1)]),
  ),"\n";
}

reply via email to

[Prev in Thread] Current Thread [Next in Thread]