From 40a93e9d17dd3bea1afaa9e21c89689fe1219588 Mon Sep 17 00:00:00 2001
From: Antoine Lanoe <antoine.lanoe@meltwater.com>
Date: Fri, 4 Oct 2024 14:38:58 +0200
Subject: [PATCH] chore: stream large lists diff

---
 src/models/stream.ts           |  27 +++++
 src/stream/emitter.ts          |  27 +++++
 src/stream/stream-list-diff.ts | 202 +++++++++++++++++++++++++++++++++
 3 files changed, 256 insertions(+)
 create mode 100644 src/models/stream.ts
 create mode 100644 src/stream/emitter.ts
 create mode 100644 src/stream/stream-list-diff.ts

diff --git a/src/models/stream.ts b/src/models/stream.ts
new file mode 100644
index 0000000..5b874f6
--- /dev/null
+++ b/src/models/stream.ts
@@ -0,0 +1,27 @@
+import { LIST_STATUS } from "./list";
+
+export type StreamListsDiff<T extends Record<string, unknown>> = {
+  currentValue: T | null;
+  previousValue: T | null;
+  prevIndex: number | null;
+  newIndex: number | null;
+  indexDiff: number | null;
+  status: LIST_STATUS;
+};
+
+export type ReferenceProperty<T extends Record<string, unknown>> = keyof T;
+
+export type StreamReferences<T extends Record<string, unknown>> = Map<
+  ReferenceProperty<T>,
+  { prevIndex: number; nextIndex?: number }
+>;
+
+export type ListStreamOptions = {
+  chunksSize?: number; // 0 by default. If 0, stream will be live
+  showOnly?: `${LIST_STATUS}`[];
+  considerMoveAsUpdate?: boolean;
+};
+
+export const DEFAULT_LIST_STREAM_OPTIONS: ListStreamOptions = {
+  chunksSize: 0,
+};
diff --git a/src/stream/emitter.ts b/src/stream/emitter.ts
new file mode 100644
index 0000000..4554c32
--- /dev/null
+++ b/src/stream/emitter.ts
@@ -0,0 +1,27 @@
+type Listener<T extends unknown[]> = (...args: T) => void;
+
+export enum StreamEvent {
+  Data = "data",
+  Finish = "finish",
+  Error = "error",
+}
+export class EventEmitter {
+  private events: Record<string, Listener<unknown[]>[]> = {};
+
+  on<T extends unknown[]>(
+    event: `${StreamEvent}`,
+    listener: Listener<T>,
+  ): this {
+    if (!this.events[event]) {
+      this.events[event] = [];
+    }
+    this.events[event].push(listener as Listener<unknown[]>);
+    return this;
+  }
+
+  emit<T extends unknown[]>(event: `${StreamEvent}`, ...args: T): void {
+    if (this.events[event]) {
+      this.events[event].forEach((listener) => listener(...args));
+    }
+  }
+}
diff --git a/src/stream/stream-list-diff.ts b/src/stream/stream-list-diff.ts
new file mode 100644
index 0000000..46eac36
--- /dev/null
+++ b/src/stream/stream-list-diff.ts
@@ -0,0 +1,202 @@
+import {
+  DEFAULT_LIST_STREAM_OPTIONS,
+  ListStreamOptions,
+  ReferenceProperty,
+  StreamListsDiff,
+  StreamReferences,
+} from "../models/stream";
+import { LIST_STATUS } from "../models/list";
+import { isEqual } from "../utils";
+import { EventEmitter, StreamEvent } from "./emitter";
+
+function outputDiffChunk<T extends Record<string, unknown>>(
+  emitter: EventEmitter,
+) {
+  let chunks: StreamListsDiff<T>[] = [];
+
+  return function handleDiffChunk(
+    chunk: StreamListsDiff<T>,
+    options: ListStreamOptions,
+  ): void {
+    const showChunk = options?.showOnly
+      ? options?.showOnly.includes(chunk.status)
+      : true;
+    if (!showChunk) {
+      return;
+    }
+    if ((options.chunksSize as number) > 0) {
+      chunks.push(chunk);
+      if (chunks.length >= (options.chunksSize as number)) {
+        const output = chunks;
+        chunks = [];
+        return emitter.emit(StreamEvent.Data, output);
+      }
+    }
+    return emitter.emit(StreamEvent.Data, [chunk]);
+  };
+}
+
+function formatSingleListStreamDiff<T extends Record<string, unknown>>(
+  list: T[],
+  isPrevious: boolean,
+  status: LIST_STATUS,
+  options: ListStreamOptions,
+): StreamListsDiff<T>[] {
+  const diff: StreamListsDiff<T>[] = list.map((data, i) => ({
+    previousValue: isPrevious ? data : null,
+    currentValue: isPrevious ? null : data,
+    prevIndex: status === LIST_STATUS.ADDED ? null : i,
+    newIndex: status === LIST_STATUS.ADDED ? i : null,
+    indexDiff: null,
+    status,
+  }));
+  if (options.showOnly && options.showOnly.length > 0) {
+    return diff.filter((value) => options.showOnly?.includes(value.status));
+  }
+  return diff;
+}
+
+function getDiffChunks<T extends Record<string, unknown>>(
+  prevList: T[],
+  nextList: T[],
+  referenceProperty: ReferenceProperty<T>,
+  emitter: EventEmitter,
+  options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS,
+) {
+  if (!prevList && !nextList) {
+    return [];
+  }
+  if (!prevList) {
+    const nextDiff = formatSingleListStreamDiff(
+      nextList as T[],
+      false,
+      LIST_STATUS.ADDED,
+      options,
+    );
+    return nextDiff.forEach((data) => handleDiffChunk(data, options));
+  }
+  if (!nextList) {
+    const prevDiff = formatSingleListStreamDiff(
+      prevList as T[],
+      true,
+      LIST_STATUS.DELETED,
+      options,
+    );
+    return prevDiff.forEach((data) => handleDiffChunk(data, options));
+  }
+  const listsReferences: StreamReferences<T> = new Map();
+  const handleDiffChunk = outputDiffChunk<T>(emitter);
+  prevList.forEach((data, i) => {
+    if (data) {
+      listsReferences.set(String(data[referenceProperty]), {
+        prevIndex: i,
+        nextIndex: undefined,
+      });
+    }
+  });
+
+  nextList.forEach((data, i) => {
+    if (data) {
+      const listReference = listsReferences.get(
+        String(data[referenceProperty]),
+      );
+      if (listReference) {
+        listReference.nextIndex = i;
+      } else {
+        handleDiffChunk(
+          {
+            previousValue: null,
+            currentValue: data,
+            prevIndex: null,
+            newIndex: i,
+            indexDiff: null,
+            status: LIST_STATUS.ADDED,
+          },
+          options,
+        );
+      }
+    }
+  });
+
+  for (const data of listsReferences.values()) {
+    if (!data.nextIndex) {
+      handleDiffChunk(
+        {
+          previousValue: prevList[data.prevIndex],
+          currentValue: null,
+          prevIndex: data.prevIndex,
+          newIndex: null,
+          indexDiff: null,
+          status: LIST_STATUS.DELETED,
+        },
+        options,
+      );
+    } else {
+      const prevData = prevList[data.prevIndex];
+      const nextData = nextList[data.nextIndex];
+      const isDataEqual = isEqual(prevData, nextData);
+      const indexDiff = data.prevIndex - data.nextIndex;
+      if (isDataEqual) {
+        if (indexDiff === 0) {
+          handleDiffChunk(
+            {
+              previousValue: prevList[data.prevIndex],
+              currentValue: nextList[data.nextIndex],
+              prevIndex: null,
+              newIndex: data.nextIndex,
+              indexDiff: null,
+              status: LIST_STATUS.EQUAL,
+            },
+            options,
+          );
+        } else {
+          handleDiffChunk(
+            {
+              previousValue: prevList[data.prevIndex],
+              currentValue: nextList[data.nextIndex],
+              prevIndex: data.prevIndex,
+              newIndex: data.nextIndex,
+              indexDiff,
+              status: options.considerMoveAsUpdate
+                ? LIST_STATUS.UPDATED
+                : LIST_STATUS.MOVED,
+            },
+            options,
+          );
+        }
+      } else {
+        handleDiffChunk(
+          {
+            previousValue: prevList[data.prevIndex],
+            currentValue: nextList[data.nextIndex],
+            prevIndex: data.prevIndex,
+            newIndex: data.nextIndex,
+            indexDiff,
+            status: LIST_STATUS.UPDATED,
+          },
+          options,
+        );
+      }
+    }
+  }
+  emitter.emit(StreamEvent.Finish);
+}
+
+export function streamListsDiff<T extends Record<string, unknown>>(
+  prevList: T[],
+  nextList: T[],
+  referenceProperty: ReferenceProperty<T>,
+  options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS,
+) {
+  const emitter = new EventEmitter();
+  try {
+    setTimeout(
+      () =>
+        getDiffChunks(prevList, nextList, referenceProperty, emitter, options),
+      0,
+    );
+    return emitter;
+  } catch (err) {
+    emitter.emit(StreamEvent.Error, err);
+  }
+}