diff --git a/src/lib/stream-list-diff/emitter.ts b/src/lib/stream-list-diff/emitter.ts index 91dff58..4d27b30 100644 --- a/src/lib/stream-list-diff/emitter.ts +++ b/src/lib/stream-list-diff/emitter.ts @@ -31,3 +31,16 @@ export class EventEmitter> { } } } + +export type EmitterEvents> = { + data: [StreamListsDiff[]]; + error: [Error]; + finish: []; +}; + +export interface ReadOnlyEmitter> { + on>( + event: E, + listener: Listener[E]>, + ): this; +} diff --git a/src/lib/stream-list-diff/index.ts b/src/lib/stream-list-diff/index.ts index 90e1d65..d47c1bc 100644 --- a/src/lib/stream-list-diff/index.ts +++ b/src/lib/stream-list-diff/index.ts @@ -7,7 +7,13 @@ import { } from "@models/stream"; import { LIST_STATUS } from "@models/list"; import { isObject } from "@lib/utils"; -import { Emitter, EventEmitter, StreamEvent } from "./emitter"; +import { + Emitter, + EmitterEvents, + EventEmitter, + ReadOnlyEmitter, + StreamEvent, +} from "./emitter"; function outputDiffChunk>( emitter: Emitter, @@ -133,7 +139,7 @@ function getDiffChunks>( StreamEvent.Error, new Error("Your nextList must only contain valid objects."), ); - emitter.emit(StreamEvent.Finish); + return emitter.emit(StreamEvent.Finish); } nextDiff?.forEach((data, i) => handleDiffChunk(data, i === nextDiff.length - 1, options), @@ -152,7 +158,7 @@ function getDiffChunks>( StreamEvent.Error, new Error("Your prevList must only contain valid objects."), ); - emitter.emit(StreamEvent.Finish); + return emitter.emit(StreamEvent.Finish); } prevDiff?.forEach((data, i) => handleDiffChunk(data, i === prevDiff.length - 1, options), @@ -180,6 +186,8 @@ function getDiffChunks>( } } + const totalChunks = listsReferences.size; + for (let i = 0; i < nextList.length; i++) { const data = nextList[i]; if (data) { @@ -208,7 +216,7 @@ function getDiffChunks>( indexDiff: null, status: LIST_STATUS.ADDED, }, - i === nextList.length - 1, + totalChunks > 0 ? false : i === nextList.length - 1, options, ); } @@ -216,11 +224,11 @@ function getDiffChunks>( } let streamedChunks = 0; - const totalChunks = listsReferences.size; - for (const data of listsReferences.values()) { + for (const [key, data] of listsReferences.entries()) { streamedChunks++; const isLastChunk = totalChunks === streamedChunks; + if (typeof data.nextIndex === "undefined") { handleDiffChunk( { @@ -284,6 +292,7 @@ function getDiffChunks>( ); } } + listsReferences.delete(key); // to free up memory } return emitter.emit(StreamEvent.Finish); @@ -305,12 +314,8 @@ export function streamListsDiff>( nextList: T[], referenceProperty: ReferenceProperty, options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS, -): Emitter { - const emitter = new EventEmitter<{ - data: [StreamListsDiff[]]; - error: [Error]; - finish: []; - }>(); +): ReadOnlyEmitter { + const emitter = new EventEmitter>(); setTimeout(() => { try { getDiffChunks(prevList, nextList, referenceProperty, emitter, options); @@ -318,5 +323,5 @@ export function streamListsDiff>( return emitter.emit(StreamEvent.Error, err as Error); } }, 0); - return emitter; + return emitter as ReadOnlyEmitter; } diff --git a/src/lib/stream-list-diff/stream-list-diff.test.ts b/src/lib/stream-list-diff/stream-list-diff.test.ts index 1a47732..2a171a7 100644 --- a/src/lib/stream-list-diff/stream-list-diff.test.ts +++ b/src/lib/stream-list-diff/stream-list-diff.test.ts @@ -1,5 +1,6 @@ import { LIST_STATUS } from "@models/list"; import { streamListsDiff } from "."; +import { StreamListsDiff } from "@models/stream"; describe("streamListsDiff data", () => { it("emits 'data' event and consider the all the nextList added if no prevList is provided", (done) => { @@ -27,8 +28,15 @@ describe("streamListsDiff data", () => { status: LIST_STATUS.ADDED, }, ]; - diff.on("data", (chunk) => expect(chunk).toStrictEqual(expectedChunks)); - diff.on("finish", () => done()); + let chunkCount = 0; + diff.on("data", (chunk) => { + expect(chunk).toStrictEqual(expectedChunks); + chunkCount++; + }); + diff.on("finish", () => { + expect(chunkCount).toBe(1); + done(); + }); }); it("emits 'data' event and consider the all the prevList deleted if no nextList is provided", (done) => { const prevList = [ @@ -55,8 +63,15 @@ describe("streamListsDiff data", () => { status: LIST_STATUS.DELETED, }, ]; - diff.on("data", (chunk) => expect(chunk).toStrictEqual(expectedChunks)); - diff.on("finish", () => done()); + let chunkCount = 0; + diff.on("data", (chunk) => { + expect(chunk).toStrictEqual(expectedChunks); + chunkCount++; + }); + diff.on("finish", () => { + expect(chunkCount).toBe(1); + done(); + }); }); it("emits 'data' event with one object diff by chunk if chunkSize is 0 or undefined", (done) => { const prevList = [ @@ -108,9 +123,12 @@ describe("streamListsDiff data", () => { expect(chunk).toStrictEqual(expectedChunks[chunkCount]); chunkCount++; }); - diff.on("finish", () => done()); + diff.on("finish", () => { + expect(chunkCount).toBe(3); + done(); + }); }); - it("emits 'data' event with 5 object diff by chunk", (done) => { + it("emits 'data' event with 5 object diff by chunk and return the last object diff in a one entry chunk at the end", (done) => { const prevList = [ { id: 1, name: "Item 1" }, { id: 2, name: "Item 2" }, @@ -221,6 +239,8 @@ describe("streamListsDiff data", () => { indexDiff: 0, status: LIST_STATUS.EQUAL, }, + ], + [ { previousValue: { id: 10, name: "Item 10" }, currentValue: { id: 10, name: "Item 10" }, @@ -230,10 +250,397 @@ describe("streamListsDiff data", () => { status: LIST_STATUS.MOVED, }, ], + ]; + + let chunkCount = 0; + + diff.on("data", (chunk) => { + expect(chunk).toStrictEqual(expectedChunks[chunkCount]); + chunkCount++; + }); + + diff.on("finish", () => { + expect(chunkCount).toBe(3); + done(); + }); + }); + it("emits 'data' event with all the objects diff in a single chunk if the chunkSize is bigger than the provided lists ", (done) => { + const prevList = [ + { id: 1, name: "Item 1" }, + { id: 2, name: "Item 2" }, + { id: 3, name: "Item 3" }, + { id: 4, name: "Item 4" }, + ]; + const nextList = [ + { id: 1, name: "Item 1" }, + { id: 2, name: "Item Two" }, + { id: 3, name: "Item 3" }, + { id: 5, name: "Item 5" }, + ]; + const diff = streamListsDiff(prevList, nextList, "id", { chunksSize: 150 }); + + const expectedChunks = [ + { + previousValue: null, + currentValue: { id: 5, name: "Item 5" }, + prevIndex: null, + newIndex: 3, + indexDiff: null, + status: LIST_STATUS.ADDED, + }, + { + previousValue: { id: 1, name: "Item 1" }, + currentValue: { id: 1, name: "Item 1" }, + prevIndex: 0, + newIndex: 0, + indexDiff: 0, + status: LIST_STATUS.EQUAL, + }, + { + previousValue: { id: 2, name: "Item 2" }, + currentValue: { id: 2, name: "Item Two" }, + prevIndex: 1, + newIndex: 1, + indexDiff: 0, + status: LIST_STATUS.UPDATED, + }, + { + previousValue: { id: 3, name: "Item 3" }, + currentValue: { id: 3, name: "Item 3" }, + prevIndex: 2, + newIndex: 2, + indexDiff: 0, + status: LIST_STATUS.EQUAL, + }, + { + previousValue: { id: 4, name: "Item 4" }, + currentValue: null, + prevIndex: 3, + newIndex: null, + indexDiff: null, + status: LIST_STATUS.DELETED, + }, + ]; + + let chunkCount = 0; + diff.on("data", (chunk) => { + expect(chunk).toStrictEqual(expectedChunks); + chunkCount++; + }); + + diff.on("finish", () => { + expect(chunkCount).toBe(1); + done(); + }); + }); + it("emits 'data' event with moved objects considered as updated if considerMoveAsUpdate is true", (done) => { + const prevList = [ + { id: 1, name: "Item 1" }, + { id: 2, name: "Item 2" }, + { id: 3, name: "Item 3" }, + { id: 4, name: "Item 4" }, + ]; + const nextList = [ + { id: 2, name: "Item Two" }, + { id: 1, name: "Item 1" }, + { id: 3, name: "Item 3" }, + { id: 5, name: "Item 5" }, + ]; + const diff = streamListsDiff(prevList, nextList, "id", { + chunksSize: 5, + considerMoveAsUpdate: true, + }); + + const expectedChunks = [ + { + previousValue: null, + currentValue: { id: 5, name: "Item 5" }, + prevIndex: null, + newIndex: 3, + indexDiff: null, + status: LIST_STATUS.ADDED, + }, + { + previousValue: { id: 1, name: "Item 1" }, + currentValue: { id: 1, name: "Item 1" }, + prevIndex: 0, + newIndex: 1, + indexDiff: 1, + status: LIST_STATUS.UPDATED, + }, + { + previousValue: { id: 2, name: "Item 2" }, + currentValue: { id: 2, name: "Item Two" }, + prevIndex: 1, + newIndex: 0, + indexDiff: -1, + status: LIST_STATUS.UPDATED, + }, + { + previousValue: { id: 3, name: "Item 3" }, + currentValue: { id: 3, name: "Item 3" }, + prevIndex: 2, + newIndex: 2, + indexDiff: 0, + status: LIST_STATUS.EQUAL, + }, + { + previousValue: { id: 4, name: "Item 4" }, + currentValue: null, + prevIndex: 3, + newIndex: null, + indexDiff: null, + status: LIST_STATUS.DELETED, + }, + ]; + + let chunkCount = 0; + diff.on("data", (chunk) => { + expect(chunk).toStrictEqual(expectedChunks); + chunkCount++; + }); + + diff.on("finish", () => { + expect(chunkCount).toBe(1); + done(); + }); + }); + it("emits 'data' event only with objects diff whose status match with showOnly's", (done) => { + const prevList = [ + { id: 1, name: "Item 1" }, + { id: 2, name: "Item 2" }, + { id: 3, name: "Item 3" }, + { id: 4, name: "Item 4" }, + ]; + const nextList = [ + { id: 2, name: "Item Two" }, + { id: 1, name: "Item 1" }, + { id: 3, name: "Item 3" }, + { id: 5, name: "Item 5" }, + ]; + const diff = streamListsDiff(prevList, nextList, "id", { + chunksSize: 5, + showOnly: ["added", "deleted"], + }); + + const expectedChunks = [ + { + previousValue: null, + currentValue: { id: 5, name: "Item 5" }, + prevIndex: null, + newIndex: 3, + indexDiff: null, + status: LIST_STATUS.ADDED, + }, + { + previousValue: { id: 4, name: "Item 4" }, + currentValue: null, + prevIndex: 3, + newIndex: null, + indexDiff: null, + status: LIST_STATUS.DELETED, + }, + ]; + + let chunkCount = 0; + diff.on("data", (chunk) => { + expect(chunk).toStrictEqual(expectedChunks); + chunkCount++; + }); + + diff.on("finish", () => { + expect(chunkCount).toBe(1); + done(); + }); + }); + it("emits 'data' event with deep nested objects diff", (done) => { + const prevList = [ + { + id: 1, + name: "Item 1", + user: { role: "admin", hobbies: ["golf", "football"] }, + }, + { id: 2, name: "Item 2" }, + { id: 3, name: "Item 3", user: { role: "admin", hobbies: ["rugby"] } }, + { + id: 4, + name: "Item 4", + user: { role: "reader", hobbies: ["video games", "fishing"] }, + }, + { id: 5, name: "Item 5" }, + { id: 6, name: "Item 6", user: { role: "root", hobbies: ["coding"] } }, + { id: 7, name: "Item 7" }, + { id: 8, name: "Item 8" }, + { id: 9, name: "Item 9" }, + { + id: 10, + name: "Item 10", + user: { + role: "root", + hobbies: ["coding"], + skills: { driving: true, diving: false }, + }, + }, + ]; + const nextList = [ + { + id: 1, + name: "Item 1", + user: { role: "admin", hobbies: ["golf", "football"] }, + }, + { id: 2, name: "Item Two" }, + { id: 3, name: "Item 3", user: { role: "admin", hobbies: ["rugby"] } }, + { id: 5, name: "Item 5" }, + { id: 6, name: "Item 6", user: { role: "root", hobbies: ["farming"] } }, + { id: 7, name: "Item 7" }, + { + id: 10, + name: "Item 10", + user: { + role: "root", + hobbies: ["coding"], + skills: { driving: true, diving: false }, + }, + }, + { id: 11, name: "Item 11" }, + { id: 9, name: "Item 9" }, + { id: 8, name: "Item 8" }, + ]; + const diff = streamListsDiff(prevList, nextList, "id", { chunksSize: 5 }); + + const expectedChunks = [ + [ + { + previousValue: null, + currentValue: { id: 11, name: "Item 11" }, + prevIndex: null, + newIndex: 7, + indexDiff: null, + status: LIST_STATUS.ADDED, + }, + { + previousValue: { + id: 1, + name: "Item 1", + user: { role: "admin", hobbies: ["golf", "football"] }, + }, + currentValue: { + id: 1, + name: "Item 1", + user: { role: "admin", hobbies: ["golf", "football"] }, + }, + prevIndex: 0, + newIndex: 0, + indexDiff: 0, + status: LIST_STATUS.EQUAL, + }, + { + previousValue: { id: 2, name: "Item 2" }, + currentValue: { id: 2, name: "Item Two" }, + prevIndex: 1, + newIndex: 1, + indexDiff: 0, + status: LIST_STATUS.UPDATED, + }, + { + previousValue: { + id: 3, + name: "Item 3", + user: { role: "admin", hobbies: ["rugby"] }, + }, + currentValue: { + id: 3, + name: "Item 3", + user: { role: "admin", hobbies: ["rugby"] }, + }, + prevIndex: 2, + newIndex: 2, + indexDiff: 0, + status: LIST_STATUS.EQUAL, + }, + { + previousValue: { + id: 4, + name: "Item 4", + user: { role: "reader", hobbies: ["video games", "fishing"] }, + }, + currentValue: null, + prevIndex: 3, + newIndex: null, + indexDiff: null, + status: LIST_STATUS.DELETED, + }, + ], + [ + { + previousValue: { id: 5, name: "Item 5" }, + currentValue: { id: 5, name: "Item 5" }, + prevIndex: 4, + newIndex: 3, + indexDiff: -1, + status: LIST_STATUS.MOVED, + }, + { + previousValue: { + id: 6, + name: "Item 6", + user: { role: "root", hobbies: ["coding"] }, + }, + currentValue: { + id: 6, + name: "Item 6", + user: { role: "root", hobbies: ["farming"] }, + }, + prevIndex: 5, + newIndex: 4, + indexDiff: -1, + status: LIST_STATUS.UPDATED, + }, + { + previousValue: { id: 7, name: "Item 7" }, + currentValue: { id: 7, name: "Item 7" }, + prevIndex: 6, + newIndex: 5, + indexDiff: -1, + status: LIST_STATUS.MOVED, + }, + { + previousValue: { id: 8, name: "Item 8" }, + currentValue: { id: 8, name: "Item 8" }, + prevIndex: 7, + newIndex: 9, + indexDiff: 2, + status: LIST_STATUS.MOVED, + }, + { + previousValue: { id: 9, name: "Item 9" }, + currentValue: { id: 9, name: "Item 9" }, + prevIndex: 8, + newIndex: 8, + indexDiff: 0, + status: LIST_STATUS.EQUAL, + }, + ], [ { - previousValue: { id: 10, name: "Item 10" }, - currentValue: { id: 10, name: "Item 10" }, + previousValue: { + id: 10, + name: "Item 10", + user: { + role: "root", + hobbies: ["coding"], + skills: { driving: true, diving: false }, + }, + }, + currentValue: { + id: 10, + name: "Item 10", + user: { + role: "root", + hobbies: ["coding"], + skills: { driving: true, diving: false }, + }, + }, prevIndex: 9, newIndex: 6, indexDiff: -3, @@ -245,11 +652,10 @@ describe("streamListsDiff data", () => { let chunkCount = 0; diff.on("data", (chunk) => { - // console.log("chunks received", chunk); - // console.log("expected chunk", expectedChunks[chunkCount]); - //expect(chunk).toStrictEqual(expectedChunks[chunkCount]); + expect(chunk).toStrictEqual(expectedChunks[chunkCount]); chunkCount++; }); + diff.on("finish", () => { expect(chunkCount).toBe(3); done(); @@ -372,3 +778,132 @@ describe("streamListsDiff error", () => { }); }); }); + +describe("Performance", () => { + it("should correctly stream diff for 10.000 entries", (done) => { + const generateLargeList = (size: number, idPrefix: string) => { + return Array.from({ length: size }, (_, i) => ({ + id: `${idPrefix}-${i}`, + value: i, + })); + }; + const prevList = generateLargeList(10_000, "prev"); + const nextList = [ + ...generateLargeList(5000, "prev"), + ...generateLargeList(5000, "next"), + ]; + + const receivedChunks: StreamListsDiff<{ id: string; value: number }>[] = []; + let chunkCount = 0; + const diffStream = streamListsDiff(prevList, nextList, "id", { + chunksSize: 1000, + }); + + diffStream.on("data", (chunk) => { + receivedChunks.push(...chunk); + chunkCount++; + }); + + diffStream.on("finish", () => { + const deletions = receivedChunks.filter( + (diff) => diff.status === LIST_STATUS.DELETED, + ); + const additions = receivedChunks.filter( + (diff) => diff.status === LIST_STATUS.ADDED, + ); + const updates = receivedChunks.filter( + (diff) => diff.status === LIST_STATUS.EQUAL, + ); + expect(receivedChunks.length).toBe(15_000); // 5000 deletions + 5000 equal + 5000 additions + expect(chunkCount).toBe(15); + expect(deletions.length).toBe(5000); + expect(additions.length).toBe(5000); + expect(updates.length).toBe(5000); + done(); + }); + }); + it("should correctly stream diff for 100.000 entries", (done) => { + const generateLargeList = (size: number, idPrefix: string) => { + return Array.from({ length: size }, (_, i) => ({ + id: `${idPrefix}-${i}`, + value: i, + })); + }; + const prevList = generateLargeList(100_000, "prev"); + const nextList = [ + ...generateLargeList(50000, "prev"), + ...generateLargeList(50000, "next"), + ]; + + const receivedChunks: StreamListsDiff<{ id: string; value: number }>[] = []; + let chunkCount = 0; + const diffStream = streamListsDiff(prevList, nextList, "id", { + chunksSize: 10_000, + }); + + diffStream.on("data", (chunk) => { + receivedChunks.push(...chunk); + chunkCount++; + }); + + diffStream.on("finish", () => { + const deletions = receivedChunks.filter( + (diff) => diff.status === LIST_STATUS.DELETED, + ); + const additions = receivedChunks.filter( + (diff) => diff.status === LIST_STATUS.ADDED, + ); + const updates = receivedChunks.filter( + (diff) => diff.status === LIST_STATUS.EQUAL, + ); + expect(receivedChunks.length).toBe(150_000); // 50.000 deletions + 50.000 equal + 50.000 additions + expect(chunkCount).toBe(15); + expect(deletions.length).toBe(50000); + expect(additions.length).toBe(50000); + expect(updates.length).toBe(50000); + done(); + }); + }); + // it("should correctly stream diff for 1.000.000 entries", (done) => { + // const generateLargeList = (size: number, idPrefix: string) => { + // return Array.from({ length: size }, (_, i) => ({ + // id: `${idPrefix}-${i}`, + // value: i, + // })); + // }; + // const prevList = generateLargeList(1_000_000, "prev"); + // const nextList = [ + // ...generateLargeList(500_000, "prev"), + // ...generateLargeList(500_000, "next"), + // ]; + + // const receivedChunks: StreamListsDiff<{ id: string; value: number }>[] = []; + // let chunkCount = 0; + // const diffStream = streamListsDiff(prevList, nextList, "id", { + // chunksSize: 100_000, + // }); + + // diffStream.on("data", (chunk) => { + // receivedChunks.push(...chunk); + // chunkCount++; + // }); + + // diffStream.on("finish", () => { + // const deletions = receivedChunks.filter( + // (diff) => diff.status === LIST_STATUS.DELETED, + // ); + // const additions = receivedChunks.filter( + // (diff) => diff.status === LIST_STATUS.ADDED, + // ); + // const updates = receivedChunks.filter( + // (diff) => diff.status === LIST_STATUS.EQUAL, + // ); + // expect(receivedChunks.length).toBe(1_500_000); // 50.000 deletions + 50.000 equal + 50.000 additions + // expect(chunkCount).toBe(15); + // expect(deletions.length).toBe(500000); + // expect(additions.length).toBe(500000); + // expect(updates.length).toBe(500000); + // done(); + // }); + // }); +});