import { useCallback, useEffect, useMemo, useRef } from "react";

import { useMutation } from "@tanstack/react-query";
import { mercurePush } from "api/mercure";
import useCallbackRef from "hooks/callback_ref/useCallbackRef";
import { MERCURE_HUB_URL } from "util/const";
import { getTokens } from "util/token";

type Return<T> = {
  push: (data: T) => void;
  eventSource?: EventSource;
  error?: Error;
};
type Options = {
  publishToken?: string;
  disable?: boolean;
  lastMessageId?: string;
  source?: string;
  onError?: (error: Error) => void;
  onOpen?: () => void;
};

const getCookie = (name: string) => {
  const value = `; ${document.cookie}`;
  const parts = value.split(`; ${name}=`);

  if (parts.length === 2) {
    return parts.pop()?.split(";").shift();
  }
};

const isStringArray = (value: (string | undefined)[]): value is string[] =>
  value.every((v) => typeof v === "string");

const topicChanged = (
  a: string | string[] | undefined,
  b: string | string[] | undefined
) => {
  if (a === b) {
    return false;
  }

  if (Array.isArray(a) && Array.isArray(b)) {
    if (a.length !== b.length) {
      return true;
    }

    return a.some((v, i) => v !== b[i]);
  }

  return a !== b;
};

const DEFAULT_SOURCE = "master";

const pending: Record<string, Promise<void>> = {};
const sources: Record<
  string,
  { source: EventSource; topics: string[] } | null
> = {};
const handlers: Record<string, ((data: { data: string }) => void)[]> = {};

const useMercureTopic = <T, R = T>(
  topic: string | string[] | undefined,
  onData: (data: R) => void,
  options: Options = {}
): Return<T> => {
  const { 2: mercureToken } = getTokens();
  const onDataRef = useCallbackRef(onData);
  const lastTopic = useRef(topic);

  const changedTopic = useMemo(() => {
    if (topicChanged(lastTopic.current, topic)) {
      lastTopic.current = topic;
      return topic;
    }

    return lastTopic.current;
  }, [topic]);

  useEffect(() => {
    lastTopic.current = changedTopic;
  }, [changedTopic]);

  const onOpenRef = useCallbackRef(() => {
    options.onOpen?.();
  });

  const token = useMemo(
    () =>
      getCookie("mercureAuthorization") ??
      options.publishToken ??
      mercureToken?.token,
    [mercureToken, options.publishToken]
  );

  useEffect(() => {
    const topic = changedTopic;

    if (
      options.disable ||
      !topic ||
      (Array.isArray(topic) && topic.length === 0)
    ) {
      return;
    }

    // let isConnecting = false;
    const sourceStr = options.source ?? DEFAULT_SOURCE;
    let ignore = false;

    const resolvedTopic: (string | undefined)[] = (
      Array.isArray(topic) ? (topic as string[]) : [topic]
    ).filter((t) => !!t);

    const delayed = async () => {
      // isConnecting = true;
      if (ignore) {
        return;
      }

      if (!isStringArray(resolvedTopic) || resolvedTopic.length === 0) {
        return;
      }

      while (sources[sourceStr] === null) {
        console.info(
          "useMercureTopic",
          "waiting for connection to be established",
          sourceStr
        );
        const p = pending[sourceStr];
        await p;
      }

      let evSource = sources[sourceStr];
      const url = new URL(MERCURE_HUB_URL);

      const next = new Set([...(evSource?.topics ?? []), ...resolvedTopic]);

      console.info("useMercureTopic", "subscribing to topics", next);

      for (const t of next) {
        url.searchParams.append("topic", t);
      }

      if (options.lastMessageId) {
        url.searchParams.append("lastEventID", options.lastMessageId);
      }

      console.info("connecting to mercure", url.toString());

      if (evSource && evSource.source.readyState === EventSource.OPEN) {
        evSource.source.close();
      }

      let resolve = () => {};

      pending[sourceStr] = new Promise((r) => (resolve = r));
      sources[sourceStr] = null; // mark as connecting, so we don't try to connect again

      if (process.env.NODE_ENV === "development") {
        // SSSP 4 says it suports EventSource but its not.
        // this polyfill is an absolute disaster but there is no other way
        const polyfill = await import("event-source-polyfill");
        const { EventSourcePolyfill } = polyfill;

        evSource = {
          source: new EventSourcePolyfill(url.toString(), {
            withCredentials: true,
            headers: {
              Authorization: `Bearer ${token}`,
            },
          }),
          topics: [...next],
        };
      } else {
        evSource = {
          source: new EventSource(url.toString(), {
            withCredentials: true,
          }),
          topics: [...next],
        };
      }

      evSource.source.onopen = () => {
        console.debug("useMercureTopic", "connected to mercure hub", sourceStr);
        onOpenRef();
      };

      evSource.source.onmessage = ({ data }: { data: string }) => {
        for (const handler of handlers[sourceStr]) {
          handler(JSON.parse(data));
        }
      };

      handlers[sourceStr] = [];
      sources[sourceStr] = evSource;
      resolve?.();

      const onMessage = ({ data }: { data: string }) =>
        onDataRef(JSON.parse(data));

      handlers[sourceStr].push(onMessage);
    };

    delayed();

    return () => {
      ignore = true;

      const evSource = sources[sourceStr];

      if (!evSource) {
        return;
      }

      handlers[sourceStr] = handlers[sourceStr].filter(
        (handler) => handler !== onDataRef
      );

      // if no more listeners, close the connection
      if (handlers[sourceStr].length === 1) {
        console.log("useMercureTopic", "closing connection", sourceStr);
        evSource.source.close();
        delete sources[sourceStr];
      } else {
        evSource.topics = evSource.topics.filter(
          (t) => !resolvedTopic.includes(t)
        );
      }
    };
  }, [
    token,
    changedTopic,
    onDataRef,
    options.source,
    options.lastMessageId,
    onOpenRef,
    options.disable,
  ]);

  const mutation = useMutation({
    mutationFn: mercurePush,
    onSuccess: onDataRef,
    // onError: options.onError ?? ((error: Error) => notifyError(notify, t`labels.could_not_push|Could not push`, error))
    onError: (error: Error) => {
      console.warn("mercure push error", error);
      options.onError?.(error);
    },
  });

  const push = useCallback(
    async (data: T) => {
      const topic = changedTopic;

      if (!token) {
        throw new Error("no token present");
      }

      if (!topic) {
        throw new Error("no topic present");
      }

      const body = new URLSearchParams();

      const resolvedTopic = new Set(
        Array.isArray(topic) ? (topic as string[]) : [topic]
      );

      for (const t of resolvedTopic) {
        body.append("topic", t);
      }

      body.append(
        "data",
        JSON.stringify({
          ...data,
          origin: "front",
        })
      );

      mutation.mutate({ body, token });
    },
    [mutation, token, changedTopic]
  );

  return useMemo(
    () => ({
      push,
      eventSource:
        sources[options.source ?? DEFAULT_SOURCE]?.source ?? undefined,
      error: mutation.error || undefined,
    }),
    [mutation.error, push, options.source]
  );
};

export default useMercureTopic;
