import * as Sentry from '@sentry/react';
import camelcaseKeys from 'camelcase-keys';
import type { IStream, QoS } from 'mqtt';
import mqtt from 'mqtt';
import type { SagaIterator, Task } from 'redux-saga';
import { eventChannel } from 'redux-saga';
import type { SagaReturnType } from 'redux-saga/effects';
import { call, delay, fork, take } from 'redux-saga/effects';

import type { DropMqttConnectionInfo } from 'types/api/events';
import { cancelTask } from 'utils/cancelTask';

export const generalMqttConnectionError = new Error('MQTT connection error');

enum MqttChannelEventType {
  Error = 'Error',
  Connected = 'Connected',
  Disconnected = 'Disconnected',
  Message = 'Message',
}

interface MqttChannelEvent<TEventType> {
  type: TEventType;
}

type MqttChannelEventConnected =
  MqttChannelEvent<MqttChannelEventType.Connected>;

type MqttChannelEventDisconnected =
  MqttChannelEvent<MqttChannelEventType.Disconnected>;

interface MqttChannelEventError
  extends MqttChannelEvent<MqttChannelEventType.Error> {
  error: Error;
}

interface MqttChannelEventMessage<TMessagePayload>
  extends MqttChannelEvent<MqttChannelEventType.Message> {
  topic: string;
  payload: TMessagePayload;
}

type MqttChannelEvents<TMessagePayload> =
  | MqttChannelEventMessage<TMessagePayload>
  | MqttChannelEventError
  | MqttChannelEventConnected
  | MqttChannelEventDisconnected;

interface MqttPublishMessageArgs<TMessagePayload> {
  topic: string;
  qos?: QoS;
  message: TMessagePayload;
}

declare module 'mqtt' {
  export interface MqttClient {
    // There is undeclared property that is useful in browser
    stream: IStream;
  }
}

export type MqttPublishMessageFn = <TMessagePayload>(
  args: MqttPublishMessageArgs<TMessagePayload>
) => void;

export interface MqttConnectionInfo extends DropMqttConnectionInfo {
  /**
   * List of topics to subscribe during connection;
   */
  subscriptionTopics: string[];
}

/**
 * Wraps MQTT connection in Redux-Saga eventChannel
 */
function createMqttEventChannel<TMessagePayload>({
  connection,
  lastWill,
  subscriptionTopics,
}: MqttConnectionInfo) {
  let publishMessage: MqttPublishMessageFn | undefined;

  const channel = eventChannel<MqttChannelEvents<TMessagePayload>>(
    (emitter) => {
      const mqttEndpointUrl = `${connection.scheme}://${connection.host}:${connection.port}/mqtt`;
      const client = mqtt.connect(mqttEndpointUrl, {
        host: connection.host,
        port: connection.port,
        keepalive: connection.keepAlive,
        clientId: connection.clientId,
        username: connection.username,
        password: connection.password,
        protocolVersion: 5,
        // Disable automatic reconnect as it can't get fresh credentials
        reconnectPeriod: 0,
        will: lastWill
          ? {
              payload: lastWill.message,
              topic: lastWill.topic,
              qos: lastWill.qos as QoS,
              retain: false,
            }
          : undefined,
      });

      client.subscribe(subscriptionTopics);

      client.on('message', (topic, payload) => {
        emitter({
          type: MqttChannelEventType.Message,
          topic,
          payload: JSON.parse(payload.toString()),
        });
      });

      client.on('connect', () => {
        emitter({ type: MqttChannelEventType.Connected });
      });

      client.on('close', () => {
        emitter({ type: MqttChannelEventType.Disconnected });
      });

      function handleError(error: Error) {
        emitter({
          type: MqttChannelEventType.Error,
          error,
        });
      }

      client.on('error', (error) => {
        handleError(error);
      });

      // Browser WebSocket is very limited in error reporting
      // in some cases MQTT.js doesn't emit `error`
      // this workaround helps to catch some
      client.stream.on('error', () => {
        handleError(generalMqttConnectionError);
      });

      publishMessage = ({ topic, qos, message }) => {
        client.publish(topic, JSON.stringify(message), {
          qos,
        });
      };

      return () => client.end();
    }
  );

  // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
  return { channel, publishMessage: publishMessage! };
}

export type ConnectionInfoFactory = () => SagaIterator<
  MqttConnectionInfo | undefined
>;

export type OnConnectHandler = () => SagaIterator<void>;

export type OnDisconnectHandler = () => SagaIterator<void>;

export interface OnErrorHandlerArgs {
  error: Error;
}

export type OnErrorHandler = (args: OnErrorHandlerArgs) => SagaIterator<void>;

export interface OnMessageHandlerArgs<TMessagePayload> {
  topic: string;
  payload: TMessagePayload;
  /**
   * Current MQTT connection info
   */
  connectionInfo: MqttConnectionInfo;
}

export type OnMessageHandler<TMessagePayload> = (
  args: OnMessageHandlerArgs<TMessagePayload>
) => SagaIterator<void>;

export type PublishMessageSaga = (
  args: PublishMessageSagaArgs
) => SagaIterator<void>;

export interface PublishMessageSagaArgs {
  publishMessage: MqttPublishMessageFn;
  /**
   * Current MQTT connection info
   */
  connectionInfo: MqttConnectionInfo;
}

interface OpenMqttConnectionArgs<TMessagePayload> {
  /**
   * Saga returning connection info as {@link MqttConnectionInfo}\
   * If `undefined` is returned - it's treated as an error and retries after delay
   */
  getConnectionInfo: ConnectionInfoFactory;
  onConnect?: OnConnectHandler;
  onDisconnect?: OnDisconnectHandler;
  onError?: OnErrorHandler;
  /**
   * Saga that is called on every message received over MQTT, payload is message parsed as JSON
   */
  onMessage?: OnMessageHandler<TMessagePayload>;
  /**
   * This saga is used to send MQTT messages, it has access to `publishMessage` and ran with `fork`
   */
  publishMessageSaga?: PublishMessageSaga;
}

/**
 * Retry delay increases in geometric progression as described in
 * https://getdrop.atlassian.net/wiki/spaces/mobile/pages/389152769/Drop+Events+Socket+-+MQTT+Web#Reconnection-logic-(exponential-backoff)
 *
 * This constant declares maximum reconnection delay
 */
export const maxRetryDelay = 120;
export const baseRetryDelay = 1;

export type OpenMqttConnection<TMessagePayload> = (
  args: OpenMqttConnectionArgs<TMessagePayload>
) => SagaIterator<void>;

/**
 * Opens MQTT over WS connection with retry logic
 * calls sagas callbacks on received events
 */
export function* openMqttConnection<TMessagePayload>({
  getConnectionInfo,
  onConnect,
  onDisconnect,
  onError,
  onMessage,
  publishMessageSaga,
}: OpenMqttConnectionArgs<TMessagePayload>): SagaIterator<void> {
  // There is no delay for first connection
  let currentRetryDelay = 0;
  let publishMessageSagaTask: Task | undefined;

  while (true) {
    yield delay(currentRetryDelay * 1000);
    /**
     * Retry delay increases in geometric progression as described in
     * https://getdrop.atlassian.net/wiki/spaces/mobile/pages/389152769/Drop+Events+Socket+-+MQTT+Web#Reconnection-logic-(exponential-backoff)
     */
    currentRetryDelay = Math.min(
      currentRetryDelay ? currentRetryDelay * 2 : baseRetryDelay,
      maxRetryDelay
    );

    const connectionInfo: SagaReturnType<typeof getConnectionInfo> = yield call(
      getConnectionInfo
    );

    if (!connectionInfo) {
      // Retry if couldn't get connection info
      continue;
    }

    const {
      channel,
      publishMessage,
    }: ReturnType<typeof createMqttEventChannel> = yield call(
      createMqttEventChannel,
      connectionInfo
    );

    if (publishMessageSaga) {
      publishMessageSagaTask = yield fork(publishMessageSaga, {
        publishMessage,
        connectionInfo,
      });
    }

    try {
      while (true) {
        const message: MqttChannelEvents<TMessagePayload> = yield take(channel);

        // Can't `break` out of loop from `switch`
        if (message.type === MqttChannelEventType.Disconnected) {
          break;
        }

        switch (message.type) {
          case MqttChannelEventType.Connected:
            if (onConnect) {
              yield call(onConnect);
            }
            currentRetryDelay = baseRetryDelay;
            break;
          case MqttChannelEventType.Error:
            yield call(Sentry.captureException, message.error);
            if (onError) {
              yield call(onError, { error: message.error });
            }
            break;
          default:
            if (onMessage) {
              yield call(onMessage, {
                topic: message.topic,
                payload: camelcaseKeys(message.payload, { deep: true }),
                connectionInfo,
              });
            }
            break;
        }
      }
    } finally {
      if (publishMessageSagaTask) {
        yield call(cancelTask, publishMessageSagaTask);
        publishMessageSagaTask = undefined;
      }
      channel.close();
      // Calling onDisconnect in finally to handle saga termination
      if (onDisconnect) {
        yield call(onDisconnect);
      }
    }
  }
}
