[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PATCH 1/3] split: accept new output --filter=CMD option
From: |
Jim Meyering |
Subject: |
[PATCH 1/3] split: accept new output --filter=CMD option |
Date: |
Tue, 3 May 2011 10:54:10 +0200 |
From: Karl Heuer <address@hidden>
* src/split.c: Include <signal.h>, <sys/wait.h> and "sig2str.h".
(FILTER_OPTION): New anonymous enum member.
(filter_command, filter_pid): New globals.
(open_pipes, open_pipes_alloc, n_open_pipes): Likewise.
(oldblocked, newblocked): Likewise.
(longopts): Add "filter".
(usage): Document --filter.
(create): Extend to create a pipe and fork "sh -c CMD".
(closeout): Adapt to close a pipe and wait for child process.
(cwrite): Call closeout, not just close.
(lines_chunk_split): FIXME
(bytes_chunk_extract): FIXME
(opid, ofile_open, lines_rr, main): FIXME
(ignorable): New function, to encapsulate EPIPE test.
---
src/split.c | 227 +++++++++++++++++++++++++++++++++++++++++++++++++++++------
1 files changed, 204 insertions(+), 23 deletions(-)
diff --git a/src/split.c b/src/split.c
index 65e44dd..05315e6 100644
--- a/src/split.c
+++ b/src/split.c
@@ -25,7 +25,9 @@
#include <assert.h>
#include <stdio.h>
#include <getopt.h>
+#include <signal.h>
#include <sys/types.h>
+#include <sys/wait.h>
#include "system.h"
#include "error.h"
@@ -35,6 +37,7 @@
#include "full-write.h"
#include "quote.h"
#include "safe-read.h"
+#include "sig2str.h"
#include "xfreopen.h"
#include "xstrtol.h"
@@ -45,6 +48,21 @@
proper_name_utf8 ("Torbjorn Granlund", "Torbj\303\266rn Granlund"), \
proper_name ("Richard M. Stallman")
+/* Shell command to filter through, instead of creating files. */
+static char const *filter_command;
+
+/* Process ID of the filter. */
+static int filter_pid;
+
+/* Array of open pipes. */
+static int *open_pipes;
+static size_t open_pipes_alloc;
+static size_t n_open_pipes;
+
+/* Blocked signals. */
+static sigset_t oldblocked;
+static sigset_t newblocked;
+
/* Base name of output files. */
static char const *outbase;
@@ -90,6 +108,7 @@ enum Split_type
enum
{
VERBOSE_OPTION = CHAR_MAX + 1,
+ FILTER_OPTION,
IO_BLKSIZE_OPTION
};
@@ -103,6 +122,7 @@ static struct option const longopts[] =
{"unbuffered", no_argument, NULL, 'u'},
{"suffix-length", required_argument, NULL, 'a'},
{"numeric-suffixes", no_argument, NULL, 'd'},
+ {"filter", required_argument, NULL, FILTER_OPTION},
{"verbose", no_argument, NULL, VERBOSE_OPTION},
{"-io-blksize", required_argument, NULL,
IO_BLKSIZE_OPTION}, /* do not document */
@@ -111,6 +131,13 @@ static struct option const longopts[] =
{NULL, 0, NULL, 0}
};
+/* Return true if the errno value, ERR, is ignorable. */
+static inline bool
+ignorable (int err)
+{
+ return filter_command && err == EPIPE;
+}
+
static void
set_suffix_length (uintmax_t n_units, enum Split_type split_type)
{
@@ -170,6 +197,7 @@ Mandatory arguments to long options are mandatory for short
options too.\n\
-C, --line-bytes=SIZE put at most SIZE bytes of lines per output file\n\
-d, --numeric-suffixes use numeric suffixes instead of alphabetic\n\
-e, --elide-empty-files do not generate empty output files with `-n'\n\
+ --filter=COMMAND write to shell COMMAND; file name is $FILE\n\
-l, --lines=NUMBER put NUMBER lines per output file\n\
-n, --number=CHUNKS generate CHUNKS output files. See below\n\
-u, --unbuffered immediately copy input to output with `-n r/...'\n\
@@ -256,10 +284,123 @@ next_file_name (void)
static int
create (const char *name)
{
- if (verbose)
- fprintf (stdout, _("creating file %s\n"), quote (name));
- return open (name, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY,
- (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH));
+ if (!filter_command)
+ {
+ if (verbose)
+ fprintf (stdout, _("creating file %s\n"), quote (name));
+ return open (name, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY,
+ (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH |
S_IWOTH));
+ }
+ else
+ {
+ int fd_pair[2];
+ pid_t child_pid;
+ char const *shell_prog = getenv ("SHELL");
+ if (shell_prog == NULL)
+ shell_prog = "/bin/sh";
+ if (setenv ("FILE", name, 1) != 0)
+ error (EXIT_FAILURE, errno,
+ _("failed to set FILE environment variable"));
+ if (verbose)
+ fprintf (stdout, _("executing with FILE=%s\n"), quote (name));
+ if (pipe (fd_pair) != 0)
+ error (EXIT_FAILURE, errno, _("failed to create pipe"));
+ child_pid = fork ();
+ if (child_pid == 0)
+ {
+ /* This is the child process. If an error occurs here, the
+ parent will eventually learn about it after doing a wait,
+ at which time it will emit its own error message. */
+ int j;
+ /* We have to close any pipes that were opened during an
+ earlier call, otherwise this process will be holding a
+ write-pipe that will prevent the earlier process from
+ reading an EOF on the corresponding read-pipe. */
+ for (j = 0; j < n_open_pipes; ++j)
+ if (close (open_pipes[j]) != 0)
+ error (EXIT_FAILURE, errno, _("closing prior pipe"));
+ if (close (fd_pair[1]))
+ error (EXIT_FAILURE, errno, _("closing output pipe"));
+ if (fd_pair[0] != STDIN_FILENO)
+ {
+ if (dup2 (fd_pair[0], STDIN_FILENO) != STDIN_FILENO)
+ error (EXIT_FAILURE, errno, _("moving input pipe"));
+ if (close (fd_pair[0]) != 0)
+ error (EXIT_FAILURE, errno, _("closing input pipe"));
+ }
+ sigprocmask (SIG_SETMASK, &oldblocked, NULL);
+ execl (shell_prog, last_component (shell_prog), "-c",
+ filter_command, (char *) NULL);
+ error (EXIT_FAILURE, errno, _("failed to run command: \"%s -c %s\""),
+ shell_prog, filter_command);
+ }
+ if (child_pid == -1)
+ error (EXIT_FAILURE, errno, _("fork system call failed"));
+ if (close (fd_pair[0]) != 0)
+ error (EXIT_FAILURE, errno, _("failed to close input pipe"));
+ filter_pid = child_pid;
+ if (n_open_pipes == open_pipes_alloc)
+ open_pipes = x2nrealloc (open_pipes, &open_pipes_alloc,
+ sizeof *open_pipes);
+ open_pipes[n_open_pipes++] = fd_pair[1];
+ return fd_pair[1];
+ }
+}
+
+/* Close the output file, and do any associated cleanup.
+ If FP and FD are both specified, they refer to the same open file;
+ in this case FP is closed, but FD is still used in cleanup. */
+static void
+closeout (FILE *fp, int fd, pid_t pid, char const *name)
+{
+ if (fp != NULL && fclose (fp) != 0 && ! ignorable (errno))
+ error (EXIT_FAILURE, errno, "%s", name);
+ if (fd >= 0)
+ {
+ if (fp == NULL && close (fd) < 0)
+ error (EXIT_FAILURE, errno, "%s", name);
+ int j;
+ for (j = 0; j < n_open_pipes; ++j)
+ {
+ if (open_pipes[j] == fd)
+ {
+ open_pipes[j] = open_pipes[--n_open_pipes];
+ break;
+ }
+ }
+ }
+ if (pid > 0)
+ {
+ int wstatus = 0;
+ if (waitpid (pid, &wstatus, 0) == -1 && errno != ECHILD)
+ error (EXIT_FAILURE, errno, _("waiting for child process"));
+ if (WIFSIGNALED (wstatus))
+ {
+ int sig = WTERMSIG (wstatus);
+ if (sig != SIGPIPE)
+ {
+ char signame[MAX (SIG2STR_MAX, INT_BUFSIZE_BOUND (int))];
+ if (sig2str (sig, signame) != 0)
+ sprintf (signame, "%d", sig);
+ error (sig + 128, 0,
+ _("with FILE=%s, signal %s (%s) from command: %s"),
+ name, signame, strsignal (sig), filter_command);
+ }
+ }
+ else if (WIFEXITED (wstatus))
+ {
+ int ex = WEXITSTATUS (wstatus);
+ if (ex != 0)
+ error (ex, 0, _("with FILE=%s, exit %d from command: %s"),
+ name, ex, filter_command);
+ }
+ else
+ {
+ /* shouldn't happen. */
+ error (EXIT_FAILURE, 0,
+ _("unknown status from command (0x%X)"), wstatus);
+ }
+ }
}
/* Write BYTES bytes at BP to an output file.
@@ -273,13 +414,12 @@ cwrite (bool new_file_flag, const char *bp, size_t bytes)
{
if (!bp && bytes == 0 && elide_empty_files)
return;
- if (output_desc >= 0 && close (output_desc) < 0)
- error (EXIT_FAILURE, errno, "%s", outfile);
+ closeout (NULL, output_desc, filter_pid, outfile);
next_file_name ();
if ((output_desc = create (outfile)) < 0)
error (EXIT_FAILURE, errno, "%s", outfile);
}
- if (full_write (output_desc, bp, bytes) != bytes)
+ if (full_write (output_desc, bp, bytes) != bytes && ! ignorable (errno))
error (EXIT_FAILURE, errno, "%s", outfile);
}
@@ -501,7 +641,8 @@ lines_chunk_split (uintmax_t k, uintmax_t n, char *buf,
size_t bufsize,
/* We don't use the stdout buffer here since we're writing
large chunks from an existing file, so it's more efficient
to write out directly. */
- if (full_write (STDOUT_FILENO, bp, to_write) != to_write)
+ if (full_write (STDOUT_FILENO, bp, to_write) != to_write
+ && ! ignorable (errno))
error (EXIT_FAILURE, errno, "%s", _("write error"));
}
else
@@ -564,7 +705,8 @@ bytes_chunk_extract (uintmax_t k, uintmax_t n, char *buf,
size_t bufsize,
error (EXIT_FAILURE, errno, "%s", infile);
else if (n_read == 0)
break; /* eof. */
- if (full_write (STDOUT_FILENO, buf, n_read) != n_read)
+ if (full_write (STDOUT_FILENO, buf, n_read) != n_read
+ && ! ignorable (errno))
error (EXIT_FAILURE, errno, "%s", quote ("-"));
start += n_read;
}
@@ -575,6 +717,7 @@ typedef struct of_info
char *of_name;
int ofd;
FILE *ofile;
+ int opid;
} of_t;
enum
@@ -637,14 +780,17 @@ ofile_open (of_t *files, size_t i_check, size_t nfiles)
error (EXIT_FAILURE, errno, "%s", files[i_check].of_name);
}
- if (fclose (files[i_reopen].ofile) != 0)
+ if (fclose (files[i_reopen].ofile) != 0 && ! ignorable (errno))
error (EXIT_FAILURE, errno, "%s", files[i_reopen].of_name);
+ files[i_reopen].ofile = NULL;
files[i_reopen].ofd = OFD_APPEND;
}
files[i_check].ofd = fd;
if (!(files[i_check].ofile = fdopen (fd, "a")))
error (EXIT_FAILURE, errno, "%s", files[i_check].of_name);
+ files[i_check].opid = filter_pid;
+ filter_pid = 0;
}
return file_limit;
@@ -658,6 +804,7 @@ ofile_open (of_t *files, size_t i_check, size_t nfiles)
static void
lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t bufsize)
{
+ bool wrapped = false;
bool file_limit;
size_t i_file;
of_t *files IF_LINT (= NULL);
@@ -678,6 +825,7 @@ lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t
bufsize)
files[i_file].of_name = xstrdup (outfile);
files[i_file].ofd = OFD_NEW;
files[i_file].ofile = NULL;
+ files[i_file].opid = 0;
}
i_file = 0;
file_limit = false;
@@ -715,10 +863,12 @@ lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t
bufsize)
{
if (line_no == k && unbuffered)
{
- if (full_write (STDOUT_FILENO, bp, to_write) != to_write)
+ if (full_write (STDOUT_FILENO, bp, to_write) != to_write
+ && ! ignorable (errno))
error (EXIT_FAILURE, errno, "%s", _("write error"));
}
- else if (line_no == k && fwrite (bp, to_write, 1, stdout) != 1)
+ else if (line_no == k && fwrite (bp, to_write, 1, stdout) != 1
+ && ! ignorable (errno))
{
clearerr (stdout); /* To silence close_stdout(). */
error (EXIT_FAILURE, errno, "%s", _("write error"));
@@ -734,19 +884,25 @@ lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t
bufsize)
{
/* Note writing to fd, rather than flushing the FILE gives
an 8% performance benefit, due to reduced data copying.
*/
- if (full_write (files[i_file].ofd, bp, to_write) != to_write)
+ if (full_write (files[i_file].ofd, bp, to_write) != to_write
+ && ! ignorable (errno))
error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
}
- else if (fwrite (bp, to_write, 1, files[i_file].ofile) != 1)
+ else if (fwrite (bp, to_write, 1, files[i_file].ofile) != 1
+ && ! ignorable (errno))
error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
if (file_limit)
{
- if (fclose (files[i_file].ofile) != 0)
+ if (fclose (files[i_file].ofile) != 0 && ! ignorable (errno))
error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
+ files[i_file].ofile = NULL;
files[i_file].ofd = OFD_APPEND;
}
if (next && ++i_file == n)
- i_file = 0;
+ {
+ wrapped = true;
+ i_file = 0;
+ }
}
bp = bp_out;
@@ -757,11 +913,18 @@ lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t
bufsize)
and to signal any waiting fifo consumers.
Also, close any open file descriptors.
FIXME: Should we do this before EXIT_FAILURE? */
- for (i_file = 0; !k && !elide_empty_files && i_file < n; i_file++)
+ if (!k)
{
- file_limit |= ofile_open (files, i_file, n);
- if (fclose (files[i_file].ofile) != 0)
- error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
+ int ceiling = (wrapped ? n : i_file);
+ for (i_file = 0; i_file < n; i_file++)
+ {
+ if (i_file >= ceiling && !elide_empty_files)
+ file_limit |= ofile_open (files, i_file, n);
+ if (files[i_file].ofd >= 0)
+ closeout (files[i_file].ofile, files[i_file].ofd,
+ files[i_file].opid, files[i_file].of_name);
+ files[i_file].ofd = OFD_APPEND;
+ }
}
}
@@ -824,7 +987,8 @@ main (int argc, char **argv)
int this_optind = optind ? optind : 1;
char *slash;
- c = getopt_long (argc, argv, "0123456789C:a:b:del:n:u", longopts, NULL);
+ c = getopt_long (argc, argv, "0123456789C:a:b:del:n:u",
+ longopts, NULL);
if (c == -1)
break;
@@ -955,6 +1119,10 @@ main (int argc, char **argv)
elide_empty_files = true;
break;
+ case FILTER_OPTION:
+ filter_command = optarg;
+ break;
+
case IO_BLKSIZE_OPTION:
{
uintmax_t tmp_blk_size;
@@ -1048,6 +1216,18 @@ main (int argc, char **argv)
buf = ptr_align (xmalloc (in_blk_size + 1 + page_size - 1), page_size);
+ /* When filtering, closure of one pipe must not terminate the process,
+ as there may still be other streams expecting input from us. */
+ sigemptyset (&newblocked);
+ if (filter_command)
+ {
+ struct sigaction act;
+ sigaction (SIGPIPE, NULL, &act);
+ if (act.sa_handler != SIG_IGN)
+ sigaddset (&newblocked, SIGPIPE);
+ }
+ sigprocmask (SIG_BLOCK, &newblocked, &oldblocked);
+
switch (split_type)
{
case type_digits:
@@ -1084,10 +1264,11 @@ main (int argc, char **argv)
abort ();
}
+ sigprocmask (SIG_SETMASK, &oldblocked, NULL);
+
if (close (STDIN_FILENO) != 0)
error (EXIT_FAILURE, errno, "%s", infile);
- if (output_desc >= 0 && close (output_desc) < 0)
- error (EXIT_FAILURE, errno, "%s", outfile);
+ closeout (NULL, output_desc, filter_pid, outfile);
exit (EXIT_SUCCESS);
}
--
1.7.5.141.g791a