Browse Source
* feat: handle stream and file inputs * chore: update jsdoc * feat: v3.0.0 BREAKING CHANGE: streamListDiff is now imported from @donedeal0/superdiff/client or @donedeal/superdiff/server depending on your environmentpull/31/head v3.0.0
17 changed files with 2755 additions and 532 deletions
@ -0,0 +1 @@
@@ -0,0 +1 @@
|
||||
export { streamListDiff } from "./lib/stream-list-diff/client"; |
@ -1,7 +1,6 @@
@@ -1,7 +1,6 @@
|
||||
export { getObjectDiff } from "./lib/object-diff"; |
||||
export { getListDiff } from "./lib/list-diff"; |
||||
export { isEqual, isObject } from "./lib/utils"; |
||||
export { streamListDiff } from "./lib/stream-list-diff"; |
||||
export * from "./models/list"; |
||||
export * from "./models/object"; |
||||
export * from "./models/stream"; |
||||
|
@ -0,0 +1,283 @@
@@ -0,0 +1,283 @@
|
||||
import { |
||||
DataBuffer, |
||||
DEFAULT_LIST_STREAM_OPTIONS, |
||||
ListStreamOptions, |
||||
ReferenceProperty, |
||||
} from "@models/stream"; |
||||
import { LIST_STATUS } from "@models/list"; |
||||
import { |
||||
Emitter, |
||||
EmitterEvents, |
||||
EventEmitter, |
||||
StreamListener, |
||||
StreamEvent, |
||||
} from "../emitter"; |
||||
import { isDataValid, isValidChunkSize, outputDiffChunk } from "../utils"; |
||||
|
||||
async function getDiffChunks<T extends Record<string, unknown>>( |
||||
prevStream: ReadableStream<T>, |
||||
nextStream: ReadableStream<T>, |
||||
referenceProperty: ReferenceProperty<T>, |
||||
emitter: Emitter<T>, |
||||
options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS, |
||||
): Promise<void> { |
||||
if (!isValidChunkSize(options?.chunksSize)) { |
||||
return emitter.emit( |
||||
StreamEvent.Error, |
||||
new Error( |
||||
`The chunk size can't be negative. You entered the value '${options.chunksSize}'`, |
||||
), |
||||
); |
||||
} |
||||
|
||||
const prevList = prevStream.getReader(); |
||||
const nextList = nextStream.getReader(); |
||||
const { handleDiffChunk, releaseLastChunks } = outputDiffChunk<T>(emitter); |
||||
const prevDataBuffer: DataBuffer<T> = new Map(); |
||||
const nextDataBuffer: DataBuffer<T> = new Map(); |
||||
let currentPrevIndex = 0; |
||||
let currentNextIndex = 0; |
||||
|
||||
async function processPrevStreamChunk(chunk: T) { |
||||
const { isValid, message } = isDataValid( |
||||
chunk, |
||||
referenceProperty, |
||||
"prevList", |
||||
); |
||||
if (!isValid) { |
||||
emitter.emit(StreamEvent.Error, new Error(message)); |
||||
emitter.emit(StreamEvent.Finish); |
||||
return; |
||||
} |
||||
const ref = chunk[referenceProperty] as ReferenceProperty<T>; |
||||
const relatedChunk = nextDataBuffer.get(ref); |
||||
|
||||
if (relatedChunk) { |
||||
nextDataBuffer.delete(ref); |
||||
const isDataEqual = |
||||
JSON.stringify(chunk) === JSON.stringify(relatedChunk.data); |
||||
const indexDiff = (relatedChunk.index as number) - currentPrevIndex; |
||||
if (isDataEqual) { |
||||
handleDiffChunk( |
||||
{ |
||||
previousValue: chunk, |
||||
currentValue: relatedChunk.data, |
||||
prevIndex: currentPrevIndex, |
||||
newIndex: relatedChunk.index, |
||||
indexDiff, |
||||
status: |
||||
indexDiff === 0 |
||||
? LIST_STATUS.EQUAL |
||||
: options.considerMoveAsUpdate |
||||
? LIST_STATUS.UPDATED |
||||
: LIST_STATUS.MOVED, |
||||
}, |
||||
options, |
||||
); |
||||
} else { |
||||
handleDiffChunk( |
||||
{ |
||||
previousValue: chunk, |
||||
currentValue: relatedChunk.data, |
||||
prevIndex: currentPrevIndex, |
||||
newIndex: relatedChunk.index, |
||||
indexDiff, |
||||
status: LIST_STATUS.UPDATED, |
||||
}, |
||||
options, |
||||
); |
||||
} |
||||
} else { |
||||
prevDataBuffer.set(ref, { data: chunk, index: currentPrevIndex }); |
||||
} |
||||
currentPrevIndex++; |
||||
} |
||||
|
||||
async function processNextStreamChunk(chunk: T) { |
||||
const { isValid, message } = isDataValid( |
||||
chunk, |
||||
referenceProperty, |
||||
"nextList", |
||||
); |
||||
if (!isValid) { |
||||
emitter.emit(StreamEvent.Error, new Error(message)); |
||||
emitter.emit(StreamEvent.Finish); |
||||
return; |
||||
} |
||||
const ref = chunk[referenceProperty] as ReferenceProperty<T>; |
||||
const relatedChunk = prevDataBuffer.get(ref); |
||||
|
||||
if (relatedChunk) { |
||||
prevDataBuffer.delete(ref); |
||||
const isDataEqual = |
||||
JSON.stringify(chunk) === JSON.stringify(relatedChunk.data); |
||||
const indexDiff = currentNextIndex - (relatedChunk.index as number); |
||||
if (isDataEqual) { |
||||
handleDiffChunk( |
||||
{ |
||||
previousValue: relatedChunk.data, |
||||
currentValue: chunk, |
||||
prevIndex: relatedChunk.index, |
||||
newIndex: currentNextIndex, |
||||
indexDiff, |
||||
status: |
||||
indexDiff === 0 |
||||
? LIST_STATUS.EQUAL |
||||
: options.considerMoveAsUpdate |
||||
? LIST_STATUS.UPDATED |
||||
: LIST_STATUS.MOVED, |
||||
}, |
||||
options, |
||||
); |
||||
} else { |
||||
handleDiffChunk( |
||||
{ |
||||
previousValue: relatedChunk.data, |
||||
currentValue: chunk, |
||||
prevIndex: relatedChunk.index, |
||||
newIndex: currentNextIndex, |
||||
indexDiff, |
||||
status: LIST_STATUS.UPDATED, |
||||
}, |
||||
options, |
||||
); |
||||
} |
||||
} else { |
||||
nextDataBuffer.set(ref, { data: chunk, index: currentNextIndex }); |
||||
} |
||||
currentNextIndex++; |
||||
} |
||||
|
||||
const readStream = async ( |
||||
reader: ReadableStreamDefaultReader<T>, |
||||
processChunk: (chunk: T) => Promise<void>, |
||||
) => { |
||||
let result; |
||||
while (!(result = await reader.read()).done) { |
||||
await processChunk(result.value); |
||||
} |
||||
}; |
||||
|
||||
await Promise.all([ |
||||
readStream(prevList, async (chunk) => { |
||||
await processPrevStreamChunk(chunk); |
||||
}), |
||||
readStream(nextList, async (chunk) => { |
||||
await processNextStreamChunk(chunk); |
||||
}), |
||||
]); |
||||
|
||||
for (const [key, chunk] of prevDataBuffer.entries()) { |
||||
handleDiffChunk( |
||||
{ |
||||
previousValue: chunk.data, |
||||
currentValue: null, |
||||
prevIndex: chunk.index, |
||||
newIndex: null, |
||||
indexDiff: null, |
||||
status: LIST_STATUS.DELETED, |
||||
}, |
||||
options, |
||||
); |
||||
prevDataBuffer.delete(key); |
||||
} |
||||
for (const [key, chunk] of nextDataBuffer.entries()) { |
||||
handleDiffChunk( |
||||
{ |
||||
previousValue: null, |
||||
currentValue: chunk.data, |
||||
prevIndex: null, |
||||
newIndex: chunk.index, |
||||
indexDiff: null, |
||||
status: LIST_STATUS.ADDED, |
||||
}, |
||||
options, |
||||
); |
||||
nextDataBuffer.delete(key); |
||||
} |
||||
|
||||
releaseLastChunks(); |
||||
return emitter.emit(StreamEvent.Finish); |
||||
} |
||||
|
||||
async function getValidClientStream<T extends Record<string, unknown>>( |
||||
input: ReadableStream<T> | T[] | File, |
||||
listType: "prevList" | "nextList", |
||||
): Promise<ReadableStream<T>> { |
||||
if (Array.isArray(input)) { |
||||
return new ReadableStream({ |
||||
start(controller) { |
||||
input.forEach((item) => controller.enqueue(item)); |
||||
controller.close(); |
||||
}, |
||||
}); |
||||
} |
||||
|
||||
if (input instanceof ReadableStream) { |
||||
return input; |
||||
} |
||||
|
||||
if (input instanceof File) { |
||||
const fileText = await input.text(); |
||||
let jsonData: T[]; |
||||
try { |
||||
jsonData = JSON.parse(fileText); |
||||
// eslint-disable-next-line @typescript-eslint/no-unused-vars
|
||||
} catch (_: unknown) { |
||||
throw new Error(`Your ${listType} is not a valid JSON array.`); |
||||
} |
||||
|
||||
if (!Array.isArray(jsonData)) { |
||||
throw new Error(`Your ${listType} is not a JSON array.`); |
||||
} |
||||
return new ReadableStream({ |
||||
start(controller) { |
||||
jsonData.forEach((item) => controller.enqueue(item)); |
||||
controller.close(); |
||||
}, |
||||
}); |
||||
} |
||||
|
||||
throw new Error( |
||||
`Invalid ${listType}. Expected ReadableStream, Array, or File.`, |
||||
); |
||||
} |
||||
|
||||
/** |
||||
* Streams the diff of two object lists |
||||
* @param {ReadableStream | File | Record<string, unknown>[]} prevList - The original object list. |
||||
* @param {ReadableStream | File | Record<string, unknown>[]} nextList - The new object list. |
||||
* @param {string} referenceProperty - A common property in all the objects of your lists (e.g. `id`) |
||||
* @param {ListStreamOptions} options - Options to refine your output. |
||||
- `chunksSize`: the number of object diffs returned by each streamed chunk. (e.g. `0` = 1 object diff by chunk, `10` = 10 object diffs by chunk). |
||||
- `showOnly`: returns only the values whose status you are interested in. (e.g. `["added", "equal"]`) |
||||
- `considerMoveAsUpdate`: if set to `true` a `moved` object will be considered as `updated` |
||||
* @returns StreamListener |
||||
*/ |
||||
export function streamListDiff<T extends Record<string, unknown>>( |
||||
prevList: ReadableStream<T> | File | T[], |
||||
nextList: ReadableStream<T> | File | T[], |
||||
referenceProperty: ReferenceProperty<T>, |
||||
options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS, |
||||
): StreamListener<T> { |
||||
const emitter = new EventEmitter<EmitterEvents<T>>(); |
||||
setTimeout(async () => { |
||||
try { |
||||
const [prevStream, nextStream] = await Promise.all([ |
||||
getValidClientStream(prevList, "prevList"), |
||||
getValidClientStream(nextList, "nextList"), |
||||
]); |
||||
|
||||
getDiffChunks( |
||||
prevStream, |
||||
nextStream, |
||||
referenceProperty, |
||||
emitter, |
||||
options, |
||||
); |
||||
} catch (err) { |
||||
return emitter.emit(StreamEvent.Error, err as Error); |
||||
} |
||||
}, 0); |
||||
return emitter as StreamListener<T>; |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -1,327 +0,0 @@
@@ -1,327 +0,0 @@
|
||||
import { |
||||
DEFAULT_LIST_STREAM_OPTIONS, |
||||
ListStreamOptions, |
||||
ReferenceProperty, |
||||
StreamListDiff, |
||||
StreamReferences, |
||||
} from "@models/stream"; |
||||
import { LIST_STATUS } from "@models/list"; |
||||
import { isObject } from "@lib/utils"; |
||||
import { |
||||
Emitter, |
||||
EmitterEvents, |
||||
EventEmitter, |
||||
StreamListener, |
||||
StreamEvent, |
||||
} from "./emitter"; |
||||
|
||||
function outputDiffChunk<T extends Record<string, unknown>>( |
||||
emitter: Emitter<T>, |
||||
) { |
||||
let chunks: StreamListDiff<T>[] = []; |
||||
|
||||
return function handleDiffChunk( |
||||
chunk: StreamListDiff<T>, |
||||
isLastChunk: boolean, |
||||
options: ListStreamOptions, |
||||
): void { |
||||
const showChunk = options?.showOnly |
||||
? options?.showOnly.includes(chunk.status) |
||||
: true; |
||||
if (!showChunk) { |
||||
return; |
||||
} |
||||
if ((options.chunksSize as number) > 0) { |
||||
chunks.push(chunk); |
||||
if (chunks.length >= (options.chunksSize as number) || isLastChunk) { |
||||
const output = chunks; |
||||
chunks = []; |
||||
return emitter.emit(StreamEvent.Data, output); |
||||
} else { |
||||
return; |
||||
} |
||||
} |
||||
return emitter.emit(StreamEvent.Data, [chunk]); |
||||
}; |
||||
} |
||||
|
||||
function formatSingleListStreamDiff<T extends Record<string, unknown>>( |
||||
list: T[], |
||||
isPrevious: boolean, |
||||
status: LIST_STATUS, |
||||
options: ListStreamOptions, |
||||
): StreamListDiff<T>[] | null { |
||||
let isValid = true; |
||||
const diff: StreamListDiff<T>[] = []; |
||||
for (let i = 0; i < list.length; i++) { |
||||
const data = list[i]; |
||||
if (!isObject(data)) { |
||||
isValid = false; |
||||
break; |
||||
} |
||||
diff.push({ |
||||
previousValue: isPrevious ? data : null, |
||||
currentValue: isPrevious ? null : data, |
||||
prevIndex: status === LIST_STATUS.ADDED ? null : i, |
||||
newIndex: status === LIST_STATUS.ADDED ? i : null, |
||||
indexDiff: null, |
||||
status, |
||||
}); |
||||
} |
||||
if (!isValid) { |
||||
return null; |
||||
} |
||||
if (options.showOnly && options.showOnly.length > 0) { |
||||
return diff.filter((value) => options.showOnly?.includes(value.status)); |
||||
} |
||||
return diff; |
||||
} |
||||
|
||||
function isValidChunkSize( |
||||
chunksSize: ListStreamOptions["chunksSize"], |
||||
): boolean { |
||||
if (!chunksSize) return true; |
||||
const sign = String(Math.sign(chunksSize)); |
||||
return sign !== "-1" && sign !== "NaN"; |
||||
} |
||||
|
||||
function isDataValid<T extends Record<string, unknown>>( |
||||
data: T, |
||||
referenceProperty: ReferenceProperty<T>, |
||||
listType: "prevList" | "nextList", |
||||
): { isValid: boolean; message?: string } { |
||||
if (!isObject(data)) { |
||||
return { |
||||
isValid: false, |
||||
message: `Your ${listType} must only contain valid objects. Found '${data}'`, |
||||
}; |
||||
} |
||||
if (!Object.hasOwn(data, referenceProperty)) { |
||||
return { |
||||
isValid: false, |
||||
message: `The reference property '${String(referenceProperty)}' is not available in all the objects of your ${listType}.`, |
||||
}; |
||||
} |
||||
return { |
||||
isValid: true, |
||||
message: "", |
||||
}; |
||||
} |
||||
|
||||
function getDiffChunks<T extends Record<string, unknown>>( |
||||
prevList: T[] = [], |
||||
nextList: T[] = [], |
||||
referenceProperty: ReferenceProperty<T>, |
||||
emitter: Emitter<T>, |
||||
options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS, |
||||
): void { |
||||
if (!isValidChunkSize(options?.chunksSize)) { |
||||
return emitter.emit( |
||||
StreamEvent.Error, |
||||
new Error( |
||||
`The chunk size can't be negative. You entered the value '${options.chunksSize}'`, |
||||
), |
||||
); |
||||
} |
||||
if (prevList.length === 0 && nextList.length === 0) { |
||||
return emitter.emit(StreamEvent.Finish); |
||||
} |
||||
const handleDiffChunk = outputDiffChunk<T>(emitter); |
||||
if (prevList.length === 0) { |
||||
const nextDiff = formatSingleListStreamDiff( |
||||
nextList as T[], |
||||
false, |
||||
LIST_STATUS.ADDED, |
||||
options, |
||||
); |
||||
if (!nextDiff) { |
||||
emitter.emit( |
||||
StreamEvent.Error, |
||||
new Error("Your nextList must only contain valid objects."), |
||||
); |
||||
return emitter.emit(StreamEvent.Finish); |
||||
} |
||||
nextDiff?.forEach((data, i) => |
||||
handleDiffChunk(data, i === nextDiff.length - 1, options), |
||||
); |
||||
return emitter.emit(StreamEvent.Finish); |
||||
} |
||||
if (nextList.length === 0) { |
||||
const prevDiff = formatSingleListStreamDiff( |
||||
prevList as T[], |
||||
true, |
||||
LIST_STATUS.DELETED, |
||||
options, |
||||
); |
||||
if (!prevDiff) { |
||||
emitter.emit( |
||||
StreamEvent.Error, |
||||
new Error("Your prevList must only contain valid objects."), |
||||
); |
||||
return emitter.emit(StreamEvent.Finish); |
||||
} |
||||
prevDiff?.forEach((data, i) => |
||||
handleDiffChunk(data, i === prevDiff.length - 1, options), |
||||
); |
||||
return emitter.emit(StreamEvent.Finish); |
||||
} |
||||
const listsReferences: StreamReferences<T> = new Map(); |
||||
for (let i = 0; i < prevList.length; i++) { |
||||
const data = prevList[i]; |
||||
if (data) { |
||||
const { isValid, message } = isDataValid( |
||||
data, |
||||
referenceProperty, |
||||
"prevList", |
||||
); |
||||
if (!isValid) { |
||||
emitter.emit(StreamEvent.Error, new Error(message)); |
||||
emitter.emit(StreamEvent.Finish); |
||||
break; |
||||
} |
||||
listsReferences.set(String(data[referenceProperty]), { |
||||
prevIndex: i, |
||||
nextIndex: undefined, |
||||
}); |
||||
} |
||||
} |
||||
|
||||
const totalChunks = listsReferences.size; |
||||
|
||||
for (let i = 0; i < nextList.length; i++) { |
||||
const data = nextList[i]; |
||||
if (data) { |
||||
const { isValid, message } = isDataValid( |
||||
data, |
||||
referenceProperty, |
||||
"nextList", |
||||
); |
||||
if (!isValid) { |
||||
emitter.emit(StreamEvent.Error, new Error(message)); |
||||
emitter.emit(StreamEvent.Finish); |
||||
break; |
||||
} |
||||
const listReference = listsReferences.get( |
||||
String(data[referenceProperty]), |
||||
); |
||||
if (listReference) { |
||||
listReference.nextIndex = i; |
||||
} else { |
||||
handleDiffChunk( |
||||
{ |
||||
previousValue: null, |
||||
currentValue: data, |
||||
prevIndex: null, |
||||
newIndex: i, |
||||
indexDiff: null, |
||||
status: LIST_STATUS.ADDED, |
||||
}, |
||||
totalChunks > 0 ? false : i === nextList.length - 1, |
||||
options, |
||||
); |
||||
} |
||||
} |
||||
} |
||||
|
||||
let streamedChunks = 0; |
||||
|
||||
for (const [key, data] of listsReferences.entries()) { |
||||
streamedChunks++; |
||||
const isLastChunk = totalChunks === streamedChunks; |
||||
|
||||
if (typeof data.nextIndex === "undefined") { |
||||
handleDiffChunk( |
||||
{ |
||||
previousValue: prevList[data.prevIndex], |
||||
currentValue: null, |
||||
prevIndex: data.prevIndex, |
||||
newIndex: null, |
||||
indexDiff: null, |
||||
status: LIST_STATUS.DELETED, |
||||
}, |
||||
isLastChunk, |
||||
options, |
||||
); |
||||
} else { |
||||
const prevData = prevList[data.prevIndex]; |
||||
const nextData = nextList[data.nextIndex]; |
||||
const isDataEqual = JSON.stringify(prevData) === JSON.stringify(nextData); |
||||
const indexDiff = data.nextIndex - data.prevIndex; |
||||
if (isDataEqual) { |
||||
if (indexDiff === 0) { |
||||
handleDiffChunk( |
||||
{ |
||||
previousValue: prevList[data.prevIndex], |
||||
currentValue: nextList[data.nextIndex], |
||||
prevIndex: data.prevIndex, |
||||
newIndex: data.nextIndex, |
||||
indexDiff: 0, |
||||
status: LIST_STATUS.EQUAL, |
||||
}, |
||||
isLastChunk, |
||||
options, |
||||
); |
||||
} else { |
||||
handleDiffChunk( |
||||
{ |
||||
previousValue: prevList[data.prevIndex], |
||||
currentValue: nextList[data.nextIndex], |
||||
prevIndex: data.prevIndex, |
||||
newIndex: data.nextIndex, |
||||
indexDiff, |
||||
status: options.considerMoveAsUpdate |
||||
? LIST_STATUS.UPDATED |
||||
: LIST_STATUS.MOVED, |
||||
}, |
||||
isLastChunk, |
||||
options, |
||||
); |
||||
} |
||||
} else { |
||||
handleDiffChunk( |
||||
{ |
||||
previousValue: prevList[data.prevIndex], |
||||
currentValue: nextList[data.nextIndex], |
||||
prevIndex: data.prevIndex, |
||||
newIndex: data.nextIndex, |
||||
indexDiff, |
||||
status: LIST_STATUS.UPDATED, |
||||
}, |
||||
isLastChunk, |
||||
options, |
||||
); |
||||
} |
||||
} |
||||
listsReferences.delete(key); |
||||
} |
||||
|
||||
return emitter.emit(StreamEvent.Finish); |
||||
} |
||||
|
||||
/** |
||||
* Streams the diff of two object lists |
||||
* @param {Record<string, unknown>[]} prevList - The original object list. |
||||
* @param {Record<string, unknown>[]} nextList - The new object list. |
||||
* @param {ReferenceProperty<T>} referenceProperty - A common property in all the objects of your lists (e.g. `id`) |
||||
* @param {ListStreamOptions} options - Options to refine your output. |
||||
- `chunksSize`: the number of object diffs returned by each streamed chunk. (e.g. `0` = 1 object diff by chunk, `10` = 10 object diffs by chunk). |
||||
- `showOnly`: returns only the values whose status you are interested in. (e.g. `["added", "equal"]`) |
||||
- `considerMoveAsUpdate`: if set to `true` a `moved` object will be considered as `updated` |
||||
* @returns EventEmitter |
||||
*/ |
||||
export function streamListDiff<T extends Record<string, unknown>>( |
||||
prevList: T[], |
||||
nextList: T[], |
||||
referenceProperty: ReferenceProperty<T>, |
||||
options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS, |
||||
): StreamListener<T> { |
||||
const emitter = new EventEmitter<EmitterEvents<T>>(); |
||||
setTimeout(() => { |
||||
try { |
||||
getDiffChunks(prevList, nextList, referenceProperty, emitter, options); |
||||
} catch (err) { |
||||
return emitter.emit(StreamEvent.Error, err as Error); |
||||
} |
||||
}, 0); |
||||
return emitter as StreamListener<T>; |
||||
} |
@ -0,0 +1,266 @@
@@ -0,0 +1,266 @@
|
||||
import { createReadStream } from "fs"; |
||||
import { Readable, Transform } from "stream"; |
||||
import { LIST_STATUS } from "@models/list"; |
||||
import { |
||||
DataBuffer, |
||||
DEFAULT_LIST_STREAM_OPTIONS, |
||||
FilePath, |
||||
ListStreamOptions, |
||||
ReferenceProperty, |
||||
} from "@models/stream"; |
||||
import { |
||||
Emitter, |
||||
EmitterEvents, |
||||
EventEmitter, |
||||
StreamListener, |
||||
StreamEvent, |
||||
} from "../emitter"; |
||||
import { isDataValid, isValidChunkSize, outputDiffChunk } from "../utils"; |
||||
|
||||
async function getDiffChunks<T extends Record<string, unknown>>( |
||||
prevStream: Readable, |
||||
nextStream: Readable, |
||||
referenceProperty: ReferenceProperty<T>, |
||||
emitter: Emitter<T>, |
||||
options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS, |
||||
): Promise<void> { |
||||
if (!isValidChunkSize(options?.chunksSize)) { |
||||
return emitter.emit( |
||||
StreamEvent.Error, |
||||
new Error( |
||||
`The chunk size can't be negative. You entered the value '${options.chunksSize}'`, |
||||
), |
||||
); |
||||
} |
||||
const { handleDiffChunk, releaseLastChunks } = outputDiffChunk<T>(emitter); |
||||
const prevDataBuffer: DataBuffer<T> = new Map(); |
||||
const nextDataBuffer: DataBuffer<T> = new Map(); |
||||
let currentPrevIndex = 0; |
||||
let currentNextIndex = 0; |
||||
|
||||
async function processPrevStreamChunk(chunk: T) { |
||||
const { isValid, message } = isDataValid( |
||||
chunk, |
||||
referenceProperty, |
||||
"prevList", |
||||
); |
||||
if (!isValid) { |
||||
emitter.emit(StreamEvent.Error, new Error(message)); |
||||
emitter.emit(StreamEvent.Finish); |
||||
return; |
||||
} |
||||
const ref = chunk[referenceProperty] as ReferenceProperty<T>; |
||||
const relatedChunk = nextDataBuffer.get(ref); |
||||
|
||||
if (relatedChunk) { |
||||
nextDataBuffer.delete(ref); |
||||
const isDataEqual = |
||||
JSON.stringify(chunk) === JSON.stringify(relatedChunk.data); |
||||
const indexDiff = (relatedChunk.index as number) - currentPrevIndex; |
||||
if (isDataEqual) { |
||||
handleDiffChunk( |
||||
{ |
||||
previousValue: chunk, |
||||
currentValue: relatedChunk.data, |
||||
prevIndex: currentPrevIndex, |
||||
newIndex: relatedChunk.index, |
||||
indexDiff, |
||||
status: |
||||
indexDiff === 0 |
||||
? LIST_STATUS.EQUAL |
||||
: options.considerMoveAsUpdate |
||||
? LIST_STATUS.UPDATED |
||||
: LIST_STATUS.MOVED, |
||||
}, |
||||
options, |
||||
); |
||||
} else { |
||||
handleDiffChunk( |
||||
{ |
||||
previousValue: chunk, |
||||
currentValue: relatedChunk.data, |
||||
prevIndex: currentPrevIndex, |
||||
newIndex: relatedChunk.index, |
||||
indexDiff, |
||||
status: LIST_STATUS.UPDATED, |
||||
}, |
||||
options, |
||||
); |
||||
} |
||||
} else { |
||||
prevDataBuffer.set(ref, { data: chunk, index: currentPrevIndex }); |
||||
} |
||||
currentPrevIndex++; |
||||
} |
||||
|
||||
async function processNextStreamChunk(chunk: T) { |
||||
const { isValid, message } = isDataValid( |
||||
chunk, |
||||
referenceProperty, |
||||
"nextList", |
||||
); |
||||
if (!isValid) { |
||||
emitter.emit(StreamEvent.Error, new Error(message)); |
||||
emitter.emit(StreamEvent.Finish); |
||||
return; |
||||
} |
||||
const ref = chunk[referenceProperty] as ReferenceProperty<T>; |
||||
const relatedChunk = prevDataBuffer.get(ref); |
||||
|
||||
if (relatedChunk) { |
||||
prevDataBuffer.delete(ref); |
||||
const isDataEqual = |
||||
JSON.stringify(chunk) === JSON.stringify(relatedChunk.data); |
||||
const indexDiff = currentNextIndex - (relatedChunk.index as number); |
||||
if (isDataEqual) { |
||||
handleDiffChunk( |
||||
{ |
||||
previousValue: relatedChunk.data, |
||||
currentValue: chunk, |
||||
prevIndex: relatedChunk.index, |
||||
newIndex: currentNextIndex, |
||||
indexDiff, |
||||
status: |
||||
indexDiff === 0 |
||||
? LIST_STATUS.EQUAL |
||||
: options.considerMoveAsUpdate |
||||
? LIST_STATUS.UPDATED |
||||
: LIST_STATUS.MOVED, |
||||
}, |
||||
options, |
||||
); |
||||
} else { |
||||
handleDiffChunk( |
||||
{ |
||||
previousValue: relatedChunk.data, |
||||
currentValue: chunk, |
||||
prevIndex: relatedChunk.index, |
||||
newIndex: currentNextIndex, |
||||
indexDiff, |
||||
status: LIST_STATUS.UPDATED, |
||||
}, |
||||
options, |
||||
); |
||||
} |
||||
} else { |
||||
nextDataBuffer.set(ref, { data: chunk, index: currentNextIndex }); |
||||
} |
||||
currentNextIndex++; |
||||
} |
||||
|
||||
const prevStreamReader = async () => { |
||||
for await (const chunk of prevStream) { |
||||
await processPrevStreamChunk(chunk); |
||||
} |
||||
}; |
||||
|
||||
const nextStreamReader = async () => { |
||||
for await (const chunk of nextStream) { |
||||
await processNextStreamChunk(chunk); |
||||
} |
||||
}; |
||||
await Promise.all([prevStreamReader(), nextStreamReader()]); |
||||
|
||||
for (const [key, chunk] of prevDataBuffer.entries()) { |
||||
handleDiffChunk( |
||||
{ |
||||
previousValue: chunk.data, |
||||
currentValue: null, |
||||
prevIndex: chunk.index, |
||||
newIndex: null, |
||||
indexDiff: null, |
||||
status: LIST_STATUS.DELETED, |
||||
}, |
||||
options, |
||||
); |
||||
prevDataBuffer.delete(key); |
||||
} |
||||
for (const [key, chunk] of nextDataBuffer.entries()) { |
||||
handleDiffChunk( |
||||
{ |
||||
previousValue: null, |
||||
currentValue: chunk.data, |
||||
prevIndex: null, |
||||
newIndex: chunk.index, |
||||
indexDiff: null, |
||||
status: LIST_STATUS.ADDED, |
||||
}, |
||||
options, |
||||
); |
||||
nextDataBuffer.delete(key); |
||||
} |
||||
releaseLastChunks(); |
||||
return emitter.emit(StreamEvent.Finish); |
||||
} |
||||
|
||||
function getValidStream<T>( |
||||
input: Readable | FilePath | T[], |
||||
listType: "prevList" | "nextList", |
||||
): Readable { |
||||
if (input instanceof Readable) { |
||||
return input; |
||||
} |
||||
|
||||
if (Array.isArray(input)) { |
||||
return Readable.from(input, { objectMode: true }); |
||||
} |
||||
|
||||
if (typeof input === "string") { |
||||
return createReadStream(input, { encoding: "utf8" }).pipe( |
||||
new Transform({ |
||||
objectMode: true, |
||||
transform(chunk, _, callback) { |
||||
try { |
||||
const data: T = JSON.parse(chunk.toString()); |
||||
if (Array.isArray(data)) { |
||||
for (let i = 0; i < data.length; i++) { |
||||
this.push(data[i]); |
||||
} |
||||
} else { |
||||
this.push(data); |
||||
} |
||||
callback(); |
||||
} catch (err) { |
||||
callback(err as Error); |
||||
} |
||||
}, |
||||
}), |
||||
); |
||||
} |
||||
|
||||
throw new Error(`Invalid ${listType}. Expected Readable, Array, or File.`); |
||||
} |
||||
|
||||
/** |
||||
* Streams the diff of two object lists |
||||
* @param {Readable | FilePath | Record<string, unknown>[]} prevList - The original object list. |
||||
* @param {Readable | FilePath | Record<string, unknown>[]} nextList - The new object list. |
||||
* @param {string} referenceProperty - A common property in all the objects of your lists (e.g. `id`) |
||||
* @param {ListStreamOptions} options - Options to refine your output. |
||||
- `chunksSize`: the number of object diffs returned by each streamed chunk. (e.g. `0` = 1 object diff by chunk, `10` = 10 object diffs by chunk). |
||||
- `showOnly`: returns only the values whose status you are interested in. (e.g. `["added", "equal"]`) |
||||
- `considerMoveAsUpdate`: if set to `true` a `moved` object will be considered as `updated` |
||||
* @returns StreamListener |
||||
*/ |
||||
export function streamListDiff<T extends Record<string, unknown>>( |
||||
prevStream: Readable | FilePath | T[], |
||||
nextStream: Readable | FilePath | T[], |
||||
referenceProperty: ReferenceProperty<T>, |
||||
options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS, |
||||
): StreamListener<T> { |
||||
const emitter = new EventEmitter<EmitterEvents<T>>(); |
||||
setTimeout(async () => { |
||||
try { |
||||
await getDiffChunks( |
||||
getValidStream(prevStream, "prevList"), |
||||
getValidStream(nextStream, "nextList"), |
||||
referenceProperty, |
||||
emitter, |
||||
options, |
||||
); |
||||
} catch (err) { |
||||
return emitter.emit(StreamEvent.Error, err as Error); |
||||
} |
||||
}, 0); |
||||
return emitter as StreamListener<T>; |
||||
} |
@ -0,0 +1,80 @@
@@ -0,0 +1,80 @@
|
||||
import { isObject } from "@lib/utils"; |
||||
import { |
||||
ListStreamOptions, |
||||
ReferenceProperty, |
||||
StreamListDiff, |
||||
} from "@models/stream"; |
||||
import { Emitter, StreamEvent } from "./emitter"; |
||||
|
||||
export function isValidChunkSize( |
||||
chunksSize: ListStreamOptions["chunksSize"], |
||||
): boolean { |
||||
if (!chunksSize) return true; |
||||
const sign = String(Math.sign(chunksSize)); |
||||
return sign !== "-1" && sign !== "NaN"; |
||||
} |
||||
|
||||
export function isDataValid<T extends Record<string, unknown>>( |
||||
data: T, |
||||
referenceProperty: ReferenceProperty<T>, |
||||
listType: "prevList" | "nextList", |
||||
): { isValid: boolean; message?: string } { |
||||
if (!isObject(data)) { |
||||
return { |
||||
isValid: false, |
||||
message: `Your ${listType} must only contain valid objects. Found '${data}'`, |
||||
}; |
||||
} |
||||
if (!Object.hasOwn(data, referenceProperty)) { |
||||
return { |
||||
isValid: false, |
||||
message: `The reference property '${String(referenceProperty)}' is not available in all the objects of your ${listType}.`, |
||||
}; |
||||
} |
||||
return { |
||||
isValid: true, |
||||
message: "", |
||||
}; |
||||
} |
||||
|
||||
export function outputDiffChunk<T extends Record<string, unknown>>( |
||||
emitter: Emitter<T>, |
||||
) { |
||||
let chunks: StreamListDiff<T>[] = []; |
||||
|
||||
function handleDiffChunk( |
||||
chunk: StreamListDiff<T>, |
||||
options: ListStreamOptions, |
||||
): void { |
||||
const showChunk = options?.showOnly |
||||
? options?.showOnly.includes(chunk.status) |
||||
: true; |
||||
if (!showChunk) { |
||||
return; |
||||
} |
||||
if ((options.chunksSize as number) > 0) { |
||||
chunks.push(chunk); |
||||
if (chunks.length >= (options.chunksSize as number)) { |
||||
const output = chunks; |
||||
chunks = []; |
||||
return emitter.emit(StreamEvent.Data, output); |
||||
} else { |
||||
return; |
||||
} |
||||
} |
||||
return emitter.emit(StreamEvent.Data, [chunk]); |
||||
} |
||||
|
||||
function releaseLastChunks() { |
||||
if (chunks.length > 0) { |
||||
const output = chunks; |
||||
chunks = []; |
||||
return emitter.emit(StreamEvent.Data, output); |
||||
} |
||||
} |
||||
|
||||
return { |
||||
handleDiffChunk, |
||||
releaseLastChunks, |
||||
}; |
||||
} |
@ -0,0 +1,6 @@
@@ -0,0 +1,6 @@
|
||||
[ |
||||
{ "id": 1, "name": "Item 1" }, |
||||
{ "id": 2, "name": "Item Two" }, |
||||
{ "id": 3, "name": "Item 3" }, |
||||
{ "id": 5, "name": "Item 5" } |
||||
] |
@ -0,0 +1,6 @@
@@ -0,0 +1,6 @@
|
||||
[ |
||||
{ "id": 1, "name": "Item 1" }, |
||||
{ "id": 2, "name": "Item 2" }, |
||||
{ "id": 3, "name": "Item 3" }, |
||||
{ "id": 4, "name": "Item 4" } |
||||
] |
@ -0,0 +1 @@
@@ -0,0 +1 @@
|
||||
export { streamListDiff } from "./lib/stream-list-diff/server"; |
@ -1,12 +1,31 @@
@@ -1,12 +1,31 @@
|
||||
import { defineConfig } from "tsup"; |
||||
import { defineConfig, Options } from "tsup"; |
||||
|
||||
export default defineConfig({ |
||||
entry: ["src/index.ts"], |
||||
format: ["cjs", "esm"], |
||||
const sharedConfig: Options = { |
||||
dts: true, |
||||
splitting: true, |
||||
clean: true, |
||||
treeshake: true, |
||||
shims: true, |
||||
minify: true, |
||||
}); |
||||
}; |
||||
|
||||
export default defineConfig([ |
||||
{ |
||||
entry: ["src/index.ts"], |
||||
format: ["cjs", "esm"], |
||||
...sharedConfig, |
||||
platform: "neutral", |
||||
}, |
||||
{ |
||||
entry: ["src/server.ts"], |
||||
format: ["cjs", "esm"], |
||||
...sharedConfig, |
||||
platform: "node", |
||||
}, |
||||
{ |
||||
entry: ["src/client.ts"], |
||||
format: ["cjs", "esm"], |
||||
...sharedConfig, |
||||
platform: "browser", |
||||
}, |
||||
]); |
||||
|
Loading…
Reference in new issue