>From d78b916f9cafeb4bb123084c7609b0bbc82d620e Mon Sep 17 00:00:00 2001 From: Paolo Bonzini Date: Sun, 5 Jun 2011 17:53:10 +0200 Subject: [PATCH] rewrite async call queue --- configure.ac | 4 + libgst/events.c | 16 ++-- libgst/events.h | 2 +- libgst/gstpriv.h | 11 ++- libgst/interp-bc.inl | 20 +---- libgst/interp-jit.inl | 20 +---- libgst/interp.c | 203 ++++++++++++++++++++++++----------------- libgst/interp.h | 22 +++++- libgst/oop.c | 7 ++- libgst/prims.def | 5 +- libgst/sysdep.c | 1 + libgst/sysdep.h | 1 - libgst/sysdep/cygwin/timer.c | 2 +- libgst/sysdep/posix/events.c | 7 +- 14 files changed, 175 insertions(+), 146 deletions(-) diff --git a/configure.ac b/configure.ac index 50a9f2f..e661b9d 100644 --- a/configure.ac +++ b/configure.ac @@ -238,6 +238,10 @@ dnl ------------------------------ C COMPILER / OS ------------ { echo; echo "${term_bold}Platform environment:${term_norm}"; } >& AS_MESSAGE_FD GST_C_SYNC_BUILTINS +if test $gst_cv_have_sync_fetch_and_add = no; then + AC_MSG_ERROR([Synchronization primitives not found, please use a newer compiler.]) +fi + GST_LOCK AC_SYS_LARGEFILE AC_C_INLINE diff --git a/libgst/events.c b/libgst/events.c index bc1b312..4ca6668 100644 --- a/libgst/events.c +++ b/libgst/events.c @@ -57,7 +57,7 @@ /* Holds the semaphores to be signaled when the operating system sends us a C-style signal. */ -volatile OOP _gst_sem_int_vec[NSIG]; +async_queue_entry _gst_sem_int_vec[NSIG]; /* Signals _gst_sem_int_vec[SIG] and removes the semaphore from the vector (because C-style signal handlers are one-shot). */ @@ -67,14 +67,10 @@ static RETSIGTYPE signal_handler (int sig); RETSIGTYPE signal_handler (int sig) { - _gst_disable_interrupts (true); - if (_gst_sem_int_vec[sig]) + if (_gst_sem_int_vec[sig].data) { - if (IS_CLASS (_gst_sem_int_vec[sig], _gst_semaphore_class)) - { - _gst_async_signal_and_unregister (_gst_sem_int_vec[sig]); - _gst_sem_int_vec[sig] = NULL; - } + if (IS_CLASS (_gst_sem_int_vec[sig].data, _gst_semaphore_class)) + _gst_async_call_internal (&_gst_sem_int_vec[sig]); else { _gst_errorf @@ -83,7 +79,6 @@ signal_handler (int sig) } } - _gst_enable_interrupts (true); _gst_set_signal_handler (sig, SIG_DFL); _gst_wakeup (); } @@ -95,8 +90,9 @@ _gst_async_interrupt_wait (OOP semaphoreOOP, if (sig < 0 || sig >= NSIG) return; - _gst_sem_int_vec[sig] = semaphoreOOP; _gst_register_oop (semaphoreOOP); + _gst_sem_int_vec[sig].func = _gst_do_async_signal_and_unregister; + _gst_sem_int_vec[sig].data = semaphoreOOP; _gst_set_signal_handler (sig, signal_handler); /* should probably package up the old interrupt state here for return diff --git a/libgst/events.h b/libgst/events.h index 85d1656..464d8a3 100644 --- a/libgst/events.h +++ b/libgst/events.h @@ -54,7 +54,7 @@ #define GST_EVENTS_H /* Array of semaphores associated to the C signals. */ -extern volatile OOP _gst_sem_int_vec[NSIG] +extern async_queue_entry _gst_sem_int_vec[NSIG] ATTRIBUTE_HIDDEN; /* Initialize the data structures used to hold information about diff --git a/libgst/gstpriv.h b/libgst/gstpriv.h index 1ad17e7..50ee317 100644 --- a/libgst/gstpriv.h +++ b/libgst/gstpriv.h @@ -199,9 +199,10 @@ DO_PREFETCH ((x), 0, (k)); /* Synchronization primitives. */ -#if !defined HAVE_SYNC_BUILTINS && defined __GNUC__ -#define __sync_synchronize() __asm__ ("" : : : "memory") -#endif +#define __sync_exchange(ptr, val) \ + ({ __typeof__ (*(ptr)) _x; \ + do _x = *(ptr); while (!__sync_bool_compare_and_swap ((ptr), (_x), (val))); \ + _x; }) /* Kill a warning when using GNU C. Note that this allows using break or continue inside a macro, unlike do...while(0) */ @@ -518,8 +519,10 @@ extern OOP _gst_nil_oop #ifdef __GNUC__ #define no_opt(x) ({ __typeof__ ((x)) _result; \ asm ("" : "=r" (_result) : "0" ((x))); _result; }) +#define barrier() asm ("") #else #define no_opt(x) (x) +#define barrier() #endif /* integer conversions and some information on SmallIntegers. */ @@ -597,7 +600,6 @@ extern OOP _gst_nil_oop #include "callin.h" #include "cint.h" #include "dict.h" -#include "events.h" #include "heap.h" #include "lex.h" #include "tree.h" @@ -608,6 +610,7 @@ extern OOP _gst_nil_oop #include "byte.h" #include "comp.h" #include "interp.h" +#include "events.h" #include "opt.h" #include "save.h" #include "str.h" diff --git a/libgst/interp-bc.inl b/libgst/interp-bc.inl index 9a1243e..8819481 100644 --- a/libgst/interp-bc.inl +++ b/libgst/interp-bc.inl @@ -525,25 +525,7 @@ monitor_byte_codes: /* First, deal with any async signals. */ if (async_queue_enabled) - { - gl_lock_lock (async_queue_lock); - _gst_disable_interrupts (false); - __sync_synchronize (); - - if UNCOMMON (async_queue_index || async_queue_index_sig) - { - int i; - for (i = 0; i < async_queue_index; i++) - queued_async_signals[i].func (queued_async_signals[i].data); - for (i = 0; i < async_queue_index_sig; i++) - queued_async_signals_sig[i].func (queued_async_signals_sig[i].data); - - async_queue_index = async_queue_index_sig = 0; - } - - _gst_enable_interrupts (false); - gl_lock_unlock (async_queue_lock); - } + empty_async_queue (); if UNCOMMON (time_to_preempt) ACTIVE_PROCESS_YIELD (); diff --git a/libgst/interp-jit.inl b/libgst/interp-jit.inl index f254730..0e8fa27 100644 --- a/libgst/interp-jit.inl +++ b/libgst/interp-jit.inl @@ -413,25 +413,7 @@ _gst_interpret (OOP processOOP) /* First, deal with any async signals. */ if (async_queue_enabled) - { - gl_lock_lock (async_queue_lock); - _gst_disable_interrupts (false); - __sync_synchronize (); - - if UNCOMMON (async_queue_index || async_queue_index_sig) - { - int i; - for (i = 0; i < async_queue_index; i++) - queued_async_signals[i].func (queued_async_signals[i].data); - for (i = 0; i < async_queue_index_sig; i++) - queued_async_signals_sig[i].func (queued_async_signals_sig[i].data); - - async_queue_index = async_queue_index_sig = 0; - } - - _gst_enable_interrupts (false); - gl_lock_unlock (async_queue_lock); - } + empty_async_queue (); thisContext = (gst_method_context) OOP_TO_OBJ (_gst_this_context_oop); diff --git a/libgst/interp.c b/libgst/interp.c index 88e3f4c..c83fbef 100644 --- a/libgst/interp.c +++ b/libgst/interp.c @@ -113,13 +113,6 @@ to speed up these messages for Arrays, Strings, and ByteArrays. */ #define METHOD_CACHE_SIZE (1 << 11) -typedef struct async_queue_entry -{ - void (*func) (OOP); - OOP data; -} -async_queue_entry; - typedef struct interp_jmp_buf { jmp_buf jmpBuf; @@ -247,14 +240,10 @@ static int class_cache_prim; #endif /* Queue for async (outside the interpreter) semaphore signals */ -gl_lock_define (static, async_queue_lock); static mst_Boolean async_queue_enabled = true; -static int async_queue_index; -static int async_queue_size; -static async_queue_entry *queued_async_signals; - -static int async_queue_index_sig; -static async_queue_entry queued_async_signals_sig[NSIG]; +static async_queue_entry queued_async_signals_tail; +static async_queue_entry *queued_async_signals = &queued_async_signals_tail; +static async_queue_entry *queued_async_signals_sig = &queued_async_signals_tail; /* When not NULL, this causes the byte code interpreter to immediately send the message whose selector is here to the current stack @@ -295,6 +284,9 @@ static inline mst_Boolean cached_index_oop_put_primitive (OOP rec, OOP val, intptr_t spec); +/* Empty the queue of asynchronous calls. */ +static void empty_async_queue (void); + /* Try to find another process with higher or same priority as the active one. Return whether there is one. */ static mst_Boolean would_reschedule_process (void); @@ -1608,73 +1600,130 @@ _gst_sync_signal (OOP semaphoreOOP, mst_Boolean incr_if_empty) return true; } -static void -do_async_signal (OOP semaphoreOOP) +void +_gst_do_async_signal (OOP semaphoreOOP) { _gst_sync_signal (semaphoreOOP, true); } void -_gst_async_call (void (*func) (OOP), OOP arg) +_gst_do_async_signal_and_unregister (OOP semaphoreOOP) { - if (_gst_signal_count) - { - /* Async-signal-safe version. Because the discriminant is - _gst_signal_count, can only be called from within libgst.la - (in particular events.c). Also, because of that we assume - interrupts are already disabled. */ - - if (async_queue_index_sig == NSIG) - { - static const char errmsg[] = "Asynchronous signal queue full!\n"; - write (2, errmsg, sizeof (errmsg) - 1); - abort (); - } + _gst_sync_signal (semaphoreOOP, true); + _gst_unregister_oop (semaphoreOOP); +} - queued_async_signals_sig[async_queue_index_sig].func = func; - queued_async_signals_sig[async_queue_index_sig++].data = arg; - } - else - { - /* Thread-safe version for the masses. */ +/* Async-signal-safe version, does no allocation. Using an atomic operation + is still the simplest choice, but on top of that we check that the entry + is not already in the list. Also, the datum and next field are NULLed + automatically when the call is made. */ +void +_gst_async_call_internal (async_queue_entry *e) +{ + /* For async-signal safety, we need to check that the entry is not + already in the list. Checking that atomically with CAS is the + simplest way. */ + do + if (__sync_val_compare_and_swap(&e->next, NULL, queued_async_signals_sig)) + return; + while (!__sync_bool_compare_and_swap (&queued_async_signals_sig, e->next, e)); + SET_EXCEPT_FLAG (true); +} - gl_lock_lock (async_queue_lock); - if (async_queue_index == async_queue_size) - { - async_queue_size *= 2; - queued_async_signals = - realloc (queued_async_signals, - sizeof (async_queue_entry) * async_queue_size); - } +void +_gst_async_call (void (*func) (OOP), OOP arg) +{ + /* Thread-safe version for the masses. This lockless stack + is reversed in the interpreter loop to get FIFO behavior. */ + async_queue_entry *sig = xmalloc (sizeof (async_queue_entry)); + sig->func = func; + sig->data = arg; - queued_async_signals[async_queue_index].func = func; - queued_async_signals[async_queue_index++].data = arg; - gl_lock_unlock (async_queue_lock); - _gst_wakeup (); - } - + do + sig->next = queued_async_signals; + while (!__sync_bool_compare_and_swap (&queued_async_signals, + sig->next, sig)); + _gst_wakeup (); SET_EXCEPT_FLAG (true); } mst_Boolean _gst_have_pending_async_calls () { - return async_queue_index_sig > 0 || async_queue_index > 0; + return (queued_async_signals != &queued_async_signals_tail + || queued_async_signals_sig != &queued_async_signals_tail); +} + +void +empty_async_queue () +{ + async_queue_entry *sig, *sig_reversed; + + /* Process a batch of asynchronous requests. These are pushed + in LIFO order by _gst_async_call. By reversing the list + in place before walking it, we get FIFO order. */ + sig = __sync_exchange (&queued_async_signals, &queued_async_signals_tail); + sig_reversed = &queued_async_signals_tail; + while (sig != &queued_async_signals_tail) + { + async_queue_entry *next = sig->next; + sig->next = sig_reversed; + sig_reversed = sig; + sig = next; + } + + sig = sig_reversed; + while (sig != &queued_async_signals_tail) + { + async_queue_entry *next = sig->next; + sig->func (sig->data); + free (sig); + sig = next; + } + + /* For async-signal-safe processing, we need to avoid entering + the same item twice into the list. So we use NEXT to mark + items that have been added... */ + sig = __sync_exchange (&queued_async_signals_sig, &queued_async_signals_tail); + sig_reversed = &queued_async_signals_tail; + while (sig != &queued_async_signals_tail) + { + async_queue_entry *next = sig->next; + sig->next = sig_reversed; + sig_reversed = sig; + sig = next; + } + + sig = sig_reversed; + while (sig != &queued_async_signals_tail) + { + async_queue_entry *next = sig->next; + void (*func) (OOP) = sig->func; + OOP data = sig->data; + barrier (); + + sig->data = NULL; + barrier (); + + /* ... and we only NULL it after a signal handler can start + writing to it. */ + sig->next = NULL; + barrier (); + func (data); + sig = next; + } } void _gst_async_signal (OOP semaphoreOOP) { - _gst_async_call (do_async_signal, semaphoreOOP); + _gst_async_call (_gst_do_async_signal, semaphoreOOP); } void _gst_async_signal_and_unregister (OOP semaphoreOOP) { - _gst_disable_interrupts (false); /* block out everything! */ - _gst_async_call (do_async_signal, semaphoreOOP); - _gst_async_call (_gst_unregister_oop, semaphoreOOP); - _gst_enable_interrupts (false); + _gst_async_call (_gst_do_async_signal_and_unregister, semaphoreOOP); } void @@ -2204,12 +2253,6 @@ _gst_init_interpreter (void) #endif _gst_this_context_oop = _gst_nil_oop; - async_queue_index = 0; - async_queue_index_sig = 0; - async_queue_size = 32; - queued_async_signals = malloc (sizeof (async_queue_entry) * async_queue_size); - gl_lock_init (async_queue_lock); - for (i = 0; i < MAX_LIFO_DEPTH; i++) lifo_contexts[i].flags = F_POOLED | F_CONTEXT; @@ -2378,16 +2421,14 @@ _gst_copy_processor_registers (void) void copy_semaphore_oops (void) { - int i; - - _gst_disable_interrupts (false); /* block out everything! */ + async_queue_entry *sig; - for (i = 0; i < async_queue_index; i++) - if (queued_async_signals[i].data) - MAYBE_COPY_OOP (queued_async_signals[i].data); - for (i = 0; i < async_queue_index_sig; i++) - if (queued_async_signals_sig[i].data) - MAYBE_COPY_OOP (queued_async_signals_sig[i].data); + for (sig = queued_async_signals; sig != &queued_async_signals_tail; + sig = sig->next) + MAYBE_COPY_OOP (sig->data); + for (sig = queued_async_signals_sig; sig != &queued_async_signals_tail; + sig = sig->next) + MAYBE_COPY_OOP (sig->data); /* there does seem to be a window where this is not valid */ if (single_step_semaphore) @@ -2395,8 +2436,6 @@ copy_semaphore_oops (void) /* there does seem to be a window where this is not valid */ MAYBE_COPY_OOP (switch_to_process); - - _gst_enable_interrupts (false); } @@ -2416,16 +2455,14 @@ _gst_mark_processor_registers (void) void mark_semaphore_oops (void) { - int i; - - _gst_disable_interrupts (false); /* block out everything! */ + async_queue_entry *sig; - for (i = 0; i < async_queue_index; i++) - if (queued_async_signals[i].data) - MAYBE_MARK_OOP (queued_async_signals[i].data); - for (i = 0; i < async_queue_index_sig; i++) - if (queued_async_signals_sig[i].data) - MAYBE_MARK_OOP (queued_async_signals_sig[i].data); + for (sig = queued_async_signals; sig != &queued_async_signals_tail; + sig = sig->next) + MAYBE_MARK_OOP (sig->data); + for (sig = queued_async_signals_sig; sig != &queued_async_signals_tail; + sig = sig->next) + MAYBE_MARK_OOP (sig->data); /* there does seem to be a window where this is not valid */ if (single_step_semaphore) @@ -2433,8 +2470,6 @@ mark_semaphore_oops (void) /* there does seem to be a window where this is not valid */ MAYBE_MARK_OOP (switch_to_process); - - _gst_enable_interrupts (false); } diff --git a/libgst/interp.h b/libgst/interp.h index 22d1c55..7696625 100644 --- a/libgst/interp.h +++ b/libgst/interp.h @@ -327,6 +327,14 @@ typedef gst_uchar *ip_type; extern ip_type ip ATTRIBUTE_HIDDEN; +typedef struct async_queue_entry +{ + void (*func) (OOP); + OOP data; + struct async_queue_entry *next; +} +async_queue_entry; + /* When not NULL, this causes the byte code interpreter to immediately send the message whose selector is here to the current stack top. */ @@ -401,7 +409,19 @@ extern mst_Boolean _gst_have_pending_async_calls (void) /* Set up so that FUNC will be called, with ARGOOP as its argument, as soon as the next sequence point is reached. */ extern void _gst_async_call (void (*func) (OOP), - OOP argOOP) + OOP argOOP) + ATTRIBUTE_HIDDEN; + +/* Worker functions for _gst_async_call_internal. */; +extern void _gst_do_async_signal (OOP semaphoreOOP) + ATTRIBUTE_HIDDEN; +extern void _gst_do_async_signal_and_unregister (OOP semaphoreOOP) + ATTRIBUTE_HIDDEN; + +/* Set up so that ENTRY->FUNC will be called, with ENTRY->DATA as its + argument, as soon as the next sequence point is reached. Async-signal + safe version. */ +extern void _gst_async_call_internal (async_queue_entry *entry) ATTRIBUTE_HIDDEN; /* Signal SEMAPHOREOOP so that one of the processes waiting on that diff --git a/libgst/oop.c b/libgst/oop.c index 0a34f95..2e8b7fd 100644 --- a/libgst/oop.c +++ b/libgst/oop.c @@ -1529,7 +1529,12 @@ mourn_objects (void) array = new_instance_with (_gst_array_class, size, &processor->gcArray); _gst_copy_buffer (array->data); if (!IS_NIL (processor->gcSemaphore)) - _gst_async_signal (processor->gcSemaphore); + { + static async_queue_entry e; + e.func = _gst_do_async_signal; + e.data = processor->gcSemaphore; + _gst_async_call_internal (&e); + } else { _gst_errorf ("Running finalizers before initialization."); diff --git a/libgst/prims.def b/libgst/prims.def index 5eff664..2b14aaa 100644 --- a/libgst/prims.def +++ b/libgst/prims.def @@ -3861,7 +3861,10 @@ primitive VMpr_Processor_disableEnableInterrupts : if (id == prim_id (VMpr_Processor_disableInterrupts) && count++ == 0) async_queue_enabled = false; else if (id == prim_id (VMpr_Processor_enableInterrupts) && --count == 0) - async_queue_enabled = true; + { + async_queue_enabled = true; + SET_EXCEPT_FLAG (true); + } process->interrupts = FROM_INT (count); diff --git a/libgst/sysdep.c b/libgst/sysdep.c index 1c3dcd7..dfb5415 100644 --- a/libgst/sysdep.c +++ b/libgst/sysdep.c @@ -67,6 +67,7 @@ #include "sysdep/cygwin/files.c" #include "sysdep/cygwin/mem.c" #elif !defined WIN32 +#include #include "sysdep/posix/findexec.c" #include "sysdep/posix/timer.c" #include "sysdep/posix/signals.c" diff --git a/libgst/sysdep.h b/libgst/sysdep.h index af77ccc..2cebe69 100644 --- a/libgst/sysdep.h +++ b/libgst/sysdep.h @@ -113,7 +113,6 @@ extern void _gst_signal_after (int deltaMilli, /* Initialize system dependent stuff. */ extern void _gst_init_sysdep (void) ATTRIBUTE_HIDDEN; - extern void _gst_init_sysdep_win32 (void) ATTRIBUTE_HIDDEN; diff --git a/libgst/sysdep/cygwin/timer.c b/libgst/sysdep/cygwin/timer.c index 37e9b42..648fe93 100644 --- a/libgst/sysdep/cygwin/timer.c +++ b/libgst/sysdep/cygwin/timer.c @@ -96,7 +96,7 @@ alarm_thread (unused) } void -_gst_init_sysdep_win32 () +_gst_init_sysdep_win32 (void) { HANDLE hthread; DWORD tid; diff --git a/libgst/sysdep/posix/events.c b/libgst/sysdep/posix/events.c index e3d3c36..ab2a88e 100644 --- a/libgst/sysdep/posix/events.c +++ b/libgst/sysdep/posix/events.c @@ -207,7 +207,7 @@ _gst_async_timed_wait (OOP semaphoreOOP, mst_Boolean _gst_is_timeout_programmed (void) { - return (!IS_NIL (_gst_sem_int_vec[SIGALRM])); + return (!IS_NIL (no_opt (_gst_sem_int_vec[SIGALRM].data))); } void @@ -341,9 +341,8 @@ file_polling_handler (int sig) { if (num_used_pollfds > 0) { - _gst_disable_interrupts (true); - _gst_async_call (async_signal_polled_files, NULL); - _gst_enable_interrupts (true); + static async_queue_entry e = { async_signal_polled_files, NULL, NULL }; + _gst_async_call_internal (&e); } _gst_set_signal_handler (sig, file_polling_handler); -- 1.7.4.4