import { MonoTypeOperatorFunction, Observable, Subscription } from "rxjs";
import { filter, tap } from "rxjs/operators";

/**
 * A conditional buffer whose behavior is controlled by an `Observable` condition. 
 * When the buffer condition emits `truthy`, all values are buffered. When the condition emits `false`, any values in 
 * the buffer are emitted and the buffer is bypassed (future source emissions are emitted immediately).
 * 
 * If the source completes while the buffer is open, the completion is not signaled until the buffer closes. If the source errors,
 * the error is passed immediately regardless of the buffer state.
 * 
 * If the condition completes, a completion is passed immediately to the subscriber. If the buffer is closed when this happens, any
 * buffered values are lost. Errors on the condition are ignored. 
 * @param condition - Observable of a `boolean`
 * @param initialBufferState - The initial state of the buffer condition, defaults to false
 * @return {Observable} An Observable that emits values coming from the source Observable.
 */
export function bufferIf<T>(condition: Observable<boolean>, initialBufferState = false): MonoTypeOperatorFunction<T> {
    return (source: Observable<T>) => {
        return new Observable<T>(subscriber => {
            const subscriptions: Subscription[] = [];
            const buffer: T[] = [];
            let isBufferOpen = initialBufferState;
            let isSourceComplete = false;

            subscriptions.push(
                // handle source events
                source.subscribe({
                    next: value => {
                        // if buffer is open, or closed but buffer is still being 
                        // emptied from previously being closed.
                        if (isBufferOpen || (!isBufferOpen && buffer.length > 0)) {
                            buffer.push(value);
                        } else {
                            subscriber.next(value);
                        }
                    },
                    error: (err) => { 
                        subscriber.error(err);
                    },
                    complete: () => { 
                        isSourceComplete = true;
                        if (!isBufferOpen && buffer.length < 1) {
                            subscriber.complete();
                        }
                    },
                }),

                // handle condition events
                condition.pipe(
                    tap({
                        next: con => isBufferOpen = !!con
                    }),
                    filter(() => !isBufferOpen)
                ).subscribe({
                    next: () => {
                        while (buffer.length > 0 && !isBufferOpen) {
                            subscriber.next(buffer.shift());
                        }
                        if (isSourceComplete) {
                            subscriber.complete();
                        }
                    },
                    error: () => { 
                        // Errors are ignored and silenced
                    },
                    complete: () => { 
                        subscriber.complete();
                        // TODO: Wipe the buffer? Is it necessary?
                    },
                })
            );

            // on unsubscribe
            return () => subscriptions.forEach(sub => sub.unsubscribe());
        });
    }
}
