diff --git a/libpoke/Makefile.am b/libpoke/Makefile.am index bab9793f..51abef3a 100644 --- a/libpoke/Makefile.am +++ b/libpoke/Makefile.am @@ -55,7 +55,9 @@ libpoke_la_SOURCES = libpoke.h libpoke.c \ pvm-program.h pvm-program.c \ pvm.jitter \ ios.c ios.h ios-dev.h \ - ios-dev-file.c ios-dev-mem.c + ios-dev-file.c ios-dev-mem.c \ + ios-buffer.h ios-buffer.c \ + ios-dev-stream.c libpoke_la_SOURCES += ../common/pk-utils.c ../common/pk-utils.h diff --git a/libpoke/ios-buffer.c b/libpoke/ios-buffer.c new file mode 100644 index 00000000..f3e1ad63 --- /dev/null +++ b/libpoke/ios-buffer.c @@ -0,0 +1,255 @@ +/* ios-buffer.c - The buffer for IO devices. */ + +/* Copyright (C) 2020 Egeyar Bagcioglu */ + +/* This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include + +#include "ios.h" +#include "ios-dev.h" + +#define IOB_CHUNK_SIZE 2048 +#define IOB_BUCKET_COUNT 8 + +#define IOB_CHUNK_OFFSET(offset) \ + ((offset) % IOB_CHUNK_SIZE) + +#define IOB_CHUNK_NO(offset) \ + ((offset) / IOB_CHUNK_SIZE) + +#define IOB_BUCKET_NO(chunk_no) \ + ((chunk_no) % IOB_BUCKET_COUNT) + +struct ios_buffer_chunk +{ + uint8_t bytes[IOB_CHUNK_SIZE]; + int chunk_no; + struct ios_buffer_chunk *next; +}; + +/* begin_offset is the first offset that's not yet flushed, initilized as 0. + end_offset of an instream is the next byte to read to. end_offset of an + outstream is the successor of the greatest offset that is written to. */ + +struct ios_buffer +{ + struct ios_buffer_chunk* chunks[IOB_BUCKET_COUNT]; + ios_dev_off begin_offset; + ios_dev_off end_offset; + int next_chunk_no; +}; + +ios_dev_off +ios_buffer_get_begin_offset (struct ios_buffer *buffer) { + return buffer->begin_offset; +} + +ios_dev_off +ios_buffer_get_end_offset (struct ios_buffer *buffer) { + return buffer->end_offset; +} + +struct ios_buffer * +ios_buffer_init () +{ + struct ios_buffer *bio = calloc (1, sizeof (struct ios_buffer)); + return bio; +} + +int +ios_buffer_free (struct ios_buffer *buffer) +{ + struct ios_buffer_chunk *chunk, *chunk_next; + for (int i = 0; i < IOB_BUCKET_COUNT; i++) + { + chunk = buffer->chunks[i]; + while (chunk) + { + chunk_next = chunk->next; + free (chunk); + chunk = chunk_next; + } + } + + free (buffer); + return 1; +} + +struct ios_buffer_chunk* +ios_buffer_get_chunk (struct ios_buffer *buffer, int chunk_no) +{ + int bucket_no = IOB_BUCKET_NO (chunk_no); + struct ios_buffer_chunk *chunk = buffer->chunks[bucket_no]; + + for ( ; chunk; chunk = chunk->next) + if (chunk->chunk_no == chunk_no) + return chunk; + + return NULL; +} + +int +ios_buffer_allocate_new_chunk (struct ios_buffer *buffer, int final_chunk_no, + struct ios_buffer_chunk **final_chunk) +{ + struct ios_buffer_chunk *chunk; + int bucket_no; + + assert (buffer->next_chunk_no <= final_chunk_no); + + do + { + chunk = calloc (1, sizeof (struct ios_buffer_chunk)); + if (!chunk) + return IOD_ERROR; + /* Place the new chunk into the buffer. */ + chunk->chunk_no = buffer->next_chunk_no; + bucket_no = IOB_BUCKET_NO (chunk->chunk_no); + chunk->next = buffer->chunks[bucket_no]; + buffer->chunks[bucket_no] = chunk; + buffer->next_chunk_no++; + } + while (buffer->next_chunk_no <= final_chunk_no); + + /* end_offset is updated as the buffer is written to. Therefore, it is not + updated here, but in ios_buffer_pwrite. */ + *final_chunk = chunk; + return 0; +} + +/* Since ios_dev_stream_pread already needs to check begin_offset and + end_offset, so this function does not. It assumes that the given range + already exists in the buffer. */ + +int +ios_buffer_pread (struct ios_buffer *buffer, void *buf, size_t count, + ios_dev_off offset) +{ + int chunk_no; + struct ios_buffer_chunk *chunk; + ios_dev_off chunk_offset; + size_t already_read_count = 0, + to_be_read_count = 0; + + chunk_no = IOB_CHUNK_NO (offset); + chunk_offset = IOB_CHUNK_OFFSET (offset); + chunk = ios_buffer_get_chunk (buffer, chunk_no); + if (!chunk && ios_buffer_allocate_new_chunk (buffer, chunk_no, &chunk)) + return IOD_ERROR; + + /* The amount we read from this chunk is the maximum of + the COUNT requested and the size of the rest of this chunk. */ + to_be_read_count = IOB_CHUNK_SIZE - chunk_offset > count + ? count + : IOB_CHUNK_SIZE - chunk_offset; + + memcpy (buf, (void *) chunk + chunk_offset, to_be_read_count); + + while ((already_read_count += to_be_read_count) < count) + { + to_be_read_count = count - already_read_count > IOB_CHUNK_SIZE + ? IOB_CHUNK_SIZE + : count - already_read_count; + + chunk = ios_buffer_get_chunk (buffer, ++chunk_no); + if (!chunk && ios_buffer_allocate_new_chunk (buffer, chunk_no, &chunk)) + return IOD_ERROR; + memcpy (buf + already_read_count, chunk, to_be_read_count); + }; + + return 0; +} + +/* Since ios_dev_stream_pwrite already needs to check begin_offset, this + function does not. It assumes the given range is not discarded. It also + allocates new chunks when necessary. */ + +int +ios_buffer_pwrite (struct ios_buffer *buffer, const void *buf, size_t count, + ios_dev_off offset) +{ + int chunk_no; + struct ios_buffer_chunk *chunk; + ios_dev_off chunk_offset; + size_t already_written_count = 0, + to_be_written_count = 0; + + chunk_no = IOB_CHUNK_NO (offset); + chunk_offset = IOB_CHUNK_OFFSET (offset); + chunk = ios_buffer_get_chunk (buffer, chunk_no); + if (!chunk && ios_buffer_allocate_new_chunk (buffer, chunk_no, &chunk)) + return IOD_ERROR; + + /* The amount we write to this chunk is the maximum of the COUNT requested + and the size of the rest of this chunk. */ + to_be_written_count = IOB_CHUNK_SIZE - chunk_offset > count + ? count + : IOB_CHUNK_SIZE - chunk_offset; + + memcpy ((void *) chunk + chunk_offset, buf, to_be_written_count); + + while ((already_written_count += to_be_written_count) < count) + { + to_be_written_count = count - already_written_count > IOB_CHUNK_SIZE + ? IOB_CHUNK_SIZE + : count - already_written_count; + + chunk = ios_buffer_get_chunk (buffer, ++chunk_no); + if (!chunk && ios_buffer_allocate_new_chunk (buffer, chunk_no, &chunk)) + return IOD_ERROR; + memcpy (chunk, buf + already_written_count, to_be_written_count); + }; + + /* Lastly, keep track of the greatest offset we wrote to in the buffer. + (In fact, end_offset is the least offset we have not written to yet.) */ + if (buffer->end_offset < offset + count) + buffer->end_offset = offset + count; + + return 0; +} + +int +ios_buffer_forget_till (struct ios_buffer *buffer, ios_dev_off offset) +{ + struct ios_buffer_chunk *chunk, *chunk_next; + int chunk_no = IOB_CHUNK_NO (offset); + + for (int i = 0; i < IOB_BUCKET_COUNT; i++) + { + chunk = buffer->chunks[i]; + buffer->chunks[i] = NULL; + while (chunk) + { + chunk_next = chunk->next; + if (chunk->chunk_no >= chunk_no) + { + chunk->next = buffer->chunks[i]; + buffer->chunks[i] = chunk; + } + else + free (chunk); + chunk = chunk_next; + } + } + + buffer->begin_offset = chunk_no * IOB_CHUNK_SIZE; + assert (buffer->end_offset >= buffer->begin_offset); + assert (buffer->begin_offset <= offset); + return 0; +} diff --git a/libpoke/ios-buffer.h b/libpoke/ios-buffer.h new file mode 100644 index 00000000..838999d6 --- /dev/null +++ b/libpoke/ios-buffer.h @@ -0,0 +1,47 @@ +/* ios-buffer.h - The buffer for IO devices. */ + +/* Copyright (C) 2020 Egeyar Bagcioglu */ + +/* This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +struct ios_buffer; + +struct ios_buffer * ios_buffer_init (); + +int ios_buffer_free (struct ios_buffer *buffer); + +ios_dev_off +ios_buffer_get_begin_offset (struct ios_buffer *buffer); + +ios_dev_off +ios_buffer_get_end_offset (struct ios_buffer *buffer); + +struct ios_buffer_chunk* +ios_buffer_get_chunk (struct ios_buffer *buffer, int chunk_no); + +int +ios_buffer_allocate_new_chunk (struct ios_buffer *buffer, int final_chunk_no, + struct ios_buffer_chunk **final_chunk); + +int +ios_buffer_pread (struct ios_buffer *buffer, void *buf, size_t count, + ios_dev_off offset); + +int +ios_buffer_pwrite (struct ios_buffer *buffer, const void *buf, size_t count, + ios_dev_off offset); + +int +ios_buffer_forget_till (struct ios_buffer *buffer, ios_dev_off offset); diff --git a/libpoke/ios-dev-stream.c b/libpoke/ios-dev-stream.c new file mode 100644 index 00000000..1258bfe5 --- /dev/null +++ b/libpoke/ios-dev-stream.c @@ -0,0 +1,240 @@ +/* ios-dev-stream.c - Streaming IO devices. */ + +/* Copyright (C) 2020 Egeyar Bagcioglu */ + +/* This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include +#include +#include + +#include +#include +#include +#include + +#include "ios.h" +#include "ios-dev.h" +#include "ios-buffer.h" + +#define IOS_STDIN_HANDLER ("") +#define IOS_STDOUT_HANDLER ("") +#define IOS_STDERR_HANDLER ("") + +/* State associated with a stream device. */ + +struct ios_dev_stream +{ + char *handler; + FILE *file; + uint64_t flags; + union + { + struct ios_buffer *buffer; + uint64_t write_offset; + }; +}; + +static char * +ios_dev_stream_get_dev_if_name () { + return "STREAM"; +} + +static char * +ios_dev_stream_handler_normalize (const char *handler, uint64_t flags) +{ + /* TODO handle the case where strdup fails. */ + if (!strcmp (handler, IOS_STDIN_HANDLER) + || !strcmp (handler, IOS_STDOUT_HANDLER) + || !strcmp (handler, IOS_STDERR_HANDLER)) + return strdup (handler); + else + return NULL; +} + +static void * +ios_dev_stream_open (const char *handler, uint64_t flags, int *error) +{ + struct ios_dev_stream *sio; + + sio = malloc (sizeof (struct ios_dev_stream)); + if (!sio) + goto error; + + sio->handler = strdup (handler); + if (!sio->handler) + goto error; + + if (!strcmp (handler, IOS_STDIN_HANDLER)) + { + sio->file = stdin; + sio->flags = IOS_F_READ; + sio->buffer = ios_buffer_init (); + if (!sio->buffer) + goto error; + } + else if (!strcmp (handler, IOS_STDOUT_HANDLER)) + { + sio->file = stdout; + sio->flags = IOS_F_WRITE; + sio->write_offset = 0; + } + else if (!strcmp (handler, IOS_STDERR_HANDLER)) + { + sio->file = stderr; + sio->flags = IOS_F_WRITE; + sio->write_offset = 0; + } + else + goto error; + + return sio; + +error: + if (sio) + { + if (sio->handler) + free (sio->handler); + free (sio); + } + *error = IOD_ERROR; + return NULL; +} + +static int +ios_dev_stream_close (void *iod) +{ + struct ios_dev_stream *sio = iod; + + ios_buffer_free (sio->buffer); + free (sio); + + return 1; +} + +static uint64_t +ios_dev_stream_get_flags (void *iod) +{ + struct ios_dev_stream *sio = iod; + return sio->flags; +} + +static int +ios_dev_stream_pread (void *iod, void *buf, size_t count, ios_dev_off offset) +{ + struct ios_dev_stream *sio = iod; + struct ios_buffer *buffer = sio->buffer; + size_t read_count, total_read_count = 0; + + if (sio->flags & IOS_F_WRITE) + return IOD_ERROR; + + /* If the beginning of the buffer is discarded, return EOF. */ + if (ios_buffer_get_begin_offset(buffer) > offset) + return IOD_EOF; + + /* If the requsted range is in the buffer, return it. */ + if (ios_buffer_get_end_offset(buffer) >= offset + count) + return ios_buffer_pread (buffer, buf, count, offset); + + /* What was last read into the buffer may be before or after the + offset that this function is provided with. */ + if (ios_buffer_get_end_offset(buffer) == offset) + { + do + { + read_count = fread (buf + total_read_count, count, 1, sio->file); + total_read_count += read_count; + } + while (total_read_count < count && read_count); + + if (ios_buffer_pwrite (buffer, buf, total_read_count, offset) + || total_read_count < count) + return IOD_ERROR; + + return IOS_OK; + } + else + { + size_t to_be_read = (offset + count) - ios_buffer_get_end_offset(buffer); + void *temp = malloc (to_be_read); + fread (temp, to_be_read, 1, sio->file); + if (ios_buffer_pwrite (buffer, temp, to_be_read, ios_buffer_get_end_offset(buffer))) + return IOD_ERROR; + free (temp); + return ios_buffer_pread (buffer, buf, count, offset); + } +} + +static int +ios_dev_stream_pwrite (void *iod, const void *buf, size_t count, + ios_dev_off offset) +{ + struct ios_dev_stream *sio = iod; + + if (sio->flags & IOS_F_READ) + return IOD_ERROR; + + /* If the offset we want to write to is already written out, + we return an error. */ + if (sio->write_offset > offset) + return IOD_EOF; + + if (offset > sio->write_offset) + /* TODO: Write this more efficiently. */ + for (int i=0; i < (offset - sio->write_offset); i++) + fputc (0, sio->file); + + fwrite (buf, count, 1, sio->file); + sio->write_offset = offset + count; + + return IOS_OK; +} + +static ios_dev_off +ios_dev_stream_size (void *iod) +{ + struct ios_dev_stream *sio = iod; + if (sio->flags & IOS_F_READ) + return ios_buffer_get_end_offset(sio->buffer); + else + return sio->write_offset; +} + +static int +ios_dev_stream_flush (void *iod, ios_dev_off offset) +{ + struct ios_dev_stream *sio = iod; + if (sio->flags & IOS_F_READ + && offset > ios_buffer_get_begin_offset(sio->buffer) + && offset <= ios_buffer_get_end_offset(sio->buffer)) + return ios_buffer_forget_till (sio->buffer, offset); + else + return IOS_OK; +} + +struct ios_dev_if ios_dev_stream + __attribute__ ((visibility ("hidden"))) = + { + .get_if_name = ios_dev_stream_get_dev_if_name, + .handler_normalize = ios_dev_stream_handler_normalize, + .open = ios_dev_stream_open, + .close = ios_dev_stream_close, + .pread = ios_dev_stream_pread, + .pwrite = ios_dev_stream_pwrite, + .get_flags = ios_dev_stream_get_flags, + .size = ios_dev_stream_size, + .flush = ios_dev_stream_flush + }; diff --git a/libpoke/ios.c b/libpoke/ios.c index 001c4f66..9658b0d9 100644 --- a/libpoke/ios.c +++ b/libpoke/ios.c @@ -85,6 +85,7 @@ static struct ios *cur_io; extern struct ios_dev_if ios_dev_mem; /* ios-dev-mem.c */ extern struct ios_dev_if ios_dev_file; /* ios-dev-file.c */ +extern struct ios_dev_if ios_dev_stream; /* ios-dev-stream.c */ #ifdef HAVE_LIBNBD extern struct ios_dev_if ios_dev_nbd; /* ios-dev-nbd.c */ #endif @@ -92,6 +93,7 @@ extern struct ios_dev_if ios_dev_nbd; /* ios-dev-nbd.c */ static struct ios_dev_if *ios_dev_ifs[] = { &ios_dev_mem, + &ios_dev_stream, #ifdef HAVE_LIBNBD &ios_dev_nbd, #endif