[Top][All Lists]

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Discuss-gnuradio] Python gr_block implementation

From: Dev Ramudit
Subject: [Discuss-gnuradio] Python gr_block implementation
Date: Tue, 26 Jun 2007 13:23:29 -0400
User-agent: Icedove (X11/20070329)

Hello everyone,

I'm trying to implement a collection of networked USRPs for an application I'm working on. I need to create blocks that retrieve data from various remote sources, and the library I've been using this far has been pyro (pyro.sourceforge.net), which is extremely simple and straightforward for the kinds of networked tests I need to run. Unfortunately, I can't send the USRP back as it is through pyro, because it is a SWIG object using local file descriptors, which pyro is unable to serialize. I'd like to implement my own blocks in python to sit between the remote USRPs' data and my local application, to handle the various network transfer and configuration issues involved. I realize doing this is possible in C++ with SWIG, but I'd thought I'd try to do it first in Python and see if it was doable (the speed issues aren't really a factor, as the network connection is going to be my main bottleneck).

I've read through the various .c, .h and .i files for gr_block, gr_basic_block and flow_graph, and created my python class accordingly. As far as I can see, I've implemented every method necessary to make my gr_pyblock class a drop-in replacement for the gr_block class. I've created a test very similar to the "writing your own block" document, and everything seems to work without errors (I've done more than a day's worth of dropping in print statements everywhere in the library to determine that the io_signature and block_details of my class are receiving the right values), however, when the flow_graph gets forked to a different thread, it stalls indefinitely. Replacing my gr_pyblock implementation with the example code provided makes it work again. I'm stumped as to where the problem is, and I've attached all of my work so far. I'd greatly appreciate any help in this matter, or suggestions as to where to look for problems.

Thank you!
Dev Ramudit

P.S. I apologize if attachments aren't to be sent on this list, please let me know if I'm incorrect in doing so :)
#!/usr/bin/env python
# Dev Ramudit

class gr_pyblock:
    def __init__(self, name, input_signature, output_signature):
        self.d_name = name
        self.d_input_signature = input_signature
        self.d_output_signature = output_signature
#       need a way to access basic_block and get both of these (?):
#       d_unique_id = s_next_id++
        self.d_unique_id = 2
#       s_ncurrently_allocated++
        self.d_output_multiple  = 1
        self.d_relative_rate = 1.0
        self.d_history = 1
        self.d_fixed_rate = False

    def name(self):
        return self.d_name

    def input_signature(self):
        return self.d_input_signature

    def output_signature(self):
        return self.d_output_signature

    def unique_id(self):
        return self.d_unique_id

    def history(self):
        return self.d_history

    def set_history(self, hist):
        self.d_history = hist

    def fixed_rate(self):
        return self.d_fixed_rate

    def forecast(self, noutput_items, ninput_items_required):
        for item in xrange(len(ninput_items_required)):
            ninuput_items_required[item] = noutput_items

    def general_work(self, noutput_items, ninput_items, input_items, 
        raise NotImplementedError("gr_pyblock.general_work")

    def check_topology(self, ninputs, noutputs):
#        raise NotImplementedError("gr_pyblock.check_topology")
        return True

    def start(self):
        return True

    def stop(self):
        return True

    def set_output_multiple(self, multiple):
        if multiple < 1:
            raise RuntimeError ("Invalid argument to: 
        self.d_output_multiple = multiple

    def output_multiple(self):
        return self.d_output_multiple

    def consume(self, which_input, how_many_items):
        self.d_detail.consume(which_input, how_many_items)

    def consume_each(self, how_many_items):

    def set_relative_rate(self, relative_rate):
        if relative_rate < 0.0:
            raise RuntimeError ("Invalid argument to: 
        self.d_relative_rate = relative_rate

    def relative_rate(self):
        return self.d_relative_rate

    def fixed_rate_ninput_to_noutput(self, ninput):
        raise NotImplementedError("gr_pyblock.fixed_rate_ninput_to_noutput")

    def fixed_rate_noutput_to_ninput(self, noutput):
        raise NotImplementedError("gr_pyblock.fixed_rate_noutput_to_ninput")

    def set_fixed_rate(self, fixed_rate):
        self.d_fixed_rate = fixed_rate

    def set_detail(self, detail):
        self.d_detail = detail

    def detail(self):
        return self.d_detail

    def block(self):
        return self
#!/usr/bin/env python

from gr_pyblock import gr_pyblock
import struct
from gnuradio.gr.gnuradio_swig_py_runtime import io_signature

MIN_IN = 1
MAX_IN = 1

class howto_square_ff(gr_pyblock):
    def __init__(self):
        gr_pyblock.__init__(self, "square_ff", io_signature(MIN_IN, MAX_IN, 
struct.calcsize('f')), io_signature(MIN_OUT, MAX_OUT, struct.calcsize('f')))

    def general_work(self, noutput_items, ninput_items, input_items, 
        raise RuntimeError()
        print "General work called"
        for item in enumerate(input_items):
            output_items[item[0]] = item[1] * item[1]
        return noutput_items
#!/usr/bin/env python

from gnuradio import gr, gr_unittest
from howto_square_ff import howto_square_ff

class qa_howto:
    def setUp(self):
        self.fg = gr.flow_graph()

    def tearDown(self):
        self.fg = None

    def test_001_square_ff(self):
        print "Starting test..."
        src_data = (-3, 4, -5.5, 2, 3)
        expected_result = (9, 16, 30.25, 4, 9)
        src = gr.vector_source_f(src_data)
        dst = gr.vector_sink_f()
        sqr = howto_square_ff()
        print src.unique_id()
        print dst.unique_id()
        print sqr.unique_id()

        self.fg.connect(src, sqr)
        self.fg.connect(sqr, dst)
        print "All connected, starting run..."
        print "Run complete"
        result_data = dst.data()
        print result_data
        self.assertFloatTuplesAlmostEqual(expected_result, result_data, 6)

if __name__ == '__main__':
#    gr_unittest.main()
    x = qa_howto()

reply via email to

[Prev in Thread] Current Thread [Next in Thread]