From 896186c7cf35e7f0ab408fc1ce821bf10d033903 Mon Sep 17 00:00:00 2001
From: Antoine Lanoe <antoine.lanoe@meltwater.com>
Date: Sat, 5 Oct 2024 12:01:04 +0200
Subject: [PATCH] chore: handle edgecases streamlistsdiff

---
 src/lib/object-diff/index.ts      |  2 +-
 src/lib/stream-list-diff/index.ts | 68 +++++++++++++++++++++++--------
 2 files changed, 53 insertions(+), 17 deletions(-)

diff --git a/src/lib/object-diff/index.ts b/src/lib/object-diff/index.ts
index b5068d8..524e9cd 100644
--- a/src/lib/object-diff/index.ts
+++ b/src/lib/object-diff/index.ts
@@ -209,7 +209,7 @@ function getSubPropertiesDiff(
  * Returns the diff between two objects
  * @param {ObjectData} prevData - The original object.
  * @param {ObjectData} nextData - The new object.
- *  * @param {ObjectOptions} options - Options to refine your output.
+ * @param {ObjectOptions} options - Options to refine your output.
     - `showOnly`: returns only the values whose status you are interested in. It takes two parameters: `statuses` and `granularity`
        `statuses` are the status you want to see in the output (e.g. `["added", "equal"]`)
       `granularity` can be either `basic` (to return only the main properties whose status matches your query) or `deep` (to return the main properties if some of their subproperties' status match your request. The subproperties are filtered accordingly).
diff --git a/src/lib/stream-list-diff/index.ts b/src/lib/stream-list-diff/index.ts
index ed009c9..1b27896 100644
--- a/src/lib/stream-list-diff/index.ts
+++ b/src/lib/stream-list-diff/index.ts
@@ -16,6 +16,7 @@ function outputDiffChunk<T extends Record<string, unknown>>(
 
   return function handleDiffChunk(
     chunk: StreamListsDiff<T>,
+    isLastChunk: boolean,
     options: ListStreamOptions,
   ): void {
     const showChunk = options?.showOnly
@@ -26,7 +27,7 @@ function outputDiffChunk<T extends Record<string, unknown>>(
     }
     if ((options.chunksSize as number) > 0) {
       chunks.push(chunk);
-      if (chunks.length >= (options.chunksSize as number)) {
+      if (chunks.length >= (options.chunksSize as number) || isLastChunk) {
         const output = chunks;
         chunks = [];
         return emitter.emit(StreamEvent.Data, output);
@@ -56,6 +57,14 @@ function formatSingleListStreamDiff<T extends Record<string, unknown>>(
   return diff;
 }
 
+function isValidChunkSize(
+  chunksSize: ListStreamOptions["chunksSize"],
+): boolean {
+  if (!chunksSize) return true;
+  const x = String(Math.sign(chunksSize));
+  return x !== "-1" && x !== "NaN";
+}
+
 function getDiffChunks<T extends Record<string, unknown>>(
   prevList: T[],
   nextList: T[],
@@ -63,6 +72,12 @@ function getDiffChunks<T extends Record<string, unknown>>(
   emitter: EventEmitter,
   options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS,
 ) {
+  if (!isValidChunkSize(options?.chunksSize)) {
+    return emitter.emit(
+      StreamEvent.Error,
+      `The chunk size can't be negative. You entered the value ${options.chunksSize}`,
+    );
+  }
   if (!prevList && !nextList) {
     return [];
   }
@@ -73,7 +88,9 @@ function getDiffChunks<T extends Record<string, unknown>>(
       LIST_STATUS.ADDED,
       options,
     );
-    return nextDiff.forEach((data) => handleDiffChunk(data, options));
+    return nextDiff.forEach((data, i) =>
+      handleDiffChunk(data, i === nextDiff.length - 1, options),
+    );
   }
   if (!nextList) {
     const prevDiff = formatSingleListStreamDiff(
@@ -82,7 +99,9 @@ function getDiffChunks<T extends Record<string, unknown>>(
       LIST_STATUS.DELETED,
       options,
     );
-    return prevDiff.forEach((data) => handleDiffChunk(data, options));
+    return prevDiff.forEach((data, i) =>
+      handleDiffChunk(data, i === prevDiff.length - 1, options),
+    );
   }
   const listsReferences: StreamReferences<T> = new Map();
   const handleDiffChunk = outputDiffChunk<T>(emitter);
@@ -112,13 +131,17 @@ function getDiffChunks<T extends Record<string, unknown>>(
             indexDiff: null,
             status: LIST_STATUS.ADDED,
           },
+          i === nextList.length - 1,
           options,
         );
       }
     }
   });
-
+  let streamedChunks = 0;
+  const totalChunks = listsReferences.size;
   for (const data of listsReferences.values()) {
+    streamedChunks++;
+    const isLastChunk = totalChunks === streamedChunks;
     if (!data.nextIndex) {
       handleDiffChunk(
         {
@@ -129,6 +152,7 @@ function getDiffChunks<T extends Record<string, unknown>>(
           indexDiff: null,
           status: LIST_STATUS.DELETED,
         },
+        isLastChunk,
         options,
       );
     } else {
@@ -147,6 +171,7 @@ function getDiffChunks<T extends Record<string, unknown>>(
               indexDiff: null,
               status: LIST_STATUS.EQUAL,
             },
+            isLastChunk,
             options,
           );
         } else {
@@ -161,6 +186,7 @@ function getDiffChunks<T extends Record<string, unknown>>(
                 ? LIST_STATUS.UPDATED
                 : LIST_STATUS.MOVED,
             },
+            isLastChunk,
             options,
           );
         }
@@ -174,29 +200,39 @@ function getDiffChunks<T extends Record<string, unknown>>(
             indexDiff,
             status: LIST_STATUS.UPDATED,
           },
+          isLastChunk,
           options,
         );
       }
     }
   }
-  emitter.emit(StreamEvent.Finish);
+  return emitter.emit(StreamEvent.Finish);
 }
 
+/**
+ * Streams the diff of two object lists
+ * @param {Record<string, unknown>[]} prevList - The original object list.
+ * @param {Record<string, unknown>[]} nextList - The new object list.
+ * @param {ReferenceProperty<T>} referenceProperty - A common property in all the objects of your lists (e.g. `id`)
+ * @param {ListStreamOptions} options - Options to refine your output.
+    - `chunksSize`: the number of object diffs returned by each stream chunk. If set to `0`, each stream will return a single object diff. If set to `10` each stream will return 10 object diffs. (default is `0`)
+    - `showOnly`: returns only the values whose status you are interested in. (e.g. `["added", "equal"]`)
+    - `considerMoveAsUpdate`: if set to `true` a `moved` object will be considered as `updated`
+ * @returns EventEmitter
+ */
 export function streamListsDiff<T extends Record<string, unknown>>(
   prevList: T[],
   nextList: T[],
   referenceProperty: ReferenceProperty<T>,
   options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS,
-) {
+): EventEmitter {
   const emitter = new EventEmitter();
-  try {
-    setTimeout(
-      () =>
-        getDiffChunks(prevList, nextList, referenceProperty, emitter, options),
-      0,
-    );
-    return emitter;
-  } catch (err) {
-    emitter.emit(StreamEvent.Error, err);
-  }
+  setTimeout(() => {
+    try {
+      getDiffChunks(prevList, nextList, referenceProperty, emitter, options);
+    } catch (err) {
+      return emitter.emit(StreamEvent.Error, err);
+    }
+  }, 0);
+  return emitter;
 }