diff --git a/README.md b/README.md index 98bfc6d..9e00358 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,8 @@ Papa Parse for Node Papa Parse can parse a [Readable Stream](https://nodejs.org/api/stream.html#stream_readable_streams) instead of a [File](https://www.w3.org/TR/FileAPI/) when used in Node.js environments (in addition to plain strings). In this mode, `encoding` must, if specified, be a Node-supported character encoding. The `Papa.LocalChunkSize`, `Papa.RemoteChunkSize` , `download`, `withCredentials` and `worker` config options are unavailable. +Papa Parse can also parse in a node streaming style which makes `.pipe` available. Simply pipe the [Readable Stream](https://nodejs.org/api/stream.html#stream_readable_streams) to the stream returned from `Papa.parse(Papa.NODE_STREAM_INPUT, options)`. The `Papa.LocalChunkSize`, `Papa.RemoteChunkSize` , `download`, `withCredentials`, `worker`, `step`, and `complete` config options are unavailable. To register a callback with the stream to process data, use the `data` event like so: `stream.on('data', callback)` and to signal the end of stream, use the 'end' event like so: `stream.on('end', callback)`. + Get Started ----------- diff --git a/papaparse.js b/papaparse.js index fa8611d..eb1a6ed 100755 --- a/papaparse.js +++ b/papaparse.js @@ -58,6 +58,7 @@ Papa.BAD_DELIMITERS = ['\r', '\n', '"', Papa.BYTE_ORDER_MARK]; Papa.WORKERS_SUPPORTED = !IS_WORKER && !!global.Worker; Papa.SCRIPT_PATH = null; // Must be set by your code if you use workers and this lib is loaded asynchronously + Papa.NODE_STREAM_INPUT = 1; // Configurable chunk sizes for local and remote files, respectively Papa.LocalChunkSize = 1024 * 1024 * 10; // 10 MB @@ -71,6 +72,7 @@ Papa.FileStreamer = FileStreamer; Papa.StringStreamer = StringStreamer; Papa.ReadableStreamStreamer = ReadableStreamStreamer; + Papa.DuplexStreamStreamer = DuplexStreamStreamer; if (global.jQuery) { @@ -228,7 +230,14 @@ } var streamer = null; - if (typeof _input === 'string') + if (_input === Papa.NODE_STREAM_INPUT) + { + // create a node Duplex stream for use + // with .pipe + streamer = new DuplexStreamStreamer(_config); + return streamer.getStream(); + } + else if (typeof _input === 'string') { if (_config.download) streamer = new NetworkStreamer(_config); @@ -839,6 +848,107 @@ ReadableStreamStreamer.prototype.constructor = ReadableStreamStreamer; + function DuplexStreamStreamer(_config) { + var Duplex = require('stream').Duplex; + var config = copy(_config); + var parseOnWrite = true; + var writeStreamHasFinished = false; + var parseCallbackQueue = []; + var stream = null; + + this._onCsvData = function(results) + { + var data = results.data; + for (var i = 0; i < data.length; i++) { + if (!stream.push(data[i]) && !this._handle.paused()) { + // the writeable consumer buffer has filled up + // so we need to pause until more items + // can be processed + this._handle.pause(); + } + } + }; + + this._onCsvComplete = function() + { + // node will finish the read stream when + // null is pushed + stream.push(null); + }; + + config.step = bindFunction(this._onCsvData, this); + config.complete = bindFunction(this._onCsvComplete, this); + ChunkStreamer.call(this, config); + + this._nextChunk = function() + { + if (writeStreamHasFinished && parseCallbackQueue.length === 1) { + this._finished = true; + } + if (parseCallbackQueue.length) { + parseCallbackQueue.shift()(); + } else { + parseOnWrite = true; + } + }; + + this._addToParseQueue = function(chunk, callback) + { + // add to queue so that we can indicate + // completion via callback + // node will automatically pause the incoming stream + // when too many items have been added without their + // callback being invoked + parseCallbackQueue.push(bindFunction(function() { + this.parseChunk(typeof chunk === 'string' ? chunk : chunk.toString(config.encoding)); + if (isFunction(callback)) { + return callback(); + } + }, this)); + if (parseOnWrite) { + parseOnWrite = false; + this._nextChunk(); + } + }; + + this._onRead = function() + { + if (this._handle.paused()) { + // the writeable consumer can handle more data + // so resume the chunk parsing + this._handle.resume(); + } + }; + + this._onWrite = function(chunk, encoding, callback) + { + this._addToParseQueue(chunk, callback); + }; + + this._onWriteComplete = function() + { + writeStreamHasFinished = true; + // have to write empty string + // so parser knows its done + this._addToParseQueue(''); + }; + + this.getStream = function() + { + return stream; + }; + stream = new Duplex({ + readableObjectMode: true, + decodeStrings: false, + read: bindFunction(this._onRead, this), + write: bindFunction(this._onWrite, this) + }); + stream.once('finish', bindFunction(this._onWriteComplete, this)); + } + DuplexStreamStreamer.prototype = Object.create(ChunkStreamer.prototype); + DuplexStreamStreamer.prototype.constructor = DuplexStreamStreamer; + + // Use one ParserHandle per entire CSV file or string function ParserHandle(_config) { diff --git a/tests/node-tests.js b/tests/node-tests.js index 44752ff..98acc9b 100644 --- a/tests/node-tests.js +++ b/tests/node-tests.js @@ -59,6 +59,36 @@ describe('PapaParse', function() { }); }); + it('piped streaming CSV should be correctly parsed', function(done) { + var data = []; + var readStream = fs.createReadStream(__dirname + '/long-sample.csv', 'utf8'); + var csvStream = readStream.pipe(Papa.parse(Papa.NODE_STREAM_INPUT)); + csvStream.on('data', function(item) { + data.push(item); + }); + csvStream.on('end', function() { + assert.deepEqual(data[0], [ + 'Grant', + 'Dyer', + 'Donec.elementum@orciluctuset.example', + '2013-11-23T02:30:31-08:00', + '2014-05-31T01:06:56-07:00', + 'Magna Ut Associates', + 'ljenkins' + ]); + assert.deepEqual(data[7], [ + 'Talon', + 'Salinas', + 'posuere.vulputate.lacus@Donecsollicitudin.example', + '2015-01-31T09:19:02-08:00', + '2014-12-17T04:59:18-08:00', + 'Aliquam Iaculis Incorporate', + 'Phasellus@Quisquetincidunt.example' + ]); + done(); + }); + }); + it('should support pausing and resuming on same tick when streaming', function(done) { var rows = []; Papa.parse(fs.createReadStream(__dirname + '/long-sample.csv', 'utf8'), {