Browse Source

feat: use workers for streamListDiff

pull/31/head
Antoine Lanoe 6 months ago
parent
commit
2f204b373b
  1. 2
      .gitignore
  2. 31
      README.md
  3. 2
      eslint.config.mjs
  4. 0
      index.ts
  5. 12
      jest.config.ts
  6. 5
      jest.setup.ts
  7. 1564
      package-lock.json
  8. 41
      package.json
  9. 17
      scripts/transpile-node-worker.js
  10. 36
      src/lib/list-diff/index.ts
  11. 8
      src/lib/list-diff/list-diff.test.ts
  12. 52
      src/lib/object-diff/index.ts
  13. 26
      src/lib/object-diff/object-diff.test.ts
  14. 93
      src/lib/stream-list-diff/client/index.ts
  15. 153
      src/lib/stream-list-diff/client/stream-list-diff-client.test.ts
  16. 1116
      src/lib/stream-list-diff/client/stream-list-diff-client.worker.test.ts
  17. 78
      src/lib/stream-list-diff/client/worker/utils.ts
  18. 30
      src/lib/stream-list-diff/client/worker/web-worker.ts
  19. 93
      src/lib/stream-list-diff/server/index.ts
  20. 149
      src/lib/stream-list-diff/server/stream-list-diff.test.ts
  21. 1048
      src/lib/stream-list-diff/server/stream-list-diff.worker.test.ts
  22. 38
      src/lib/stream-list-diff/server/worker/node-worker.ts
  23. 78
      src/lib/stream-list-diff/server/worker/utils.ts
  24. 8
      src/lib/stream-list-diff/utils.ts
  25. 27
      src/models/emitter/index.ts
  26. 27
      src/models/list/index.ts
  27. 14
      src/models/object/index.ts
  28. 36
      src/models/stream/index.ts
  29. 22
      src/models/worker/index.ts
  30. 5
      tsconfig.json
  31. 28
      tsup.config.ts

2
.gitignore vendored

@ -1,3 +1,5 @@
/node_modules /node_modules
dist dist
.eslintcache .eslintcache
# Ignore generated worker files
src/lib/stream-list-diff/server/worker/node-worker.cjs

31
README.md

@ -104,12 +104,12 @@ type ObjectDiff = {
diff: Diff[]; diff: Diff[];
}; };
/** recursive diff in case of subproperties */
type Diff = { type Diff = {
property: string; property: string;
previousValue: unknown; previousValue: unknown;
currentValue: unknown; currentValue: unknown;
status: "added" | "deleted" | "equal" | "updated"; status: "added" | "deleted" | "equal" | "updated";
// recursive diff in case of subproperties
diff?: Diff[]; diff?: Diff[];
}; };
``` ```
@ -307,13 +307,15 @@ getListDiff(
```js ```js
// If you are in a server environment // If you are in a server environment
import { streamListDiff } from "@donedeal0/superdiff/server"; import { streamListDiff } from "@donedeal0/superdiff/server.cjs";
// If you are in a browser environment // If you are in a browser environment
import { streamListDiff } from "@donedeal0/superdiff/client"; import { streamListDiff } from "@donedeal0/superdiff/client";
``` ```
Streams the diff of two object lists, ideal for large lists and maximum performance. Streams the diff of two object lists, ideal for large lists and maximum performance.
`streamListDiff` requires ESM support for browser usage. It will work out of the box if you use a modern bundler (Webpack, Rollup) or JavaScript framework (Next.js, Vue.js).
#### FORMAT #### FORMAT
**Input** **Input**
@ -330,6 +332,8 @@ Streams the diff of two object lists, ideal for large lists and maximum performa
showOnly?: ("added" | "deleted" | "moved" | "updated" | "equal")[], // [] by default showOnly?: ("added" | "deleted" | "moved" | "updated" | "equal")[], // [] by default
chunksSize?: number, // 0 by default chunksSize?: number, // 0 by default
considerMoveAsUpdate?: boolean; // false by default considerMoveAsUpdate?: boolean; // false by default
useWorker?: boolean; // true by default
showWarnings?: boolean; // true by default
} }
``` ```
@ -345,6 +349,9 @@ Streams the diff of two object lists, ideal for large lists and maximum performa
showOnly?: ("added" | "deleted" | "moved" | "updated" | "equal")[], // [] by default showOnly?: ("added" | "deleted" | "moved" | "updated" | "equal")[], // [] by default
chunksSize?: number, // 0 by default chunksSize?: number, // 0 by default
considerMoveAsUpdate?: boolean; // false by default considerMoveAsUpdate?: boolean; // false by default
useWorker?: boolean; // true by default
showWarnings?: boolean; // true by default
} }
``` ```
@ -355,6 +362,10 @@ Streams the diff of two object lists, ideal for large lists and maximum performa
- `chunksSize` the number of object diffs returned by each streamed chunk. (e.g. `0` = 1 object diff per chunk, `10` = 10 object diffs per chunk). - `chunksSize` the number of object diffs returned by each streamed chunk. (e.g. `0` = 1 object diff per chunk, `10` = 10 object diffs per chunk).
- `showOnly` gives you the option to return only the values whose status you are interested in (e.g. `["added", "equal"]`). - `showOnly` gives you the option to return only the values whose status you are interested in (e.g. `["added", "equal"]`).
- `considerMoveAsUpdate`: if set to `true` a `moved` value will be considered as `updated`. - `considerMoveAsUpdate`: if set to `true` a `moved` value will be considered as `updated`.
- `useWorker`: if set to `true`, the diff will be run in a worker for maximum performance. Only recommended for large lists (e.g. +100,000 items).
- `showWarnings`: if set to `true`, potential warnings will be displayed in the console.
> ⚠ Warning: using Readable streams may impact workers' performance since they need to be converted to arrays. Consider using arrays or files for optimal performance. Alternatively, you can turn the `useWorker` option off.
**Output** **Output**
@ -364,20 +375,12 @@ The objects diff are grouped into arrays - called `chunks` - and are consumed th
- `error`: to be notified if an error occurs during the stream. - `error`: to be notified if an error occurs during the stream.
```ts ```ts
interface StreamListener<T extends Record<string, unknown>> { interface StreamListener<T> {
on<E extends keyof EmitterEvents<T>>( on(event: "data", listener: (chunk: StreamListDiff<T>[]) => void);
event: E, on(event: "finish", listener: () => void);
listener: Listener<EmitterEvents<T>[E]>, on(event: "error", listener: (error: Error) => void);
): this;
} }
type EmitterEvents<T extends Record<string, unknown>> = {
data: [StreamListDiff<T>[]];
error: [Error];
finish: [];
};
type StreamListDiff<T extends Record<string, unknown>> = { type StreamListDiff<T extends Record<string, unknown>> = {
currentValue: T | null; currentValue: T | null;
previousValue: T | null; previousValue: T | null;

2
eslint.config.mjs

@ -3,7 +3,7 @@ import tseslint from "typescript-eslint";
export default [ export default [
{ files: ["**/*.{js,mjs,cjs,ts}"] }, { files: ["**/*.{js,mjs,cjs,ts}"] },
{ ignores: ["dist", "jest.config.js"] }, { ignores: ["dist", "jest.config.js", "src/lib/stream-list-diff/server/worker/node-worker.cjs"] },
{ settings: { react: { version: "detect" } } }, { settings: { react: { version: "detect" } } },
pluginJs.configs.recommended, pluginJs.configs.recommended,
...tseslint.configs.recommended, ...tseslint.configs.recommended,

12
jest.config.js → jest.config.ts

@ -1,6 +1,8 @@
module.exports = { import type { Config } from "jest";
const config: Config = {
transform: { transform: {
"^.+\\.(ts|js)$": [ "^.+\\.(ts|js)$": [
"@swc/jest", "@swc/jest",
{ {
jsc: { jsc: {
@ -11,13 +13,17 @@ module.exports = {
dynamicImport: true, dynamicImport: true,
}, },
paths: { paths: {
"@mocks/*": ["./src/mocks/*"],
"@models/*": ["./src/models/*"], "@models/*": ["./src/models/*"],
"@lib/*": ["./src/lib/*"], "@lib/*": ["./src/lib/*"],
}, },
target: "esnext", target: "esnext",
}, },
}, },
], ],
}, },
testEnvironment: "node",
setupFilesAfterEnv: ["<rootDir>/jest.setup.ts"],
}; };
export default config;

5
jest.setup.ts

@ -0,0 +1,5 @@
import { TextEncoder, TextDecoder } from "util";
global.TextEncoder = TextEncoder;
//@ts-expect-error - the TextDecoder is valid
global.TextDecoder = TextDecoder;

1564
package-lock.json generated

File diff suppressed because it is too large Load Diff

41
package.json

@ -1,6 +1,7 @@
{ {
"name": "@donedeal0/superdiff", "name": "@donedeal0/superdiff",
"version": "3.0.0", "version": "3.1.0",
"type": "module",
"description": "SuperDiff compares two arrays or objects and returns a full diff of their differences", "description": "SuperDiff compares two arrays or objects and returns a full diff of their differences",
"main": "dist/index.js", "main": "dist/index.js",
"module": "dist/index.mjs", "module": "dist/index.mjs",
@ -9,6 +10,18 @@
"files": [ "files": [
"dist" "dist"
], ],
"exports": {
".": {
"import": "./dist/index.mjs",
"require": "./dist/index.js"
},
"./client": {
"default": "./dist/client.mjs"
},
"./server": {
"default": "./dist/server.cjs"
}
},
"author": "DoneDeal0", "author": "DoneDeal0",
"license": "ISC", "license": "ISC",
"repository": { "repository": {
@ -60,6 +73,11 @@
"object", "object",
"diff", "diff",
"deep-diff", "deep-diff",
"json-diff",
"files diff",
"json",
"file",
"isobject",
"comparison", "comparison",
"compare", "compare",
"stream", "stream",
@ -73,28 +91,31 @@
"lint:dead-code": "npx -p typescript@latest -p knip knip", "lint:dead-code": "npx -p typescript@latest -p knip knip",
"lint": "eslint --cache --max-warnings=0 --fix", "lint": "eslint --cache --max-warnings=0 --fix",
"prepare": "husky", "prepare": "husky",
"test": "jest", "transpile": "node scripts/transpile-node-worker.js",
"test": "npm run transpile && jest",
"tsc": "tsc --noEmit --incremental" "tsc": "tsc --noEmit --incremental"
}, },
"devDependencies": { "devDependencies": {
"@eslint/js": "^9.11.1", "@eslint/js": "^9.14.0",
"@semantic-release/exec": "^6.0.3", "@semantic-release/exec": "^6.0.3",
"@semantic-release/git": "^10.0.1", "@semantic-release/git": "^10.0.1",
"@semantic-release/github": "^11.0.0", "@semantic-release/github": "^11.0.0",
"@semantic-release/npm": "^12.0.1", "@semantic-release/npm": "^12.0.1",
"@swc/core": "^1.7.26", "@swc/core": "^1.7.42",
"@swc/jest": "^0.2.36", "@swc/jest": "^0.2.37",
"@types/jest": "^29.5.13", "@types/jest": "^29.5.14",
"blob-polyfill": "^9.0.20240710", "blob-polyfill": "^9.0.20240710",
"eslint": "^9.11.1", "eslint": "^9.14.0",
"husky": "^9.1.6", "husky": "^9.1.6",
"jest": "^29.7.0", "jest": "^29.7.0",
"jest-environment-jsdom": "^29.7.0", "jest-environment-jsdom": "^29.7.0",
"jsdom": "^25.0.1",
"prettier": "^3.3.3", "prettier": "^3.3.3",
"swc-loader": "^0.2.6", "swc-loader": "^0.2.6",
"tsup": "^8.3.0", "ts-node": "^10.9.2",
"typescript": "^5.6.2", "tsup": "^8.3.5",
"typescript-eslint": "^8.7.0", "typescript": "^5.6.3",
"typescript-eslint": "^8.12.2",
"web-streams-polyfill": "^4.0.0" "web-streams-polyfill": "^4.0.0"
} }
} }

17
scripts/transpile-node-worker.js

@ -0,0 +1,17 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable no-undef */
import { execSync } from "child_process";
import { existsSync } from "fs"
// The src/lib/stream-list-diff/server/node-worker.ts file needs to be transpiled to a .cjs file to be used in the tests.
const workerFile = "src/lib/stream-list-diff/server/worker/node-worker"
try {
if(!existsSync(`${workerFile}.cjs`)){
execSync(`npx esbuild ${workerFile}.ts --bundle --platform=node --format=cjs --outfile=${workerFile}.cjs`, {
stdio: "inherit",
});
}
} catch (_) {
process.exit(1);
}

36
src/lib/list-diff/index.ts

@ -1,10 +1,10 @@
import { isEqual, isObject } from "@lib/utils";
import { import {
DEFAULT_LIST_DIFF_OPTIONS, DEFAULT_LIST_DIFF_OPTIONS,
LIST_STATUS, ListStatus,
ListDiff, ListDiff,
ListDiffOptions, ListDiffOptions,
} from "@models/list"; } from "@models/list";
import { isEqual, isObject } from "@lib/utils";
function getLeanDiff( function getLeanDiff(
diff: ListDiff["diff"], diff: ListDiff["diff"],
@ -15,13 +15,13 @@ function getLeanDiff(
function formatSingleListDiff<T>( function formatSingleListDiff<T>(
listData: T[], listData: T[],
status: LIST_STATUS, status: ListStatus,
options: ListDiffOptions = { showOnly: [] }, options: ListDiffOptions = { showOnly: [] },
): ListDiff { ): ListDiff {
const diff = listData.map((data, i) => ({ const diff = listData.map((data, i) => ({
value: data, value: data,
prevIndex: status === LIST_STATUS.ADDED ? null : i, prevIndex: status === ListStatus.ADDED ? null : i,
newIndex: status === LIST_STATUS.ADDED ? i : null, newIndex: status === ListStatus.ADDED ? i : null,
indexDiff: null, indexDiff: null,
status, status,
})); }));
@ -39,10 +39,10 @@ function formatSingleListDiff<T>(
}; };
} }
function getListStatus(listDiff: ListDiff["diff"]): LIST_STATUS { function getListStatus(listDiff: ListDiff["diff"]): ListStatus {
return listDiff.some((value) => value.status !== LIST_STATUS.EQUAL) return listDiff.some((value) => value.status !== ListStatus.EQUAL)
? LIST_STATUS.UPDATED ? ListStatus.UPDATED
: LIST_STATUS.EQUAL; : ListStatus.EQUAL;
} }
function isReferencedObject( function isReferencedObject(
@ -72,15 +72,15 @@ export const getListDiff = <T>(
if (!prevList && !nextList) { if (!prevList && !nextList) {
return { return {
type: "list", type: "list",
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
diff: [], diff: [],
}; };
} }
if (!prevList) { if (!prevList) {
return formatSingleListDiff(nextList as T[], LIST_STATUS.ADDED, options); return formatSingleListDiff(nextList as T[], ListStatus.ADDED, options);
} }
if (!nextList) { if (!nextList) {
return formatSingleListDiff(prevList as T[], LIST_STATUS.DELETED, options); return formatSingleListDiff(prevList as T[], ListStatus.DELETED, options);
} }
const diff: ListDiff["diff"] = []; const diff: ListDiff["diff"] = [];
const prevIndexMatches = new Set<number>(); const prevIndexMatches = new Set<number>();
@ -106,10 +106,10 @@ export const getListDiff = <T>(
} }
const indexDiff = prevIndex === -1 ? null : i - prevIndex; const indexDiff = prevIndex === -1 ? null : i - prevIndex;
if (indexDiff === 0 || options.ignoreArrayOrder) { if (indexDiff === 0 || options.ignoreArrayOrder) {
let nextStatus = LIST_STATUS.EQUAL; let nextStatus = ListStatus.EQUAL;
if (isReferencedObject(nextValue, options.referenceProperty)) { if (isReferencedObject(nextValue, options.referenceProperty)) {
if (!isEqual(prevList[prevIndex], nextValue)) { if (!isEqual(prevList[prevIndex], nextValue)) {
nextStatus = LIST_STATUS.UPDATED; nextStatus = ListStatus.UPDATED;
} }
} }
return diff.push({ return diff.push({
@ -126,7 +126,7 @@ export const getListDiff = <T>(
prevIndex: null, prevIndex: null,
newIndex: i, newIndex: i,
indexDiff, indexDiff,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}); });
} }
return diff.push({ return diff.push({
@ -135,8 +135,8 @@ export const getListDiff = <T>(
newIndex: i, newIndex: i,
indexDiff, indexDiff,
status: options.considerMoveAsUpdate status: options.considerMoveAsUpdate
? LIST_STATUS.UPDATED ? ListStatus.UPDATED
: LIST_STATUS.MOVED, : ListStatus.MOVED,
}); });
}); });
@ -147,7 +147,7 @@ export const getListDiff = <T>(
prevIndex: i, prevIndex: i,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}); });
} }
}); });

8
src/lib/list-diff/list-diff.test.ts

@ -1,5 +1,5 @@
import { ListStatus } from "@models/list";
import { getListDiff } from "."; import { getListDiff } from ".";
import { LIST_STATUS } from "@models/list";
describe("getListDiff", () => { describe("getListDiff", () => {
it("returns an empty diff if no lists are provided", () => { it("returns an empty diff if no lists are provided", () => {
@ -418,7 +418,7 @@ describe("getListDiff", () => {
false, false,
{ name: "joe", age: 88 }, { name: "joe", age: 88 },
], ],
{ showOnly: [LIST_STATUS.ADDED, LIST_STATUS.DELETED] }, { showOnly: [ListStatus.ADDED, ListStatus.DELETED] },
), ),
).toStrictEqual({ ).toStrictEqual({
type: "list", type: "list",
@ -463,7 +463,7 @@ describe("getListDiff", () => {
}); });
expect( expect(
getListDiff(["mbappe", "mendes", "verratti", "ruiz"], null, { getListDiff(["mbappe", "mendes", "verratti", "ruiz"], null, {
showOnly: [LIST_STATUS.MOVED, LIST_STATUS.UPDATED], showOnly: [ListStatus.MOVED, ListStatus.UPDATED],
}), }),
).toStrictEqual({ ).toStrictEqual({
type: "list", type: "list",
@ -474,7 +474,7 @@ describe("getListDiff", () => {
it("returns all values if their status match the required statuses", () => { it("returns all values if their status match the required statuses", () => {
expect( expect(
getListDiff(null, ["mbappe", "mendes", "verratti", "ruiz"], { getListDiff(null, ["mbappe", "mendes", "verratti", "ruiz"], {
showOnly: [LIST_STATUS.ADDED], showOnly: [ListStatus.ADDED],
}), }),
).toStrictEqual({ ).toStrictEqual({
type: "list", type: "list",

52
src/lib/object-diff/index.ts

@ -1,13 +1,13 @@
import { isEqual, isObject } from "@lib/utils";
import { import {
GRANULARITY, Granularity,
OBJECT_STATUS, ObjectStatus,
ObjectData, ObjectData,
ObjectDiff, ObjectDiff,
ObjectDiffOptions, ObjectDiffOptions,
Diff, Diff,
DEFAULT_OBJECT_DIFF_OPTIONS, DEFAULT_OBJECT_DIFF_OPTIONS,
} from "@models/object"; } from "@models/object";
import { isEqual, isObject } from "@lib/utils";
function getLeanDiff( function getLeanDiff(
diff: ObjectDiff["diff"], diff: ObjectDiff["diff"],
@ -17,7 +17,7 @@ function getLeanDiff(
const res: ObjectDiff["diff"] = []; const res: ObjectDiff["diff"] = [];
for (let i = 0; i < diff.length; i++) { for (let i = 0; i < diff.length; i++) {
const value = diff[i]; const value = diff[i];
if (granularity === GRANULARITY.DEEP && value.diff) { if (granularity === Granularity.DEEP && value.diff) {
const leanDiff = getLeanDiff(value.diff, showOnly); const leanDiff = getLeanDiff(value.diff, showOnly);
if (leanDiff.length > 0) { if (leanDiff.length > 0) {
res.push({ ...value, diff: leanDiff }); res.push({ ...value, diff: leanDiff });
@ -29,21 +29,21 @@ function getLeanDiff(
return res; return res;
} }
function getObjectStatus(diff: ObjectDiff["diff"]): OBJECT_STATUS { function getObjectStatus(diff: ObjectDiff["diff"]): ObjectStatus {
return diff.some((property) => property.status !== OBJECT_STATUS.EQUAL) return diff.some((property) => property.status !== ObjectStatus.EQUAL)
? OBJECT_STATUS.UPDATED ? ObjectStatus.UPDATED
: OBJECT_STATUS.EQUAL; : ObjectStatus.EQUAL;
} }
function formatSingleObjectDiff( function formatSingleObjectDiff(
data: ObjectData, data: ObjectData,
status: OBJECT_STATUS, status: ObjectStatus,
options: ObjectDiffOptions = DEFAULT_OBJECT_DIFF_OPTIONS, options: ObjectDiffOptions = DEFAULT_OBJECT_DIFF_OPTIONS,
): ObjectDiff { ): ObjectDiff {
if (!data) { if (!data) {
return { return {
type: "object", type: "object",
status: OBJECT_STATUS.EQUAL, status: ObjectStatus.EQUAL,
diff: [], diff: [],
}; };
} }
@ -55,16 +55,16 @@ function formatSingleObjectDiff(
for (const [subProperty, subValue] of Object.entries(value)) { for (const [subProperty, subValue] of Object.entries(value)) {
subPropertiesDiff.push({ subPropertiesDiff.push({
property: subProperty, property: subProperty,
previousValue: status === OBJECT_STATUS.ADDED ? undefined : subValue, previousValue: status === ObjectStatus.ADDED ? undefined : subValue,
currentValue: status === OBJECT_STATUS.ADDED ? subValue : undefined, currentValue: status === ObjectStatus.ADDED ? subValue : undefined,
status, status,
}); });
} }
diff.push({ diff.push({
property, property,
previousValue: previousValue:
status === OBJECT_STATUS.ADDED ? undefined : data[property], status === ObjectStatus.ADDED ? undefined : data[property],
currentValue: status === OBJECT_STATUS.ADDED ? value : undefined, currentValue: status === ObjectStatus.ADDED ? value : undefined,
status, status,
diff: subPropertiesDiff, diff: subPropertiesDiff,
}); });
@ -72,8 +72,8 @@ function formatSingleObjectDiff(
diff.push({ diff.push({
property, property,
previousValue: previousValue:
status === OBJECT_STATUS.ADDED ? undefined : data[property], status === ObjectStatus.ADDED ? undefined : data[property],
currentValue: status === OBJECT_STATUS.ADDED ? value : undefined, currentValue: status === ObjectStatus.ADDED ? value : undefined,
status, status,
}); });
} }
@ -97,11 +97,11 @@ function getValueStatus(
previousValue: unknown, previousValue: unknown,
nextValue: unknown, nextValue: unknown,
options?: ObjectDiffOptions, options?: ObjectDiffOptions,
): OBJECT_STATUS { ): ObjectStatus {
if (isEqual(previousValue, nextValue, options)) { if (isEqual(previousValue, nextValue, options)) {
return OBJECT_STATUS.EQUAL; return ObjectStatus.EQUAL;
} }
return OBJECT_STATUS.UPDATED; return ObjectStatus.UPDATED;
} }
function getDiff( function getDiff(
@ -123,7 +123,7 @@ function getDiff(
property, property,
previousValue: prevSubValue, previousValue: prevSubValue,
currentValue: undefined, currentValue: undefined,
status: OBJECT_STATUS.DELETED, status: ObjectStatus.DELETED,
}); });
continue; continue;
} }
@ -132,20 +132,20 @@ function getDiff(
property, property,
previousValue: undefined, previousValue: undefined,
currentValue: nextSubValue, currentValue: nextSubValue,
status: OBJECT_STATUS.ADDED, status: ObjectStatus.ADDED,
}); });
continue; continue;
} }
if (isObject(nextSubValue) && isObject(prevSubValue)) { if (isObject(nextSubValue) && isObject(prevSubValue)) {
const subDiff = getDiff(prevSubValue, nextSubValue, options); const subDiff = getDiff(prevSubValue, nextSubValue, options);
const isUpdated = subDiff.some( const isUpdated = subDiff.some(
(entry) => entry.status !== OBJECT_STATUS.EQUAL, (entry) => entry.status !== ObjectStatus.EQUAL,
); );
diff.push({ diff.push({
property, property,
previousValue: prevSubValue, previousValue: prevSubValue,
currentValue: nextSubValue, currentValue: nextSubValue,
status: isUpdated ? OBJECT_STATUS.UPDATED : OBJECT_STATUS.EQUAL, status: isUpdated ? ObjectStatus.UPDATED : ObjectStatus.EQUAL,
...(isUpdated && { diff: subDiff }), ...(isUpdated && { diff: subDiff }),
}); });
} else { } else {
@ -180,15 +180,15 @@ export function getObjectDiff(
if (!prevData && !nextData) { if (!prevData && !nextData) {
return { return {
type: "object", type: "object",
status: OBJECT_STATUS.EQUAL, status: ObjectStatus.EQUAL,
diff: [], diff: [],
}; };
} }
if (!prevData) { if (!prevData) {
return formatSingleObjectDiff(nextData, OBJECT_STATUS.ADDED, options); return formatSingleObjectDiff(nextData, ObjectStatus.ADDED, options);
} }
if (!nextData) { if (!nextData) {
return formatSingleObjectDiff(prevData, OBJECT_STATUS.DELETED, options); return formatSingleObjectDiff(prevData, ObjectStatus.DELETED, options);
} }
const diff: ObjectDiff["diff"] = getDiff(prevData, nextData, options); const diff: ObjectDiff["diff"] = getDiff(prevData, nextData, options);
const status = getObjectStatus(diff); const status = getObjectStatus(diff);

26
src/lib/object-diff/object-diff.test.ts

@ -1,4 +1,4 @@
import { GRANULARITY, OBJECT_STATUS } from "@models/object"; import { Granularity, ObjectStatus } from "@models/object";
import { getObjectDiff } from "."; import { getObjectDiff } from ".";
describe("getObjectDiff", () => { describe("getObjectDiff", () => {
@ -51,8 +51,8 @@ describe("getObjectDiff", () => {
null, null,
{ {
showOnly: { showOnly: {
statuses: [OBJECT_STATUS.ADDED], statuses: [ObjectStatus.ADDED],
granularity: GRANULARITY.DEEP, granularity: Granularity.DEEP,
}, },
}, },
), ),
@ -509,7 +509,7 @@ describe("getObjectDiff", () => {
nickname: "super joe", nickname: "super joe",
}, },
}, },
{ showOnly: { statuses: [OBJECT_STATUS.ADDED] } }, { showOnly: { statuses: [ObjectStatus.ADDED] } },
), ),
).toStrictEqual({ ).toStrictEqual({
type: "object", type: "object",
@ -549,8 +549,8 @@ describe("getObjectDiff", () => {
}, },
{ {
showOnly: { showOnly: {
statuses: [OBJECT_STATUS.ADDED, OBJECT_STATUS.DELETED], statuses: [ObjectStatus.ADDED, ObjectStatus.DELETED],
granularity: GRANULARITY.DEEP, granularity: Granularity.DEEP,
}, },
}, },
), ),
@ -634,8 +634,8 @@ describe("getObjectDiff", () => {
}, },
{ {
showOnly: { showOnly: {
statuses: [OBJECT_STATUS.UPDATED], statuses: [ObjectStatus.UPDATED],
granularity: GRANULARITY.DEEP, granularity: Granularity.DEEP,
}, },
}, },
), ),
@ -742,8 +742,8 @@ describe("getObjectDiff", () => {
}, },
{ {
showOnly: { showOnly: {
statuses: [OBJECT_STATUS.ADDED], statuses: [ObjectStatus.ADDED],
granularity: GRANULARITY.DEEP, granularity: Granularity.DEEP,
}, },
}, },
), ),
@ -834,8 +834,8 @@ describe("getObjectDiff", () => {
}, },
{ {
showOnly: { showOnly: {
statuses: [OBJECT_STATUS.DELETED], statuses: [ObjectStatus.DELETED],
granularity: GRANULARITY.DEEP, granularity: Granularity.DEEP,
}, },
}, },
), ),
@ -850,7 +850,7 @@ describe("getObjectDiff", () => {
getObjectDiff( getObjectDiff(
{ name: "joe", age: 54, hobbies: ["golf", "football"] }, { name: "joe", age: 54, hobbies: ["golf", "football"] },
null, null,
{ showOnly: { statuses: [OBJECT_STATUS.DELETED] } }, { showOnly: { statuses: [ObjectStatus.DELETED] } },
), ),
).toStrictEqual({ ).toStrictEqual({
type: "object", type: "object",

93
src/lib/stream-list-diff/client/index.ts

@ -1,24 +1,21 @@
import { IEmitter, EmitterEvents, EventEmitter } from "@models/emitter";
import { import {
DataBuffer, DataBuffer,
DEFAULT_LIST_STREAM_OPTIONS, DEFAULT_LIST_STREAM_OPTIONS,
ListStreamOptions, ListStreamOptions,
ReferenceProperty, ReferenceProperty,
} from "@models/stream";
import { LIST_STATUS } from "@models/list";
import {
Emitter,
EmitterEvents,
EventEmitter,
StreamListener,
StreamEvent, StreamEvent,
} from "../emitter"; StreamListener,
} from "@models/stream";
import { ListStatus, ListType } from "@models/list";
import { isDataValid, isValidChunkSize, outputDiffChunk } from "../utils"; import { isDataValid, isValidChunkSize, outputDiffChunk } from "../utils";
import { generateWorker } from "./worker/utils";
async function getDiffChunks<T extends Record<string, unknown>>( async function getDiffChunks<T extends Record<string, unknown>>(
prevStream: ReadableStream<T>, prevStream: ReadableStream<T>,
nextStream: ReadableStream<T>, nextStream: ReadableStream<T>,
referenceProperty: ReferenceProperty<T>, referenceProperty: ReferenceProperty<T>,
emitter: Emitter<T>, emitter: IEmitter<T>,
options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS, options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS,
): Promise<void> { ): Promise<void> {
if (!isValidChunkSize(options?.chunksSize)) { if (!isValidChunkSize(options?.chunksSize)) {
@ -42,7 +39,7 @@ async function getDiffChunks<T extends Record<string, unknown>>(
const { isValid, message } = isDataValid( const { isValid, message } = isDataValid(
chunk, chunk,
referenceProperty, referenceProperty,
"prevList", ListType.PREV,
); );
if (!isValid) { if (!isValid) {
emitter.emit(StreamEvent.Error, new Error(message)); emitter.emit(StreamEvent.Error, new Error(message));
@ -67,10 +64,10 @@ async function getDiffChunks<T extends Record<string, unknown>>(
indexDiff, indexDiff,
status: status:
indexDiff === 0 indexDiff === 0
? LIST_STATUS.EQUAL ? ListStatus.EQUAL
: options.considerMoveAsUpdate : options.considerMoveAsUpdate
? LIST_STATUS.UPDATED ? ListStatus.UPDATED
: LIST_STATUS.MOVED, : ListStatus.MOVED,
}, },
options, options,
); );
@ -82,7 +79,7 @@ async function getDiffChunks<T extends Record<string, unknown>>(
prevIndex: currentPrevIndex, prevIndex: currentPrevIndex,
newIndex: relatedChunk.index, newIndex: relatedChunk.index,
indexDiff, indexDiff,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
options, options,
); );
@ -97,7 +94,7 @@ async function getDiffChunks<T extends Record<string, unknown>>(
const { isValid, message } = isDataValid( const { isValid, message } = isDataValid(
chunk, chunk,
referenceProperty, referenceProperty,
"nextList", ListType.NEXT,
); );
if (!isValid) { if (!isValid) {
emitter.emit(StreamEvent.Error, new Error(message)); emitter.emit(StreamEvent.Error, new Error(message));
@ -122,10 +119,10 @@ async function getDiffChunks<T extends Record<string, unknown>>(
indexDiff, indexDiff,
status: status:
indexDiff === 0 indexDiff === 0
? LIST_STATUS.EQUAL ? ListStatus.EQUAL
: options.considerMoveAsUpdate : options.considerMoveAsUpdate
? LIST_STATUS.UPDATED ? ListStatus.UPDATED
: LIST_STATUS.MOVED, : ListStatus.MOVED,
}, },
options, options,
); );
@ -137,7 +134,7 @@ async function getDiffChunks<T extends Record<string, unknown>>(
prevIndex: relatedChunk.index, prevIndex: relatedChunk.index,
newIndex: currentNextIndex, newIndex: currentNextIndex,
indexDiff, indexDiff,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
options, options,
); );
@ -175,7 +172,7 @@ async function getDiffChunks<T extends Record<string, unknown>>(
prevIndex: chunk.index, prevIndex: chunk.index,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
options, options,
); );
@ -189,7 +186,7 @@ async function getDiffChunks<T extends Record<string, unknown>>(
prevIndex: null, prevIndex: null,
newIndex: chunk.index, newIndex: chunk.index,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
options, options,
); );
@ -202,7 +199,7 @@ async function getDiffChunks<T extends Record<string, unknown>>(
async function getValidClientStream<T extends Record<string, unknown>>( async function getValidClientStream<T extends Record<string, unknown>>(
input: ReadableStream<T> | T[] | File, input: ReadableStream<T> | T[] | File,
listType: "prevList" | "nextList", listType: ListType,
): Promise<ReadableStream<T>> { ): Promise<ReadableStream<T>> {
if (Array.isArray(input)) { if (Array.isArray(input)) {
return new ReadableStream({ return new ReadableStream({
@ -243,6 +240,25 @@ async function getValidClientStream<T extends Record<string, unknown>>(
); );
} }
export async function generateStream<T extends Record<string, unknown>>(
prevList: ReadableStream<T> | File | T[],
nextList: ReadableStream<T> | File | T[],
referenceProperty: ReferenceProperty<T>,
options: ListStreamOptions,
emitter: IEmitter<T>,
): Promise<void> {
try {
const [prevStream, nextStream] = await Promise.all([
getValidClientStream(prevList, ListType.PREV),
getValidClientStream(nextList, ListType.NEXT),
]);
getDiffChunks(prevStream, nextStream, referenceProperty, emitter, options);
} catch (err) {
return emitter.emit(StreamEvent.Error, err as Error);
}
}
/** /**
* Streams the diff of two object lists * Streams the diff of two object lists
* @param {ReadableStream | File | Record<string, unknown>[]} prevList - The original object list. * @param {ReadableStream | File | Record<string, unknown>[]} prevList - The original object list.
@ -250,8 +266,10 @@ async function getValidClientStream<T extends Record<string, unknown>>(
* @param {string} referenceProperty - A common property in all the objects of your lists (e.g. `id`) * @param {string} referenceProperty - A common property in all the objects of your lists (e.g. `id`)
* @param {ListStreamOptions} options - Options to refine your output. * @param {ListStreamOptions} options - Options to refine your output.
- `chunksSize`: the number of object diffs returned by each streamed chunk. (e.g. `0` = 1 object diff by chunk, `10` = 10 object diffs by chunk). - `chunksSize`: the number of object diffs returned by each streamed chunk. (e.g. `0` = 1 object diff by chunk, `10` = 10 object diffs by chunk).
- `showOnly`: returns only the values whose status you are interested in. (e.g. `["added", "equal"]`) - `showOnly`: returns only the values whose status you are interested in. (e.g. `["added", "equal"]`).
- `considerMoveAsUpdate`: if set to `true` a `moved` object will be considered as `updated` - `considerMoveAsUpdate`: if set to `true` a `moved` object will be considered as `updated`.
- `useWorker`: if set to `true`, the diff will be run in a worker. Recommended for maximum performance, `true` by default.
- `showWarnings`: if set to `true`, potential warnings will be displayed in the console.
* @returns StreamListener * @returns StreamListener
*/ */
export function streamListDiff<T extends Record<string, unknown>>( export function streamListDiff<T extends Record<string, unknown>>(
@ -261,23 +279,16 @@ export function streamListDiff<T extends Record<string, unknown>>(
options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS, options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS,
): StreamListener<T> { ): StreamListener<T> {
const emitter = new EventEmitter<EmitterEvents<T>>(); const emitter = new EventEmitter<EmitterEvents<T>>();
setTimeout(async () => {
try {
const [prevStream, nextStream] = await Promise.all([
getValidClientStream(prevList, "prevList"),
getValidClientStream(nextList, "nextList"),
]);
getDiffChunks( if (typeof Worker === "undefined" || !options.useWorker) {
prevStream, setTimeout(
nextStream, () =>
referenceProperty, generateStream(prevList, nextList, referenceProperty, options, emitter),
emitter, 0,
options, );
); } else {
} catch (err) { generateWorker(prevList, nextList, referenceProperty, options, emitter);
return emitter.emit(StreamEvent.Error, err as Error); }
}
}, 0);
return emitter as StreamListener<T>; return emitter as StreamListener<T>;
} }

153
src/lib/stream-list-diff/client/stream-list-diff-client.test.ts

@ -3,13 +3,13 @@
*/ */
import "blob-polyfill"; import "blob-polyfill";
import { ReadableStream } from "web-streams-polyfill"; import { ReadableStream } from "web-streams-polyfill";
import { LIST_STATUS } from "@models/list"; import prevListFile from "@mocks/prevList.json";
import nextListFile from "@mocks/nextList.json";
import { ListStatus } from "@models/list";
import { StreamListDiff } from "@models/stream"; import { StreamListDiff } from "@models/stream";
import { streamListDiff } from "."; import { streamListDiff } from ".";
import prevListFile from "../../../mocks/prevList.json";
import nextListFile from "../../../mocks/nextList.json";
//@ts-expect-error - the ReadableStream polyfill is necessary to test ReadableStream in a Node environment. // @ts-expect-error - the ReadableStream polyfill is necessary to test ReadableStream in a Node environment.
global.ReadableStream = ReadableStream; global.ReadableStream = ReadableStream;
describe("data emission", () => { describe("data emission", () => {
@ -18,7 +18,10 @@ describe("data emission", () => {
{ id: 1, name: "Item 1" }, { id: 1, name: "Item 1" },
{ id: 2, name: "Item 2" }, { id: 2, name: "Item 2" },
]; ];
const diff = streamListDiff([], nextList, "id", { chunksSize: 2 }); const diff = streamListDiff([], nextList, "id", {
chunksSize: 2,
useWorker: false,
});
const expectedChunks = [ const expectedChunks = [
{ {
@ -27,7 +30,7 @@ describe("data emission", () => {
prevIndex: null, prevIndex: null,
newIndex: 0, newIndex: 0,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
{ {
previousValue: null, previousValue: null,
@ -35,7 +38,7 @@ describe("data emission", () => {
prevIndex: null, prevIndex: null,
newIndex: 1, newIndex: 1,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
]; ];
let chunkCount = 0; let chunkCount = 0;
@ -53,7 +56,10 @@ describe("data emission", () => {
{ id: 1, name: "Item 1" }, { id: 1, name: "Item 1" },
{ id: 2, name: "Item 2" }, { id: 2, name: "Item 2" },
]; ];
const diff = streamListDiff(prevList, [], "id", { chunksSize: 2 }); const diff = streamListDiff(prevList, [], "id", {
chunksSize: 2,
useWorker: false,
});
const expectedChunks = [ const expectedChunks = [
{ {
@ -62,7 +68,7 @@ describe("data emission", () => {
prevIndex: 0, prevIndex: 0,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
{ {
previousValue: { id: 2, name: "Item 2" }, previousValue: { id: 2, name: "Item 2" },
@ -70,7 +76,7 @@ describe("data emission", () => {
prevIndex: 1, prevIndex: 1,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
]; ];
let chunkCount = 0; let chunkCount = 0;
@ -78,7 +84,7 @@ describe("data emission", () => {
expect(chunk).toStrictEqual(expectedChunks); expect(chunk).toStrictEqual(expectedChunks);
chunkCount++; chunkCount++;
}); });
diff.on("error", (err) => console.error("shiiiite", err)); diff.on("error", (err) => console.error(err));
diff.on("finish", () => { diff.on("finish", () => {
expect(chunkCount).toBe(1); expect(chunkCount).toBe(1);
done(); done();
@ -93,7 +99,7 @@ describe("data emission", () => {
{ id: 2, name: "Item 2" }, { id: 2, name: "Item 2" },
{ id: 3, name: "Item 3" }, { id: 3, name: "Item 3" },
]; ];
const diff = streamListDiff(prevList, nextList, "id"); const diff = streamListDiff(prevList, nextList, "id", { useWorker: false });
const expectedChunks = [ const expectedChunks = [
[ [
@ -103,7 +109,7 @@ describe("data emission", () => {
prevIndex: 1, prevIndex: 1,
newIndex: 0, newIndex: 0,
indexDiff: -1, indexDiff: -1,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
], ],
[ [
@ -113,7 +119,7 @@ describe("data emission", () => {
prevIndex: 0, prevIndex: 0,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
], ],
[ [
@ -123,7 +129,7 @@ describe("data emission", () => {
prevIndex: null, prevIndex: null,
newIndex: 1, newIndex: 1,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
], ],
]; ];
@ -166,6 +172,7 @@ describe("data emission", () => {
]; ];
const diff = streamListDiff(prevList, nextList, "id", { const diff = streamListDiff(prevList, nextList, "id", {
chunksSize: 5, chunksSize: 5,
useWorker: false,
}); });
const expectedChunks = [ const expectedChunks = [
@ -176,7 +183,7 @@ describe("data emission", () => {
prevIndex: 0, prevIndex: 0,
newIndex: 0, newIndex: 0,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 2, name: "Item 2" }, previousValue: { id: 2, name: "Item 2" },
@ -184,7 +191,7 @@ describe("data emission", () => {
prevIndex: 1, prevIndex: 1,
newIndex: 1, newIndex: 1,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
{ {
previousValue: { id: 3, name: "Item 3" }, previousValue: { id: 3, name: "Item 3" },
@ -192,7 +199,7 @@ describe("data emission", () => {
prevIndex: 2, prevIndex: 2,
newIndex: 2, newIndex: 2,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 5, name: "Item 5" }, previousValue: { id: 5, name: "Item 5" },
@ -200,7 +207,7 @@ describe("data emission", () => {
prevIndex: 4, prevIndex: 4,
newIndex: 3, newIndex: 3,
indexDiff: -1, indexDiff: -1,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
{ {
previousValue: { id: 6, name: "Item 6" }, previousValue: { id: 6, name: "Item 6" },
@ -208,7 +215,7 @@ describe("data emission", () => {
prevIndex: 5, prevIndex: 5,
newIndex: 4, newIndex: 4,
indexDiff: -1, indexDiff: -1,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
], ],
[ [
@ -218,7 +225,7 @@ describe("data emission", () => {
prevIndex: 6, prevIndex: 6,
newIndex: 5, newIndex: 5,
indexDiff: -1, indexDiff: -1,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
{ {
previousValue: { id: 9, name: "Item 9" }, previousValue: { id: 9, name: "Item 9" },
@ -226,7 +233,7 @@ describe("data emission", () => {
prevIndex: 8, prevIndex: 8,
newIndex: 8, newIndex: 8,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 10, name: "Item 10" }, previousValue: { id: 10, name: "Item 10" },
@ -234,7 +241,7 @@ describe("data emission", () => {
prevIndex: 9, prevIndex: 9,
newIndex: 6, newIndex: 6,
indexDiff: -3, indexDiff: -3,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
{ {
previousValue: { id: 8, name: "Item 8" }, previousValue: { id: 8, name: "Item 8" },
@ -242,7 +249,7 @@ describe("data emission", () => {
prevIndex: 7, prevIndex: 7,
newIndex: 9, newIndex: 9,
indexDiff: 2, indexDiff: 2,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
{ {
previousValue: { id: 4, name: "Item 4" }, previousValue: { id: 4, name: "Item 4" },
@ -250,7 +257,7 @@ describe("data emission", () => {
prevIndex: 3, prevIndex: 3,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
], ],
[ [
@ -260,7 +267,7 @@ describe("data emission", () => {
prevIndex: null, prevIndex: null,
newIndex: 7, newIndex: 7,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
], ],
]; ];
@ -293,6 +300,7 @@ describe("data emission", () => {
const diff = streamListDiff(prevList, nextList, "id", { const diff = streamListDiff(prevList, nextList, "id", {
chunksSize: 150, chunksSize: 150,
useWorker: false,
}); });
const expectedChunks = [ const expectedChunks = [
@ -302,7 +310,7 @@ describe("data emission", () => {
prevIndex: 0, prevIndex: 0,
newIndex: 0, newIndex: 0,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 2, name: "Item 2" }, previousValue: { id: 2, name: "Item 2" },
@ -310,7 +318,7 @@ describe("data emission", () => {
prevIndex: 1, prevIndex: 1,
newIndex: 1, newIndex: 1,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
{ {
previousValue: { id: 3, name: "Item 3" }, previousValue: { id: 3, name: "Item 3" },
@ -318,7 +326,7 @@ describe("data emission", () => {
prevIndex: 2, prevIndex: 2,
newIndex: 2, newIndex: 2,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 4, name: "Item 4" }, previousValue: { id: 4, name: "Item 4" },
@ -326,7 +334,7 @@ describe("data emission", () => {
prevIndex: 3, prevIndex: 3,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
{ {
previousValue: null, previousValue: null,
@ -334,7 +342,7 @@ describe("data emission", () => {
prevIndex: null, prevIndex: null,
newIndex: 3, newIndex: 3,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
]; ];
@ -365,6 +373,7 @@ describe("data emission", () => {
const diff = streamListDiff(prevList, nextList, "id", { const diff = streamListDiff(prevList, nextList, "id", {
chunksSize: 5, chunksSize: 5,
considerMoveAsUpdate: true, considerMoveAsUpdate: true,
useWorker: false,
}); });
const expectedChunks = [ const expectedChunks = [
@ -374,7 +383,7 @@ describe("data emission", () => {
prevIndex: 1, prevIndex: 1,
newIndex: 0, newIndex: 0,
indexDiff: -1, indexDiff: -1,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
{ {
previousValue: { id: 1, name: "Item 1" }, previousValue: { id: 1, name: "Item 1" },
@ -382,7 +391,7 @@ describe("data emission", () => {
prevIndex: 0, prevIndex: 0,
newIndex: 1, newIndex: 1,
indexDiff: 1, indexDiff: 1,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
{ {
previousValue: { id: 3, name: "Item 3" }, previousValue: { id: 3, name: "Item 3" },
@ -390,7 +399,7 @@ describe("data emission", () => {
prevIndex: 2, prevIndex: 2,
newIndex: 2, newIndex: 2,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 4, name: "Item 4" }, previousValue: { id: 4, name: "Item 4" },
@ -398,7 +407,7 @@ describe("data emission", () => {
prevIndex: 3, prevIndex: 3,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
{ {
previousValue: null, previousValue: null,
@ -406,7 +415,7 @@ describe("data emission", () => {
prevIndex: null, prevIndex: null,
newIndex: 3, newIndex: 3,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
]; ];
@ -437,6 +446,7 @@ describe("data emission", () => {
const diff = streamListDiff(prevList, nextList, "id", { const diff = streamListDiff(prevList, nextList, "id", {
chunksSize: 5, chunksSize: 5,
showOnly: ["added", "deleted"], showOnly: ["added", "deleted"],
useWorker: false,
}); });
const expectedChunks = [ const expectedChunks = [
@ -446,7 +456,7 @@ describe("data emission", () => {
prevIndex: 3, prevIndex: 3,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
{ {
previousValue: null, previousValue: null,
@ -454,7 +464,7 @@ describe("data emission", () => {
prevIndex: null, prevIndex: null,
newIndex: 3, newIndex: 3,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
]; ];
@ -524,6 +534,7 @@ describe("data emission", () => {
]; ];
const diff = streamListDiff(prevList, nextList, "id", { const diff = streamListDiff(prevList, nextList, "id", {
chunksSize: 5, chunksSize: 5,
useWorker: false,
}); });
const expectedChunks = [ const expectedChunks = [
@ -542,7 +553,7 @@ describe("data emission", () => {
prevIndex: 0, prevIndex: 0,
newIndex: 0, newIndex: 0,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 2, name: "Item 2" }, previousValue: { id: 2, name: "Item 2" },
@ -550,7 +561,7 @@ describe("data emission", () => {
prevIndex: 1, prevIndex: 1,
newIndex: 1, newIndex: 1,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
{ {
previousValue: { previousValue: {
@ -566,7 +577,7 @@ describe("data emission", () => {
prevIndex: 2, prevIndex: 2,
newIndex: 2, newIndex: 2,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 5, name: "Item 5" }, previousValue: { id: 5, name: "Item 5" },
@ -574,7 +585,7 @@ describe("data emission", () => {
prevIndex: 4, prevIndex: 4,
newIndex: 3, newIndex: 3,
indexDiff: -1, indexDiff: -1,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
{ {
previousValue: { previousValue: {
@ -590,7 +601,7 @@ describe("data emission", () => {
prevIndex: 5, prevIndex: 5,
newIndex: 4, newIndex: 4,
indexDiff: -1, indexDiff: -1,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
], ],
[ [
@ -600,7 +611,7 @@ describe("data emission", () => {
prevIndex: 6, prevIndex: 6,
newIndex: 5, newIndex: 5,
indexDiff: -1, indexDiff: -1,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
{ {
previousValue: { id: 9, name: "Item 9" }, previousValue: { id: 9, name: "Item 9" },
@ -608,7 +619,7 @@ describe("data emission", () => {
prevIndex: 8, prevIndex: 8,
newIndex: 8, newIndex: 8,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { previousValue: {
@ -632,7 +643,7 @@ describe("data emission", () => {
prevIndex: 9, prevIndex: 9,
newIndex: 6, newIndex: 6,
indexDiff: -3, indexDiff: -3,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
{ {
previousValue: { id: 8, name: "Item 8" }, previousValue: { id: 8, name: "Item 8" },
@ -640,7 +651,7 @@ describe("data emission", () => {
prevIndex: 7, prevIndex: 7,
newIndex: 9, newIndex: 9,
indexDiff: 2, indexDiff: 2,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
{ {
previousValue: { previousValue: {
@ -652,7 +663,7 @@ describe("data emission", () => {
prevIndex: 3, prevIndex: 3,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
], ],
[ [
@ -662,7 +673,7 @@ describe("data emission", () => {
prevIndex: null, prevIndex: null,
newIndex: 7, newIndex: 7,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
], ],
]; ];
@ -701,7 +712,7 @@ describe("input handling", () => {
prevIndex: 0, prevIndex: 0,
newIndex: 0, newIndex: 0,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 2, name: "Item 2" }, previousValue: { id: 2, name: "Item 2" },
@ -709,7 +720,7 @@ describe("input handling", () => {
prevIndex: 1, prevIndex: 1,
newIndex: 1, newIndex: 1,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
{ {
previousValue: { id: 3, name: "Item 3" }, previousValue: { id: 3, name: "Item 3" },
@ -717,7 +728,7 @@ describe("input handling", () => {
prevIndex: 2, prevIndex: 2,
newIndex: 2, newIndex: 2,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 4, name: "Item 4" }, previousValue: { id: 4, name: "Item 4" },
@ -725,7 +736,7 @@ describe("input handling", () => {
prevIndex: 3, prevIndex: 3,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
{ {
previousValue: null, previousValue: null,
@ -733,7 +744,7 @@ describe("input handling", () => {
prevIndex: null, prevIndex: null,
newIndex: 3, newIndex: 3,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
]; ];
@ -753,6 +764,7 @@ describe("input handling", () => {
const diff = streamListDiff(prevStream, nextStream, "id", { const diff = streamListDiff(prevStream, nextStream, "id", {
chunksSize: 5, chunksSize: 5,
useWorker: false,
}); });
let chunkCount = 0; let chunkCount = 0;
@ -760,7 +772,7 @@ describe("input handling", () => {
expect(chunk).toStrictEqual(expectedChunks); expect(chunk).toStrictEqual(expectedChunks);
chunkCount++; chunkCount++;
}); });
diff.on("error", (err) => console.error("sheeeet", err)); diff.on("error", (err) => console.error(err));
diff.on("finish", () => { diff.on("finish", () => {
expect(chunkCount).toBe(1); expect(chunkCount).toBe(1);
done(); done();
@ -777,6 +789,7 @@ describe("input handling", () => {
const diff = streamListDiff(prevFile, nextFile, "id", { const diff = streamListDiff(prevFile, nextFile, "id", {
chunksSize: 5, chunksSize: 5,
useWorker: false,
}); });
let chunkCount = 0; let chunkCount = 0;
@ -784,7 +797,7 @@ describe("input handling", () => {
expect(chunk).toStrictEqual(expectedChunks); expect(chunk).toStrictEqual(expectedChunks);
chunkCount++; chunkCount++;
}); });
diff.on("error", (err) => console.error("sheeeet", err)); diff.on("error", (err) => console.error(err));
diff.on("finish", () => { diff.on("finish", () => {
expect(chunkCount).toBe(1); expect(chunkCount).toBe(1);
done(); done();
@ -803,6 +816,7 @@ describe("input handling", () => {
const diff = streamListDiff(prevStream, nextFile, "id", { const diff = streamListDiff(prevStream, nextFile, "id", {
chunksSize: 5, chunksSize: 5,
useWorker: false,
}); });
let chunkCount = 0; let chunkCount = 0;
@ -810,7 +824,7 @@ describe("input handling", () => {
expect(chunk).toStrictEqual(expectedChunks); expect(chunk).toStrictEqual(expectedChunks);
chunkCount++; chunkCount++;
}); });
diff.on("error", (err) => console.error("sheeeet", err)); diff.on("error", (err) => console.error(err));
diff.on("finish", () => { diff.on("finish", () => {
expect(chunkCount).toBe(1); expect(chunkCount).toBe(1);
done(); done();
@ -826,6 +840,7 @@ describe("input handling", () => {
const diff = streamListDiff(prevStream, nextList, "id", { const diff = streamListDiff(prevStream, nextList, "id", {
chunksSize: 5, chunksSize: 5,
useWorker: false,
}); });
let chunkCount = 0; let chunkCount = 0;
@ -833,7 +848,7 @@ describe("input handling", () => {
expect(chunk).toStrictEqual(expectedChunks); expect(chunk).toStrictEqual(expectedChunks);
chunkCount++; chunkCount++;
}); });
diff.on("error", (err) => console.error("sheeeet", err)); diff.on("error", (err) => console.error(err));
diff.on("finish", () => { diff.on("finish", () => {
expect(chunkCount).toBe(1); expect(chunkCount).toBe(1);
done(); done();
@ -846,6 +861,7 @@ describe("input handling", () => {
const diff = streamListDiff(prevFile, nextList, "id", { const diff = streamListDiff(prevFile, nextList, "id", {
chunksSize: 5, chunksSize: 5,
useWorker: false,
}); });
let chunkCount = 0; let chunkCount = 0;
@ -853,7 +869,7 @@ describe("input handling", () => {
expect(chunk).toStrictEqual(expectedChunks); expect(chunk).toStrictEqual(expectedChunks);
chunkCount++; chunkCount++;
}); });
diff.on("error", (err) => console.error("sheeeet", err)); diff.on("error", (err) => console.error(err));
diff.on("finish", () => { diff.on("finish", () => {
expect(chunkCount).toBe(1); expect(chunkCount).toBe(1);
done(); done();
@ -863,7 +879,7 @@ describe("input handling", () => {
describe("finish event", () => { describe("finish event", () => {
it("emits 'finish' event if no prevList nor nextList is provided", (done) => { it("emits 'finish' event if no prevList nor nextList is provided", (done) => {
const diff = streamListDiff([], [], "id"); const diff = streamListDiff([], [], "id", { useWorker: false });
diff.on("finish", () => done()); diff.on("finish", () => done());
}); });
it("emits 'finish' event when all the chunks have been processed", (done) => { it("emits 'finish' event when all the chunks have been processed", (done) => {
@ -875,7 +891,7 @@ describe("finish event", () => {
{ id: 2, name: "Item 2" }, { id: 2, name: "Item 2" },
{ id: 3, name: "Item 3" }, { id: 3, name: "Item 3" },
]; ];
const diff = streamListDiff(prevList, nextList, "id"); const diff = streamListDiff(prevList, nextList, "id", { useWorker: false });
diff.on("finish", () => done()); diff.on("finish", () => done());
}); });
}); });
@ -893,7 +909,7 @@ describe("error event", () => {
]; ];
// @ts-expect-error prevList is invalid by design for the test // @ts-expect-error prevList is invalid by design for the test
const diff = streamListDiff(prevList, nextList, "id"); const diff = streamListDiff(prevList, nextList, "id", { useWorker: false });
diff.on("error", (err) => { diff.on("error", (err) => {
expect(err["message"]).toEqual( expect(err["message"]).toEqual(
@ -915,7 +931,7 @@ describe("error event", () => {
]; ];
// @ts-expect-error nextList is invalid by design for the test // @ts-expect-error nextList is invalid by design for the test
const diff = streamListDiff(prevList, nextList, "id"); const diff = streamListDiff(prevList, nextList, "id", { useWorker: false });
diff.on("error", (err) => { diff.on("error", (err) => {
expect(err["message"]).toEqual( expect(err["message"]).toEqual(
@ -932,7 +948,7 @@ describe("error event", () => {
{ id: 2, name: "Item 2" }, { id: 2, name: "Item 2" },
]; ];
const diff = streamListDiff(prevList, nextList, "id"); const diff = streamListDiff(prevList, nextList, "id", { useWorker: false });
diff.on("error", (err) => { diff.on("error", (err) => {
expect(err["message"]).toEqual( expect(err["message"]).toEqual(
@ -949,7 +965,7 @@ describe("error event", () => {
]; ];
const nextList = [{ id: 1, name: "Item 1" }, { name: "Item 2" }]; const nextList = [{ id: 1, name: "Item 1" }, { name: "Item 2" }];
const diff = streamListDiff(prevList, nextList, "id"); const diff = streamListDiff(prevList, nextList, "id", { useWorker: false });
diff.on("error", (err) => { diff.on("error", (err) => {
expect(err["message"]).toEqual( expect(err["message"]).toEqual(
@ -968,6 +984,7 @@ describe("error event", () => {
const diff = streamListDiff(prevList, nextList, "id", { const diff = streamListDiff(prevList, nextList, "id", {
chunksSize: -3, chunksSize: -3,
useWorker: false,
}); });
diff.on("error", (err) => { diff.on("error", (err) => {
@ -982,7 +999,9 @@ describe("error event", () => {
const nextList = [{ id: 1, name: "Item 1" }, { name: "Item 2" }]; const nextList = [{ id: 1, name: "Item 1" }, { name: "Item 2" }];
// @ts-expect-error - prevList is invalid by design for the test // @ts-expect-error - prevList is invalid by design for the test
const diff = streamListDiff({ name: "hello" }, nextList, "id"); const diff = streamListDiff({ name: "hello" }, nextList, "id", {
useWorker: false,
});
diff.on("error", (err) => { diff.on("error", (err) => {
expect(err["message"]).toEqual( expect(err["message"]).toEqual(
@ -995,7 +1014,7 @@ describe("error event", () => {
const prevList = [{ id: 1, name: "Item 1" }, { name: "Item 2" }]; const prevList = [{ id: 1, name: "Item 1" }, { name: "Item 2" }];
// @ts-expect-error - nextList is invalid by design for the test // @ts-expect-error - nextList is invalid by design for the test
const diff = streamListDiff(prevList, null, "id"); const diff = streamListDiff(prevList, null, "id", { useWorker: false });
diff.on("error", (err) => { diff.on("error", (err) => {
expect(err["message"]).toEqual( expect(err["message"]).toEqual(

1116
src/lib/stream-list-diff/client/stream-list-diff-client.worker.test.ts

File diff suppressed because it is too large Load Diff

78
src/lib/stream-list-diff/client/worker/utils.ts

@ -0,0 +1,78 @@
import { IEmitter, EmitterEvents, EventEmitter } from "@models/emitter";
import {
ListStreamOptions,
READABLE_STREAM_ALERT,
ReferenceProperty,
StreamEvent,
StreamListener,
} from "@models/stream";
import { WebWorkerMessage } from "@models/worker";
import { generateStream } from "..";
export function workerDiff<T extends Record<string, unknown>>(
prevList: File | T[],
nextList: File | T[],
referenceProperty: ReferenceProperty<T>,
options: ListStreamOptions,
): StreamListener<T> {
const emitter = new EventEmitter<EmitterEvents<T>>();
setTimeout(
() =>
generateStream(prevList, nextList, referenceProperty, options, emitter),
0,
);
return emitter as StreamListener<T>;
}
async function getArrayFromStream<T>(
readableStream: ReadableStream<T>,
showWarnings: boolean = true,
): Promise<T[]> {
if (showWarnings) {
console.warn(READABLE_STREAM_ALERT);
}
const reader = readableStream.getReader();
const chunks: T[] = [];
let result;
while (!(result = await reader.read()).done) {
chunks.push(result.value);
}
return chunks;
}
export async function generateWorker<T extends Record<string, unknown>>(
prevList: ReadableStream<T> | File | T[],
nextList: ReadableStream<T> | File | T[],
referenceProperty: ReferenceProperty<T>,
options: ListStreamOptions,
emitter: IEmitter<T>,
) {
try {
if (prevList instanceof ReadableStream) {
prevList = await getArrayFromStream(prevList, options?.showWarnings);
}
if (nextList instanceof ReadableStream) {
nextList = await getArrayFromStream(nextList, options?.showWarnings);
}
const worker = new Worker(new URL("./web-worker.js", import.meta.url), {
type: "module",
});
worker.postMessage({ prevList, nextList, referenceProperty, options });
worker.onmessage = (e: WebWorkerMessage<T>) => {
const { event, chunk, error } = e.data;
if (event === StreamEvent.Data) {
emitter.emit(StreamEvent.Data, chunk);
} else if (event === StreamEvent.Finish) {
emitter.emit(StreamEvent.Finish);
worker.terminate();
} else if (event === StreamEvent.Error) {
emitter.emit(StreamEvent.Error, new Error(error));
worker.terminate();
}
};
worker.onerror = (err: ErrorEvent) =>
emitter.emit(StreamEvent.Error, new Error(err.message));
} catch (err) {
return emitter.emit(StreamEvent.Error, err as Error);
}
}

30
src/lib/stream-list-diff/client/worker/web-worker.ts

@ -0,0 +1,30 @@
import {
ListStreamOptions,
ReferenceProperty,
StreamEvent,
} from "@models/stream";
import { workerDiff } from "./utils";
self.onmessage = async <T extends Record<string, unknown>>(
event: MessageEvent<{
prevList: File | T[];
nextList: File | T[];
referenceProperty: ReferenceProperty<T>;
options: ListStreamOptions;
}>,
) => {
const { prevList, nextList, referenceProperty, options } = event.data;
const listener = workerDiff(prevList, nextList, referenceProperty, options);
listener.on(StreamEvent.Data, (chunk) => {
self.postMessage({ event: StreamEvent.Data, chunk });
});
listener.on(StreamEvent.Finish, () => {
self.postMessage({ event: StreamEvent.Finish });
});
listener.on(StreamEvent.Error, (error) => {
self.postMessage({ event: StreamEvent.Error, error: error.message });
});
};

93
src/lib/stream-list-diff/server/index.ts

@ -1,27 +1,25 @@
import { createReadStream } from "fs"; import { createReadStream } from "fs";
import { Readable, Transform } from "stream"; import { Readable, Transform } from "stream";
import { LIST_STATUS } from "@models/list"; import { Worker } from "worker_threads";
import { IEmitter, EmitterEvents, EventEmitter } from "@models/emitter";
import { ListStatus, ListType } from "@models/list";
import { import {
DataBuffer, DataBuffer,
DEFAULT_LIST_STREAM_OPTIONS, DEFAULT_LIST_STREAM_OPTIONS,
FilePath, FilePath,
ListStreamOptions, ListStreamOptions,
ReferenceProperty, ReferenceProperty,
} from "@models/stream";
import {
Emitter,
EmitterEvents,
EventEmitter,
StreamListener,
StreamEvent, StreamEvent,
} from "../emitter"; StreamListener,
} from "@models/stream";
import { isDataValid, isValidChunkSize, outputDiffChunk } from "../utils"; import { isDataValid, isValidChunkSize, outputDiffChunk } from "../utils";
import { generateWorker } from "./worker/utils";
async function getDiffChunks<T extends Record<string, unknown>>( async function getDiffChunks<T extends Record<string, unknown>>(
prevStream: Readable, prevStream: Readable,
nextStream: Readable, nextStream: Readable,
referenceProperty: ReferenceProperty<T>, referenceProperty: ReferenceProperty<T>,
emitter: Emitter<T>, emitter: IEmitter<T>,
options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS, options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS,
): Promise<void> { ): Promise<void> {
if (!isValidChunkSize(options?.chunksSize)) { if (!isValidChunkSize(options?.chunksSize)) {
@ -42,7 +40,7 @@ async function getDiffChunks<T extends Record<string, unknown>>(
const { isValid, message } = isDataValid( const { isValid, message } = isDataValid(
chunk, chunk,
referenceProperty, referenceProperty,
"prevList", ListType.PREV,
); );
if (!isValid) { if (!isValid) {
emitter.emit(StreamEvent.Error, new Error(message)); emitter.emit(StreamEvent.Error, new Error(message));
@ -67,10 +65,10 @@ async function getDiffChunks<T extends Record<string, unknown>>(
indexDiff, indexDiff,
status: status:
indexDiff === 0 indexDiff === 0
? LIST_STATUS.EQUAL ? ListStatus.EQUAL
: options.considerMoveAsUpdate : options.considerMoveAsUpdate
? LIST_STATUS.UPDATED ? ListStatus.UPDATED
: LIST_STATUS.MOVED, : ListStatus.MOVED,
}, },
options, options,
); );
@ -82,7 +80,7 @@ async function getDiffChunks<T extends Record<string, unknown>>(
prevIndex: currentPrevIndex, prevIndex: currentPrevIndex,
newIndex: relatedChunk.index, newIndex: relatedChunk.index,
indexDiff, indexDiff,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
options, options,
); );
@ -97,7 +95,7 @@ async function getDiffChunks<T extends Record<string, unknown>>(
const { isValid, message } = isDataValid( const { isValid, message } = isDataValid(
chunk, chunk,
referenceProperty, referenceProperty,
"nextList", ListType.NEXT,
); );
if (!isValid) { if (!isValid) {
emitter.emit(StreamEvent.Error, new Error(message)); emitter.emit(StreamEvent.Error, new Error(message));
@ -122,10 +120,10 @@ async function getDiffChunks<T extends Record<string, unknown>>(
indexDiff, indexDiff,
status: status:
indexDiff === 0 indexDiff === 0
? LIST_STATUS.EQUAL ? ListStatus.EQUAL
: options.considerMoveAsUpdate : options.considerMoveAsUpdate
? LIST_STATUS.UPDATED ? ListStatus.UPDATED
: LIST_STATUS.MOVED, : ListStatus.MOVED,
}, },
options, options,
); );
@ -137,7 +135,7 @@ async function getDiffChunks<T extends Record<string, unknown>>(
prevIndex: relatedChunk.index, prevIndex: relatedChunk.index,
newIndex: currentNextIndex, newIndex: currentNextIndex,
indexDiff, indexDiff,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
options, options,
); );
@ -169,7 +167,7 @@ async function getDiffChunks<T extends Record<string, unknown>>(
prevIndex: chunk.index, prevIndex: chunk.index,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
options, options,
); );
@ -183,7 +181,7 @@ async function getDiffChunks<T extends Record<string, unknown>>(
prevIndex: null, prevIndex: null,
newIndex: chunk.index, newIndex: chunk.index,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
options, options,
); );
@ -195,7 +193,7 @@ async function getDiffChunks<T extends Record<string, unknown>>(
function getValidStream<T>( function getValidStream<T>(
input: Readable | FilePath | T[], input: Readable | FilePath | T[],
listType: "prevList" | "nextList", listType: ListType,
): Readable { ): Readable {
if (input instanceof Readable) { if (input instanceof Readable) {
return input; return input;
@ -227,10 +225,27 @@ function getValidStream<T>(
}), }),
); );
} }
throw new Error(`Invalid ${listType}. Expected Readable, Array, or File.`); throw new Error(`Invalid ${listType}. Expected Readable, Array, or File.`);
} }
export async function generateStream<T extends Record<string, unknown>>(
prevList: Readable | FilePath | T[],
nextList: Readable | FilePath | T[],
referenceProperty: ReferenceProperty<T>,
options: ListStreamOptions,
emitter: IEmitter<T>,
): Promise<void> {
try {
const [prevStream, nextStream] = await Promise.all([
getValidStream(prevList, ListType.PREV),
getValidStream(nextList, ListType.NEXT),
]);
getDiffChunks(prevStream, nextStream, referenceProperty, emitter, options);
} catch (err) {
return emitter.emit(StreamEvent.Error, err as Error);
}
}
/** /**
* Streams the diff of two object lists * Streams the diff of two object lists
* @param {Readable | FilePath | Record<string, unknown>[]} prevList - The original object list. * @param {Readable | FilePath | Record<string, unknown>[]} prevList - The original object list.
@ -238,29 +253,27 @@ function getValidStream<T>(
* @param {string} referenceProperty - A common property in all the objects of your lists (e.g. `id`) * @param {string} referenceProperty - A common property in all the objects of your lists (e.g. `id`)
* @param {ListStreamOptions} options - Options to refine your output. * @param {ListStreamOptions} options - Options to refine your output.
- `chunksSize`: the number of object diffs returned by each streamed chunk. (e.g. `0` = 1 object diff by chunk, `10` = 10 object diffs by chunk). - `chunksSize`: the number of object diffs returned by each streamed chunk. (e.g. `0` = 1 object diff by chunk, `10` = 10 object diffs by chunk).
- `showOnly`: returns only the values whose status you are interested in. (e.g. `["added", "equal"]`) - `showOnly`: returns only the values whose status you are interested in. (e.g. `["added", "equal"]`).
- `considerMoveAsUpdate`: if set to `true` a `moved` object will be considered as `updated` - `considerMoveAsUpdate`: if set to `true` a `moved` object will be considered as `updated`.
- `useWorker`: if set to `true`, the diff will be run in a worker. Recommended for maximum performance, `true` by default.
- `showWarnings`: if set to `true`, potential warnings will be displayed in the console.
* @returns StreamListener * @returns StreamListener
*/ */
export function streamListDiff<T extends Record<string, unknown>>( export function streamListDiff<T extends Record<string, unknown>>(
prevStream: Readable | FilePath | T[], prevList: Readable | FilePath | T[],
nextStream: Readable | FilePath | T[], nextList: Readable | FilePath | T[],
referenceProperty: ReferenceProperty<T>, referenceProperty: ReferenceProperty<T>,
options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS, options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS,
): StreamListener<T> { ): StreamListener<T> {
const emitter = new EventEmitter<EmitterEvents<T>>(); const emitter = new EventEmitter<EmitterEvents<T>>();
setTimeout(async () => { if (typeof Worker === "undefined" || !options.useWorker) {
try { setTimeout(
await getDiffChunks( () =>
getValidStream(prevStream, "prevList"), generateStream(prevList, nextList, referenceProperty, options, emitter),
getValidStream(nextStream, "nextList"), 0,
referenceProperty, );
emitter, } else {
options, generateWorker(prevList, nextList, referenceProperty, options, emitter);
); }
} catch (err) {
return emitter.emit(StreamEvent.Error, err as Error);
}
}, 0);
return emitter as StreamListener<T>; return emitter as StreamListener<T>;
} }

149
src/lib/stream-list-diff/server/stream-list-diff.test.ts

@ -1,6 +1,6 @@
import path from "path"; import path from "path";
import { Readable } from "stream"; import { Readable } from "stream";
import { LIST_STATUS } from "@models/list"; import { ListStatus } from "@models/list";
import { StreamListDiff } from "@models/stream"; import { StreamListDiff } from "@models/stream";
import { streamListDiff } from "."; import { streamListDiff } from ".";
@ -10,7 +10,10 @@ describe("data emission", () => {
{ id: 1, name: "Item 1" }, { id: 1, name: "Item 1" },
{ id: 2, name: "Item 2" }, { id: 2, name: "Item 2" },
]; ];
const diff = streamListDiff([], nextList, "id", { chunksSize: 2 }); const diff = streamListDiff([], nextList, "id", {
chunksSize: 2,
useWorker: false,
});
const expectedChunks = [ const expectedChunks = [
{ {
@ -19,7 +22,7 @@ describe("data emission", () => {
prevIndex: null, prevIndex: null,
newIndex: 0, newIndex: 0,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
{ {
previousValue: null, previousValue: null,
@ -27,7 +30,7 @@ describe("data emission", () => {
prevIndex: null, prevIndex: null,
newIndex: 1, newIndex: 1,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
]; ];
let chunkCount = 0; let chunkCount = 0;
@ -45,7 +48,10 @@ describe("data emission", () => {
{ id: 1, name: "Item 1" }, { id: 1, name: "Item 1" },
{ id: 2, name: "Item 2" }, { id: 2, name: "Item 2" },
]; ];
const diff = streamListDiff(prevList, [], "id", { chunksSize: 2 }); const diff = streamListDiff(prevList, [], "id", {
chunksSize: 2,
useWorker: false,
});
const expectedChunks = [ const expectedChunks = [
{ {
@ -54,7 +60,7 @@ describe("data emission", () => {
prevIndex: 0, prevIndex: 0,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
{ {
previousValue: { id: 2, name: "Item 2" }, previousValue: { id: 2, name: "Item 2" },
@ -62,7 +68,7 @@ describe("data emission", () => {
prevIndex: 1, prevIndex: 1,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
]; ];
let chunkCount = 0; let chunkCount = 0;
@ -70,7 +76,7 @@ describe("data emission", () => {
expect(chunk).toStrictEqual(expectedChunks); expect(chunk).toStrictEqual(expectedChunks);
chunkCount++; chunkCount++;
}); });
diff.on("error", (err) => console.error("shiiiite", err)); diff.on("error", (err) => console.error(err));
diff.on("finish", () => { diff.on("finish", () => {
expect(chunkCount).toBe(1); expect(chunkCount).toBe(1);
done(); done();
@ -85,7 +91,7 @@ describe("data emission", () => {
{ id: 2, name: "Item 2" }, { id: 2, name: "Item 2" },
{ id: 3, name: "Item 3" }, { id: 3, name: "Item 3" },
]; ];
const diff = streamListDiff(prevList, nextList, "id"); const diff = streamListDiff(prevList, nextList, "id", { useWorker: false });
const expectedChunks = [ const expectedChunks = [
[ [
@ -95,7 +101,7 @@ describe("data emission", () => {
prevIndex: 1, prevIndex: 1,
newIndex: 0, newIndex: 0,
indexDiff: -1, indexDiff: -1,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
], ],
[ [
@ -105,7 +111,7 @@ describe("data emission", () => {
prevIndex: 0, prevIndex: 0,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
], ],
[ [
@ -115,7 +121,7 @@ describe("data emission", () => {
prevIndex: null, prevIndex: null,
newIndex: 1, newIndex: 1,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
], ],
]; ];
@ -158,6 +164,7 @@ describe("data emission", () => {
]; ];
const diff = streamListDiff(prevList, nextList, "id", { const diff = streamListDiff(prevList, nextList, "id", {
chunksSize: 5, chunksSize: 5,
useWorker: false,
}); });
const expectedChunks = [ const expectedChunks = [
@ -168,7 +175,7 @@ describe("data emission", () => {
prevIndex: 0, prevIndex: 0,
newIndex: 0, newIndex: 0,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 2, name: "Item 2" }, previousValue: { id: 2, name: "Item 2" },
@ -176,7 +183,7 @@ describe("data emission", () => {
prevIndex: 1, prevIndex: 1,
newIndex: 1, newIndex: 1,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
{ {
previousValue: { id: 3, name: "Item 3" }, previousValue: { id: 3, name: "Item 3" },
@ -184,7 +191,7 @@ describe("data emission", () => {
prevIndex: 2, prevIndex: 2,
newIndex: 2, newIndex: 2,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 5, name: "Item 5" }, previousValue: { id: 5, name: "Item 5" },
@ -192,7 +199,7 @@ describe("data emission", () => {
prevIndex: 4, prevIndex: 4,
newIndex: 3, newIndex: 3,
indexDiff: -1, indexDiff: -1,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
{ {
previousValue: { id: 6, name: "Item 6" }, previousValue: { id: 6, name: "Item 6" },
@ -200,7 +207,7 @@ describe("data emission", () => {
prevIndex: 5, prevIndex: 5,
newIndex: 4, newIndex: 4,
indexDiff: -1, indexDiff: -1,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
], ],
[ [
@ -210,7 +217,7 @@ describe("data emission", () => {
prevIndex: 6, prevIndex: 6,
newIndex: 5, newIndex: 5,
indexDiff: -1, indexDiff: -1,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
{ {
previousValue: { id: 9, name: "Item 9" }, previousValue: { id: 9, name: "Item 9" },
@ -218,7 +225,7 @@ describe("data emission", () => {
prevIndex: 8, prevIndex: 8,
newIndex: 8, newIndex: 8,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 10, name: "Item 10" }, previousValue: { id: 10, name: "Item 10" },
@ -226,7 +233,7 @@ describe("data emission", () => {
prevIndex: 9, prevIndex: 9,
newIndex: 6, newIndex: 6,
indexDiff: -3, indexDiff: -3,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
{ {
previousValue: { id: 8, name: "Item 8" }, previousValue: { id: 8, name: "Item 8" },
@ -234,7 +241,7 @@ describe("data emission", () => {
prevIndex: 7, prevIndex: 7,
newIndex: 9, newIndex: 9,
indexDiff: 2, indexDiff: 2,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
{ {
previousValue: { id: 4, name: "Item 4" }, previousValue: { id: 4, name: "Item 4" },
@ -242,7 +249,7 @@ describe("data emission", () => {
prevIndex: 3, prevIndex: 3,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
], ],
[ [
@ -252,7 +259,7 @@ describe("data emission", () => {
prevIndex: null, prevIndex: null,
newIndex: 7, newIndex: 7,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
], ],
]; ];
@ -286,6 +293,7 @@ describe("data emission", () => {
const diff = streamListDiff(prevList, nextList, "id", { const diff = streamListDiff(prevList, nextList, "id", {
chunksSize: 5, chunksSize: 5,
useWorker: false,
}); });
const expectedChunks = [ const expectedChunks = [
@ -295,7 +303,7 @@ describe("data emission", () => {
prevIndex: 0, prevIndex: 0,
newIndex: 0, newIndex: 0,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 2, name: "Item 2" }, previousValue: { id: 2, name: "Item 2" },
@ -303,7 +311,7 @@ describe("data emission", () => {
prevIndex: 1, prevIndex: 1,
newIndex: 1, newIndex: 1,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
{ {
previousValue: { id: 3, name: "Item 3" }, previousValue: { id: 3, name: "Item 3" },
@ -311,7 +319,7 @@ describe("data emission", () => {
prevIndex: 2, prevIndex: 2,
newIndex: 2, newIndex: 2,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 4, name: "Item 4" }, previousValue: { id: 4, name: "Item 4" },
@ -319,7 +327,7 @@ describe("data emission", () => {
prevIndex: 3, prevIndex: 3,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
{ {
previousValue: null, previousValue: null,
@ -327,7 +335,7 @@ describe("data emission", () => {
prevIndex: null, prevIndex: null,
newIndex: 3, newIndex: 3,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
]; ];
@ -336,7 +344,7 @@ describe("data emission", () => {
expect(chunk).toStrictEqual(expectedChunks); expect(chunk).toStrictEqual(expectedChunks);
chunkCount++; chunkCount++;
}); });
diff.on("error", (err) => console.error("sheeeet", err)); diff.on("error", (err) => console.error(err));
diff.on("finish", () => { diff.on("finish", () => {
expect(chunkCount).toBe(1); expect(chunkCount).toBe(1);
done(); done();
@ -358,6 +366,7 @@ describe("data emission", () => {
const diff = streamListDiff(prevList, nextList, "id", { const diff = streamListDiff(prevList, nextList, "id", {
chunksSize: 5, chunksSize: 5,
considerMoveAsUpdate: true, considerMoveAsUpdate: true,
useWorker: false,
}); });
const expectedChunks = [ const expectedChunks = [
@ -367,7 +376,7 @@ describe("data emission", () => {
prevIndex: 1, prevIndex: 1,
newIndex: 0, newIndex: 0,
indexDiff: -1, indexDiff: -1,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
{ {
previousValue: { id: 1, name: "Item 1" }, previousValue: { id: 1, name: "Item 1" },
@ -375,7 +384,7 @@ describe("data emission", () => {
prevIndex: 0, prevIndex: 0,
newIndex: 1, newIndex: 1,
indexDiff: 1, indexDiff: 1,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
{ {
previousValue: { id: 3, name: "Item 3" }, previousValue: { id: 3, name: "Item 3" },
@ -383,7 +392,7 @@ describe("data emission", () => {
prevIndex: 2, prevIndex: 2,
newIndex: 2, newIndex: 2,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 4, name: "Item 4" }, previousValue: { id: 4, name: "Item 4" },
@ -391,7 +400,7 @@ describe("data emission", () => {
prevIndex: 3, prevIndex: 3,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
{ {
previousValue: null, previousValue: null,
@ -399,7 +408,7 @@ describe("data emission", () => {
prevIndex: null, prevIndex: null,
newIndex: 3, newIndex: 3,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
]; ];
@ -430,6 +439,7 @@ describe("data emission", () => {
const diff = streamListDiff(prevList, nextList, "id", { const diff = streamListDiff(prevList, nextList, "id", {
chunksSize: 5, chunksSize: 5,
showOnly: ["added", "deleted"], showOnly: ["added", "deleted"],
useWorker: false,
}); });
const expectedChunks = [ const expectedChunks = [
@ -439,7 +449,7 @@ describe("data emission", () => {
prevIndex: 3, prevIndex: 3,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
{ {
previousValue: null, previousValue: null,
@ -447,7 +457,7 @@ describe("data emission", () => {
prevIndex: null, prevIndex: null,
newIndex: 3, newIndex: 3,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
]; ];
@ -517,6 +527,7 @@ describe("data emission", () => {
]; ];
const diff = streamListDiff(prevList, nextList, "id", { const diff = streamListDiff(prevList, nextList, "id", {
chunksSize: 5, chunksSize: 5,
useWorker: false,
}); });
const expectedChunks = [ const expectedChunks = [
@ -535,7 +546,7 @@ describe("data emission", () => {
prevIndex: 0, prevIndex: 0,
newIndex: 0, newIndex: 0,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 2, name: "Item 2" }, previousValue: { id: 2, name: "Item 2" },
@ -543,7 +554,7 @@ describe("data emission", () => {
prevIndex: 1, prevIndex: 1,
newIndex: 1, newIndex: 1,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
{ {
previousValue: { previousValue: {
@ -559,7 +570,7 @@ describe("data emission", () => {
prevIndex: 2, prevIndex: 2,
newIndex: 2, newIndex: 2,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 5, name: "Item 5" }, previousValue: { id: 5, name: "Item 5" },
@ -567,7 +578,7 @@ describe("data emission", () => {
prevIndex: 4, prevIndex: 4,
newIndex: 3, newIndex: 3,
indexDiff: -1, indexDiff: -1,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
{ {
previousValue: { previousValue: {
@ -583,7 +594,7 @@ describe("data emission", () => {
prevIndex: 5, prevIndex: 5,
newIndex: 4, newIndex: 4,
indexDiff: -1, indexDiff: -1,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
], ],
[ [
@ -593,7 +604,7 @@ describe("data emission", () => {
prevIndex: 6, prevIndex: 6,
newIndex: 5, newIndex: 5,
indexDiff: -1, indexDiff: -1,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
{ {
previousValue: { id: 9, name: "Item 9" }, previousValue: { id: 9, name: "Item 9" },
@ -601,7 +612,7 @@ describe("data emission", () => {
prevIndex: 8, prevIndex: 8,
newIndex: 8, newIndex: 8,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { previousValue: {
@ -625,7 +636,7 @@ describe("data emission", () => {
prevIndex: 9, prevIndex: 9,
newIndex: 6, newIndex: 6,
indexDiff: -3, indexDiff: -3,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
{ {
previousValue: { id: 8, name: "Item 8" }, previousValue: { id: 8, name: "Item 8" },
@ -633,7 +644,7 @@ describe("data emission", () => {
prevIndex: 7, prevIndex: 7,
newIndex: 9, newIndex: 9,
indexDiff: 2, indexDiff: 2,
status: LIST_STATUS.MOVED, status: ListStatus.MOVED,
}, },
{ {
previousValue: { previousValue: {
@ -645,7 +656,7 @@ describe("data emission", () => {
prevIndex: 3, prevIndex: 3,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
], ],
[ [
@ -655,7 +666,7 @@ describe("data emission", () => {
prevIndex: null, prevIndex: null,
newIndex: 7, newIndex: 7,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
], ],
]; ];
@ -694,7 +705,7 @@ describe("input handling", () => {
prevIndex: 0, prevIndex: 0,
newIndex: 0, newIndex: 0,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 2, name: "Item 2" }, previousValue: { id: 2, name: "Item 2" },
@ -702,7 +713,7 @@ describe("input handling", () => {
prevIndex: 1, prevIndex: 1,
newIndex: 1, newIndex: 1,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.UPDATED, status: ListStatus.UPDATED,
}, },
{ {
previousValue: { id: 3, name: "Item 3" }, previousValue: { id: 3, name: "Item 3" },
@ -710,7 +721,7 @@ describe("input handling", () => {
prevIndex: 2, prevIndex: 2,
newIndex: 2, newIndex: 2,
indexDiff: 0, indexDiff: 0,
status: LIST_STATUS.EQUAL, status: ListStatus.EQUAL,
}, },
{ {
previousValue: { id: 4, name: "Item 4" }, previousValue: { id: 4, name: "Item 4" },
@ -718,7 +729,7 @@ describe("input handling", () => {
prevIndex: 3, prevIndex: 3,
newIndex: null, newIndex: null,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.DELETED, status: ListStatus.DELETED,
}, },
{ {
previousValue: null, previousValue: null,
@ -726,7 +737,7 @@ describe("input handling", () => {
prevIndex: null, prevIndex: null,
newIndex: 3, newIndex: 3,
indexDiff: null, indexDiff: null,
status: LIST_STATUS.ADDED, status: ListStatus.ADDED,
}, },
]; ];
@ -736,6 +747,7 @@ describe("input handling", () => {
const diff = streamListDiff(prevStream, nextStream, "id", { const diff = streamListDiff(prevStream, nextStream, "id", {
chunksSize: 5, chunksSize: 5,
useWorker: false,
}); });
let chunkCount = 0; let chunkCount = 0;
@ -743,7 +755,7 @@ describe("input handling", () => {
expect(chunk).toStrictEqual(expectedChunks); expect(chunk).toStrictEqual(expectedChunks);
chunkCount++; chunkCount++;
}); });
diff.on("error", (err) => console.error("sheeeet", err)); diff.on("error", (err) => console.error(err));
diff.on("finish", () => { diff.on("finish", () => {
expect(chunkCount).toBe(1); expect(chunkCount).toBe(1);
done(); done();
@ -755,6 +767,7 @@ describe("input handling", () => {
const diff = streamListDiff(prevFile, nextFile, "id", { const diff = streamListDiff(prevFile, nextFile, "id", {
chunksSize: 5, chunksSize: 5,
useWorker: false,
}); });
let chunkCount = 0; let chunkCount = 0;
@ -762,7 +775,7 @@ describe("input handling", () => {
expect(chunk).toStrictEqual(expectedChunks); expect(chunk).toStrictEqual(expectedChunks);
chunkCount++; chunkCount++;
}); });
diff.on("error", (err) => console.error("sheeeet", err)); diff.on("error", (err) => console.error(err));
diff.on("finish", () => { diff.on("finish", () => {
expect(chunkCount).toBe(1); expect(chunkCount).toBe(1);
done(); done();
@ -774,6 +787,7 @@ describe("input handling", () => {
const diff = streamListDiff(prevStream, nextFile, "id", { const diff = streamListDiff(prevStream, nextFile, "id", {
chunksSize: 5, chunksSize: 5,
useWorker: false,
}); });
let chunkCount = 0; let chunkCount = 0;
@ -781,7 +795,7 @@ describe("input handling", () => {
expect(chunk).toStrictEqual(expectedChunks); expect(chunk).toStrictEqual(expectedChunks);
chunkCount++; chunkCount++;
}); });
diff.on("error", (err) => console.error("sheeeet", err)); diff.on("error", (err) => console.error(err));
diff.on("finish", () => { diff.on("finish", () => {
expect(chunkCount).toBe(1); expect(chunkCount).toBe(1);
done(); done();
@ -792,6 +806,7 @@ describe("input handling", () => {
const diff = streamListDiff(prevStream, nextList, "id", { const diff = streamListDiff(prevStream, nextList, "id", {
chunksSize: 5, chunksSize: 5,
useWorker: false,
}); });
let chunkCount = 0; let chunkCount = 0;
@ -799,7 +814,7 @@ describe("input handling", () => {
expect(chunk).toStrictEqual(expectedChunks); expect(chunk).toStrictEqual(expectedChunks);
chunkCount++; chunkCount++;
}); });
diff.on("error", (err) => console.error("sheeeet", err)); diff.on("error", (err) => console.error(err));
diff.on("finish", () => { diff.on("finish", () => {
expect(chunkCount).toBe(1); expect(chunkCount).toBe(1);
done(); done();
@ -810,6 +825,7 @@ describe("input handling", () => {
const diff = streamListDiff(prevFile, nextList, "id", { const diff = streamListDiff(prevFile, nextList, "id", {
chunksSize: 5, chunksSize: 5,
useWorker: false,
}); });
let chunkCount = 0; let chunkCount = 0;
@ -817,7 +833,7 @@ describe("input handling", () => {
expect(chunk).toStrictEqual(expectedChunks); expect(chunk).toStrictEqual(expectedChunks);
chunkCount++; chunkCount++;
}); });
diff.on("error", (err) => console.error("sheeeet", err)); diff.on("error", (err) => console.error(err));
diff.on("finish", () => { diff.on("finish", () => {
expect(chunkCount).toBe(1); expect(chunkCount).toBe(1);
done(); done();
@ -827,7 +843,7 @@ describe("input handling", () => {
describe("finish event", () => { describe("finish event", () => {
it("emits 'finish' event if no prevList nor nextList is provided", (done) => { it("emits 'finish' event if no prevList nor nextList is provided", (done) => {
const diff = streamListDiff([], [], "id"); const diff = streamListDiff([], [], "id", { useWorker: false });
diff.on("finish", () => done()); diff.on("finish", () => done());
}); });
it("emits 'finish' event when all the chunks have been processed", (done) => { it("emits 'finish' event when all the chunks have been processed", (done) => {
@ -839,7 +855,7 @@ describe("finish event", () => {
{ id: 2, name: "Item 2" }, { id: 2, name: "Item 2" },
{ id: 3, name: "Item 3" }, { id: 3, name: "Item 3" },
]; ];
const diff = streamListDiff(prevList, nextList, "id"); const diff = streamListDiff(prevList, nextList, "id", { useWorker: false });
diff.on("finish", () => done()); diff.on("finish", () => done());
}); });
}); });
@ -857,7 +873,7 @@ describe("error event", () => {
]; ];
// @ts-expect-error prevList is invalid by design for the test // @ts-expect-error prevList is invalid by design for the test
const diff = streamListDiff(prevList, nextList, "id"); const diff = streamListDiff(prevList, nextList, "id", { useWorker: false });
diff.on("error", (err) => { diff.on("error", (err) => {
expect(err["message"]).toEqual( expect(err["message"]).toEqual(
@ -879,7 +895,7 @@ describe("error event", () => {
]; ];
// @ts-expect-error nextList is invalid by design for the test // @ts-expect-error nextList is invalid by design for the test
const diff = streamListDiff(prevList, nextList, "id"); const diff = streamListDiff(prevList, nextList, "id", { useWorker: false });
diff.on("error", (err) => { diff.on("error", (err) => {
expect(err["message"]).toEqual( expect(err["message"]).toEqual(
@ -896,7 +912,7 @@ describe("error event", () => {
{ id: 2, name: "Item 2" }, { id: 2, name: "Item 2" },
]; ];
const diff = streamListDiff(prevList, nextList, "id"); const diff = streamListDiff(prevList, nextList, "id", { useWorker: false });
diff.on("error", (err) => { diff.on("error", (err) => {
expect(err["message"]).toEqual( expect(err["message"]).toEqual(
@ -913,7 +929,7 @@ describe("error event", () => {
]; ];
const nextList = [{ id: 1, name: "Item 1" }, { name: "Item 2" }]; const nextList = [{ id: 1, name: "Item 1" }, { name: "Item 2" }];
const diff = streamListDiff(prevList, nextList, "id"); const diff = streamListDiff(prevList, nextList, "id", { useWorker: false });
diff.on("error", (err) => { diff.on("error", (err) => {
expect(err["message"]).toEqual( expect(err["message"]).toEqual(
@ -932,6 +948,7 @@ describe("error event", () => {
const diff = streamListDiff(prevList, nextList, "id", { const diff = streamListDiff(prevList, nextList, "id", {
chunksSize: -3, chunksSize: -3,
useWorker: false,
}); });
diff.on("error", (err) => { diff.on("error", (err) => {
@ -946,7 +963,9 @@ describe("error event", () => {
const nextList = [{ id: 1, name: "Item 1" }, { name: "Item 2" }]; const nextList = [{ id: 1, name: "Item 1" }, { name: "Item 2" }];
// @ts-expect-error - prevList is invalid by design for the test // @ts-expect-error - prevList is invalid by design for the test
const diff = streamListDiff({ name: "hello" }, nextList, "id"); const diff = streamListDiff({ name: "hello" }, nextList, "id", {
useWorker: false,
});
diff.on("error", (err) => { diff.on("error", (err) => {
expect(err["message"]).toEqual( expect(err["message"]).toEqual(
@ -959,7 +978,7 @@ describe("error event", () => {
const prevList = [{ id: 1, name: "Item 1" }, { name: "Item 2" }]; const prevList = [{ id: 1, name: "Item 1" }, { name: "Item 2" }];
// @ts-expect-error - nextList is invalid by design for the test // @ts-expect-error - nextList is invalid by design for the test
const diff = streamListDiff(prevList, null, "id"); const diff = streamListDiff(prevList, null, "id", { useWorker: false });
diff.on("error", (err) => { diff.on("error", (err) => {
expect(err["message"]).toEqual( expect(err["message"]).toEqual(

1048
src/lib/stream-list-diff/server/stream-list-diff.worker.test.ts

File diff suppressed because it is too large Load Diff

38
src/lib/stream-list-diff/server/worker/node-worker.ts

@ -0,0 +1,38 @@
import { parentPort } from "worker_threads";
import {
FilePath,
ListStreamOptions,
ReferenceProperty,
StreamEvent,
} from "@models/stream";
import { WorkerEvent } from "@models/worker";
import { workerDiff } from "./utils";
parentPort?.on(
WorkerEvent.Message,
async <T extends Record<string, unknown>>(event: {
prevList: FilePath | T[];
nextList: FilePath | T[];
referenceProperty: ReferenceProperty<T>;
options: ListStreamOptions;
}) => {
const { prevList, nextList, referenceProperty, options } = event;
const listener = workerDiff(prevList, nextList, referenceProperty, options);
listener.on(StreamEvent.Data, (chunk) => {
parentPort?.postMessage({ event: StreamEvent.Data, chunk });
});
listener.on(StreamEvent.Finish, () => {
parentPort?.postMessage({ event: StreamEvent.Finish });
});
listener.on(StreamEvent.Error, (error) => {
parentPort?.postMessage({
event: StreamEvent.Error,
error: error.message,
});
});
},
);

78
src/lib/stream-list-diff/server/worker/utils.ts

@ -0,0 +1,78 @@
import path from "path";
import { once, Readable } from "stream";
import { Worker } from "worker_threads";
import { IEmitter, EmitterEvents, EventEmitter } from "@models/emitter";
import {
FilePath,
ListStreamOptions,
READABLE_STREAM_ALERT,
ReferenceProperty,
StreamEvent,
StreamListener,
} from "@models/stream";
import { NodeWorkerMessage, WorkerEvent } from "@models/worker";
import { generateStream } from "..";
async function getArrayFromStream<T>(
stream: Readable,
showWarnings: boolean = true,
): Promise<T[]> {
if (showWarnings) {
console.warn(READABLE_STREAM_ALERT);
}
const data: T[] = [];
stream.on(StreamEvent.Data, (chunk) => data.push(chunk));
await once(stream, "end");
return data;
}
export async function generateWorker<T extends Record<string, unknown>>(
prevList: Readable | FilePath | T[],
nextList: Readable | FilePath | T[],
referenceProperty: ReferenceProperty<T>,
options: ListStreamOptions,
emitter: IEmitter<T>,
) {
try {
if (prevList instanceof Readable) {
prevList = await getArrayFromStream(prevList, options?.showWarnings);
}
if (nextList instanceof Readable) {
nextList = await getArrayFromStream(nextList, options?.showWarnings);
}
const worker = new Worker(path.resolve(__dirname, "./node-worker.cjs"));
worker.postMessage({ prevList, nextList, referenceProperty, options });
worker.on(WorkerEvent.Message, (e: NodeWorkerMessage<T>) => {
const { event, chunk, error } = e;
if (event === StreamEvent.Data) {
emitter.emit(StreamEvent.Data, chunk);
} else if (event === StreamEvent.Finish) {
emitter.emit(StreamEvent.Finish);
worker.terminate();
} else if (event === StreamEvent.Error) {
emitter.emit(StreamEvent.Error, new Error(error));
worker.terminate();
}
});
worker.on(WorkerEvent.Error, (err) =>
emitter.emit(StreamEvent.Error, new Error(err.message)),
);
} catch (err) {
return emitter.emit(StreamEvent.Error, err as Error);
}
}
export function workerDiff<T extends Record<string, unknown>>(
prevList: FilePath | T[],
nextList: FilePath | T[],
referenceProperty: ReferenceProperty<T>,
options: ListStreamOptions,
): StreamListener<T> {
const emitter = new EventEmitter<EmitterEvents<T>>();
setTimeout(
() =>
generateStream(prevList, nextList, referenceProperty, options, emitter),
0,
);
return emitter as StreamListener<T>;
}

8
src/lib/stream-list-diff/utils.ts

@ -1,10 +1,12 @@
import { isObject } from "@lib/utils"; import { isObject } from "@lib/utils";
import { IEmitter } from "@models/emitter";
import { ListType } from "@models/list";
import { import {
ListStreamOptions, ListStreamOptions,
ReferenceProperty, ReferenceProperty,
StreamEvent,
StreamListDiff, StreamListDiff,
} from "@models/stream"; } from "@models/stream";
import { Emitter, StreamEvent } from "./emitter";
export function isValidChunkSize( export function isValidChunkSize(
chunksSize: ListStreamOptions["chunksSize"], chunksSize: ListStreamOptions["chunksSize"],
@ -17,7 +19,7 @@ export function isValidChunkSize(
export function isDataValid<T extends Record<string, unknown>>( export function isDataValid<T extends Record<string, unknown>>(
data: T, data: T,
referenceProperty: ReferenceProperty<T>, referenceProperty: ReferenceProperty<T>,
listType: "prevList" | "nextList", listType: ListType,
): { isValid: boolean; message?: string } { ): { isValid: boolean; message?: string } {
if (!isObject(data)) { if (!isObject(data)) {
return { return {
@ -38,7 +40,7 @@ export function isDataValid<T extends Record<string, unknown>>(
} }
export function outputDiffChunk<T extends Record<string, unknown>>( export function outputDiffChunk<T extends Record<string, unknown>>(
emitter: Emitter<T>, emitter: IEmitter<T>,
) { ) {
let chunks: StreamListDiff<T>[] = []; let chunks: StreamListDiff<T>[] = [];

27
src/lib/stream-list-diff/emitter.ts → src/models/emitter/index.ts

@ -1,14 +1,14 @@
import { StreamListDiff } from "@models/stream"; import { StreamListDiff } from "@models/stream";
type Listener<T extends unknown[]> = (...args: T) => void; export type Listener<T extends unknown[]> = (...args: T) => void;
export enum StreamEvent { export type EmitterEvents<T extends Record<string, unknown>> = {
Data = "data", data: [StreamListDiff<T>[]];
Finish = "finish", error: [Error];
Error = "error", finish: [];
} };
export type Emitter<T extends Record<string, unknown>> = EventEmitter<{ export type IEmitter<T extends Record<string, unknown>> = EventEmitter<{
data: [StreamListDiff<T>[]]; data: [StreamListDiff<T>[]];
error: [Error]; error: [Error];
finish: []; finish: [];
@ -31,16 +31,3 @@ export class EventEmitter<Events extends Record<string, unknown[]>> {
} }
} }
} }
export type EmitterEvents<T extends Record<string, unknown>> = {
data: [StreamListDiff<T>[]];
error: [Error];
finish: [];
};
export interface StreamListener<T extends Record<string, unknown>> {
on<E extends keyof EmitterEvents<T>>(
event: E,
listener: Listener<EmitterEvents<T>[E]>,
): this;
}

27
src/models/list/index.ts

@ -1,4 +1,11 @@
export enum LIST_STATUS { export const DEFAULT_LIST_DIFF_OPTIONS = {
showOnly: [],
referenceProperty: undefined,
considerMoveAsUpdate: false,
ignoreArrayOrder: false,
};
export enum ListStatus {
ADDED = "added", ADDED = "added",
EQUAL = "equal", EQUAL = "equal",
DELETED = "deleted", DELETED = "deleted",
@ -6,28 +13,26 @@ export enum LIST_STATUS {
MOVED = "moved", MOVED = "moved",
} }
export enum ListType {
PREV = "prevList",
NEXT = "nextList",
}
export type ListDiffOptions = { export type ListDiffOptions = {
showOnly?: `${LIST_STATUS}`[]; showOnly?: `${ListStatus}`[];
referenceProperty?: string; referenceProperty?: string;
considerMoveAsUpdate?: boolean; considerMoveAsUpdate?: boolean;
ignoreArrayOrder?: boolean; ignoreArrayOrder?: boolean;
}; };
export const DEFAULT_LIST_DIFF_OPTIONS = {
showOnly: [],
referenceProperty: undefined,
considerMoveAsUpdate: false,
ignoreArrayOrder: false,
};
export type ListDiff = { export type ListDiff = {
type: "list"; type: "list";
status: LIST_STATUS; status: `${ListStatus}`;
diff: { diff: {
value: unknown; value: unknown;
prevIndex: number | null; prevIndex: number | null;
newIndex: number | null; newIndex: number | null;
indexDiff: number | null; indexDiff: number | null;
status: LIST_STATUS; status: ListStatus;
}[]; }[];
}; };

14
src/models/object/index.ts

@ -1,11 +1,11 @@
export enum OBJECT_STATUS { export enum ObjectStatus {
ADDED = "added", ADDED = "added",
EQUAL = "equal", EQUAL = "equal",
DELETED = "deleted", DELETED = "deleted",
UPDATED = "updated", UPDATED = "updated",
} }
export enum GRANULARITY { export enum Granularity {
BASIC = "basic", BASIC = "basic",
DEEP = "deep", DEEP = "deep",
} }
@ -15,14 +15,14 @@ export type ObjectData = Record<string, unknown> | undefined | null;
export type ObjectDiffOptions = { export type ObjectDiffOptions = {
ignoreArrayOrder?: boolean; ignoreArrayOrder?: boolean;
showOnly?: { showOnly?: {
statuses: `${OBJECT_STATUS}`[]; statuses: `${ObjectStatus}`[];
granularity?: `${GRANULARITY}`; granularity?: `${Granularity}`;
}; };
}; };
export const DEFAULT_OBJECT_DIFF_OPTIONS = { export const DEFAULT_OBJECT_DIFF_OPTIONS = {
ignoreArrayOrder: false, ignoreArrayOrder: false,
showOnly: { statuses: [], granularity: GRANULARITY.BASIC }, showOnly: { statuses: [], granularity: Granularity.BASIC },
}; };
/** recursive diff in case of subproperties */ /** recursive diff in case of subproperties */
@ -30,12 +30,12 @@ export type Diff = {
property: string; property: string;
previousValue: unknown; previousValue: unknown;
currentValue: unknown; currentValue: unknown;
status: OBJECT_STATUS; status: `${ObjectStatus}`;
diff?: Diff[]; diff?: Diff[];
}; };
export type ObjectDiff = { export type ObjectDiff = {
type: "object"; type: "object";
status: OBJECT_STATUS; status: `${ObjectStatus}`;
diff: Diff[]; diff: Diff[];
}; };

36
src/models/stream/index.ts

@ -1,4 +1,21 @@
import { LIST_STATUS } from "@models/list"; import { EmitterEvents, Listener } from "@models/emitter";
import { ListStatus } from "@models/list";
export const READABLE_STREAM_ALERT = `Warning: using Readable streams may impact workers' performance since they need to be converted to arrays.
Consider using arrays or files for optimal performance. Alternatively, you can turn the 'useWorker' option off.
To disable this warning, set 'showWarnings' to false in production.`;
export const DEFAULT_LIST_STREAM_OPTIONS: ListStreamOptions = {
chunksSize: 0,
useWorker: true,
showWarnings: true,
};
export enum StreamEvent {
Data = "data",
Finish = "finish",
Error = "error",
}
export type StreamListDiff<T extends Record<string, unknown>> = { export type StreamListDiff<T extends Record<string, unknown>> = {
currentValue: T | null; currentValue: T | null;
@ -6,7 +23,7 @@ export type StreamListDiff<T extends Record<string, unknown>> = {
prevIndex: number | null; prevIndex: number | null;
newIndex: number | null; newIndex: number | null;
indexDiff: number | null; indexDiff: number | null;
status: LIST_STATUS; status: `${ListStatus}`;
}; };
export type ReferenceProperty<T extends Record<string, unknown>> = keyof T; export type ReferenceProperty<T extends Record<string, unknown>> = keyof T;
@ -26,12 +43,17 @@ export type DataBuffer<T extends Record<string, unknown>> = Map<
export type ListStreamOptions = { export type ListStreamOptions = {
chunksSize?: number; // 0 by default. chunksSize?: number; // 0 by default.
showOnly?: `${LIST_STATUS}`[]; showOnly?: `${ListStatus}`[];
considerMoveAsUpdate?: boolean; considerMoveAsUpdate?: boolean;
}; useWorker?: boolean; // true by default
showWarnings?: boolean; // true by default
export const DEFAULT_LIST_STREAM_OPTIONS: ListStreamOptions = {
chunksSize: 0,
}; };
export type FilePath = string; export type FilePath = string;
export interface StreamListener<T extends Record<string, unknown>> {
on<E extends keyof EmitterEvents<T>>(
event: E,
listener: Listener<EmitterEvents<T>[E]>,
): this;
}

22
src/models/worker/index.ts

@ -0,0 +1,22 @@
import { StreamEvent, StreamListDiff } from "@models/stream";
export enum WorkerEvent {
Message = "message",
Error = "error",
}
type WorkerData<T extends Record<string, unknown>> = {
chunk: StreamListDiff<T>[];
error: string;
event: StreamEvent;
};
type WorkerMessage<T extends Record<string, unknown>> = {
data: WorkerData<T>;
};
export type WebWorkerMessage<T extends Record<string, unknown>> =
WorkerMessage<T>;
export type NodeWorkerMessage<T extends Record<string, unknown>> =
WorkerData<T>;

5
tsconfig.json

@ -18,10 +18,9 @@
"skipLibCheck": true , "skipLibCheck": true ,
"baseUrl": ".", "baseUrl": ".",
"paths": { "paths": {
"@models/*": ["./src/models/*"],
"@lib/*": ["./src/lib/*"], "@lib/*": ["./src/lib/*"],
"@mocks/*": ["./src/mocks/*"],
"@models/*": ["./src/models/*"]
} }
}, },
} }

28
tsup.config.ts

@ -15,17 +15,37 @@ export default defineConfig([
format: ["cjs", "esm"], format: ["cjs", "esm"],
...sharedConfig, ...sharedConfig,
platform: "neutral", platform: "neutral",
name: "MAIN",
},
{
entry: ["src/client.ts"],
format: ["esm"],
...sharedConfig,
platform: "browser",
name: "CLIENT",
},
{
entry: ["src/lib/stream-list-diff/client/worker/web-worker.ts"],
format: ["esm"],
...sharedConfig,
splitting: false,
platform: "browser",
name: "WEB WORKER",
}, },
{ {
entry: ["src/server.ts"], entry: ["src/server.ts"],
format: ["cjs", "esm"], format: ["cjs"],
...sharedConfig, ...sharedConfig,
platform: "node", platform: "node",
name: "SERVER",
}, },
{ {
entry: ["src/client.ts"], entry: ["src/lib/stream-list-diff/server/worker/node-worker.ts"],
format: ["cjs", "esm"], format: ["cjs"],
...sharedConfig, ...sharedConfig,
platform: "browser", splitting: false,
shims: false,
platform: "node",
name: "NODEJS WORKER",
}, },
]); ]);

Loading…
Cancel
Save