import { Injectable } from '@angular/core';
import { interval, Observable, Observer, Subject, Subscription } from 'rxjs';
import { webSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';
import { distinctUntilChanged, share, takeWhile } from 'rxjs/operators';

export class RxWebsocketSubject<T> extends Subject<T> {
    private reconnectionObservable: Observable<number>;

    private wsSubjectConfig: WebSocketSubjectConfig<any>;
    private socket: WebSocketSubject<any>;
    private socketSubscription: Subscription;
    private connectionObserver: Observer<boolean>;
    public connectionStatus: Observable<any>;
    public closingConnection: boolean = false;

    /// by default, when a message is received from the server, we are trying to decode it as JSON
    /// we can override it in the constructor
    defaultResultSelector = (e: MessageEvent) => {
        return JSON.parse(e.data);
    };

    constructor(
        private url: string,
        private multiplex: any = {
            subMsg: () => {
                return { a: 'b' };
            },
            unsubMsg: null,
            messageFilter: null,
        },
        private reconnectInterval: number = 3000, /// pause between connections
        private reconnectAttempts: number = 100, /// number of connection attempts
        private resultSelector?: (e: MessageEvent) => any,
    ) {
        super();
        /// connection status
        this.connectionStatus = new Observable((observer) => {
            this.connectionObserver = observer;
        }).pipe(share(), distinctUntilChanged());
        //.share()
        //.distinctUntilChanged();

        if (!resultSelector) {
            this.resultSelector = this.defaultResultSelector;
        }

        /// config for WebSocketSubject
        /// except the url, here is closeObserver and openObserver to update connection status
        this.wsSubjectConfig = {
            url: url,
            closeObserver: {
                next: (e: CloseEvent) => {
                    this.socket = null;
                    this.connectionObserver.next(false);
                },
            },
            openObserver: {
                next: (e: Event) => {
                    this.connectionObserver.next(true);
                },
            },
        };
        /// we connect
        this.connect();
        /// we follow the connection status and run the reconnect while losing the connection
        this.connectionStatus.subscribe((isConnected) => {
            if (!this.reconnectionObservable && typeof isConnected === 'boolean' && !isConnected) {
                this.reconnect();
            }
        });
    }

    close(): void {
        if (this.socket) {
            this.closingConnection = true;
            this.socketSubscription.unsubscribe();
            //this.socket.complete();
            this.complete();
            this.socket = null;
        }
    }

    connect(): void {
        this.socket = new WebSocketSubject(this.wsSubjectConfig);
        if (this.multiplex) {
            //this.socket.;
        }
        this.socketSubscription = this.socket
            .multiplex(this.multiplex.subMsg, this.multiplex.unsubMsg, this.multiplex.messageFilter)
            .subscribe(
                (m) => {
                    this.next(m); /// when receiving a message, we just send it to our Subject
                },
                (error: Event) => {
                    console.log(error);
                    //if (!this.socket) {
                    /// in case of an error with a loss of connection, we restore it
                    this.reconnect();
                    //}
                },
            );
    }

    /// WebSocket Reconnect handling
    reconnect(): void {
        if (this.closingConnection) {
            return;
        }
        this.reconnectionObservable = interval(this.reconnectInterval).pipe(
            takeWhile((v, index) => {
                return index < this.reconnectAttempts && !this.socket;
            }),
        );
        this.reconnectionObservable.subscribe({
            next: () => {
                this.connect();
            },
            complete: () => {
                /// if the reconnection attempts are failed, then we call complete of our Subject and status
                this.reconnectionObservable = null;
                if (!this.socket) {
                    this.complete();
                    this.connectionObserver.complete();
                }
            },
        });
    }

    /// sending the message
    send(data: any): void {
        if (this.socket) {
            this.socket.next(data);
        }
    }
}
