From 5403741c2d5bc814c2f722d8e33537469ab2d064 Mon Sep 17 00:00:00 2001
From: Joey Degges
Date: Tue, 2 Mar 2010 23:59:28 -0800
Subject: [PATCH] sort: parallel external sort implementation
- input files are broken up into groups based on device id
- each group of files is sorted in a separate thread
- merging is carried out as usual
- sorting speeups of 75% have been measured with 4 cores and 4 devices
(not considering merge or IO time)
- overall speedups of 60% have been measured considering merge and IO time
Authors:
Matt Ball
Joey Degges
---
bootstrap.conf | 1 +
configure.ac | 3 +
src/Makefile.am | 3 +
src/sort.c | 333 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
4 files changed, 333 insertions(+), 7 deletions(-)
diff --git a/bootstrap.conf b/bootstrap.conf
index 9cdf984..0c69e93 100644
--- a/bootstrap.conf
+++ b/bootstrap.conf
@@ -171,6 +171,7 @@ gnulib_modules="
priv-set
progname
propername
+ pthread
putenv
quote
quotearg
diff --git a/configure.ac b/configure.ac
index b07a52b..7f8969a 100644
--- a/configure.ac
+++ b/configure.ac
@@ -344,6 +344,9 @@ if test "$elf_sys" = "yes" && \
gl_ADD_PROG([optional_pkglib_progs], [libstdbuf.so])
fi
+# Check for pthreads
+AC_CHECK_LIB(pthread, pthread_create)
+
############################################################################
mk="$srcdir/src/Makefile.am"
# Extract all literal names from the definition of $(EXTRA_PROGRAMS)
diff --git a/src/Makefile.am b/src/Makefile.am
index ecb42a8..f44974e 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -391,6 +391,9 @@ who_LDADD += $(GETADDRINFO_LIB)
hostname_LDADD += $(GETHOSTNAME_LIB)
uname_LDADD += $(GETHOSTNAME_LIB)
+# for pthread
+sort_LDADD += $(LIB_PTHREAD)
+
$(PROGRAMS): ../lib/libcoreutils.a
# Get the release year from ../lib/version-etc.c.
diff --git a/src/sort.c b/src/sort.c
index 39cb6d6..b00bea8 100644
--- a/src/sort.c
+++ b/src/sort.c
@@ -33,6 +33,7 @@
#include "hard-locale.h"
#include "hash.h"
#include "md5.h"
+#include "nproc.h"
#include "physmem.h"
#include "posixver.h"
#include "quote.h"
@@ -47,6 +48,34 @@
#include "xnanosleep.h"
#include "xstrtol.h"
+#if HAVE_LIBPTHREAD
+# include
+# define xpthread_error(rv, msg) { \
+ if (rv) \
+ { \
+ error (SORT_FAILURE, 0, _("%s - %d: %s\n"), msg, rv, strerror (rv)); \
+ exit (SORT_FAILURE); \
+ } \
+}
+# define xpthread_mutex_init(mutex, attr) { \
+ int rv = pthread_mutex_init (mutex, attr); \
+ xpthread_error (rv, "pthread_mutex_init"); \
+}
+# define xpthread_mutex_lock(mutex) { \
+ int rv = pthread_mutex_lock (mutex); \
+ xpthread_error (rv, "pthread_mutex_lock"); \
+}
+# define xpthread_mutex_unlock(mutex) { \
+ int rv = pthread_mutex_unlock (mutex); \
+ xpthread_error (rv, "pthread_mutex_unlock"); \
+}
+#else
+# define xpthread_error(rv)
+# define xpthread_mutex_init(mutex, attr)
+# define xpthread_mutex_lock(mutex)
+# define xpthread_mutex_unlock(mutex)
+#endif
+
#if HAVE_SYS_RESOURCE_H
# include
#endif
@@ -388,6 +417,7 @@ Other options:\n\
-t, --field-separator=SEP use SEP instead of non-blank to blank transition\n\
-T, --temporary-directory=DIR use DIR for temporaries, not $TMPDIR or %s;\n\
multiple options specify multiple directories\n\
+ --threads=N use no more than N threads to improve parallelism\n\
-u, --unique with -c, check for strict ordering;\n\
without -c, output only the first of an equal run\n\
"), DEFAULT_TMPDIR);
@@ -431,7 +461,8 @@ enum
FILES0_FROM_OPTION,
NMERGE_OPTION,
RANDOM_SOURCE_OPTION,
- SORT_OPTION
+ SORT_OPTION,
+ THREADS_OPTION
};
static char const short_options[] = "-bcCdfghik:mMno:rRsS:t:T:uVy:z";
@@ -464,6 +495,7 @@ static struct option const long_options[] =
{"temporary-directory", required_argument, NULL, 'T'},
{"unique", no_argument, NULL, 'u'},
{"zero-terminated", no_argument, NULL, 'z'},
+ {"threads", required_argument, NULL, THREADS_OPTION},
{GETOPT_HELP_OPTION_DECL},
{GETOPT_VERSION_OPTION_DECL},
{NULL, 0, NULL, 0},
@@ -547,6 +579,7 @@ struct tempnode
};
static struct tempnode *volatile temphead;
static struct tempnode *volatile *temptail = &temphead;
+size_t total_num_temps = 0;
struct sortfile
{
@@ -704,13 +737,17 @@ wait_proc (pid_t pid)
This doesn't block waiting for any of them, it only reaps those
that are already dead. */
+pthread_mutex_t reap_lock;
+
static void
reap_some (void)
{
pid_t pid;
+ xpthread_mutex_lock (&reap_lock);
while (0 < nprocs && (pid = reap (-1)))
update_proc (pid);
+ xpthread_mutex_unlock (&reap_lock);
}
/* Clean up any remaining temporary files. */
@@ -776,6 +813,7 @@ create_temp_file (int *pfd, bool survive_fd_exhaustion)
{
*temptail = node;
temptail = &node->next;
+ total_num_temps++;
}
saved_errno = errno;
cs_leave (cs);
@@ -930,15 +968,21 @@ pipe_fork (int pipefds[2], size_t tries)
fails, return NULL if the failure is due to file descriptor
exhaustion and SURVIVE_FD_EXHAUSTION; otherwise, die. */
+pthread_mutex_t temp_file_lock;
+
static char *
maybe_create_temp (FILE **pfp, pid_t *ppid, bool survive_fd_exhaustion)
{
+ xpthread_mutex_lock (&temp_file_lock);
int tempfd;
struct tempnode *node = create_temp_file (&tempfd, survive_fd_exhaustion);
char *name;
if (! node)
- return NULL;
+ {
+ xpthread_mutex_unlock (&temp_file_lock);
+ return NULL;
+ }
name = node->name;
@@ -978,6 +1022,7 @@ maybe_create_temp (FILE **pfp, pid_t *ppid, bool survive_fd_exhaustion)
if (ppid)
*ppid = node->pid;
+ xpthread_mutex_unlock (&temp_file_lock);
return name;
}
@@ -1265,6 +1310,21 @@ specify_sort_size (int oi, char c, char const *s)
xstrtol_fatal (e, oi, c, long_options, s);
}
+/* Specify the number of threads to spawn during internal sort. */
+static unsigned long int
+specify_nthreads (int oi, char c, char const *s)
+{
+ unsigned long int nthreads;
+ enum strtol_error e = xstrtoul (s, NULL, 10, &nthreads, "");
+ if (e == LONGINT_OVERFLOW)
+ return ULONG_MAX;
+ if (e != LONGINT_OK)
+ xstrtol_fatal (e, oi, c, long_options, s);
+ if (nthreads == 0)
+ error (SORT_FAILURE, 0, _("number of threads must be nonzero"));
+ return nthreads;
+}
+
/* Return the default sort size. */
static size_t
default_sort_size (void)
@@ -1862,6 +1922,9 @@ getmonth (char const *month, size_t len)
/* A source of random data. */
static struct randread_source *randread_source;
+/* A mutex to lock randread_source function */
+pthread_mutex_t randread_lock;
+
/* Return the Ith randomly-generated state. The caller must invoke
random_state (H) for all H less than I before invoking random_state
(I). */
@@ -1889,7 +1952,10 @@ random_state (size_t i)
s = &state[i];
}
+ xpthread_mutex_lock (&randread_lock);
randread (randread_source, buf, sizeof buf);
+ xpthread_mutex_unlock (&randread_lock);
+
md5_init_ctx (s);
md5_process_bytes (buf, sizeof buf, s);
}
@@ -2859,10 +2925,12 @@ merge (struct sortfile *files, size_t ntemps, size_t nfiles,
}
}
-/* Sort NFILES FILES onto OUTPUT_FILE. */
+/* Sort NFILES FILES into OUTPUT_FILE if should_output is true and into
+ temporary files if false. */
static void
-sort (char * const *files, size_t nfiles, char const *output_file)
+do_sort (char * const *files, size_t nfiles, char const *output_file,
+ const bool should_output)
{
struct buffer buf;
size_t ntemps = 0;
@@ -2906,7 +2974,7 @@ sort (char * const *files, size_t nfiles, char const *output_file)
linebase = line - buf.nlines;
if (1 < buf.nlines)
sortlines (line, buf.nlines, linebase);
- if (buf.eof && !nfiles && !ntemps && !buf.left)
+ if (should_output && buf.eof && !nfiles && !ntemps && !buf.left)
{
xfclose (fp, file);
tfp = xfopen (output_file, "w");
@@ -2944,7 +3012,7 @@ sort (char * const *files, size_t nfiles, char const *output_file)
finish:
free (buf.buf);
- if (! output_file_created)
+ if (should_output && ! output_file_created)
{
size_t i;
struct tempnode *node = temphead;
@@ -2960,6 +3028,236 @@ sort (char * const *files, size_t nfiles, char const *output_file)
}
}
+/* Thread arguments for sort_thread. */
+struct sort_multidisk_thread_args
+{
+ char ***dev_files;
+ size_t ndevs;
+ size_t *nfiles;
+ pthread_mutex_t mutex;
+};
+
+/* Tries to sort files from one device at a time. Multiple instances can run
+ with the same arguments concurrently. Each instance will sort the files from
+ a different device. */
+
+static void *
+sort_multidisk_thread (void *data)
+{
+#if HAVE_LIBPTHREAD
+ struct sort_multidisk_thread_args *args = data;
+ char ***dev_files = args->dev_files;
+ size_t ndevs = args->ndevs;
+ size_t *nfiles = args->nfiles;
+ size_t cur_dev = 0;
+ int ret_val;
+
+ while (cur_dev < ndevs)
+ {
+ char **files = NULL;
+
+ // Find next available list of device files to sort
+ ret_val = pthread_mutex_lock (&args->mutex);
+ xpthread_error (ret_val, "error while locking mutex");
+ for (; cur_dev < ndevs; cur_dev++)
+ {
+ if (NULL == (files = dev_files[cur_dev]))
+ continue;
+ // Tell other threads that this device list is no longer available
+ dev_files[cur_dev] = NULL;
+ break;
+ }
+ ret_val = pthread_mutex_unlock (&args->mutex);
+ xpthread_error (ret_val, "error while unlocking mutex");
+
+ if (NULL == files)
+ return NULL;
+
+ do_sort (files, nfiles[cur_dev], NULL, false);
+
+ // Free the device list here, no one else has a reference to it anymore
+ free (files);
+ }
+#endif
+ return NULL;
+}
+
+/* Compare the device that A and B are located on. Returns true if A and B are
+ on the same physical device and false otherwise */
+
+static bool
+device_cmp (struct stat a, struct stat b)
+{
+ return a.st_dev == b.st_dev;
+}
+
+/* Group each file of the NFILES in FILES by the physical device that the files
+ are located on. Files on the same device are added to the same index of
+ DEV_FILES. NDEV_FILES contains a list with the number of files on each
+ device. DEV_FILES and NDEV_FILES should already be allocated and at least
+ NFILES long.
+
+ DEV_FILES[i] is a list containing files that are all on the same device
+ NDEV_FILES[i] is the length of the DEV_FILES[i] list.
+
+ The total number of devices found is returned. New array pointers are
+ allocated for each of the device groups in DEV_FILES. */
+
+static size_t
+group_files_by_device (char * const *files, size_t nfiles,
+ char ***dev_files, size_t *ndev_files)
+{
+ struct stat *dev_map = xnmalloc (nfiles, sizeof *dev_map);
+ size_t ndevs = 0;
+ char * const *fnp = files + nfiles;
+
+ while (fnp --> files)
+ {
+ size_t dev_index;
+ struct stat st;
+
+ if (0 != stat (*fnp, &st))
+ {
+ error (SORT_FAILURE, 0, _("Could not stat `%s': %s"), *fnp,
+ strerror(errno));
+ abort ();
+ }
+
+ // Determine if any other files from this device have been found
+ for (dev_index = 0; dev_index < ndevs; dev_index++)
+ if (device_cmp (dev_map[dev_index], st))
+ break;
+
+ // If no other files have been checked, create all the
+ // necessary stuff for the new device
+ if (ndevs <= dev_index)
+ {
+ dev_index = ndevs;
+ // This is a little wasteful, but avoids the need to realloc
+ dev_files[dev_index] = xnmalloc (nfiles, sizeof **dev_files);
+ ndev_files[dev_index] = 0;
+ dev_map[dev_index] = st;
+ ndevs++;
+ }
+
+ // Add the filename to the device's file list
+ dev_files[dev_index][ndev_files[dev_index]++] = *fnp;
+ }
+
+ free (dev_map);
+ return ndevs;
+}
+
+/* Sort NFILES FILES onto OUTPUT_FILE.
+
+ Threading approach: Break FILES up into several groups where each contains
+ only files that can be found on the same physical device (according to
+ device_cmp()). Spawn threads to execute do_sort() on each group of files in
+ parallel.
+
+ This allows for all concerned resources (storage devices and processors) to
+ be more fully utilized.
+*/
+
+static void
+sort_multidisk (char * const *files, size_t nfiles, char const *output_file,
+ unsigned long int nthreads)
+{
+#if HAVE_LIBPTHREAD != 1
+ do_sort (files, nfiles, output_file, true);
+#else
+ // No point in spawning a new thread if just one input file
+ if (nfiles <= 1)
+ do_sort (files, nfiles, output_file, true);
+ else
+ {
+ char ***dev_files = xnmalloc (nfiles, sizeof *dev_files);
+ size_t *nfiles_on_dev = xnmalloc (nfiles, sizeof *nfiles_on_dev);
+ size_t ndevs = group_files_by_device (files, nfiles, dev_files,
+ nfiles_on_dev);
+
+ // Only one device, do no need to create any threads
+ if (ndevs <= 1)
+ {
+ free (dev_files[0]);
+ free (dev_files);
+ free (nfiles_on_dev);
+
+ do_sort (files, nfiles, output_file, true);
+ }
+ else
+ {
+ // There is no point in starting more threads than there are devices
+ unsigned long int nthreads_to_use = MIN (ndevs, nthreads);
+ pthread_t *threads = xnmalloc (nthreads_to_use, sizeof *threads);
+ unsigned long int tid = 0;
+ int ret_val;
+
+ struct sort_multidisk_thread_args args = {
+ .dev_files = dev_files,
+ .ndevs = ndevs,
+ .nfiles = nfiles_on_dev};
+
+ ret_val = pthread_mutex_init (&args.mutex, NULL);
+ xpthread_error (ret_val, "error while init'n mutex");
+
+ // Spawn threads to sort the device lists. The threads will keep
+ // running until all of the device lists have been sorted.
+ for (tid = 0; tid < nthreads_to_use; tid++)
+ {
+ ret_val = pthread_create (&threads[tid], NULL,
+ sort_multidisk_thread, &args);
+ xpthread_error (ret_val, "error while creating a thread");
+ }
+
+ // Wait for each thread to finish before merging
+ // We could potentially optimize this by beginning some
+ // merges while other threads are still sorting
+ // That's something to look into once we have something
+ // functional
+ for (tid = 0; tid < nthreads_to_use; tid++)
+ {
+ ret_val = pthread_join (threads[tid], NULL);
+ xpthread_error (ret_val, "error while joining a thread");
+ }
+
+ free (threads);
+
+ // Free all the memory allocated for device information
+ free (dev_files);
+ free (nfiles_on_dev);
+
+ ret_val = pthread_mutex_destroy (&args.mutex);
+ xpthread_error (ret_val, "error while destroying mutex");
+
+ // Merge all the temp files created by the threads
+ {
+ size_t i;
+ size_t ntemps = total_num_temps;
+
+ struct tempnode *node = temphead;
+ struct sortfile *tempfiles = xnmalloc (ntemps, sizeof *tempfiles);
+ for (i = 0; node; i++)
+ {
+ tempfiles[i].name = node->name;
+ tempfiles[i].pid = node->pid;
+ node = node->next;
+ }
+ merge (tempfiles, ntemps, ntemps, output_file);
+ free (tempfiles);
+ }
+ }
+ }
+#endif
+}
+
+static void
+sort (char * const *files, size_t nfiles, char const *output_file,
+ unsigned long int nthreads)
+{
+ sort_multidisk (files, nfiles, output_file, nthreads);
+}
+
/* Insert a malloc'd copy of key KEY_ARG at the end of the key list. */
static void
@@ -3164,6 +3462,7 @@ main (int argc, char **argv)
char *random_source = NULL;
bool need_random = false;
size_t nfiles = 0;
+ unsigned long int nthreads = 0;
bool posixly_correct = (getenv ("POSIXLY_CORRECT") != NULL);
bool obsolete_usage = (posix2_version () < 200112);
char **files;
@@ -3375,6 +3674,7 @@ main (int argc, char **argv)
if (compress_program && !STREQ (compress_program, optarg))
error (SORT_FAILURE, 0, _("multiple compress programs specified"));
compress_program = optarg;
+ xpthread_mutex_init (&reap_lock, NULL);
break;
case FILES0_FROM_OPTION:
@@ -3489,6 +3789,10 @@ main (int argc, char **argv)
add_temp_dir (optarg);
break;
+ case THREADS_OPTION:
+ nthreads = specify_nthreads (oi, c, optarg);
+ break;
+
case 'u':
unique = true;
break;
@@ -3637,6 +3941,7 @@ main (int argc, char **argv)
if (need_random)
{
+ xpthread_mutex_init (&randread_lock, NULL);
randread_source = randread_new (random_source, MD5_DIGEST_SIZE);
if (! randread_source)
die (_("open failed"), random_source);
@@ -3691,7 +3996,21 @@ main (int argc, char **argv)
IF_LINT (free (sortfiles));
}
else
- sort (files, nfiles, outfile);
+ {
+ if (!nthreads)
+ {
+ /* The default NTHREADS is 2 ** floor (log2 (number of processors)).
+ If comparisons do not vary in cost and threads are
+ scheduled evenly, with the current algorithm there is no
+ performance advantage to using a number of threads that
+ is not a power of 2. */
+ unsigned long int np2 = num_processors (NPROC_CURRENT) / 2;
+ for (nthreads = 1; nthreads <= np2; nthreads *= 2)
+ continue;
+ }
+
+ sort (files, nfiles, outfile, nthreads);
+ }
if (have_read_stdin && fclose (stdin) == EOF)
die (_("close failed"), "-");
--
1.7.0.1