From 03eaf76ed5b4fcdfb5b1b20310e5ceeefdeb2ebc Mon Sep 17 00:00:00 2001 From: Martin Blom Date: Thu, 9 Mar 2017 11:30:25 +0100 Subject: [PATCH] Add support for Node Streams (#370) * Node Readable Stream support. * Updated README with info regarding the Node support. * Added a simple test for Node Readable Stream parsing. --- README.md | 4 ++- papaparse.js | 76 +++++++++++++++++++++++++++++++++++++++++++++ tests/node-tests.js | 9 ++++++ 3 files changed, 88 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 12752b2..7f5091a 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,9 @@ To learn how to use Papa Parse: Papa Parse for Node -------------------- -[Rich Harris](https://github.com/Rich-Harris) forked this project to make **[Baby Parse](https://github.com/Rich-Harris/BabyParse)** which runs in Node.js environments. +Papa Parse can parse a [Readable Stream](https://nodejs.org/api/stream.html#stream_readable_streams) instead of a [File](https://www.w3.org/TR/FileAPI/) when used in Node.js environments (in addition to plain strings). In this mode, `encoding` must, if specified, be a Node-supported character encoding. The `Papa.LocalChunkSize`, `Papa.RemoteChunkSize` , `download`, `withCredentials` and `worker` config options are unavailable. + +Additionally, [Rich Harris](https://github.com/Rich-Harris) forked this project to make **[Baby Parse](https://github.com/Rich-Harris/BabyParse)** which runs in Node.js environments. ```bash $ npm install babyparse diff --git a/papaparse.js b/papaparse.js index ba228b1..4a80f50 100644 --- a/papaparse.js +++ b/papaparse.js @@ -68,6 +68,7 @@ Papa.NetworkStreamer = NetworkStreamer; Papa.FileStreamer = FileStreamer; Papa.StringStreamer = StringStreamer; + Papa.ReadableStreamStreamer = ReadableStreamStreamer; if (global.jQuery) { @@ -226,6 +227,10 @@ else streamer = new StringStreamer(_config); } + else if (_input.readable === true && typeof _input.read === 'function' && typeof _input.on === 'function') + { + streamer = new ReadableStreamStreamer(_config); + } else if ((global.File && _input instanceof File) || _input instanceof Object) // ...Safari. (see issue #106) streamer = new FileStreamer(_config); @@ -725,6 +730,77 @@ StringStreamer.prototype.constructor = StringStreamer; + function ReadableStreamStreamer(config) + { + config = config || {}; + + ChunkStreamer.call(this, config); + + var queue = []; + var parseOnData = true; + + this.stream = function(stream) + { + this._input = stream; + + this._input.on('data', this._streamData); + this._input.on('end', this._streamEnd); + this._input.on('error', this._streamError); + } + + this._nextChunk = function() + { + if (queue.length) + { + this.parseChunk(queue.shift()); + } + else + { + parseOnData = true; + } + } + + this._streamData = bindFunction(function(chunk) + { + try + { + queue.push(typeof chunk === 'string' ? chunk : chunk.toString(this._config.encoding)); + + if (parseOnData) + { + parseOnData = false; + this.parseChunk(queue.shift()); + } + } + catch (error) + { + this._streamError(error); + } + }, this); + + this._streamError = bindFunction(function(error) + { + this._streamCleanUp(); + this._sendError(error.message); + }, this); + + this._streamEnd = bindFunction(function() + { + this._streamCleanUp(); + this._finished = true; + this._streamData(''); + }, this); + + this._streamCleanUp = bindFunction(function() + { + this._input.removeListener('data', this._streamData); + this._input.removeListener('end', this._streamEnd); + this._input.removeListener('error', this._streamError); + }, this); + } + ReadableStreamStreamer.prototype = Object.create(ChunkStreamer.prototype); + ReadableStreamStreamer.prototype.constructor = ReadableStreamStreamer; + // Use one ParserHandle per entire CSV file or string function ParserHandle(_config) diff --git a/tests/node-tests.js b/tests/node-tests.js index 7a8d3e5..42ecaa6 100644 --- a/tests/node-tests.js +++ b/tests/node-tests.js @@ -50,6 +50,15 @@ }, }); }); + + it('asynchronously parsed streaming CSV should be correctly parsed', function(done) { + Papa.parse(fs.createReadStream(__dirname + '/long-sample.csv', 'utf8'), { + complete: function(parsedCsv) { + assertLongSampleParsedCorrectly(parsedCsv); + done(); + }, + }); + }); }); })();