commit-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Commit-gnuradio] [gnuradio] 02/06: filter: Adding hierarchical pfb chan


From: git
Subject: [Commit-gnuradio] [gnuradio] 02/06: filter: Adding hierarchical pfb channelizer. Useful if pfb_channelizer is bottle-neck since it splits it into multiple blocks.
Date: Mon, 31 Mar 2014 20:27:42 +0000 (UTC)

This is an automated email from the git hooks/post-receive script.

jcorgan pushed a commit to branch master
in repository gnuradio.

commit 96977d9d6b3aebe9cc8a099c9c024c919d9f7f51
Author: Ben Reynwar <address@hidden>
Date:   Thu Mar 27 10:50:32 2014 -0700

    filter: Adding hierarchical pfb channelizer.  Useful if pfb_channelizer is 
bottle-neck since it splits it into multiple blocks.
---
 gr-filter/include/gnuradio/filter/CMakeLists.txt |   2 +-
 gr-filter/python/filter/pfb.py                   |  83 ++++++++++++-
 gr-filter/python/filter/qa_pfb_channelizer.py    | 144 ++++++++++++++---------
 3 files changed, 173 insertions(+), 56 deletions(-)

diff --git a/gr-filter/include/gnuradio/filter/CMakeLists.txt 
b/gr-filter/include/gnuradio/filter/CMakeLists.txt
index f9cb933..1af91a6 100644
--- a/gr-filter/include/gnuradio/filter/CMakeLists.txt
+++ b/gr-filter/include/gnuradio/filter/CMakeLists.txt
@@ -90,7 +90,7 @@ install(FILES
     pm_remez.h
     polyphase_filterbank.h
     filterbank.h
-    filterbank_ccf.h
+    filterbank_vcvcf.h
     single_pole_iir.h
     dc_blocker_cc.h
     dc_blocker_ff.h
diff --git a/gr-filter/python/filter/pfb.py b/gr-filter/python/filter/pfb.py
index 732812e..0fb1fe9 100644
--- a/gr-filter/python/filter/pfb.py
+++ b/gr-filter/python/filter/pfb.py
@@ -20,9 +20,10 @@
 # Boston, MA 02110-1301, USA.
 #
 
-from gnuradio import gr
+import optfir, math
+
+from gnuradio import gr, fft
 import filter_swig as filter
-import optfir
 
 try:
     from gnuradio import blocks
@@ -341,3 +342,81 @@ class arb_resampler_ccc(gr.hier_block2):
 
     def set_rate(self, rate):
         self.pfb.set_rate(rate)
+
+
+class channelizer_hier_ccf(gr.hier_block2):
+    """
+    Make a Polyphase Filter channelizer (complex in, complex out, 
floating-point taps)
+
+    Args:
+        n_chans - The number of channels to split into.
+        n_filterbanks - The number of filterbank blocks to use (default=2).
+        taps: The taps to use.  If this is `None` then taps are generated 
using optfir.low_pass.
+        outchans - Which channels to output streams for (a list of integers) 
(default is all channels).
+        atten: Stop band attenuation.
+        bw: The fraction of the channel you want to keep.
+        tb: Transition band with as fraction of channel width.
+        ripple: Pass band ripple in dB.
+    """
+
+    def __init__(self, n_chans, n_filterbanks=1, taps=None, outchans=None,
+                 atten=100, bw=1.0, tb=0.2, ripple=0.1):
+        if outchans is None:
+            outchans = range(n_chans)
+        gr.hier_block2.__init__(
+            self, "pfb_channelizer_hier_ccf",
+            gr.io_signature(1, 1, gr.sizeof_gr_complex),
+            gr.io_signature(len(outchans), len(outchans), 
gr.sizeof_gr_complex))
+        if taps is None:
+            taps = optfir.low_pass(1, n_chans, bw, bw+tb, ripple, atten)
+        taps = list(taps)
+        extra_taps = int(math.ceil(1.0*len(taps)/n_chans)*n_chans - len(taps))
+        taps = taps + [0] * extra_taps
+        # Make taps for each channel
+        chantaps = [list(reversed(taps[i: len(taps): n_chans])) for i in 
range(0, n_chans)]
+        # Convert the input stream into a stream of vectors.
+        self.s2v = blocks.stream_to_vector(gr.sizeof_gr_complex, n_chans)
+        # Create a mapping to separate out each filterbank (a group of 
channels to be processed together)
+        # And a list of sets of taps for each filterbank.
+        low_cpp = int(n_chans/n_filterbanks)
+        extra = n_chans - low_cpp*n_filterbanks
+        cpps = [low_cpp+1]*extra + [low_cpp]*(n_filterbanks-extra)
+        splitter_mapping = []
+        filterbanktaps = []
+        total = 0
+        for cpp in cpps:
+            splitter_mapping.append([(0, i) for i in range(total, total+cpp)])
+            filterbanktaps.append(chantaps[total: total+cpp])
+            total += cpp
+        assert(total == n_chans)
+        # Split the stream of vectors in n_filterbanks streams of vectors.
+        self.splitter = blocks.vector_map(gr.sizeof_gr_complex, [n_chans], 
splitter_mapping)
+        # Create the filterbanks
+        self.fbs = [filter.filterbank_vcvcf(taps) for taps in filterbanktaps]
+        # Combine the streams of vectors back into a single stream of vectors.
+        combiner_mapping = [[]]
+        for i, cpp in enumerate(cpps):
+            for j in range(cpp):
+                combiner_mapping[0].append((i, j))
+        self.combiner = blocks.vector_map(gr.sizeof_gr_complex, cpps, 
combiner_mapping)
+        self.prefft_snk = blocks.vector_sink_c(n_chans)
+        self.connect(self.combiner, self.prefft_snk)
+        # Add the final FFT to the channelizer.
+        self.fft = fft.fft_vcc(n_chans, forward=True, window=[1.0]*n_chans)
+        # Select the desired channels
+        if outchans != range(n_chans):
+            selector_mapping = [[(0, i) for i in outchans]]
+            self.selector = blocks.vector_map(gr.sizeof_gr_complex, [n_chans], 
selector_mapping)
+        # Convert stream of vectors to a normal stream.
+        self.v2ss = blocks.vector_to_streams(gr.sizeof_gr_complex, 
len(outchans))
+        self.connect(self, self.s2v, self.splitter)
+        for i in range(0, n_filterbanks):
+            self.connect((self.splitter, i), self.fbs[i], (self.combiner, i))
+        self.connect(self.combiner, self.fft)
+        if outchans != range(n_chans):
+            self.connect(self.fft, self.selector, self.v2ss)
+        else:
+            self.connect(self.fft, self.v2ss)
+        for i in range(0, len(outchans)):
+            self.connect((self.v2ss, i), (self, i))
+            
diff --git a/gr-filter/python/filter/qa_pfb_channelizer.py 
b/gr-filter/python/filter/qa_pfb_channelizer.py
index 46c6e7b..549e41d 100755
--- a/gr-filter/python/filter/qa_pfb_channelizer.py
+++ b/gr-filter/python/filter/qa_pfb_channelizer.py
@@ -20,8 +20,8 @@
 # Boston, MA 02110-1301, USA.
 #
 
-from gnuradio import gr, gr_unittest, filter, blocks
-import math
+from gnuradio import gr, gr_unittest, filter, blocks, analog
+import math, cmath
 
 def sig_source_c(samp_rate, freq, amp, N):
     t = map(lambda x: float(x)/samp_rate, xrange(N))
@@ -29,88 +29,126 @@ def sig_source_c(samp_rate, freq, amp, N):
                 1j*math.sin(2.*math.pi*freq*x), t)
     return y
 
+
 class test_pfb_channelizer(gr_unittest.TestCase):
 
     def setUp(self):
         self.tb = gr.top_block()
+        self.freqs = [110., -513., 203., -230, 121]
+        # Number of channels to channelize.
+        self.M = len(self.freqs)
+        # Number of samples to use.
+        self.N = 1000
+        # Baseband sampling rate.
+        self.fs = 5000
+        # Input samp rate to channelizer.
+        self.ifs = self.M*self.fs
+        
+        self.taps = filter.firdes.low_pass_2(
+            1, self.ifs, self.fs/2, self.fs/10,
+            attenuation_dB=80,
+            window=filter.firdes.WIN_BLACKMAN_hARRIS)
+
+        self.Ntest = 50
+
 
     def tearDown(self):
         self.tb = None
 
-    def test_000(self):
-        N = 1000         # number of samples to use
-        M = 5            # Number of channels to channelize
-        fs = 5000        # baseband sampling rate
-        ifs = M*fs       # input samp rate to channelizer
-
-        taps = filter.firdes.low_pass_2(1, ifs, fs/2, fs/10,
-                                        attenuation_dB=80,
-                                        
window=filter.firdes.WIN_BLACKMAN_hARRIS)
-
+    def test_0000(self):
+        self.check_channelizer(filter.pfb.channelizer_ccf(
+            self.M, taps=self.taps, oversample_rate=1))
+
+    def test_0001(self):
+        self.check_channelizer(filter.pfb.channelizer_hier_ccf(
+            self.M, n_filterbanks=1, taps=self.taps))
+
+    def get_input_data(self):
+        """
+        Get the raw data generated by addition of sinusoids.
+        Useful for debugging.
+        """
+        tb = gr.top_block()
+        signals = []
+        add = blocks.add_cc()
+        for i in xrange(len(self.freqs)):
+            f = self.freqs[i] + i*self.fs
+            signals.append(analog.sig_source_c(self.ifs, analog.GR_SIN_WAVE, 
f, 1))
+            tb.connect(signals[i], (add,i))
+        head = blocks.head(gr.sizeof_gr_complex, self.N)
+        snk = blocks.vector_sink_c()
+        tb.connect(add, head, snk)
+        tb.run()
+        input_data = snk.data()
+        return input_data
+
+    def check_channelizer(self, channelizer_block):
         signals = list()
         add = blocks.add_cc()
-        freqs = [-230., 121., 110., -513., 203.]
-        for i in xrange(len(freqs)):
-            f = freqs[i] + (M/2-M+i+1)*fs
-            data = sig_source_c(ifs, f, 1, N)
+        for i in xrange(len(self.freqs)):
+            f = self.freqs[i] + i*self.fs
+            data = sig_source_c(self.ifs, f, 1, self.N)
             signals.append(blocks.vector_source_c(data))
             self.tb.connect(signals[i], (add,i))
 
-        s2ss = blocks.stream_to_streams(gr.sizeof_gr_complex, M)
-        pfb = filter.pfb_channelizer_ccf(M, taps, 1)
+        #s2ss = blocks.stream_to_streams(gr.sizeof_gr_complex, self.M)
 
-        self.tb.connect(add, s2ss)
+        #self.tb.connect(add, s2ss)
+        self.tb.connect(add, channelizer_block)
 
         snks = list()
-        for i in xrange(M):
+        for i in xrange(self.M):
             snks.append(blocks.vector_sink_c())
-            self.tb.connect((s2ss,i), (pfb,i))
-            self.tb.connect((pfb, i), snks[i])
+            #self.tb.connect((s2ss,i), (channelizer_block,i))
+            self.tb.connect((channelizer_block, i), snks[i])
 
         self.tb.run()
 
-        Ntest = 50
         L = len(snks[0].data())
 
-        # Adjusted phase rotations for data
-        p0 = 0.11058379158914133
-        p1 = 4.5108246571401693
-        p2 = 3.9739891674564594
-        p3 = 2.2820531095511924
-        p4 = 1.3782797467397869
+        expected_data = self.get_expected_data(L)
+        received_data = [snk.data() for snk in snks]
+        
+        for expected, received in zip(expected_data, received_data):
+            self.compare_data(expected, received)
+
+    def compare_data(self, expected, received):
+        Ntest = 50
+        expected = expected[-Ntest:]
+        received = received[-Ntest:]
+        expected = [x/expected[0] for x in expected]
+        received = [x/received[0] for x in received]
+        self.assertComplexTuplesAlmostEqual(expected, received, 3)
+        
+
+    def get_freq(self, data):
+        freqs = []
+        for r1, r2 in zip(data[:-1], data[1:]):
+            diff = cmath.phase(r1) - cmath.phase(r2) 
+            if diff > math.pi:
+                diff -= 2*math.pi
+            if diff < -math.pi:
+                diff += 2*math.pi
+            freqs.append(diff)
+        freq = float(sum(freqs))/len(freqs)
+        freq /= 2*math.pi
+        return freq
+
+    def get_expected_data(self, L):
 
         # Filter delay is the normal delay of each arm
-        tpf = math.ceil(len(taps) / float(M))
+        tpf = math.ceil(len(self.taps) / float(self.M))
         delay = -(tpf - 1.0) / 2.0
         delay = int(delay)
 
         # Create a time scale that's delayed to match the filter delay
-        t = map(lambda x: float(x)/fs, xrange(delay, L+delay))
+        t = map(lambda x: float(x)/self.fs, xrange(delay, L+delay))
 
         # Create known data as complex sinusoids at the different baseband 
freqs
         # the different channel numbering is due to channelizer output order.
-        expected0_data = map(lambda x: math.cos(2.*math.pi*freqs[2]*x+p0) + \
-                                       1j*math.sin(2.*math.pi*freqs[2]*x+p0), 
t)
-        expected1_data = map(lambda x: math.cos(2.*math.pi*freqs[3]*x+p1) + \
-                                       1j*math.sin(2.*math.pi*freqs[3]*x+p1), 
t)
-        expected2_data = map(lambda x: math.cos(2.*math.pi*freqs[4]*x+p2) + \
-                                       1j*math.sin(2.*math.pi*freqs[4]*x+p2), 
t)
-        expected3_data = map(lambda x: math.cos(2.*math.pi*freqs[0]*x+p3) + \
-                                       1j*math.sin(2.*math.pi*freqs[0]*x+p3), 
t)
-        expected4_data = map(lambda x: math.cos(2.*math.pi*freqs[1]*x+p4) + \
-                                       1j*math.sin(2.*math.pi*freqs[1]*x+p4), 
t)
-
-        dst0_data = snks[0].data()
-        dst1_data = snks[1].data()
-        dst2_data = snks[2].data()
-        dst3_data = snks[3].data()
-        dst4_data = snks[4].data()
-
-        self.assertComplexTuplesAlmostEqual(expected0_data[-Ntest:], 
dst0_data[-Ntest:], 3)
-        self.assertComplexTuplesAlmostEqual(expected1_data[-Ntest:], 
dst1_data[-Ntest:], 3)
-        self.assertComplexTuplesAlmostEqual(expected2_data[-Ntest:], 
dst2_data[-Ntest:], 3)
-        self.assertComplexTuplesAlmostEqual(expected3_data[-Ntest:], 
dst3_data[-Ntest:], 3)
-        self.assertComplexTuplesAlmostEqual(expected4_data[-Ntest:], 
dst4_data[-Ntest:], 3)
+        expected_data = [map(lambda x: math.cos(2.*math.pi*f*x) +
+                             1j*math.sin(2.*math.pi*f*x), t) for f in 
self.freqs]
+        return expected_data
 
 if __name__ == '__main__':
     gr_unittest.run(test_pfb_channelizer, "test_pfb_channelizer.xml")



reply via email to

[Prev in Thread] Current Thread [Next in Thread]