import firebase from 'firebase/compat/app';
import 'firebase/compat/database';
import { getRef } from 'firebase-db';
import { buffers, EventChannel, eventChannel, Task } from 'redux-saga';
import { actionChannel, call, cancel, cancelled, put, select, spawn, take } from 'redux-saga/effects';
import { camelify } from 'shared/string-utils';
import { getPrimaryToken } from 'services/auth/selectors';
import {
  ISubscribeAction,
  IUnsubscribeAction,
  IUnsubscribeCollectionAction,
  cleanupPath,
  incrementListener,
  decrementListener,
  SUBSCRIBE,
  UNSUBSCRIBE,
  updateDocument,
  UNSUBSCRIBE_COLLECTION,
} from './actions';
import Reference = firebase.database.Reference;
import { dataMapper, isMapError } from 'mappers';
import type IState from 'services/state';
import { notify } from './bridge';

export const createDocumentChannel = (ref: Reference) => eventChannel((emit) => {
  const listener = ref.on('value', (snapshot) => {
    emit({ value: snapshot ? camelify(snapshot.val()) : null });
  });

  return () => {
    ref.off('value', listener);
  };
}, buffers.sliding<any>(1)); // Buffer latest event since Firebase fires synchronously in certain cases

export const mapData = function* (path: string, value: any) {
  const state = yield select();
  const token = getPrimaryToken(state);
  const collection = path.split('/').find(Boolean);

  if (!collection)
    return value;

  return yield call(dataMapper, collection, value, token);
};

export const syncDocument = function*(path: string) {
  let channel: EventChannel<any> | null = null;
  try {
    const ref = yield call(getRef, path);
    channel = yield call(createDocumentChannel, ref);
    while (true) {
      const { value } = yield take(channel!);
      // if value is undefined, nothing to update
      if (!value) {
        continue;
      }
      const mappedValue = yield call(mapData, path, value);
      if (!isMapError(mappedValue)) {
        yield put(updateDocument({ path, value: mappedValue }));
        notify(path, mappedValue);
      }
    }
  } catch (error) {
    // TODO: Error saga
    console.error(error); // tslint:disable-line no-console
  } finally {
    if (yield cancelled()) {
      channel?.close();
    }
  }
};

// private selector
const getListenerCount = (state: IState, path: string): number => state.realtime.listeners[path] || 0;

// these take events in the order they come in
export const watcher = function*(
  syncTasks: Map<string, Task>,
  action: ISubscribeAction | IUnsubscribeAction | IUnsubscribeCollectionAction,
) {
  const state = yield select();
  const { type } = action;

  if (type === SUBSCRIBE) {
    const path = action.payload;
    // first subscribe listener, create the ref listener
    if (getListenerCount(state, path) === 0) {
      const syncTask = yield spawn(syncDocument, path);

      syncTasks.set(path, syncTask);
    }
    // increment listener
    yield put(incrementListener(path));
  } else if (type === UNSUBSCRIBE) {
    const path = action.payload;
    const listenerCount = getListenerCount(state, path);

    // the last listener is unsubscribing, remove the listener, cancel and cleanup path
    if (listenerCount === 1) {
      const task = syncTasks.get(path);
      if (task) {
        yield cancel(task);
        yield put(cleanupPath(path));
        syncTasks.delete(path);
      }
    } else if (listenerCount > 1) {
      // for values larger than 1, just decrement
      yield put(decrementListener(path));
    }
  } else if (type === UNSUBSCRIBE_COLLECTION) {
    const collection = action.payload;

    for (const [path, task] of syncTasks) {
      if (path.indexOf(collection + '/') === 0 && task) {
        yield cancel(task);
        yield put(cleanupPath(path));
        syncTasks.delete(path);
      }
    }
  }
};

const realtimeSaga = function*() {
  const syncTasks = new Map();
  // Buffer size is totally arbitrary :)
  const watcherChannel = yield actionChannel([SUBSCRIBE, UNSUBSCRIBE, UNSUBSCRIBE_COLLECTION], buffers.expanding(64));
  while (true) {
    const action = yield take(watcherChannel);
    yield call(watcher, syncTasks, action);
  }
};

export default realtimeSaga;
