discuss-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Discuss-gnuradio] message passing: strobe or _post?


From: Jared Dulmage
Subject: [Discuss-gnuradio] message passing: strobe or _post?
Date: Thu, 2 Jul 2015 02:55:41 +0000

I am testing the capability of posting messages to Python blocks via Python.  I 
have included below a simple example in which the Python block just prints out 
the message received.  In one case, I send messages to this block via the 
builtin gnuradio.blocks.message_strobe block.  In the other case I try to call 
_post on the block directly from Python.  The former case works, but not the 
latter.  I've tried to delve into the code, but so far have not found an 
answer.  Any advice would be appreciated.

Jared.

Ubuntu 14.04
Python 2.7.6
GR 3.7.8

#!/usr/bin/env python
##################################################
# Message passing example
# Jared Dulmage
#
##################################################

from gnuradio import gr, blocks
import pmt
import json
import time

class message_print(gr.sync_block):
    def __init__(self):
        gr.sync_block.__init__(self, "message_print", None, None)
      
        self.msg_port = mport = "in_port"
        self.message_port_register_in(pmt.intern(mport))
        self.set_msg_handler(pmt.intern(mport), self.print_msg)
      
    def print_msg(self, msg):
        print "RECEIVED: ",
        print json.dumps(pmt.to_python(msg))

def main(*args):
    if "strobe" in args:
      main_strobe()
    else:
      main_no_strobe()

def main_no_strobe():
      tb = gr.top_block()
      msg_printer = message_print()
      tb.connect(msg_printer)
      tb.start()
      for n in range(10):
        time.sleep(0.1)
        msg_printer.to_basic_block()._post(pmt.intern(msg_printer.msg_port),
                                           pmt.to_pmt({'NUM': n}))
      tb.stop()
      tb.wait()

def main_strobe():
      tb = gr.top_block()
      msg_printer = message_print()
      msg_strobe = blocks.message_strobe(pmt.to_pmt({'NUM': -1}), 100)
      tb.msg_connect(msg_strobe, "strobe", msg_printer, msg_printer.msg_port)
      tb.start()
      time.sleep(1)
      tb.stop()
      tb.wait()


if __name__ == '__main__':
    import sys
    sys.exit(main(*sys.argv[1:]))


------------------------------------------------------
Jared Dulmage
Engineering Specialist
Digital Comm. and Implementation Dept.
Aerospace Corporation
310-336-3140



reply via email to

[Prev in Thread] Current Thread [Next in Thread]