Browse Source

chore: handle errors

pull/26/head
Antoine Lanoe 7 months ago
parent
commit
aaf5f3fa13
  1. 28
      src/lib/stream-list-diff/emitter.ts
  2. 118
      src/lib/stream-list-diff/index.ts

28
src/lib/stream-list-diff/emitter.ts

@ -1,3 +1,5 @@ @@ -1,3 +1,5 @@
import { StreamListsDiff } from "@models/stream";
type Listener<T extends unknown[]> = (...args: T) => void;
export enum StreamEvent {
@ -5,23 +7,27 @@ export enum StreamEvent { @@ -5,23 +7,27 @@ export enum StreamEvent {
Finish = "finish",
Error = "error",
}
export class EventEmitter {
export type Emitter<T extends Record<string, unknown>> = EventEmitter<{
data: [StreamListsDiff<T>[]];
error: [Error];
finish: [];
}>;
export class EventEmitter<Events extends Record<string, unknown[]>> {
private events: Record<string, Listener<unknown[]>[]> = {};
on<T extends unknown[]>(
event: `${StreamEvent}`,
listener: Listener<T>,
): this {
if (!this.events[event]) {
this.events[event] = [];
on<E extends keyof Events>(event: E, listener: Listener<Events[E]>): this {
if (!this.events[event as string]) {
this.events[event as string] = [];
}
this.events[event].push(listener as Listener<unknown[]>);
this.events[event as string].push(listener as Listener<unknown[]>);
return this;
}
emit<T extends unknown[]>(event: `${StreamEvent}`, ...args: T): void {
if (this.events[event]) {
this.events[event].forEach((listener) => listener(...args));
emit<E extends keyof Events>(event: E, ...args: Events[E]): void {
if (this.events[event as string]) {
this.events[event as string].forEach((listener) => listener(...args));
}
}
}

118
src/lib/stream-list-diff/index.ts

@ -6,11 +6,11 @@ import { @@ -6,11 +6,11 @@ import {
StreamReferences,
} from "@models/stream";
import { LIST_STATUS } from "@models/list";
import { isEqual } from "@lib/utils";
import { EventEmitter, StreamEvent } from "./emitter";
import { isEqual, isObject } from "@lib/utils";
import { Emitter, EventEmitter, StreamEvent } from "./emitter";
function outputDiffChunk<T extends Record<string, unknown>>(
emitter: EventEmitter,
emitter: Emitter<T>,
) {
let chunks: StreamListsDiff<T>[] = [];
@ -42,15 +42,27 @@ function formatSingleListStreamDiff<T extends Record<string, unknown>>( @@ -42,15 +42,27 @@ function formatSingleListStreamDiff<T extends Record<string, unknown>>(
isPrevious: boolean,
status: LIST_STATUS,
options: ListStreamOptions,
): StreamListsDiff<T>[] {
const diff: StreamListsDiff<T>[] = list.map((data, i) => ({
previousValue: isPrevious ? data : null,
currentValue: isPrevious ? null : data,
prevIndex: status === LIST_STATUS.ADDED ? null : i,
newIndex: status === LIST_STATUS.ADDED ? i : null,
indexDiff: null,
status,
}));
): StreamListsDiff<T>[] | null {
let isValid = true;
const diff: StreamListsDiff<T>[] = [];
for (let i = 0; i < list.length; i++) {
const data = list[i];
if (!isObject(data)) {
isValid = false;
break;
}
diff.push({
previousValue: isPrevious ? data : null,
currentValue: isPrevious ? null : data,
prevIndex: status === LIST_STATUS.ADDED ? null : i,
newIndex: status === LIST_STATUS.ADDED ? i : null,
indexDiff: null,
status,
});
}
if (!isValid) {
return null;
}
if (options.showOnly && options.showOnly.length > 0) {
return diff.filter((value) => options.showOnly?.includes(value.status));
}
@ -65,17 +77,46 @@ function isValidChunkSize( @@ -65,17 +77,46 @@ function isValidChunkSize(
return x !== "-1" && x !== "NaN";
}
function isDataValid<T extends Record<string, unknown>>(
data: T,
referenceProperty: ReferenceProperty<T>,
emitter: Emitter<T>,
listType: "prevList" | "nextList",
): boolean {
if (!isObject(data)) {
emitter.emit(
StreamEvent.Error,
new Error(
`Your ${listType} must only contain valid objects. Found ${data}`,
),
);
return false;
}
if (!Object.hasOwn(data, referenceProperty)) {
emitter.emit(
StreamEvent.Error,
new Error(
`The reference property ${String(referenceProperty)} is not available in all the objects of your ${listType}.`,
),
);
return false;
}
return true;
}
function getDiffChunks<T extends Record<string, unknown>>(
prevList: T[],
nextList: T[],
referenceProperty: ReferenceProperty<T>,
emitter: EventEmitter,
emitter: Emitter<T>,
options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS,
) {
if (!isValidChunkSize(options?.chunksSize)) {
return emitter.emit(
StreamEvent.Error,
`The chunk size can't be negative. You entered the value ${options.chunksSize}`,
new Error(
`The chunk size can't be negative. You entered the value ${options.chunksSize}`,
),
);
}
if (!prevList && !nextList) {
@ -88,7 +129,14 @@ function getDiffChunks<T extends Record<string, unknown>>( @@ -88,7 +129,14 @@ function getDiffChunks<T extends Record<string, unknown>>(
LIST_STATUS.ADDED,
options,
);
return nextDiff.forEach((data, i) =>
if (!nextDiff) {
emitter.emit(
StreamEvent.Error,
new Error("Your nextList must only contain valid objects."),
);
emitter.emit(StreamEvent.Finish);
}
return nextDiff?.forEach((data, i) =>
handleDiffChunk(data, i === nextDiff.length - 1, options),
);
}
@ -99,23 +147,42 @@ function getDiffChunks<T extends Record<string, unknown>>( @@ -99,23 +147,42 @@ function getDiffChunks<T extends Record<string, unknown>>(
LIST_STATUS.DELETED,
options,
);
return prevDiff.forEach((data, i) =>
if (!prevDiff) {
emitter.emit(
StreamEvent.Error,
new Error("Your prevList must only contain valid objects."),
);
emitter.emit(StreamEvent.Finish);
}
return prevDiff?.forEach((data, i) =>
handleDiffChunk(data, i === prevDiff.length - 1, options),
);
}
const listsReferences: StreamReferences<T> = new Map();
const handleDiffChunk = outputDiffChunk<T>(emitter);
prevList.forEach((data, i) => {
for (let i = 0; i < prevList.length; i++) {
const data = prevList[i];
if (data) {
const isValid = isDataValid(data, referenceProperty, emitter, "prevList");
if (!isValid) {
emitter.emit(StreamEvent.Finish);
break;
}
listsReferences.set(String(data[referenceProperty]), {
prevIndex: i,
nextIndex: undefined,
});
}
});
}
nextList.forEach((data, i) => {
for (let i = 0; i < nextList.length; i++) {
const data = prevList[i];
if (data) {
const isValid = isDataValid(data, referenceProperty, emitter, "nextList");
if (!isValid) {
emitter.emit(StreamEvent.Finish);
break;
}
const listReference = listsReferences.get(
String(data[referenceProperty]),
);
@ -136,7 +203,8 @@ function getDiffChunks<T extends Record<string, unknown>>( @@ -136,7 +203,8 @@ function getDiffChunks<T extends Record<string, unknown>>(
);
}
}
});
}
let streamedChunks = 0;
const totalChunks = listsReferences.size;
for (const data of listsReferences.values()) {
@ -225,13 +293,17 @@ export function streamListsDiff<T extends Record<string, unknown>>( @@ -225,13 +293,17 @@ export function streamListsDiff<T extends Record<string, unknown>>(
nextList: T[],
referenceProperty: ReferenceProperty<T>,
options: ListStreamOptions = DEFAULT_LIST_STREAM_OPTIONS,
): EventEmitter {
const emitter = new EventEmitter();
): Emitter<T> {
const emitter = new EventEmitter<{
data: [StreamListsDiff<T>[]];
error: [Error];
finish: [];
}>();
setTimeout(() => {
try {
getDiffChunks(prevList, nextList, referenceProperty, emitter, options);
} catch (err) {
return emitter.emit(StreamEvent.Error, err);
return emitter.emit(StreamEvent.Error, err as Error);
}
}, 0);
return emitter;

Loading…
Cancel
Save