ReplaySubject<T> constructor

ReplaySubject<T>({
  1. int? maxSize,
  2. void onListen()?,
  3. void onCancel()?,
  4. bool sync = false,
})

Constructs a ReplaySubject, optionally pass handlers for onListen, onCancel and a flag to handle events sync.

See also StreamController.broadcast

Implementation

factory ReplaySubject({
  int? maxSize,
  void Function()? onListen,
  void Function()? onCancel,
  bool sync = false,
}) {
  // ignore: close_sinks
  final controller = StreamController<T>.broadcast(
    onListen: onListen,
    onCancel: onCancel,
    sync: sync,
  );

  final queue = Queue<_Event<T>>();

  return ReplaySubject<T>._(
    controller,
    Rx.defer<T>(
      () => queue.toList(growable: false).reversed.fold(
        controller.stream,
        (stream, event) {
          final errorAndStackTrace = event.errorAndStackTrace;

          if (errorAndStackTrace != null) {
            return stream.transform(
              StartWithErrorStreamTransformer(
                errorAndStackTrace.error,
                errorAndStackTrace.stackTrace,
              ),
            );
          } else {
            return stream
                .transform(StartWithStreamTransformer(event.data as T));
          }
        },
      ),
      reusable: true,
    ),
    queue,
    maxSize,
  );
}