Browse Source

Add support for node-style streaming (#494)

pull/509/head
Trevor Harwell 7 years ago committed by Sergi Almacellas Abellana
parent
commit
392408edcc
  1. 2
      README.md
  2. 112
      papaparse.js
  3. 30
      tests/node-tests.js

2
README.md

@ -49,6 +49,8 @@ Papa Parse for Node @@ -49,6 +49,8 @@ Papa Parse for Node
Papa Parse can parse a [Readable Stream](https://nodejs.org/api/stream.html#stream_readable_streams) instead of a [File](https://www.w3.org/TR/FileAPI/) when used in Node.js environments (in addition to plain strings). In this mode, `encoding` must, if specified, be a Node-supported character encoding. The `Papa.LocalChunkSize`, `Papa.RemoteChunkSize` , `download`, `withCredentials` and `worker` config options are unavailable.
Papa Parse can also parse in a node streaming style which makes `.pipe` available. Simply pipe the [Readable Stream](https://nodejs.org/api/stream.html#stream_readable_streams) to the stream returned from `Papa.parse(Papa.NODE_STREAM_INPUT, options)`. The `Papa.LocalChunkSize`, `Papa.RemoteChunkSize` , `download`, `withCredentials`, `worker`, `step`, and `complete` config options are unavailable. To register a callback with the stream to process data, use the `data` event like so: `stream.on('data', callback)` and to signal the end of stream, use the 'end' event like so: `stream.on('end', callback)`.
Get Started
-----------

112
papaparse.js

@ -58,6 +58,7 @@ @@ -58,6 +58,7 @@
Papa.BAD_DELIMITERS = ['\r', '\n', '"', Papa.BYTE_ORDER_MARK];
Papa.WORKERS_SUPPORTED = !IS_WORKER && !!global.Worker;
Papa.SCRIPT_PATH = null; // Must be set by your code if you use workers and this lib is loaded asynchronously
Papa.NODE_STREAM_INPUT = 1;
// Configurable chunk sizes for local and remote files, respectively
Papa.LocalChunkSize = 1024 * 1024 * 10; // 10 MB
@ -71,6 +72,7 @@ @@ -71,6 +72,7 @@
Papa.FileStreamer = FileStreamer;
Papa.StringStreamer = StringStreamer;
Papa.ReadableStreamStreamer = ReadableStreamStreamer;
Papa.DuplexStreamStreamer = DuplexStreamStreamer;
if (global.jQuery)
{
@ -228,7 +230,14 @@ @@ -228,7 +230,14 @@
}
var streamer = null;
if (typeof _input === 'string')
if (_input === Papa.NODE_STREAM_INPUT)
{
// create a node Duplex stream for use
// with .pipe
streamer = new DuplexStreamStreamer(_config);
return streamer.getStream();
}
else if (typeof _input === 'string')
{
if (_config.download)
streamer = new NetworkStreamer(_config);
@ -839,6 +848,107 @@ @@ -839,6 +848,107 @@
ReadableStreamStreamer.prototype.constructor = ReadableStreamStreamer;
function DuplexStreamStreamer(_config) {
var Duplex = require('stream').Duplex;
var config = copy(_config);
var parseOnWrite = true;
var writeStreamHasFinished = false;
var parseCallbackQueue = [];
var stream = null;
this._onCsvData = function(results)
{
var data = results.data;
for (var i = 0; i < data.length; i++) {
if (!stream.push(data[i]) && !this._handle.paused()) {
// the writeable consumer buffer has filled up
// so we need to pause until more items
// can be processed
this._handle.pause();
}
}
};
this._onCsvComplete = function()
{
// node will finish the read stream when
// null is pushed
stream.push(null);
};
config.step = bindFunction(this._onCsvData, this);
config.complete = bindFunction(this._onCsvComplete, this);
ChunkStreamer.call(this, config);
this._nextChunk = function()
{
if (writeStreamHasFinished && parseCallbackQueue.length === 1) {
this._finished = true;
}
if (parseCallbackQueue.length) {
parseCallbackQueue.shift()();
} else {
parseOnWrite = true;
}
};
this._addToParseQueue = function(chunk, callback)
{
// add to queue so that we can indicate
// completion via callback
// node will automatically pause the incoming stream
// when too many items have been added without their
// callback being invoked
parseCallbackQueue.push(bindFunction(function() {
this.parseChunk(typeof chunk === 'string' ? chunk : chunk.toString(config.encoding));
if (isFunction(callback)) {
return callback();
}
}, this));
if (parseOnWrite) {
parseOnWrite = false;
this._nextChunk();
}
};
this._onRead = function()
{
if (this._handle.paused()) {
// the writeable consumer can handle more data
// so resume the chunk parsing
this._handle.resume();
}
};
this._onWrite = function(chunk, encoding, callback)
{
this._addToParseQueue(chunk, callback);
};
this._onWriteComplete = function()
{
writeStreamHasFinished = true;
// have to write empty string
// so parser knows its done
this._addToParseQueue('');
};
this.getStream = function()
{
return stream;
};
stream = new Duplex({
readableObjectMode: true,
decodeStrings: false,
read: bindFunction(this._onRead, this),
write: bindFunction(this._onWrite, this)
});
stream.once('finish', bindFunction(this._onWriteComplete, this));
}
DuplexStreamStreamer.prototype = Object.create(ChunkStreamer.prototype);
DuplexStreamStreamer.prototype.constructor = DuplexStreamStreamer;
// Use one ParserHandle per entire CSV file or string
function ParserHandle(_config)
{

30
tests/node-tests.js

@ -59,6 +59,36 @@ describe('PapaParse', function() { @@ -59,6 +59,36 @@ describe('PapaParse', function() {
});
});
it('piped streaming CSV should be correctly parsed', function(done) {
var data = [];
var readStream = fs.createReadStream(__dirname + '/long-sample.csv', 'utf8');
var csvStream = readStream.pipe(Papa.parse(Papa.NODE_STREAM_INPUT));
csvStream.on('data', function(item) {
data.push(item);
});
csvStream.on('end', function() {
assert.deepEqual(data[0], [
'Grant',
'Dyer',
'Donec.elementum@orciluctuset.example',
'2013-11-23T02:30:31-08:00',
'2014-05-31T01:06:56-07:00',
'Magna Ut Associates',
'ljenkins'
]);
assert.deepEqual(data[7], [
'Talon',
'Salinas',
'posuere.vulputate.lacus@Donecsollicitudin.example',
'2015-01-31T09:19:02-08:00',
'2014-12-17T04:59:18-08:00',
'Aliquam Iaculis Incorporate',
'Phasellus@Quisquetincidunt.example'
]);
done();
});
});
it('should support pausing and resuming on same tick when streaming', function(done) {
var rows = [];
Papa.parse(fs.createReadStream(__dirname + '/long-sample.csv', 'utf8'), {

Loading…
Cancel
Save