rxdart: Get buffered elements on stream subscription cancel

Issue

I’m using a rxdart ZipStream within my app to combine two streams of incoming bluetooth data. Those streams are used along with "bufferCount" to collect 500 elements each before emitting. Everything works fine so far, but if the stream subscription gets cancelled at some point, there might be a number of elements in those buffers that are omitted after that. I could wait for a "buffer cycle" to complete before cancelling the stream subscription, but as this might take some time depending on the configured sample rate, I wonder if there is a solution to get those buffers as they are even if the number of elements might be less than 500.

Here is some simplified code for explanation:

subscription = ZipStream.zip2(
  streamA.bufferCount(500),
  streamB.bufferCount(500),
  (streamABuffer, streamBBuffer) {
    return ...;
  },
).listen((data) {
  ...
});

Thanks in advance!

Solution

So for anyone wondering: As bufferCount is implemented with BufferCountStreamTransformer which extends BackpressureStreamTransformer, there is a dispatchOnClose property that defaults to true. That means if the underlying stream whose emitted elements are buffered is closed, then the remaining elements in that buffer are emitted finally. This also applies to the example above. My fault was to close the stream and to cancel the stream subscription instantly. With awaiting the stream’s closing and cancelling the stream subscription afterwards, everything works as expected.

Answered By – b4sti

Answer Checked By – Willingham (FlutterFixes Volunteer)

Leave a Reply

Your email address will not be published. Required fields are marked *