commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] r5960 - in grc/trunk/src: . SignalBlockDefs


From: jblum
Subject: [Commit-gnuradio] r5960 - in grc/trunk/src: . SignalBlockDefs
Date: Sat, 14 Jul 2007 00:05:01 -0600 (MDT)

Author: jblum
Date: 2007-07-14 00:05:00 -0600 (Sat, 14 Jul 2007)
New Revision: 5960

Modified:
   grc/trunk/src/ExecFlowGraph.py
   grc/trunk/src/MathExprParser.py
   grc/trunk/src/SignalBlockDefs/Audio.py
   grc/trunk/src/SignalBlockDefs/Filters.py
   grc/trunk/src/SignalBlockDefs/Misc.py
   grc/trunk/src/SignalBlockDefs/Packet.py
Log:
split up callbacks into ones needed locks and ones that did not

Modified: grc/trunk/src/ExecFlowGraph.py
===================================================================
--- grc/trunk/src/ExecFlowGraph.py      2007-07-14 05:50:20 UTC (rev 5959)
+++ grc/trunk/src/ExecFlowGraph.py      2007-07-14 06:05:00 UTC (rev 5960)
@@ -25,7 +25,7 @@
 import Variables
 from Elements import SignalBlock
 from gnuradio import gr
-import os
+import os,time
 from Constants import FLOW_GRAPH_FILE_EXTENSION,MUTEX
 from optparse import OptionParser
 
@@ -40,6 +40,7 @@
                gr.top_block.__init__(self, file_path)  
                #internal data structures               
                self.callbacks = list()
+               self.callbacks_locked = list()
                self.var_keys = list()          
                self.runtime = gr.runtime(self)
                self.started = False
@@ -68,26 +69,49 @@
 Exiting!"""
                exit(-1)
                
-       def add_callback(self, function, data_type_params):
-               """Register a callback function with its associated data."""
+       def add_callback(self, function, *data_type_params):
+               """
+               Register a callback function with its associated data.
+               @param function the callback function
+               @param data_type_params a list of data types
+               """
                self.callbacks.append((function, data_type_params))
                
+       def add_callback_locked(self, function, *data_type_params):
+               """
+               Register a callback function with its associated data.
+               These callbacks will be called inside of a lock/unlock sequence.
+               @param function the callback function
+               @param data_type_params a list of data types
+               """
+               self.callbacks_locked.append((function, data_type_params))
+               
+       def _parse_callback(self, function, *data_type_params):
+               """
+               Parse a single callback. Call function on the data types.
+               @param function the callback function
+               @param data_type_params a list of data types
+               """
+               print "***\nBegin A callback\n%s\n\n***"%function
+               try: function(*map(lambda param: param.parse(), 
data_type_params))
+               except Exception, e: print "***\n\nerror parsing a callback -> 
ignoring\n%s...\n\n***"%e
+               print "***\nEnd A callback\n***"                
+               
        def parse_callbacks(self):
                """For each call back, parse all of the data and 
                call the registered callback function on that data."""
                MUTEX.lock()
                print "***\n\nCallback Time BEGIN\n\n***"
-               started = self.started
-               if started: self._hb.lock()
-               for function, data_type_params in self.callbacks:
-                       print "***\nBegin A callback\n%s\n\n***"%function
-                       try:
-                               if type(data_type_params) in (type(list()), 
type(tuple())):
-                                       function(*map(lambda param: 
param.parse(), data_type_params))
-                               else: function(data_type_params.parse())        
        
-                       except Exception, e: print "***\n\nerror parsing a 
callback -> ignoring\n%s...\n\n***"%e
-                       print "***\nEnd A callback\n***"
-               if started: self._hb.unlock()
+               if self.started:                        
+                       if self.callbacks:      #parse regular callbacks
+                               for function, data_type_params in 
self.callbacks:
+                                       self._parse_callback(function, 
*data_type_params)
+                       if self.callbacks_locked:       #parse locked callbacks 
                                
+                               self._hb.lock() 
+                               for function, data_type_params in 
self.callbacks_locked:
+                                       self._parse_callback(function, 
*data_type_params)
+                               self._hb.unlock()
+                               time.sleep(.005)        #sleep to lower chances 
of possible thread-lockup
                print "***\n\nCallback Time END\n\n***"
                MUTEX.unlock()
                

Modified: grc/trunk/src/MathExprParser.py
===================================================================
--- grc/trunk/src/MathExprParser.py     2007-07-14 05:50:20 UTC (rev 5959)
+++ grc/trunk/src/MathExprParser.py     2007-07-14 06:05:00 UTC (rev 5960)
@@ -240,7 +240,7 @@
 #      Boolean tests for special characters and symbols
 #########################################################
 
-def _is_list(symbol): return type(symbol) == type(list())
+def _is_list(symbol): return type(symbol) is list
 
 def _is_function(symbol): return symbol in _FUNCTIONS.keys()
 

Modified: grc/trunk/src/SignalBlockDefs/Audio.py
===================================================================
--- grc/trunk/src/SignalBlockDefs/Audio.py      2007-07-14 05:50:20 UTC (rev 
5959)
+++ grc/trunk/src/SignalBlockDefs/Audio.py      2007-07-14 06:05:00 UTC (rev 
5960)
@@ -102,7 +102,7 @@
                self._hb.unlock()               
                
 
#######################################################################################
-##     Selector Defs for Audio Source and Audio Sink
+##      Defs for Audio Source and Audio Sink
 
#######################################################################################
                                
                                
 def AudioSink(sb):
@@ -115,7 +115,7 @@
 If only one audio stream is connected, both channels will receive the 
stream.''')
        def make(fg, samp_rate):
                block = AudioHelper(samp_rate.parse(), SINK)
-               fg.add_callback(block.reconstruct, samp_rate)
+               fg.add_callback_locked(block.reconstruct, samp_rate)
                return block
        return sb, make 
        
@@ -127,7 +127,7 @@
        sb.set_docs('''The left output must be connected, The right output is 
optional.''')
        def make(fg, samp_rate):
                block = AudioHelper(samp_rate.parse(), SOURCE)
-               fg.add_callback(block.reconstruct, samp_rate)
+               fg.add_callback_locked(block.reconstruct, samp_rate)
                return block
        return sb, make 
        

Modified: grc/trunk/src/SignalBlockDefs/Filters.py
===================================================================
--- grc/trunk/src/SignalBlockDefs/Filters.py    2007-07-14 05:50:20 UTC (rev 
5959)
+++ grc/trunk/src/SignalBlockDefs/Filters.py    2007-07-14 06:05:00 UTC (rev 
5960)
@@ -162,7 +162,7 @@
        decimation = decimation.parse() 
        taps = taps_maker(*map(lambda data:data.parse(), taps_args))            
        block = filter(decimation, taps)
-       fg.add_callback(lambda *args: block.set_taps(taps_maker(*args)), 
taps_args)
+       fg.add_callback(lambda *args: block.set_taps(taps_maker(*args)), 
*taps_args)
        return block    
 
 window_choices = [

Modified: grc/trunk/src/SignalBlockDefs/Misc.py
===================================================================
--- grc/trunk/src/SignalBlockDefs/Misc.py       2007-07-14 05:50:20 UTC (rev 
5959)
+++ grc/trunk/src/SignalBlockDefs/Misc.py       2007-07-14 06:05:00 UTC (rev 
5960)
@@ -199,8 +199,8 @@
                        input_index.parse(), 
                        output_index.parse(),
                )
-               fg.add_callback(block.set_input_index, input_index)
-               fg.add_callback(block.set_output_index, output_index)
+               fg.add_callback_locked(block.set_input_index, input_index)
+               fg.add_callback_locked(block.set_output_index, output_index)
                return block
        return sb, make
 
@@ -215,8 +215,8 @@
        sb.set_docs('''When open is 0, the valve will forward data.''') 
        def make(fg, type, open, vlen):
                item_size = type.parse().get_num_bytes()*vlen.parse()
-               block = SelectorHelper(fg, item_size, 1, 1, 0, open.parse())
-               fg.add_callback(block.set_output_index, open)
+               block = SelectorHelper(item_size, 1, 1, 0, open.parse())
+               fg.add_callback_locked(block.set_output_index, open)
                return block
        return sb, make
        

Modified: grc/trunk/src/SignalBlockDefs/Packet.py
===================================================================
--- grc/trunk/src/SignalBlockDefs/Packet.py     2007-07-14 05:50:20 UTC (rev 
5959)
+++ grc/trunk/src/SignalBlockDefs/Packet.py     2007-07-14 06:05:00 UTC (rev 
5960)
@@ -89,7 +89,7 @@
                modulator.samples_per_symbol = lambda: samples_per_symbol
                modulator.bits_per_symbol = lambda: bits_per_symbol
                #create the packet modulator (handles the output data stream)
-               packet_mod = blks.mod_pkts(     
+               self.packet_mod = blks.mod_pkts(        
                        fg=self, 
                        modulator=modulator, 
                        access_code=access_code, 
@@ -98,13 +98,21 @@
                        use_whitener_offset=use_whitener_offset,
                )
                #the message sink (handles the input data stream)
-               msgq = gr.msg_queue(DEFAULT_QUEUE_LIMIT)
-               msg_sink = gr.message_sink(item_size, msgq, False)              
+               self.msgq = gr.msg_queue(DEFAULT_QUEUE_LIMIT)
+               msg_sink = gr.message_sink(item_size, self.msgq, False)         
                #connections
-               self.connect(packet_mod.tail, self)
+               self.connect(self.packet_mod.tail, self)
                self.connect(self, msg_sink)            
                #create/start the thread
-               PacketModThread(msgq, packet_mod.send_pkt, packet_length)
+               PacketModThread(self.msgq, self.packet_mod.send_pkt, 
packet_length)
+       
+       def flush(self):
+               """
+               Flush the message queues.
+               Special locked callback to avoid thread lockup.
+               """
+               self.msgq.flush()       
+               self.packet_mod._pkt_input.msgq().flush()       
 
 class PacketDemodHelper(gr.hier_block2):
        """Forward data from demod packet to the gr data stream."""
@@ -185,7 +193,7 @@
                packet_length = packet_length.parse()
                if packet_length%item_size != 0:        #verify that packet 
length is a multiple of the stream size
                        raise ValueError('The packet length: "%d" is not a 
mutiple of the stream size: "%d".'%(packet_length, item_size))
-               return PacketModHelper(
+               block = PacketModHelper(
                        item_size=item_size, 
                        packet_length=packet_length,
                        samples_per_symbol=samples_per_symbol.parse(), 
@@ -194,6 +202,8 @@
                        pad_for_usrp=pad_for_usrp.parse(), 
                        use_whitener_offset=use_whitener_offset.parse(),
                )       #build packet modulator
+               fg.add_callback_locked(block.flush)
+               return block
        return sb, make
        
 def PacketDemod(sb):





reply via email to

[Prev in Thread] Current Thread [Next in Thread]