import { tagged } from '@mirage/service-logging';
import { namespace } from '@mirage/service-operational-metrics';
import * as scoring from '@mirage/service-typeahead-search/service/scoring';
import {
  relevant,
  TYPEAHEAD_SEARCH_SOURCES,
} from '@mirage/service-typeahead-search/service/sources';
import {
  CacheKey,
  TypeaheadCache,
} from '@mirage/service-typeahead-search/service/typeahead-cache';
import {
  ResultType,
  SourceId,
} from '@mirage/service-typeahead-search/service/types';
import instrument from '@mirage/service-typeahead-search/service/utils/instrument';
import { managedShortcuts } from '@mirage/service-typeahead-search/service/utils/managed-shortcuts';
import { PreviousQuery } from '@mirage/shared/search/cache-result';
import * as rx from 'rxjs';
import * as op from 'rxjs/operators';

import type { typeahead } from '@mirage/service-typeahead-search/service/types';
import type { Observable } from 'rxjs';

const logger = tagged('service-typeahead-search/search');
const metrics = namespace('typeahead-search');

/**
 * Returns an observable of arrays of typeahead search results
 * It will handle collecting, deduplication, scoring, grouping, limiting, and
 * sorting of the results in each emitted array.
 *
 * There's a diagram of the data flow in confluence:
 * https://dropbox.atlassian.net/wiki/spaces/Dash/pages/1569784549/service-typeahead-search+-+search+function+diagram
 */
export default function search(
  query_: string,
  cache: TypeaheadCache,
  auxiliarySources: typeahead.Source[], // sources from consuming context
  config: typeahead.Config,
  limit: number = 5,
): Observable<typeahead.ScoredResult[]> {
  // sanitize query
  const query = query_.trim();

  // result grouping parameters
  const RECENT_QUERIES_LIMIT = query.length > 0 ? 1 : limit;
  const LIMIT = query?.startsWith('/') ? managedShortcuts.length : limit;

  // TODO: We should just make this a hard limit like it is in SERP. Too
  // dangerous to have it just warn. No source should be producting more than
  // 100 candiadtes, there's no reason for that.
  //
  // warn if there are too many items returned from the sources
  const WARN_TOO_MANY_RESULTS_THRESHOLD = 100;

  // time-based cutoffs aren't likely to be immediately relevant, but this gives us
  // future optionality for async sources that take time to emit
  const [cutoff, end] = intervals([50, 1_000]);

  // analyze cache (quicky!)
  const t0 = performance.now();
  const cacheAnalysis = getCacheAnalysis(cache);
  const t1 = performance.now();
  const cacheAnalysisMs = t1 - t0;
  if (cacheAnalysisMs > 200) {
    const rounded = Math.round(cacheAnalysisMs);
    logger.warn(
      `typeahead cache analysis time is unexpectedly high: ${rounded}ms`,
    );
  }
  metrics.stats('search/cacheAnalysis', t1 - t0, {
    name: 'cacheAnalysis',
  });

  // find relevant sources
  const callable = relevant(
    query,
    [...TYPEAHEAD_SEARCH_SOURCES, ...auxiliarySources],
    config,
    cacheAnalysis,
  );

  function runner(
    source: typeahead.Source,
  ): Observable<typeahead.TaggedResult> {
    const perSourceUUIDCountSet = new Set<string>();
    let warned = false;
    return (
      source
        .search(query, cache)
        .pipe(op.subscribeOn(rx.asyncScheduler))
        // swallow source-level failures
        .pipe(
          op.catchError((e) => {
            logger.error(`search source error`, e);
            return rx.EMPTY;
          }),
        )
        // warn if there are too many results returned, but don't do anything
        // to stop it as nondeterministically missing data is worse than a
        // slow response
        .pipe(
          op.tap((item) => {
            perSourceUUIDCountSet.add(item?.uuid);

            if (
              !warned &&
              perSourceUUIDCountSet.size > WARN_TOO_MANY_RESULTS_THRESHOLD
            ) {
              warned = true;
              logger.warn(
                `More than ${WARN_TOO_MANY_RESULTS_THRESHOLD} items emitted from the source ${source.id}!`,
              );
            }
          }),
        )
      // uncomment this line to add some delay for easier development
      // .pipe(rx.delay(500))
    );
  }

  const sources$ = rx.from(callable).pipe(op.mergeMap(runner));

  // instrument latency of source aggregation
  const isources$ = instrument(sources$, (stats) => {
    const DEBUG = false;
    if (DEBUG) logger.debug(`sources`, stats);

    metrics.stats('search/sources', stats.duration, {
      name: 'sources',
    });
  });

  // filter out dropboxer-only url-shortcuts for non-dropboxers
  const filtered$ = isources$.pipe(
    op.filter(
      (result) =>
        result.type !== ResultType.URLShortcut ||
        !result.result.dropboxersOnly ||
        (result.result.dropboxersOnly && config.isDropboxer),
    ),
  );

  // deduplicate any results across all sources
  const deduped$ = filtered$.pipe(rx.distinct(({ uuid }) => uuid));

  // instrument latency of dedupe
  const ideduped$ = instrument(deduped$, (stats) => {
    const DEBUG = false;
    if (DEBUG) logger.debug(`dedupe`, stats);

    metrics.stats('search/dedupe', stats.duration, {
      name: 'dedupe',
    });
  });

  // apply scores score to candidates (titleMatchScore, lastClickedScore, etc.)
  let scored$ = ideduped$.pipe(op.map(scoring.score(query)));

  const DEBUG = false;
  const DEBUG_LINES = 3;
  if (DEBUG) {
    let i = 0;
    scored$ = scored$.pipe(
      op.tap((scoredResult: typeahead.ScoredResult) => {
        if (i < DEBUG_LINES) {
          logger.debug(`scoredResult ${i + 1}:`, scoredResult);
          i++;
        }
      }),
    );
  }

  // instrument latency of scoring
  const iscored$ = instrument(scored$, (stats) => {
    const DEBUG = false;
    if (DEBUG) logger.debug(`scored`, stats);

    metrics.stats('search/scoring', stats.duration, {
      name: 'scoring',
    });
  });

  /**
   * sort results by score and convert from Observable<typeahead.ScoredResult>
   * to Observable<typeahead.ScoredResult[]>
   */
  const sorted$ = iscored$.pipe(
    op.scan((acc, value) => {
      return [...acc, value].sort(sortScoredResults);
    }, [] as typeahead.ScoredResult[]),
  );

  // instrument latency of scoring
  const isorted$ = instrument(sorted$, (stats) => {
    const DEBUG = false;
    if (DEBUG) logger.debug(`sorted`, stats);

    metrics.stats('search/sorting', stats.duration, {
      name: 'sorting',
    });
  });

  // group results by type into tiers
  const grouped$ = isorted$
    .pipe(
      op.map((sortedResults) => {
        // suggested-search
        const suggested = sortedResults.filter(isSuggestedQuery).slice(0, 1);

        // shortcuts
        const shortcuts = sortedResults.filter(isURLShortcut);

        // recent queries
        const queries = sortedResults
          .filter(isRecentQuery)
          .slice(0, RECENT_QUERIES_LIMIT);

        // all other sources
        const rest = sortedResults.filter(
          (result) =>
            !isSuggestedQuery(result) &&
            !isURLShortcut(result) &&
            !isRecentQuery(result) &&
            !isEventResult(result, config.isFilterOutEvents),
        );

        // combine all but the suggested-search
        const combinedAndLimited = [...shortcuts, ...queries, ...rest].slice(
          0,
          LIMIT,
        );

        // sort by score again to ensure they are all in the correct order post merge
        return [...suggested, ...combinedAndLimited].sort(sortScoredResults);
      }),
    )
    // if the search produces no results, send that to the UI
    .pipe(op.defaultIfEmpty([]));

  // instrument latency of grouping/tiering logic
  const igrouped$ = instrument(grouped$, (stats) => {
    const DEBUG = false;
    if (DEBUG) logger.debug(`grouped`, stats);

    metrics.stats('search/grouping', stats.duration, {
      name: 'grouping',
    });
  });

  // define intervals for emitting results
  // TODO: consider using P95-99 latency of slowest emitting search source
  // for defining these values to prevent "jumpy" typeahead results
  const segmentation$ = igrouped$
    // perform result segmentation to prevent killing the UI with too many
    // intermediate result states and re-renders
    .pipe(
      op.connect((shared$) => {
        return rx.merge(
          shared$.pipe(op.takeUntil(cutoff)).pipe(op.auditTime(50)),
          shared$.pipe(op.skipUntil(cutoff)).pipe(op.auditTime(100)),
        );
      }),
    )
    .pipe(op.takeUntil(end));

  // instrument latency of segmentation logic
  const isegmentation$ = instrument(segmentation$, (stats) => {
    const DEBUG = false;
    if (DEBUG) logger.debug(`segmented`, stats);

    metrics.stats('search/segmentation', stats.duration, {
      name: 'segmentation',
    });
  });

  // instrument aggregate search stream metrics
  return instrument(isegmentation$, (stats) => {
    const DEBUG = false;
    if (DEBUG) logger.debug(`typeahead search`, stats);

    metrics.stats('search/ttfr', stats.first);
    metrics.stats('search/duration', stats.duration);
    metrics.counter('search/status', 1, {
      status: stats.failed ? 'error' : 'success',
    });
  });
}

// In the predicates some sources wish to be included/excluded based on the
// state of the local cache. For example: with an empty query, if the user
// has recent queries we want to the null state to show recent queries, but if
// the user doesn't have recent queries we want the null state to show
// "recommendations".
//
// WARN:
// We should keep the analysis to a minimum! If we need more, we probably need
// a better pattern for a source to be included based on the state of another
// source
export function getCacheAnalysis(
  cache: TypeaheadCache,
): typeahead.CacheAnalysis {
  // <1ms right now. We should implement cache.some() if perf becomes a concern
  const recentQueries = cache.all(CacheKey.RecentQueries) as PreviousQuery[];
  const hasRecentQueries = recentQueries.length > 0;
  return { hasRecentQueries };
}

function isSuggestedQuery(result: typeahead.ScoredResult) {
  return result?.metadata?.source === SourceId.SuggestedQueries;
}

function isRecentQuery(result: typeahead.ScoredResult) {
  return result?.metadata?.source === SourceId.RecentQueries;
}

function isURLShortcut(result: typeahead.ScoredResult) {
  return result?.metadata?.source === SourceId.URLShortcuts;
}

function isEventResult(result: typeahead.ScoredResult, filter: boolean) {
  if (!filter) return false;
  return (
    (result?.type === ResultType.SearchResult ||
      result?.type === ResultType.Recommendation) &&
    result?.result?.recordType?.['.tag'] === 'event'
  );
}

function sortScoredResults(
  left: typeahead.ScoredResult,
  right: typeahead.ScoredResult,
): number {
  return right.score - left.score;
}

function intervals(periods: Array<number>): Array<Observable<void>> {
  return periods.map((period) =>
    rx
      .interval(period)
      .pipe(op.take(1))
      .pipe(op.map(() => undefined)),
  );
}
