Multiple Access to the Same Stream

Issue

Context

The code below represents an abstraction where the MyClass is some kind of a download manager.

import 'dart:async';

Future<void> main() async {
  MyClass().test().listen((v) => print('t1: $v')).onError(print);
  final commomClass = MyClass();
  commomClass.test().listen((v) => print('t2: $v')).onError(print);
  commomClass.test().listen((v) => print('t3: $v')).onError(print);
}

class MyClass {
  bool _isDownloadInProgress = false;
  int _i = 0;
  StreamController<int> _sc;
  
  Stream<int> test() async* {
    if (_isDownloadInProgress) {
      throw Exception('Download already in progress');
    } else {
      _sc = StreamController<int>();
    }
    
    Timer.periodic(
      const Duration(seconds: 1),
      (t) {
        if (_i == 4) {
          _isDownloadInProgress = false;
          _i = 0;
          _sc.close();
          t.cancel();
        } else {
          _sc.add(_i++);
        }
      },
    );

    yield* _sc.stream;
  }
}

Question

I expected that after executing this code, it would generate the values t1 and t2 and the output t3 would generate ‘Download already in progress’ only once. For example:

t1: 0
t2: 0
t3: Download already in progress
t1: 1
t2: 1
t1: 2
t2: 2
t1: 3
t2: 3

But it outputs all four t1 values, eight t3 values and no ‘Download already in progress’ message:

t1: 0
t3: 0
t3: 1
t1: 1
t3: 2
t3: 3
t1: 2
t3: 0
t1: 3
t3: 1
t3: 2
t3: 3

For me, the t1 values would output correctly, the t2 also would output correctly, and the t3 would output the ‘Download already in progress’ message because as everything is being run asynchronous, it would be trying to ‘download’ something that is already being downloaded (since the test() method was called on the same instance of the MyClass).

What am I missing?

Solution

For starters, your code never sets _isDownloadInProgress to true, so there is no reason that "Download already in progress" would ever appear.

This actually is what’s causing the second error. When you call the t3 listen, since _isDownloadInProgress is always false, this results in _sc getting overwritten in addition to a new Timer.periodic getting enqueued. When each timer triggers, it references _sc, which is now the one containing the t3 listen, so you end up with two timers pushing events to the same stream controller, which is why you see double the t3 events.

Simply setting _isDownloadInProgress = true prior to the timer being instantiated is sufficient to get the expected results:

class MyClass {
  bool _isDownloadInProgress = false;
  int _i = 0;
  StreamController<int> _sc;
  
  Stream<int> test() async* {
    if (_isDownloadInProgress) {
      throw Exception('Download already in progress');
    } else {
      _sc = StreamController<int>();
    }
    
    _isDownloadInProgress = true; // Add this line
    Timer.periodic(
      const Duration(seconds: 1),
      (t) {
        if (_i == 4) {
          _isDownloadInProgress = false;
          _i = 0;
          _sc.close();
          t.cancel();
        } else {
          _sc.add(_i++);
        }
      },
    );

    yield* _sc.stream;
  }
}

Results:

Exception: Download already in progress
t1: 0
t2: 0
t1: 1
t2: 1
t1: 2
t2: 2
t1: 3
t2: 3

Answered By – Abion47

Answer Checked By – Robin (FlutterFixes Admin)

Leave a Reply

Your email address will not be published. Required fields are marked *