Browse Source

Added sendWithStream method in MessageHandler.

Adds functionality to accept Queueing Strategy in
sendWithStream method. Using Queueing Strategy we
can control the data that is enqueued into the sink,
and hence regulated the flow of chunks from worker
to main thread.

Adds capability in pull and cancel methods.
Adds ready and desiredSize property in streamSink.

Adds unit test for ReadableStream and sendWithStream.
Mukul Mishra 8 years ago
parent
commit
bbd9968f76
  1. 3
      external/streams/streams-lib.js
  2. 275
      src/shared/util.js
  3. 3
      test/unit/clitests.json
  4. 18
      test/unit/jasmine-boot.js
  5. 16
      test/unit/util_spec.js
  6. 378
      test/unit/util_stream_spec.js

3
external/streams/streams-lib.js vendored

@ -1960,7 +1960,8 @@ function ReadableStreamClose(stream) { @@ -1960,7 +1960,8 @@ function ReadableStreamClose(stream) {
if (IsReadableStreamDefaultReader(reader) === true) {
for (var i = 0; i < reader._readRequests.length; i++) {
var _resolve = reader._readRequests[i];
var _resolve = reader._readRequests[i]._resolve;
_resolve(CreateIterResultObject(undefined, true));
}
reader._readRequests = [];

275
src/shared/util.js

@ -1214,24 +1214,50 @@ var createObjectURL = (function createObjectURLClosure() { @@ -1214,24 +1214,50 @@ var createObjectURL = (function createObjectURLClosure() {
};
})();
function resolveCall(fn, args, thisArg = null) {
if (!fn) {
return Promise.resolve(undefined);
}
return new Promise((resolve, reject) => {
resolve(fn.apply(thisArg, args));
});
}
function resolveOrReject(capability, success, reason) {
if (success) {
capability.resolve();
} else {
capability.reject(reason);
}
}
function finalize(promise) {
return Promise.resolve(promise).catch(() => {});
}
function MessageHandler(sourceName, targetName, comObj) {
this.sourceName = sourceName;
this.targetName = targetName;
this.comObj = comObj;
this.callbackIndex = 1;
this.callbackId = 1;
this.streamId = 1;
this.postMessageTransfers = true;
var callbacksCapabilities = this.callbacksCapabilities = Object.create(null);
var ah = this.actionHandler = Object.create(null);
this.streamSinks = Object.create(null);
this.streamControllers = Object.create(null);
let callbacksCapabilities = this.callbacksCapabilities = Object.create(null);
let ah = this.actionHandler = Object.create(null);
this._onComObjOnMessage = (event) => {
var data = event.data;
let data = event.data;
if (data.targetName !== this.sourceName) {
return;
}
if (data.isReply) {
var callbackId = data.callbackId;
if (data.stream) {
this._processStreamMessage(data);
} else if (data.isReply) {
let callbackId = data.callbackId;
if (data.callbackId in callbacksCapabilities) {
var callback = callbacksCapabilities[callbackId];
let callback = callbacksCapabilities[callbackId];
delete callbacksCapabilities[callbackId];
if ('error' in data) {
callback.reject(data.error);
@ -1242,13 +1268,13 @@ function MessageHandler(sourceName, targetName, comObj) { @@ -1242,13 +1268,13 @@ function MessageHandler(sourceName, targetName, comObj) {
error('Cannot resolve callback ' + callbackId);
}
} else if (data.action in ah) {
var action = ah[data.action];
let action = ah[data.action];
if (data.callbackId) {
var sourceName = this.sourceName;
var targetName = data.sourceName;
let sourceName = this.sourceName;
let targetName = data.sourceName;
Promise.resolve().then(function () {
return action[0].call(action[1], data.data);
}).then(function (result) {
}).then((result) => {
comObj.postMessage({
sourceName,
targetName,
@ -1256,7 +1282,7 @@ function MessageHandler(sourceName, targetName, comObj) { @@ -1256,7 +1282,7 @@ function MessageHandler(sourceName, targetName, comObj) {
callbackId: data.callbackId,
data: result,
});
}, function (reason) {
}, (reason) => {
if (reason instanceof Error) {
// Serialize error to avoid "DataCloneError"
reason = reason + '';
@ -1269,6 +1295,8 @@ function MessageHandler(sourceName, targetName, comObj) { @@ -1269,6 +1295,8 @@ function MessageHandler(sourceName, targetName, comObj) {
error: reason,
});
});
} else if (data.streamId) {
this._createStreamSink(data);
} else {
action[0].call(action[1], data.data);
}
@ -1289,9 +1317,9 @@ MessageHandler.prototype = { @@ -1289,9 +1317,9 @@ MessageHandler.prototype = {
},
/**
* Sends a message to the comObj to invoke the action with the supplied data.
* @param {String} actionName Action to call.
* @param {JSON} data JSON data to send.
* @param {Array} [transfers] Optional list of transfers/ArrayBuffers
* @param {String} actionName - Action to call.
* @param {JSON} data - JSON data to send.
* @param {Array} [transfers] - Optional list of transfers/ArrayBuffers
*/
send(actionName, data, transfers) {
var message = {
@ -1304,14 +1332,14 @@ MessageHandler.prototype = { @@ -1304,14 +1332,14 @@ MessageHandler.prototype = {
},
/**
* Sends a message to the comObj to invoke the action with the supplied data.
* Expects that other side will callback with the response.
* @param {String} actionName Action to call.
* @param {JSON} data JSON data to send.
* @param {Array} [transfers] Optional list of transfers/ArrayBuffers.
* Expects that the other side will callback with the response.
* @param {String} actionName - Action to call.
* @param {JSON} data - JSON data to send.
* @param {Array} [transfers] - Optional list of transfers/ArrayBuffers.
* @returns {Promise} Promise to be resolved with response data.
*/
sendWithPromise(actionName, data, transfers) {
var callbackId = this.callbackIndex++;
var callbackId = this.callbackId++;
var message = {
sourceName: this.sourceName,
targetName: this.targetName,
@ -1328,10 +1356,215 @@ MessageHandler.prototype = { @@ -1328,10 +1356,215 @@ MessageHandler.prototype = {
}
return capability.promise;
},
/**
* Sends a message to the comObj to invoke the action with the supplied data.
* Expect that the other side will callback to signal 'start_complete'.
* @param {String} actionName - Action to call.
* @param {JSON} data - JSON data to send.
* @param {Object} queueingStrategy - strategy to signal backpressure based on
* internal queue.
* @param {Array} [transfers] - Optional list of transfers/ArrayBuffers.
* @return {ReadableStream} ReadableStream to read data in chunks.
*/
sendWithStream(actionName, data, queueingStrategy, transfers) {
let streamId = this.streamId++;
let sourceName = this.sourceName;
let targetName = this.targetName;
return new ReadableStream({
start: (controller) => {
let startCapability = createPromiseCapability();
this.streamControllers[streamId] = {
controller,
startCall: startCapability,
};
this.postMessage({
sourceName,
targetName,
action: actionName,
streamId,
data,
desiredSize: controller.desiredSize,
});
// Return Promise for Async process, to signal success/failure.
return startCapability.promise;
},
pull: (controller) => {
let pullCapability = createPromiseCapability();
this.streamControllers[streamId].pullCall = pullCapability;
this.postMessage({
sourceName,
targetName,
stream: 'pull',
streamId,
desiredSize: controller.desiredSize,
});
// Returning Promise will not call "pull"
// again until current pull is resolved.
return pullCapability.promise;
},
cancel: (reason) => {
let cancelCapability = createPromiseCapability();
this.streamControllers[streamId].cancelCall = cancelCapability;
this.postMessage({
sourceName,
targetName,
stream: 'cancel',
reason,
streamId,
});
// Return Promise to signal success or failure.
return cancelCapability.promise;
},
}, queueingStrategy);
},
_createStreamSink(data) {
let self = this;
let action = this.actionHandler[data.action];
let streamId = data.streamId;
let desiredSize = data.desiredSize;
let sourceName = this.sourceName;
let targetName = data.sourceName;
let capability = createPromiseCapability();
let sendStreamRequest = ({ stream, chunk, success, reason, }) => {
this.comObj.postMessage({ sourceName, targetName, stream, streamId,
chunk, success, reason, });
};
let streamSink = {
enqueue(chunk, size = 1) {
let lastDesiredSize = this.desiredSize;
this.desiredSize -= size;
// Enqueue decreases the desiredSize property of sink,
// so when it changes from positive to negative,
// set ready as unresolved promise.
if (lastDesiredSize > 0 && this.desiredSize <= 0) {
this.sinkCapability = createPromiseCapability();
this.ready = this.sinkCapability.promise;
}
sendStreamRequest({ stream: 'enqueue', chunk, });
},
close() {
sendStreamRequest({ stream: 'close', });
delete self.streamSinks[streamId];
},
error(reason) {
sendStreamRequest({ stream: 'error', reason, });
},
sinkCapability: capability,
onPull: null,
onCancel: null,
desiredSize,
ready: null,
};
streamSink.sinkCapability.resolve();
streamSink.ready = streamSink.sinkCapability.promise;
this.streamSinks[streamId] = streamSink;
resolveCall(action[0], [data.data, streamSink], action[1]).then(() => {
sendStreamRequest({ stream: 'start_complete', success: true, });
}, (reason) => {
sendStreamRequest({ stream: 'start_complete', success: false, reason, });
});
},
_processStreamMessage(data) {
let sourceName = this.sourceName;
let targetName = data.sourceName;
let streamId = data.streamId;
let sendStreamResponse = ({ stream, success, reason, }) => {
this.comObj.postMessage({ sourceName, targetName, stream,
success, streamId, reason, });
};
let deleteStreamController = () => {
// Delete streamController only when start, pull and
// cancel callbacks are resolved, to avoid "TypeError".
Promise.all([
this.streamControllers[data.streamId].startCall,
this.streamControllers[data.streamId].pullCall,
this.streamControllers[data.streamId].cancelCall
].map(function(capability) {
return capability && finalize(capability.promise);
})).then(() => {
delete this.streamControllers[data.streamId];
});
};
switch (data.stream) {
case 'start_complete':
resolveOrReject(this.streamControllers[data.streamId].startCall,
data.success, data.reason);
break;
case 'pull_complete':
resolveOrReject(this.streamControllers[data.streamId].pullCall,
data.success, data.reason);
break;
case 'pull':
// Ignore any pull after close is called.
if (!this.streamSinks[data.streamId]) {
sendStreamResponse({ stream: 'pull_complete', success: true, });
break;
}
// Pull increases the desiredSize property of sink,
// so when it changes from negative to positive,
// set ready property as resolved promise.
if (this.streamSinks[data.streamId].desiredSize <= 0 &&
data.desiredSize > 0) {
this.streamSinks[data.streamId].sinkCapability.resolve();
}
// Reset desiredSize property of sink on every pull.
this.streamSinks[data.streamId].desiredSize = data.desiredSize;
resolveCall(this.streamSinks[data.streamId].onPull).then(() => {
sendStreamResponse({ stream: 'pull_complete', success: true, });
}, (reason) => {
sendStreamResponse({ stream: 'pull_complete',
success: false, reason, });
});
break;
case 'enqueue':
this.streamControllers[data.streamId].controller.enqueue(data.chunk);
break;
case 'close':
this.streamControllers[data.streamId].controller.close();
deleteStreamController();
break;
case 'error':
this.streamControllers[data.streamId].controller.error(data.reason);
deleteStreamController();
break;
case 'cancel_complete':
resolveOrReject(this.streamControllers[data.streamId].cancelCall,
data.success, data.reason);
deleteStreamController();
break;
case 'cancel':
resolveCall(this.streamSinks[data.streamId].onCancel,
[data.reason]).then(() => {
sendStreamResponse({ stream: 'cancel_complete', success: true, });
}, (reason) => {
sendStreamResponse({ stream: 'cancel_complete',
success: false, reason, });
});
delete this.streamSinks[data.streamId];
break;
default:
throw new Error('Unexpected stream case');
}
},
/**
* Sends raw message to the comObj.
* @private
* @param message {Object} Raw message.
* @param {Object} message - Raw message.
* @param transfers List of transfers/ArrayBuffers, or undefined.
*/
postMessage(message, transfers) {

3
test/unit/clitests.json

@ -19,6 +19,7 @@ @@ -19,6 +19,7 @@
"type1_parser_spec.js",
"ui_utils_spec.js",
"unicode_spec.js",
"util_spec.js"
"util_spec.js",
"util_stream_spec.js"
]
}

18
test/unit/jasmine-boot.js

@ -45,15 +45,15 @@ function initializePDFJS(callback) { @@ -45,15 +45,15 @@ function initializePDFJS(callback) {
'pdfjs/display/global', 'pdfjs-test/unit/annotation_spec',
'pdfjs-test/unit/api_spec', 'pdfjs-test/unit/bidi_spec',
'pdfjs-test/unit/cff_parser_spec', 'pdfjs-test/unit/cmap_spec',
'pdfjs-test/unit/crypto_spec', 'pdfjs-test/unit/document_spec',
'pdfjs-test/unit/dom_utils_spec', 'pdfjs-test/unit/evaluator_spec',
'pdfjs-test/unit/fonts_spec', 'pdfjs-test/unit/function_spec',
'pdfjs-test/unit/metadata_spec', 'pdfjs-test/unit/murmurhash3_spec',
'pdfjs-test/unit/network_spec', 'pdfjs-test/unit/parser_spec',
'pdfjs-test/unit/primitives_spec', 'pdfjs-test/unit/stream_spec',
'pdfjs-test/unit/type1_parser_spec', 'pdfjs-test/unit/ui_utils_spec',
'pdfjs-test/unit/unicode_spec', 'pdfjs-test/unit/util_spec',
'pdfjs-test/unit/custom_spec'
'pdfjs-test/unit/crypto_spec', 'pdfjs-test/unit/custom_spec',
'pdfjs-test/unit/document_spec', 'pdfjs-test/unit/dom_utils_spec',
'pdfjs-test/unit/evaluator_spec', 'pdfjs-test/unit/fonts_spec',
'pdfjs-test/unit/function_spec', 'pdfjs-test/unit/metadata_spec',
'pdfjs-test/unit/murmurhash3_spec', 'pdfjs-test/unit/network_spec',
'pdfjs-test/unit/parser_spec', 'pdfjs-test/unit/primitives_spec',
'pdfjs-test/unit/stream_spec', 'pdfjs-test/unit/type1_parser_spec',
'pdfjs-test/unit/ui_utils_spec', 'pdfjs-test/unit/unicode_spec',
'pdfjs-test/unit/util_spec', 'pdfjs-test/unit/util_stream_spec'
].map(function (moduleName) {
return SystemJS.import(moduleName);
})).then(function (modules) {

16
test/unit/util_spec.js

@ -20,46 +20,46 @@ import { @@ -20,46 +20,46 @@ import {
describe('util', function() {
describe('stringToPDFString', function() {
it('handles ISO Latin 1 strings', function() {
var str = '\x8Dstring\x8E';
let str = '\x8Dstring\x8E';
expect(stringToPDFString(str)).toEqual('\u201Cstring\u201D');
});
it('handles UTF-16BE strings', function() {
var str = '\xFE\xFF\x00\x73\x00\x74\x00\x72\x00\x69\x00\x6E\x00\x67';
let str = '\xFE\xFF\x00\x73\x00\x74\x00\x72\x00\x69\x00\x6E\x00\x67';
expect(stringToPDFString(str)).toEqual('string');
});
it('handles empty strings', function() {
// ISO Latin 1
var str1 = '';
let str1 = '';
expect(stringToPDFString(str1)).toEqual('');
// UTF-16BE
var str2 = '\xFE\xFF';
let str2 = '\xFE\xFF';
expect(stringToPDFString(str2)).toEqual('');
});
});
describe('removeNullCharacters', function() {
it('should not modify string without null characters', function() {
var str = 'string without null chars';
let str = 'string without null chars';
expect(removeNullCharacters(str)).toEqual('string without null chars');
});
it('should modify string with null characters', function() {
var str = 'string\x00With\x00Null\x00Chars';
let str = 'string\x00With\x00Null\x00Chars';
expect(removeNullCharacters(str)).toEqual('stringWithNullChars');
});
});
describe('ReadableStream', function() {
it('should return an Object', function () {
var readable = new ReadableStream();
let readable = new ReadableStream();
expect(typeof readable).toEqual('object');
});
it('should have property getReader', function () {
var readable = new ReadableStream();
let readable = new ReadableStream();
expect(typeof readable.getReader).toEqual('function');
});
});

378
test/unit/util_stream_spec.js

@ -0,0 +1,378 @@ @@ -0,0 +1,378 @@
/* Copyright 2017 Mozilla Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { createPromiseCapability, MessageHandler } from '../../src/shared/util';
describe('util_stream', function () {
// Temporary fake port for sending messages between main and worker.
class FakePort {
constructor() {
this._listeners = [];
this._deferred = Promise.resolve(undefined);
}
postMessage(obj) {
let event = { data: obj, };
this._deferred.then(() => {
this._listeners.forEach(function (listener) {
listener.call(this, event);
}, this);
});
}
addEventListener(name, listener) {
this._listeners.push(listener);
}
removeEventListener(name, listener) {
let i = this._listeners.indexOf(listener);
this._listeners.splice(i, 1);
}
terminate() {
this._listeners = [];
}
}
// Sleep function to wait for sometime, similar to setTimeout but faster.
function sleep(ticks) {
return Promise.resolve().then(() => {
return (ticks && sleep(ticks - 1));
});
}
describe('sendWithStream', function () {
it('should return a ReadableStream', function () {
let port = new FakePort();
let messageHandler1 = new MessageHandler('main', 'worker', port);
let readable = messageHandler1.sendWithStream('fakeHandler');
// Check if readable is an instance of ReadableStream.
expect(typeof readable).toEqual('object');
expect(typeof readable.getReader).toEqual('function');
});
it('should read using a reader', function (done) {
let log = '';
let port = new FakePort();
let messageHandler1 = new MessageHandler('main', 'worker', port);
let messageHandler2 = new MessageHandler('worker', 'main', port);
messageHandler2.on('fakeHandler', (data, sink) => {
sink.onPull = function () {
log += 'p';
};
sink.onCancel = function (reason) {
log += 'c';
};
sink.ready.then(() => {
sink.enqueue('hi');
return sink.ready;
}).then(() => {
sink.close();
});
return sleep(5);
});
let readable = messageHandler1.sendWithStream('fakeHandler', {}, {
highWaterMark: 1,
size() {
return 1;
},
});
let reader = readable.getReader();
sleep(10).then(() => {
expect(log).toEqual('');
return reader.read();
}).then((result) => {
expect(log).toEqual('p');
expect(result.value).toEqual('hi');
expect(result.done).toEqual(false);
return sleep(10);
}).then(() => {
return reader.read();
}).then((result) => {
expect(result.value).toEqual(undefined);
expect(result.done).toEqual(true);
done();
});
});
it('should not read any data when cancelled', function (done) {
let log = '';
let port = new FakePort();
let messageHandler2 = new MessageHandler('worker', 'main', port);
messageHandler2.on('fakeHandler', (data, sink) => {
sink.onPull = function () {
log += 'p';
};
sink.onCancel = function (reason) {
log += 'c';
};
log += '0';
sink.ready.then(() => {
log += '1';
sink.enqueue([1, 2, 3, 4], 4);
return sink.ready;
}).then(() => {
log += '2';
sink.enqueue([5, 6, 7, 8], 4);
return sink.ready;
}).then(() => {
log += '3';
sink.close();
}, () => {
log += '4';
});
});
let messageHandler1 = new MessageHandler('main', 'worker', port);
let readable = messageHandler1.sendWithStream('fakeHandler', {}, {
highWaterMark: 4,
size(arr) {
return arr.length;
},
});
let reader = readable.getReader();
sleep(10).then(() => {
expect(log).toEqual('01');
return reader.read();
}).then((result) => {
expect(result.value).toEqual([1, 2, 3, 4]);
expect(result.done).toEqual(false);
return sleep(10);
}).then(() => {
expect(log).toEqual('01p2');
return reader.cancel();
}).then(() => {
expect(log).toEqual('01p2c');
done();
});
});
it('should not read when errored', function(done) {
let log = '';
let port = new FakePort();
let messageHandler2 = new MessageHandler('worker', 'main', port);
messageHandler2.on('fakeHandler', (data, sink) => {
sink.onPull = function () {
log += 'p';
};
sink.onCancel = function (reason) {
log += 'c';
};
sink.ready.then(() => {
sink.enqueue([1, 2, 3, 4], 4);
return sink.ready;
}).then(() => {
log += 'error';
sink.error('error');
});
});
let messageHandler1 = new MessageHandler('main', 'worker', port);
let readable = messageHandler1.sendWithStream('fakeHandler', {}, {
highWaterMark: 4,
size(arr) {
return arr.length;
},
});
let reader = readable.getReader();
sleep(10).then(() => {
expect(log).toEqual('');
return reader.read();
}).then((result) => {
expect(result.value).toEqual([1, 2, 3, 4]);
expect(result.done).toEqual(false);
return reader.read();
}).then(() => {
}, (reason) => {
expect(reason).toEqual('error');
done();
});
});
it('should read data with blocking promise', function (done) {
let log = '';
let port = new FakePort();
let messageHandler2 = new MessageHandler('worker', 'main', port);
messageHandler2.on('fakeHandler', (data, sink) => {
sink.onPull = function () {
log += 'p';
};
sink.onCancel = function (reason) {
log += 'c';
};
log += '0';
sink.ready.then(() => {
log += '1';
sink.enqueue([1, 2, 3, 4], 4);
return sink.ready;
}).then(() => {
log += '2';
sink.enqueue([5, 6, 7, 8], 4);
return sink.ready;
}).then(() => {
sink.close();
});
});
let messageHandler1 = new MessageHandler('main', 'worker', port);
let readable = messageHandler1.sendWithStream('fakeHandler', {}, {
highWaterMark: 4,
size(arr) {
return arr.length;
},
});
let reader = readable.getReader();
// Sleep for 10ms, so that read() is not unblocking the ready promise.
// Chain all read() to stream in sequence.
sleep(10).then(() => {
expect(log).toEqual('01');
return reader.read();
}).then((result) => {
expect(result.value).toEqual([1, 2, 3, 4]);
expect(result.done).toEqual(false);
return sleep(10);
}).then(() => {
expect(log).toEqual('01p2');
return reader.read();
}).then((result) => {
expect(result.value).toEqual([5, 6, 7, 8]);
expect(result.done).toEqual(false);
return sleep(10);
}).then(() => {
expect(log).toEqual('01p2p');
return reader.read();
}).then((result) => {
expect(result.value).toEqual(undefined);
expect(result.done).toEqual(true);
done();
});
});
it('should read data with blocking promise and buffer whole data' +
' into stream', function (done) {
let log = '';
let port = new FakePort();
let messageHandler2 = new MessageHandler('worker', 'main', port);
messageHandler2.on('fakeHandler', (data, sink) => {
sink.onPull = function () {
log += 'p';
};
sink.onCancel = function (reason) {
log += 'c';
};
log += '0';
sink.ready.then(() => {
log += '1';
sink.enqueue([1, 2, 3, 4], 4);
return sink.ready;
}).then(() => {
log += '2';
sink.enqueue([5, 6, 7, 8], 4);
return sink.ready;
}).then(() => {
sink.close();
});
return sleep(10);
});
let messageHandler1 = new MessageHandler('main', 'worker', port);
let readable = messageHandler1.sendWithStream('fakeHandler', {}, {
highWaterMark: 8,
size(arr) {
return arr.length;
},
});
let reader = readable.getReader();
sleep(10).then(() => {
expect(log).toEqual('012');
return reader.read();
}).then((result) => {
expect(result.value).toEqual([1, 2, 3, 4]);
expect(result.done).toEqual(false);
return sleep(10);
}).then(() => {
expect(log).toEqual('012p');
return reader.read();
}).then((result) => {
expect(result.value).toEqual([5, 6, 7, 8]);
expect(result.done).toEqual(false);
return sleep(10);
}).then(() => {
expect(log).toEqual('012p');
return reader.read();
}).then((result) => {
expect(result.value).toEqual(undefined);
expect(result.done).toEqual(true);
done();
});
});
it('should ignore any pull after close is called', function (done) {
let log = '';
let port = new FakePort();
let capability = createPromiseCapability();
let messageHandler2 = new MessageHandler('worker', 'main', port);
messageHandler2.on('fakeHandler', (data, sink) => {
sink.onPull = function () {
log += 'p';
};
sink.onCancel = function (reason) {
log += 'c';
};
log += '0';
sink.ready.then(() => {
log += '1';
sink.enqueue([1, 2, 3, 4], 4);
});
return capability.promise.then(() => {
sink.close();
});
});
let messageHandler1 = new MessageHandler('main', 'worker', port);
let readable = messageHandler1.sendWithStream('fakeHandler', {}, {
highWaterMark: 10,
size(arr) {
return arr.length;
},
});
let reader = readable.getReader();
sleep(10).then(() => {
expect(log).toEqual('01');
capability.resolve();
return capability.promise.then(() => {
return reader.read();
});
}).then((result) => {
expect(result.value).toEqual([1, 2, 3, 4]);
expect(result.done).toEqual(false);
return sleep(10);
}).then(() => {
expect(log).toEqual('01');
return reader.read();
}).then((result) => {
expect(result.value).toEqual(undefined);
expect(result.done).toEqual(true);
done();
});
});
});
});
Loading…
Cancel
Save