From 5403741c2d5bc814c2f722d8e33537469ab2d064 Mon Sep 17 00:00:00 2001 From: Joey Degges Date: Tue, 2 Mar 2010 23:59:28 -0800 Subject: [PATCH] sort: parallel external sort implementation - input files are broken up into groups based on device id - each group of files is sorted in a separate thread - merging is carried out as usual - sorting speeups of 75% have been measured with 4 cores and 4 devices (not considering merge or IO time) - overall speedups of 60% have been measured considering merge and IO time Authors: Matt Ball Joey Degges --- bootstrap.conf | 1 + configure.ac | 3 + src/Makefile.am | 3 + src/sort.c | 333 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 333 insertions(+), 7 deletions(-) diff --git a/bootstrap.conf b/bootstrap.conf index 9cdf984..0c69e93 100644 --- a/bootstrap.conf +++ b/bootstrap.conf @@ -171,6 +171,7 @@ gnulib_modules=" priv-set progname propername + pthread putenv quote quotearg diff --git a/configure.ac b/configure.ac index b07a52b..7f8969a 100644 --- a/configure.ac +++ b/configure.ac @@ -344,6 +344,9 @@ if test "$elf_sys" = "yes" && \ gl_ADD_PROG([optional_pkglib_progs], [libstdbuf.so]) fi +# Check for pthreads +AC_CHECK_LIB(pthread, pthread_create) + ############################################################################ mk="$srcdir/src/Makefile.am" # Extract all literal names from the definition of $(EXTRA_PROGRAMS) diff --git a/src/Makefile.am b/src/Makefile.am index ecb42a8..f44974e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -391,6 +391,9 @@ who_LDADD += $(GETADDRINFO_LIB) hostname_LDADD += $(GETHOSTNAME_LIB) uname_LDADD += $(GETHOSTNAME_LIB) +# for pthread +sort_LDADD += $(LIB_PTHREAD) + $(PROGRAMS): ../lib/libcoreutils.a # Get the release year from ../lib/version-etc.c. diff --git a/src/sort.c b/src/sort.c index 39cb6d6..b00bea8 100644 --- a/src/sort.c +++ b/src/sort.c @@ -33,6 +33,7 @@ #include "hard-locale.h" #include "hash.h" #include "md5.h" +#include "nproc.h" #include "physmem.h" #include "posixver.h" #include "quote.h" @@ -47,6 +48,34 @@ #include "xnanosleep.h" #include "xstrtol.h" +#if HAVE_LIBPTHREAD +# include +# define xpthread_error(rv, msg) { \ + if (rv) \ + { \ + error (SORT_FAILURE, 0, _("%s - %d: %s\n"), msg, rv, strerror (rv)); \ + exit (SORT_FAILURE); \ + } \ +} +# define xpthread_mutex_init(mutex, attr) { \ + int rv = pthread_mutex_init (mutex, attr); \ + xpthread_error (rv, "pthread_mutex_init"); \ +} +# define xpthread_mutex_lock(mutex) { \ + int rv = pthread_mutex_lock (mutex); \ + xpthread_error (rv, "pthread_mutex_lock"); \ +} +# define xpthread_mutex_unlock(mutex) { \ + int rv = pthread_mutex_unlock (mutex); \ + xpthread_error (rv, "pthread_mutex_unlock"); \ +} +#else +# define xpthread_error(rv) +# define xpthread_mutex_init(mutex, attr) +# define xpthread_mutex_lock(mutex) +# define xpthread_mutex_unlock(mutex) +#endif + #if HAVE_SYS_RESOURCE_H # include #endif @@ -388,6 +417,7 @@ Other options:\n\ -t, --field-separator=SEP use SEP instead of non-blank to blank transition\n\ -T, --temporary-directory=DIR use DIR for temporaries, not $TMPDIR or %s;\n\ multiple options specify multiple directories\n\ + --threads=N use no more than N threads to improve parallelism\n\ -u, --unique with -c, check for strict ordering;\n\ without -c, output only the first of an equal run\n\ "), DEFAULT_TMPDIR); @@ -431,7 +461,8 @@ enum FILES0_FROM_OPTION, NMERGE_OPTION, RANDOM_SOURCE_OPTION, - SORT_OPTION + SORT_OPTION, + THREADS_OPTION }; static char const short_options[] = "-bcCdfghik:mMno:rRsS:t:T:uVy:z"; @@ -464,6 +495,7 @@ static struct option const long_options[] = {"temporary-directory", required_argument, NULL, 'T'}, {"unique", no_argument, NULL, 'u'}, {"zero-terminated", no_argument, NULL, 'z'}, + {"threads", required_argument, NULL, THREADS_OPTION}, {GETOPT_HELP_OPTION_DECL}, {GETOPT_VERSION_OPTION_DECL}, {NULL, 0, NULL, 0}, @@ -547,6 +579,7 @@ struct tempnode }; static struct tempnode *volatile temphead; static struct tempnode *volatile *temptail = &temphead; +size_t total_num_temps = 0; struct sortfile { @@ -704,13 +737,17 @@ wait_proc (pid_t pid) This doesn't block waiting for any of them, it only reaps those that are already dead. */ +pthread_mutex_t reap_lock; + static void reap_some (void) { pid_t pid; + xpthread_mutex_lock (&reap_lock); while (0 < nprocs && (pid = reap (-1))) update_proc (pid); + xpthread_mutex_unlock (&reap_lock); } /* Clean up any remaining temporary files. */ @@ -776,6 +813,7 @@ create_temp_file (int *pfd, bool survive_fd_exhaustion) { *temptail = node; temptail = &node->next; + total_num_temps++; } saved_errno = errno; cs_leave (cs); @@ -930,15 +968,21 @@ pipe_fork (int pipefds[2], size_t tries) fails, return NULL if the failure is due to file descriptor exhaustion and SURVIVE_FD_EXHAUSTION; otherwise, die. */ +pthread_mutex_t temp_file_lock; + static char * maybe_create_temp (FILE **pfp, pid_t *ppid, bool survive_fd_exhaustion) { + xpthread_mutex_lock (&temp_file_lock); int tempfd; struct tempnode *node = create_temp_file (&tempfd, survive_fd_exhaustion); char *name; if (! node) - return NULL; + { + xpthread_mutex_unlock (&temp_file_lock); + return NULL; + } name = node->name; @@ -978,6 +1022,7 @@ maybe_create_temp (FILE **pfp, pid_t *ppid, bool survive_fd_exhaustion) if (ppid) *ppid = node->pid; + xpthread_mutex_unlock (&temp_file_lock); return name; } @@ -1265,6 +1310,21 @@ specify_sort_size (int oi, char c, char const *s) xstrtol_fatal (e, oi, c, long_options, s); } +/* Specify the number of threads to spawn during internal sort. */ +static unsigned long int +specify_nthreads (int oi, char c, char const *s) +{ + unsigned long int nthreads; + enum strtol_error e = xstrtoul (s, NULL, 10, &nthreads, ""); + if (e == LONGINT_OVERFLOW) + return ULONG_MAX; + if (e != LONGINT_OK) + xstrtol_fatal (e, oi, c, long_options, s); + if (nthreads == 0) + error (SORT_FAILURE, 0, _("number of threads must be nonzero")); + return nthreads; +} + /* Return the default sort size. */ static size_t default_sort_size (void) @@ -1862,6 +1922,9 @@ getmonth (char const *month, size_t len) /* A source of random data. */ static struct randread_source *randread_source; +/* A mutex to lock randread_source function */ +pthread_mutex_t randread_lock; + /* Return the Ith randomly-generated state. The caller must invoke random_state (H) for all H less than I before invoking random_state (I). */ @@ -1889,7 +1952,10 @@ random_state (size_t i) s = &state[i]; } + xpthread_mutex_lock (&randread_lock); randread (randread_source, buf, sizeof buf); + xpthread_mutex_unlock (&randread_lock); + md5_init_ctx (s); md5_process_bytes (buf, sizeof buf, s); } @@ -2859,10 +2925,12 @@ merge (struct sortfile *files, size_t ntemps, size_t nfiles, } } -/* Sort NFILES FILES onto OUTPUT_FILE. */ +/* Sort NFILES FILES into OUTPUT_FILE if should_output is true and into + temporary files if false. */ static void -sort (char * const *files, size_t nfiles, char const *output_file) +do_sort (char * const *files, size_t nfiles, char const *output_file, + const bool should_output) { struct buffer buf; size_t ntemps = 0; @@ -2906,7 +2974,7 @@ sort (char * const *files, size_t nfiles, char const *output_file) linebase = line - buf.nlines; if (1 < buf.nlines) sortlines (line, buf.nlines, linebase); - if (buf.eof && !nfiles && !ntemps && !buf.left) + if (should_output && buf.eof && !nfiles && !ntemps && !buf.left) { xfclose (fp, file); tfp = xfopen (output_file, "w"); @@ -2944,7 +3012,7 @@ sort (char * const *files, size_t nfiles, char const *output_file) finish: free (buf.buf); - if (! output_file_created) + if (should_output && ! output_file_created) { size_t i; struct tempnode *node = temphead; @@ -2960,6 +3028,236 @@ sort (char * const *files, size_t nfiles, char const *output_file) } } +/* Thread arguments for sort_thread. */ +struct sort_multidisk_thread_args +{ + char ***dev_files; + size_t ndevs; + size_t *nfiles; + pthread_mutex_t mutex; +}; + +/* Tries to sort files from one device at a time. Multiple instances can run + with the same arguments concurrently. Each instance will sort the files from + a different device. */ + +static void * +sort_multidisk_thread (void *data) +{ +#if HAVE_LIBPTHREAD + struct sort_multidisk_thread_args *args = data; + char ***dev_files = args->dev_files; + size_t ndevs = args->ndevs; + size_t *nfiles = args->nfiles; + size_t cur_dev = 0; + int ret_val; + + while (cur_dev < ndevs) + { + char **files = NULL; + + // Find next available list of device files to sort + ret_val = pthread_mutex_lock (&args->mutex); + xpthread_error (ret_val, "error while locking mutex"); + for (; cur_dev < ndevs; cur_dev++) + { + if (NULL == (files = dev_files[cur_dev])) + continue; + // Tell other threads that this device list is no longer available + dev_files[cur_dev] = NULL; + break; + } + ret_val = pthread_mutex_unlock (&args->mutex); + xpthread_error (ret_val, "error while unlocking mutex"); + + if (NULL == files) + return NULL; + + do_sort (files, nfiles[cur_dev], NULL, false); + + // Free the device list here, no one else has a reference to it anymore + free (files); + } +#endif + return NULL; +} + +/* Compare the device that A and B are located on. Returns true if A and B are + on the same physical device and false otherwise */ + +static bool +device_cmp (struct stat a, struct stat b) +{ + return a.st_dev == b.st_dev; +} + +/* Group each file of the NFILES in FILES by the physical device that the files + are located on. Files on the same device are added to the same index of + DEV_FILES. NDEV_FILES contains a list with the number of files on each + device. DEV_FILES and NDEV_FILES should already be allocated and at least + NFILES long. + + DEV_FILES[i] is a list containing files that are all on the same device + NDEV_FILES[i] is the length of the DEV_FILES[i] list. + + The total number of devices found is returned. New array pointers are + allocated for each of the device groups in DEV_FILES. */ + +static size_t +group_files_by_device (char * const *files, size_t nfiles, + char ***dev_files, size_t *ndev_files) +{ + struct stat *dev_map = xnmalloc (nfiles, sizeof *dev_map); + size_t ndevs = 0; + char * const *fnp = files + nfiles; + + while (fnp --> files) + { + size_t dev_index; + struct stat st; + + if (0 != stat (*fnp, &st)) + { + error (SORT_FAILURE, 0, _("Could not stat `%s': %s"), *fnp, + strerror(errno)); + abort (); + } + + // Determine if any other files from this device have been found + for (dev_index = 0; dev_index < ndevs; dev_index++) + if (device_cmp (dev_map[dev_index], st)) + break; + + // If no other files have been checked, create all the + // necessary stuff for the new device + if (ndevs <= dev_index) + { + dev_index = ndevs; + // This is a little wasteful, but avoids the need to realloc + dev_files[dev_index] = xnmalloc (nfiles, sizeof **dev_files); + ndev_files[dev_index] = 0; + dev_map[dev_index] = st; + ndevs++; + } + + // Add the filename to the device's file list + dev_files[dev_index][ndev_files[dev_index]++] = *fnp; + } + + free (dev_map); + return ndevs; +} + +/* Sort NFILES FILES onto OUTPUT_FILE. + + Threading approach: Break FILES up into several groups where each contains + only files that can be found on the same physical device (according to + device_cmp()). Spawn threads to execute do_sort() on each group of files in + parallel. + + This allows for all concerned resources (storage devices and processors) to + be more fully utilized. +*/ + +static void +sort_multidisk (char * const *files, size_t nfiles, char const *output_file, + unsigned long int nthreads) +{ +#if HAVE_LIBPTHREAD != 1 + do_sort (files, nfiles, output_file, true); +#else + // No point in spawning a new thread if just one input file + if (nfiles <= 1) + do_sort (files, nfiles, output_file, true); + else + { + char ***dev_files = xnmalloc (nfiles, sizeof *dev_files); + size_t *nfiles_on_dev = xnmalloc (nfiles, sizeof *nfiles_on_dev); + size_t ndevs = group_files_by_device (files, nfiles, dev_files, + nfiles_on_dev); + + // Only one device, do no need to create any threads + if (ndevs <= 1) + { + free (dev_files[0]); + free (dev_files); + free (nfiles_on_dev); + + do_sort (files, nfiles, output_file, true); + } + else + { + // There is no point in starting more threads than there are devices + unsigned long int nthreads_to_use = MIN (ndevs, nthreads); + pthread_t *threads = xnmalloc (nthreads_to_use, sizeof *threads); + unsigned long int tid = 0; + int ret_val; + + struct sort_multidisk_thread_args args = { + .dev_files = dev_files, + .ndevs = ndevs, + .nfiles = nfiles_on_dev}; + + ret_val = pthread_mutex_init (&args.mutex, NULL); + xpthread_error (ret_val, "error while init'n mutex"); + + // Spawn threads to sort the device lists. The threads will keep + // running until all of the device lists have been sorted. + for (tid = 0; tid < nthreads_to_use; tid++) + { + ret_val = pthread_create (&threads[tid], NULL, + sort_multidisk_thread, &args); + xpthread_error (ret_val, "error while creating a thread"); + } + + // Wait for each thread to finish before merging + // We could potentially optimize this by beginning some + // merges while other threads are still sorting + // That's something to look into once we have something + // functional + for (tid = 0; tid < nthreads_to_use; tid++) + { + ret_val = pthread_join (threads[tid], NULL); + xpthread_error (ret_val, "error while joining a thread"); + } + + free (threads); + + // Free all the memory allocated for device information + free (dev_files); + free (nfiles_on_dev); + + ret_val = pthread_mutex_destroy (&args.mutex); + xpthread_error (ret_val, "error while destroying mutex"); + + // Merge all the temp files created by the threads + { + size_t i; + size_t ntemps = total_num_temps; + + struct tempnode *node = temphead; + struct sortfile *tempfiles = xnmalloc (ntemps, sizeof *tempfiles); + for (i = 0; node; i++) + { + tempfiles[i].name = node->name; + tempfiles[i].pid = node->pid; + node = node->next; + } + merge (tempfiles, ntemps, ntemps, output_file); + free (tempfiles); + } + } + } +#endif +} + +static void +sort (char * const *files, size_t nfiles, char const *output_file, + unsigned long int nthreads) +{ + sort_multidisk (files, nfiles, output_file, nthreads); +} + /* Insert a malloc'd copy of key KEY_ARG at the end of the key list. */ static void @@ -3164,6 +3462,7 @@ main (int argc, char **argv) char *random_source = NULL; bool need_random = false; size_t nfiles = 0; + unsigned long int nthreads = 0; bool posixly_correct = (getenv ("POSIXLY_CORRECT") != NULL); bool obsolete_usage = (posix2_version () < 200112); char **files; @@ -3375,6 +3674,7 @@ main (int argc, char **argv) if (compress_program && !STREQ (compress_program, optarg)) error (SORT_FAILURE, 0, _("multiple compress programs specified")); compress_program = optarg; + xpthread_mutex_init (&reap_lock, NULL); break; case FILES0_FROM_OPTION: @@ -3489,6 +3789,10 @@ main (int argc, char **argv) add_temp_dir (optarg); break; + case THREADS_OPTION: + nthreads = specify_nthreads (oi, c, optarg); + break; + case 'u': unique = true; break; @@ -3637,6 +3941,7 @@ main (int argc, char **argv) if (need_random) { + xpthread_mutex_init (&randread_lock, NULL); randread_source = randread_new (random_source, MD5_DIGEST_SIZE); if (! randread_source) die (_("open failed"), random_source); @@ -3691,7 +3996,21 @@ main (int argc, char **argv) IF_LINT (free (sortfiles)); } else - sort (files, nfiles, outfile); + { + if (!nthreads) + { + /* The default NTHREADS is 2 ** floor (log2 (number of processors)). + If comparisons do not vary in cost and threads are + scheduled evenly, with the current algorithm there is no + performance advantage to using a number of threads that + is not a power of 2. */ + unsigned long int np2 = num_processors (NPROC_CURRENT) / 2; + for (nthreads = 1; nthreads <= np2; nthreads *= 2) + continue; + } + + sort (files, nfiles, outfile, nthreads); + } if (have_read_stdin && fclose (stdin) == EOF) die (_("close failed"), "-"); -- 1.7.0.1