I'm writing a Python block that, in general, does
the following:
Input: a stream of numbers (assume integers for the sake of
discussion).
Output: a stream of numbers
I made some generalizations to the real problem to simplify
the question.
The block looks for sequences of positive integers and tags
the max integer in each sequence it finds.
Example (^ means a tag):
0 0 1 0 0 2 1 0 2 3 2 0 0 0 7 0 4 2 5 3 8 2 4 6 3 6 4 2 7 9
3 1 0 5 0 --> 1^ 2^ 3^ 7^ 9^ 5^
The trivial implementation - in each call to
(general_)work() process the current window, fails when the
scheduler cuts a sequence in the middle, for example, there's
a long sequence above, the scheduler might process it as 2 or
more different calls to work(), which will end with wrong
results (I'll get more than one value). Clarification: if a
call to work() gets 0 4 2 5 and then the next call gets 3 8 2
4 and then 6 3 6 4 2 7 9 3 1, the output will be 5^ 8^ 9^ and
not only 9^.
Note: The sequences can be of arbitrary lengths. I'm ok
with a practical solution that says that above some very high
sequence length, the block ignores the data or gives wrong
results. I don't have practical memory limits in my desktop
(the app has several GBs it can consume in addition to what it
consumes now).
I solved the issue by implementing a check: if the last
input value is positive, this call of work() can't know for
certain it is the last value of the sequence, so return 0 (no
consumption) and ask for more data (I took this idea from
peak_detector2_fb_impl.cc). The block insists on getting a
sequence that is long enough such that it has a 0 in the end.
For the sake of discussion, let's ignore the scenario that
there are multiple sequences in a single call to work().
I have several questions and issues with this solution:
- The only way that worked for me is using
self.set_output_multiple() when necessary (when the block
sees an unended sequence), and calling it again with lower
values later when the sequence is ended (to improve
latency). Since I don't know the sequence length, I keep
calling set_output_multiple(2 * self.output_multiple())
until I get what I need. The question is: Why do I need
the output multiple to begin with? If the block doesn't
consume anything, and the upstream block keeps feeding it
with data, I expect the input buffers to grow, thus the
next call to work() should come with more items to
process. Instead, I noticed it doesn't happen, and only an
output multiple makes the scheduler provide more items for
processing in the next call to work().
- At some point I get the error "sched: <block my_block
(8)> is requesting more input data than we can provide.
ninput_items_required = 74. max_possible_items_available =
63. If this is a filter, consider reducing the number of
taps.". How do I increase the
max_possible_items_available? I searched the source code
but couldn't find a way that works in Python (not all
methods are exposed to Python) that increases the input
buffers so that max_possible_items_available (input buffer
size - 1) is increased? Again, I can allow myself
increasing the input buffer of this specific block to a
lot more. I can also increase the input buffers
application-wide, but really only this block needs them to
be bigger.
- How can I handle this error at runtime without
"crashing"? Ideally, I want to catch an exception and let
the block decides it ignores the data / gives false
results, but block_executor::run_one_iteration() goes to
were_done thus stopping the block. In the spirit of
practicality, I can also live with workarounds like
restarting the block.
Thanks,
Gilad