flatMapBatchesSingle<R> method

Single<List<R>> flatMapBatchesSingle<R>(
  1. Single<R> transform(
    1. T value
    ),
  2. int size
)

Similar to flatMapBatches, but collect all output result batches into a List, and emit final result as a Single when source Stream emits done event.

If source Stream is empty, returns a Single that emits a empty list. Errors will be forwarded downstream, the first will cause the returned Single completes with that error.

input     : --|
transform : a -> a| (Single.value)
size      : 2
output    : --[]|

input     : --a---b---c----d--e--|
transform : a -> a| (Single.value)
size      : 3
output    : --------------[a,b,c,d,e]|

input     : --a---b---c--x--d--e--|
transform : a -> a| (Single.value)
size      : 3
output    : ----------x|

input     : --a---b---c--x--d--e--|
transform : a -> x| (Single.error)
size      : 3
output    : --x|

NOTE: x is error event

Implementation

Single<List<R>> flatMapBatchesSingle<R>(
  Single<R> Function(T value) transform,
  int size,
) {
  Stream<List<R>> convert(List<T> streams) =>
      Rx.forkJoinList(streams.map(transform).toList(growable: false));

  final seed = <R>[];
  return bufferCount(size)
      .asyncExpand(convert)
      .startWith(seed)
      .scan<List<R>>((acc, value, _) => acc..addAll(value), seed)
      .takeLast(1)
      .map((value) => List<R>.unmodifiable(value))
      .doneOnError()
      .singleOrError();
}