diff --git a/libpoke/Makefile.am b/libpoke/Makefile.am
index bab9793f..51abef3a 100644
--- a/libpoke/Makefile.am
+++ b/libpoke/Makefile.am
@@ -55,7 +55,9 @@ libpoke_la_SOURCES = libpoke.h libpoke.c \
pvm-program.h pvm-program.c \
pvm.jitter \
ios.c ios.h ios-dev.h \
- ios-dev-file.c ios-dev-mem.c
+ ios-dev-file.c ios-dev-mem.c \
+ ios-buffer.h ios-buffer.c \
+ ios-dev-stream.c
libpoke_la_SOURCES += ../common/pk-utils.c ../common/pk-utils.h
diff --git a/libpoke/ios-buffer.c b/libpoke/ios-buffer.c
new file mode 100644
index 00000000..f3e1ad63
--- /dev/null
+++ b/libpoke/ios-buffer.c
@@ -0,0 +1,255 @@
+/* ios-buffer.c - The buffer for IO devices. */
+
+/* Copyright (C) 2020 Egeyar Bagcioglu */
+
+/* This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+#include
+#include
+#include
+#include
+
+#include "ios.h"
+#include "ios-dev.h"
+
+#define IOB_CHUNK_SIZE 2048
+#define IOB_BUCKET_COUNT 8
+
+#define IOB_CHUNK_OFFSET(offset) \
+ ((offset) % IOB_CHUNK_SIZE)
+
+#define IOB_CHUNK_NO(offset) \
+ ((offset) / IOB_CHUNK_SIZE)
+
+#define IOB_BUCKET_NO(chunk_no) \
+ ((chunk_no) % IOB_BUCKET_COUNT)
+
+struct ios_buffer_chunk
+{
+ uint8_t bytes[IOB_CHUNK_SIZE];
+ int chunk_no;
+ struct ios_buffer_chunk *next;
+};
+
+/* begin_offset is the first offset that's not yet flushed, initilized as 0.
+ end_offset of an instream is the next byte to read to. end_offset of an
+ outstream is the successor of the greatest offset that is written to. */
+
+struct ios_buffer
+{
+ struct ios_buffer_chunk* chunks[IOB_BUCKET_COUNT];
+ ios_dev_off begin_offset;
+ ios_dev_off end_offset;
+ int next_chunk_no;
+};
+
+ios_dev_off
+ios_buffer_get_begin_offset (struct ios_buffer *buffer) {
+ return buffer->begin_offset;
+}
+
+ios_dev_off
+ios_buffer_get_end_offset (struct ios_buffer *buffer) {
+ return buffer->end_offset;
+}
+
+struct ios_buffer *
+ios_buffer_init ()
+{
+ struct ios_buffer *bio = calloc (1, sizeof (struct ios_buffer));
+ return bio;
+}
+
+int
+ios_buffer_free (struct ios_buffer *buffer)
+{
+ struct ios_buffer_chunk *chunk, *chunk_next;
+ for (int i = 0; i < IOB_BUCKET_COUNT; i++)
+ {
+ chunk = buffer->chunks[i];
+ while (chunk)
+ {
+ chunk_next = chunk->next;
+ free (chunk);
+ chunk = chunk_next;
+ }
+ }
+
+ free (buffer);
+ return 1;
+}
+
+struct ios_buffer_chunk*
+ios_buffer_get_chunk (struct ios_buffer *buffer, int chunk_no)
+{
+ int bucket_no = IOB_BUCKET_NO (chunk_no);
+ struct ios_buffer_chunk *chunk = buffer->chunks[bucket_no];
+
+ for ( ; chunk; chunk = chunk->next)
+ if (chunk->chunk_no == chunk_no)
+ return chunk;
+
+ return NULL;
+}
+
+int
+ios_buffer_allocate_new_chunk (struct ios_buffer *buffer, int final_chunk_no,
+ struct ios_buffer_chunk **final_chunk)
+{
+ struct ios_buffer_chunk *chunk;
+ int bucket_no;
+
+ assert (buffer->next_chunk_no <= final_chunk_no);
+
+ do
+ {
+ chunk = calloc (1, sizeof (struct ios_buffer_chunk));
+ if (!chunk)
+ return IOD_ERROR;
+ /* Place the new chunk into the buffer. */
+ chunk->chunk_no = buffer->next_chunk_no;
+ bucket_no = IOB_BUCKET_NO (chunk->chunk_no);
+ chunk->next = buffer->chunks[bucket_no];
+ buffer->chunks[bucket_no] = chunk;
+ buffer->next_chunk_no++;
+ }
+ while (buffer->next_chunk_no <= final_chunk_no);
+
+ /* end_offset is updated as the buffer is written to. Therefore, it is not
+ updated here, but in ios_buffer_pwrite. */
+ *final_chunk = chunk;
+ return 0;
+}
+
+/* Since ios_dev_stream_pread already needs to check begin_offset and
+ end_offset, so this function does not. It assumes that the given range
+ already exists in the buffer. */
+
+int
+ios_buffer_pread (struct ios_buffer *buffer, void *buf, size_t count,
+ ios_dev_off offset)
+{
+ int chunk_no;
+ struct ios_buffer_chunk *chunk;
+ ios_dev_off chunk_offset;
+ size_t already_read_count = 0,
+ to_be_read_count = 0;
+
+ chunk_no = IOB_CHUNK_NO (offset);
+ chunk_offset = IOB_CHUNK_OFFSET (offset);
+ chunk = ios_buffer_get_chunk (buffer, chunk_no);
+ if (!chunk && ios_buffer_allocate_new_chunk (buffer, chunk_no, &chunk))
+ return IOD_ERROR;
+
+ /* The amount we read from this chunk is the maximum of
+ the COUNT requested and the size of the rest of this chunk. */
+ to_be_read_count = IOB_CHUNK_SIZE - chunk_offset > count
+ ? count
+ : IOB_CHUNK_SIZE - chunk_offset;
+
+ memcpy (buf, (void *) chunk + chunk_offset, to_be_read_count);
+
+ while ((already_read_count += to_be_read_count) < count)
+ {
+ to_be_read_count = count - already_read_count > IOB_CHUNK_SIZE
+ ? IOB_CHUNK_SIZE
+ : count - already_read_count;
+
+ chunk = ios_buffer_get_chunk (buffer, ++chunk_no);
+ if (!chunk && ios_buffer_allocate_new_chunk (buffer, chunk_no, &chunk))
+ return IOD_ERROR;
+ memcpy (buf + already_read_count, chunk, to_be_read_count);
+ };
+
+ return 0;
+}
+
+/* Since ios_dev_stream_pwrite already needs to check begin_offset, this
+ function does not. It assumes the given range is not discarded. It also
+ allocates new chunks when necessary. */
+
+int
+ios_buffer_pwrite (struct ios_buffer *buffer, const void *buf, size_t count,
+ ios_dev_off offset)
+{
+ int chunk_no;
+ struct ios_buffer_chunk *chunk;
+ ios_dev_off chunk_offset;
+ size_t already_written_count = 0,
+ to_be_written_count = 0;
+
+ chunk_no = IOB_CHUNK_NO (offset);
+ chunk_offset = IOB_CHUNK_OFFSET (offset);
+ chunk = ios_buffer_get_chunk (buffer, chunk_no);
+ if (!chunk && ios_buffer_allocate_new_chunk (buffer, chunk_no, &chunk))
+ return IOD_ERROR;
+
+ /* The amount we write to this chunk is the maximum of the COUNT requested
+ and the size of the rest of this chunk. */
+ to_be_written_count = IOB_CHUNK_SIZE - chunk_offset > count
+ ? count
+ : IOB_CHUNK_SIZE - chunk_offset;
+
+ memcpy ((void *) chunk + chunk_offset, buf, to_be_written_count);
+
+ while ((already_written_count += to_be_written_count) < count)
+ {
+ to_be_written_count = count - already_written_count > IOB_CHUNK_SIZE
+ ? IOB_CHUNK_SIZE
+ : count - already_written_count;
+
+ chunk = ios_buffer_get_chunk (buffer, ++chunk_no);
+ if (!chunk && ios_buffer_allocate_new_chunk (buffer, chunk_no, &chunk))
+ return IOD_ERROR;
+ memcpy (chunk, buf + already_written_count, to_be_written_count);
+ };
+
+ /* Lastly, keep track of the greatest offset we wrote to in the buffer.
+ (In fact, end_offset is the least offset we have not written to yet.) */
+ if (buffer->end_offset < offset + count)
+ buffer->end_offset = offset + count;
+
+ return 0;
+}
+
+int
+ios_buffer_forget_till (struct ios_buffer *buffer, ios_dev_off offset)
+{
+ struct ios_buffer_chunk *chunk, *chunk_next;
+ int chunk_no = IOB_CHUNK_NO (offset);
+
+ for (int i = 0; i < IOB_BUCKET_COUNT; i++)
+ {
+ chunk = buffer->chunks[i];
+ buffer->chunks[i] = NULL;
+ while (chunk)
+ {
+ chunk_next = chunk->next;
+ if (chunk->chunk_no >= chunk_no)
+ {
+ chunk->next = buffer->chunks[i];
+ buffer->chunks[i] = chunk;
+ }
+ else
+ free (chunk);
+ chunk = chunk_next;
+ }
+ }
+
+ buffer->begin_offset = chunk_no * IOB_CHUNK_SIZE;
+ assert (buffer->end_offset >= buffer->begin_offset);
+ assert (buffer->begin_offset <= offset);
+ return 0;
+}
diff --git a/libpoke/ios-buffer.h b/libpoke/ios-buffer.h
new file mode 100644
index 00000000..838999d6
--- /dev/null
+++ b/libpoke/ios-buffer.h
@@ -0,0 +1,47 @@
+/* ios-buffer.h - The buffer for IO devices. */
+
+/* Copyright (C) 2020 Egeyar Bagcioglu */
+
+/* This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+struct ios_buffer;
+
+struct ios_buffer * ios_buffer_init ();
+
+int ios_buffer_free (struct ios_buffer *buffer);
+
+ios_dev_off
+ios_buffer_get_begin_offset (struct ios_buffer *buffer);
+
+ios_dev_off
+ios_buffer_get_end_offset (struct ios_buffer *buffer);
+
+struct ios_buffer_chunk*
+ios_buffer_get_chunk (struct ios_buffer *buffer, int chunk_no);
+
+int
+ios_buffer_allocate_new_chunk (struct ios_buffer *buffer, int final_chunk_no,
+ struct ios_buffer_chunk **final_chunk);
+
+int
+ios_buffer_pread (struct ios_buffer *buffer, void *buf, size_t count,
+ ios_dev_off offset);
+
+int
+ios_buffer_pwrite (struct ios_buffer *buffer, const void *buf, size_t count,
+ ios_dev_off offset);
+
+int
+ios_buffer_forget_till (struct ios_buffer *buffer, ios_dev_off offset);
diff --git a/libpoke/ios-dev-stream.c b/libpoke/ios-dev-stream.c
new file mode 100644
index 00000000..1258bfe5
--- /dev/null
+++ b/libpoke/ios-dev-stream.c
@@ -0,0 +1,240 @@
+/* ios-dev-stream.c - Streaming IO devices. */
+
+/* Copyright (C) 2020 Egeyar Bagcioglu */
+
+/* This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+
+#include "ios.h"
+#include "ios-dev.h"
+#include "ios-buffer.h"
+
+#define IOS_STDIN_HANDLER ("")
+#define IOS_STDOUT_HANDLER ("")
+#define IOS_STDERR_HANDLER ("")
+
+/* State associated with a stream device. */
+
+struct ios_dev_stream
+{
+ char *handler;
+ FILE *file;
+ uint64_t flags;
+ union
+ {
+ struct ios_buffer *buffer;
+ uint64_t write_offset;
+ };
+};
+
+static char *
+ios_dev_stream_get_dev_if_name () {
+ return "STREAM";
+}
+
+static char *
+ios_dev_stream_handler_normalize (const char *handler, uint64_t flags)
+{
+ /* TODO handle the case where strdup fails. */
+ if (!strcmp (handler, IOS_STDIN_HANDLER)
+ || !strcmp (handler, IOS_STDOUT_HANDLER)
+ || !strcmp (handler, IOS_STDERR_HANDLER))
+ return strdup (handler);
+ else
+ return NULL;
+}
+
+static void *
+ios_dev_stream_open (const char *handler, uint64_t flags, int *error)
+{
+ struct ios_dev_stream *sio;
+
+ sio = malloc (sizeof (struct ios_dev_stream));
+ if (!sio)
+ goto error;
+
+ sio->handler = strdup (handler);
+ if (!sio->handler)
+ goto error;
+
+ if (!strcmp (handler, IOS_STDIN_HANDLER))
+ {
+ sio->file = stdin;
+ sio->flags = IOS_F_READ;
+ sio->buffer = ios_buffer_init ();
+ if (!sio->buffer)
+ goto error;
+ }
+ else if (!strcmp (handler, IOS_STDOUT_HANDLER))
+ {
+ sio->file = stdout;
+ sio->flags = IOS_F_WRITE;
+ sio->write_offset = 0;
+ }
+ else if (!strcmp (handler, IOS_STDERR_HANDLER))
+ {
+ sio->file = stderr;
+ sio->flags = IOS_F_WRITE;
+ sio->write_offset = 0;
+ }
+ else
+ goto error;
+
+ return sio;
+
+error:
+ if (sio)
+ {
+ if (sio->handler)
+ free (sio->handler);
+ free (sio);
+ }
+ *error = IOD_ERROR;
+ return NULL;
+}
+
+static int
+ios_dev_stream_close (void *iod)
+{
+ struct ios_dev_stream *sio = iod;
+
+ ios_buffer_free (sio->buffer);
+ free (sio);
+
+ return 1;
+}
+
+static uint64_t
+ios_dev_stream_get_flags (void *iod)
+{
+ struct ios_dev_stream *sio = iod;
+ return sio->flags;
+}
+
+static int
+ios_dev_stream_pread (void *iod, void *buf, size_t count, ios_dev_off offset)
+{
+ struct ios_dev_stream *sio = iod;
+ struct ios_buffer *buffer = sio->buffer;
+ size_t read_count, total_read_count = 0;
+
+ if (sio->flags & IOS_F_WRITE)
+ return IOD_ERROR;
+
+ /* If the beginning of the buffer is discarded, return EOF. */
+ if (ios_buffer_get_begin_offset(buffer) > offset)
+ return IOD_EOF;
+
+ /* If the requsted range is in the buffer, return it. */
+ if (ios_buffer_get_end_offset(buffer) >= offset + count)
+ return ios_buffer_pread (buffer, buf, count, offset);
+
+ /* What was last read into the buffer may be before or after the
+ offset that this function is provided with. */
+ if (ios_buffer_get_end_offset(buffer) == offset)
+ {
+ do
+ {
+ read_count = fread (buf + total_read_count, count, 1, sio->file);
+ total_read_count += read_count;
+ }
+ while (total_read_count < count && read_count);
+
+ if (ios_buffer_pwrite (buffer, buf, total_read_count, offset)
+ || total_read_count < count)
+ return IOD_ERROR;
+
+ return IOS_OK;
+ }
+ else
+ {
+ size_t to_be_read = (offset + count) - ios_buffer_get_end_offset(buffer);
+ void *temp = malloc (to_be_read);
+ fread (temp, to_be_read, 1, sio->file);
+ if (ios_buffer_pwrite (buffer, temp, to_be_read, ios_buffer_get_end_offset(buffer)))
+ return IOD_ERROR;
+ free (temp);
+ return ios_buffer_pread (buffer, buf, count, offset);
+ }
+}
+
+static int
+ios_dev_stream_pwrite (void *iod, const void *buf, size_t count,
+ ios_dev_off offset)
+{
+ struct ios_dev_stream *sio = iod;
+
+ if (sio->flags & IOS_F_READ)
+ return IOD_ERROR;
+
+ /* If the offset we want to write to is already written out,
+ we return an error. */
+ if (sio->write_offset > offset)
+ return IOD_EOF;
+
+ if (offset > sio->write_offset)
+ /* TODO: Write this more efficiently. */
+ for (int i=0; i < (offset - sio->write_offset); i++)
+ fputc (0, sio->file);
+
+ fwrite (buf, count, 1, sio->file);
+ sio->write_offset = offset + count;
+
+ return IOS_OK;
+}
+
+static ios_dev_off
+ios_dev_stream_size (void *iod)
+{
+ struct ios_dev_stream *sio = iod;
+ if (sio->flags & IOS_F_READ)
+ return ios_buffer_get_end_offset(sio->buffer);
+ else
+ return sio->write_offset;
+}
+
+static int
+ios_dev_stream_flush (void *iod, ios_dev_off offset)
+{
+ struct ios_dev_stream *sio = iod;
+ if (sio->flags & IOS_F_READ
+ && offset > ios_buffer_get_begin_offset(sio->buffer)
+ && offset <= ios_buffer_get_end_offset(sio->buffer))
+ return ios_buffer_forget_till (sio->buffer, offset);
+ else
+ return IOS_OK;
+}
+
+struct ios_dev_if ios_dev_stream
+ __attribute__ ((visibility ("hidden"))) =
+ {
+ .get_if_name = ios_dev_stream_get_dev_if_name,
+ .handler_normalize = ios_dev_stream_handler_normalize,
+ .open = ios_dev_stream_open,
+ .close = ios_dev_stream_close,
+ .pread = ios_dev_stream_pread,
+ .pwrite = ios_dev_stream_pwrite,
+ .get_flags = ios_dev_stream_get_flags,
+ .size = ios_dev_stream_size,
+ .flush = ios_dev_stream_flush
+ };
diff --git a/libpoke/ios.c b/libpoke/ios.c
index 001c4f66..9658b0d9 100644
--- a/libpoke/ios.c
+++ b/libpoke/ios.c
@@ -85,6 +85,7 @@ static struct ios *cur_io;
extern struct ios_dev_if ios_dev_mem; /* ios-dev-mem.c */
extern struct ios_dev_if ios_dev_file; /* ios-dev-file.c */
+extern struct ios_dev_if ios_dev_stream; /* ios-dev-stream.c */
#ifdef HAVE_LIBNBD
extern struct ios_dev_if ios_dev_nbd; /* ios-dev-nbd.c */
#endif
@@ -92,6 +93,7 @@ extern struct ios_dev_if ios_dev_nbd; /* ios-dev-nbd.c */
static struct ios_dev_if *ios_dev_ifs[] =
{
&ios_dev_mem,
+ &ios_dev_stream,
#ifdef HAVE_LIBNBD
&ios_dev_nbd,
#endif