flutter How to process bloc events in parallel?

Issue

Consider an app for counting colors.

  • A server provides a list of colors.
  • The user can click on a color in the app UI
  • The clicks per color are counted and each click is stored on the server.

I have build a BLoC to manage the "color-counters".

class ColorsBloc extends Bloc<ColorsEvent, ColorsState> {
  final ColorRepository colorRepository;

  ColorsBloc({required this.colorRepository}) : super(ColorsState.initial());

  @override
  Stream<ColorsState> mapEventToState(
      ColorsEvent event,
      ) async* {
    if (event is ColorsFetchRequested) {
      yield ColorsState.loading();
      try {
        final colors = await colorRepository.getColors();
        yield ColorsState.success(colors);
      } catch (e) {
        yield ColorsState.error();
      }
    } else if (event is ColorCounted) {
      yield* _mapColorCountedToState(event);
    }
  }

  Stream<ColorsState> _mapColorCountedToState(ColorCounted event) async* {
    yield state.copyWith(
      sendingByColorId: {...state.sendingByColorId, event.colorId},
    );
    await colorRepository.storeColor(Color(
      colorId: event.colorId,
      timestamp: DateTime.now().millisecondsSinceEpoch,
    ));
    final colors = await colorRepository.getColors();
    yield state.copyWith(
      status: Status.success,
      colors: colors,
      sendingByColorId: {...state.sendingByColorId}..remove(event.colorId),
    );
  }
}

Sending a color-click takes some time (let’s say 1 second on a slow network). The user may not click a color again before it is stored to the server (what the sendingByColorId set keeps track of).

PROBLEM

The user however may click on different colors very fast. The counters are working in that case, but they lag behind because events are processed FIFO (including the await colorRepository.storeColor(...) and the await to get the updated colors list).

I want the sending state to update immediately after any click even if there are previous clicks which are currently in the process of storing it to the repository.

How can I enable the BLoC to keep on processing new events while another one is awaiting the API response?

Solution

Just to show a solution based on @kohjakob ‘s proposal but with:

  • no static methods
  • complete error handling routines

The idea is basically to wrap the repository call into an async method (_sendClick(...)) and call it non-blocking (i.e. without await) while the status update on the sending state is done synchronously.

The _sendClick(...) waits for the repository and adds a ColorSendSuccess or ColorSendFailed event to the bloc once it’s done. These events are then handle in their own run of the mapEventToState(...) routine.

class ColorsBloc extends Bloc<ColorsEvent, ColorsState> {
  final ColorRepository colorRepository;

  ColorsBloc({required this.colorRepository}) : super(ColorsState.initial());

  @override
  Stream<ColorsState> mapEventToState(
      ColorsEvent event,
      ) async* {
    if (event is ColorsFetchRequested) {
      yield ColorsState.loading();
      try {
        final colors = await colorRepository.getColors();
        yield ColorsState.success(colors);
      } catch (e) {
        yield ColorsState.error();
      }
    } else if (event is ColorCounted) {
      yield* _mapColorCountedToState(event);
    } else if (event is ColorSendSuccess) {
      yield _mapColorSendSuccessToState(event);
    } else if (event is ColorSendFailed) {
      yield _mapColorSendFailedToState(event);
    }
  }

  Stream<ColorsState> _mapColorCountedToState(ColorCounted event) async* {
    yield state.copyWith(
      sendingByColorId: {...state.sendingByColorId, event.colorId},
    );
    // non-blocking <----------------
    _sendClick(Color(
      colorId: event.colorId,
      timestamp: DateTime.now().millisecondsSinceEpoch,
    ));
    final colors = await colorRepository.getColors();
    yield state.copyWith(
      status: Status.success,
      colors: colors,
      sendingByColorId: {...state.sendingByColorId}..remove(event.colorId),
    );
  }

  Future<void> _sendClick(Color color) async {
    try {
     int newId = await colorRepository.storeColor(color);
     Color storedColor = color.copyWith(id: () => newId);
     add(ColorSendSuccess(color: storedColor));
    } on StoreColorClickException catch (_) {
      add(ColorSendFailed(color: color));
    }
  }

  ColorsState _mapColorSendSuccessToState(ColorCounted event) async* {
    return state.copyWith(
      colors: [...state.colors]
        // replace the local color-click with the stored one
        ..removeWhere((element) => element.localId == event.color.localId)
        ..add(event.color.copyWith(localId: () => null)),
      sendingByColorId: {...state.sendingByColorId}..remove(event.color.id),
    );
  }

  ColorsState _mapColorSendFailedToState(ColorCounted event) async* {
    return state.copyWith(
      colors: [...state.colors]
        // remove the color that has not been stored
        ..removeWhere((element) => element.localId == event.color.localId),
      sendingByColorId: {...state.sendingByColorId}..remove(event.color.localId),
      // mark the color as failed
      errorByColorId: {...state.errorByColorId, event.color.localId},
    );
  }
}

Answered By – Stuck

Answer Checked By – Katrina (FlutterFixes Volunteer)

Leave a Reply

Your email address will not be published.