diff --git a/src/models/stream.ts b/src/models/stream.ts new file mode 100644 index 0000000..5b874f6 --- /dev/null +++ b/src/models/stream.ts @@ -0,0 +1,27 @@ +import { LIST_STATUS } from "./list"; + +export type StreamListsDiff> = { + currentValue: T | null; + previousValue: T | null; + prevIndex: number | null; + newIndex: number | null; + indexDiff: number | null; + status: LIST_STATUS; +}; + +export type ReferenceProperty> = keyof T; + +export type StreamReferences> = Map< + ReferenceProperty, + { prevIndex: number; nextIndex?: number } +>; + +export type ListStreamOptions = { + chunksSize?: number; // 0 by default. If 0, stream will be live + showOnly?: `${LIST_STATUS}`[]; + considerMoveAsUpdate?: boolean; +}; + +export const DEFAULT_LIST_STREAM_OPTIONS: ListStreamOptions = { + chunksSize: 0, +}; diff --git a/src/stream/emitter.ts b/src/stream/emitter.ts new file mode 100644 index 0000000..4554c32 --- /dev/null +++ b/src/stream/emitter.ts @@ -0,0 +1,27 @@ +type Listener = (...args: T) => void; + +export enum StreamEvent { + Data = "data", + Finish = "finish", + Error = "error", +} +export class EventEmitter { + private events: Record[]> = {}; + + on( + event: `${StreamEvent}`, + listener: Listener, + ): this { + if (!this.events[event]) { + this.events[event] = []; + } + this.events[event].push(listener as Listener); + return this; + } + + emit(event: `${StreamEvent}`, ...args: T): void { + if (this.events[event]) { + this.events[event].forEach((listener) => listener(...args)); + } + } +} diff --git a/src/stream/stream-list-diff.ts b/src/stream/stream-list-diff.ts new file mode 100644 index 0000000..46eac36 --- /dev/null +++ b/src/stream/stream-list-diff.ts @@ -0,0 +1,202 @@ +import { + DEFAULT_LIST_STREAM_OPTIONS, + ListStreamOptions, + ReferenceProperty, + StreamListsDiff, + StreamReferences, +} from "../models/stream"; +import { LIST_STATUS } from "../models/list"; +import { isEqual } from "../utils"; +import { EventEmitter, StreamEvent } from "./emitter"; + +function outputDiffChunk>( + emitter: EventEmitter, +) { + let chunks: StreamListsDiff[] = []; + + return function handleDiffChunk( + chunk: StreamListsDiff, + options: ListStreamOptions, + ): void { + const showChunk = options?.showOnly + ? options?.showOnly.includes(chunk.status) + : true; + if (!showChunk) { + return; + } + if ((options.chunksSize as number) > 0) { + chunks.push(chunk); + if (chunks.length >= (options.chunksSize as number)) { + const output = chunks; + chunks = []; + return emitter.emit(StreamEvent.Data, output); + } + } + return emitter.emit(StreamEvent.Data, [chunk]); + }; +} + +function formatSingleListStreamDiff>( + list: T[], + isPrevious: boolean, + status: LIST_STATUS, + options: ListStreamOptions, +): StreamListsDiff[] { + const diff: StreamListsDiff[] = list.map((data, i) => ({ + previousValue: isPrevious ? data : null, + currentValue: isPrevious ? null : data, + prevIndex: status === LIST_STATUS.ADDED ? null : i, + newIndex: status === LIST_STATUS.ADDED ? i : null, + indexDiff: null, + status, + })); + if (options.showOnly && options.showOnly.length > 0) { + return diff.filter((value) => options.showOnly?.includes(value.status)); + } + return diff; +} + +function getDiffChunks>( + prevList: T[], + nextList: T[], + referenceProperty: ReferenceProperty, + emitter: EventEmitter, + options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS, +) { + if (!prevList && !nextList) { + return []; + } + if (!prevList) { + const nextDiff = formatSingleListStreamDiff( + nextList as T[], + false, + LIST_STATUS.ADDED, + options, + ); + return nextDiff.forEach((data) => handleDiffChunk(data, options)); + } + if (!nextList) { + const prevDiff = formatSingleListStreamDiff( + prevList as T[], + true, + LIST_STATUS.DELETED, + options, + ); + return prevDiff.forEach((data) => handleDiffChunk(data, options)); + } + const listsReferences: StreamReferences = new Map(); + const handleDiffChunk = outputDiffChunk(emitter); + prevList.forEach((data, i) => { + if (data) { + listsReferences.set(String(data[referenceProperty]), { + prevIndex: i, + nextIndex: undefined, + }); + } + }); + + nextList.forEach((data, i) => { + if (data) { + const listReference = listsReferences.get( + String(data[referenceProperty]), + ); + if (listReference) { + listReference.nextIndex = i; + } else { + handleDiffChunk( + { + previousValue: null, + currentValue: data, + prevIndex: null, + newIndex: i, + indexDiff: null, + status: LIST_STATUS.ADDED, + }, + options, + ); + } + } + }); + + for (const data of listsReferences.values()) { + if (!data.nextIndex) { + handleDiffChunk( + { + previousValue: prevList[data.prevIndex], + currentValue: null, + prevIndex: data.prevIndex, + newIndex: null, + indexDiff: null, + status: LIST_STATUS.DELETED, + }, + options, + ); + } else { + const prevData = prevList[data.prevIndex]; + const nextData = nextList[data.nextIndex]; + const isDataEqual = isEqual(prevData, nextData); + const indexDiff = data.prevIndex - data.nextIndex; + if (isDataEqual) { + if (indexDiff === 0) { + handleDiffChunk( + { + previousValue: prevList[data.prevIndex], + currentValue: nextList[data.nextIndex], + prevIndex: null, + newIndex: data.nextIndex, + indexDiff: null, + status: LIST_STATUS.EQUAL, + }, + options, + ); + } else { + handleDiffChunk( + { + previousValue: prevList[data.prevIndex], + currentValue: nextList[data.nextIndex], + prevIndex: data.prevIndex, + newIndex: data.nextIndex, + indexDiff, + status: options.considerMoveAsUpdate + ? LIST_STATUS.UPDATED + : LIST_STATUS.MOVED, + }, + options, + ); + } + } else { + handleDiffChunk( + { + previousValue: prevList[data.prevIndex], + currentValue: nextList[data.nextIndex], + prevIndex: data.prevIndex, + newIndex: data.nextIndex, + indexDiff, + status: LIST_STATUS.UPDATED, + }, + options, + ); + } + } + } + emitter.emit(StreamEvent.Finish); +} + +export function streamListsDiff>( + prevList: T[], + nextList: T[], + referenceProperty: ReferenceProperty, + options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS, +) { + const emitter = new EventEmitter(); + try { + setTimeout( + () => + getDiffChunks(prevList, nextList, referenceProperty, emitter, options), + 0, + ); + return emitter; + } catch (err) { + emitter.emit(StreamEvent.Error, err); + } +}