From 44b552104142f6d34d8e71d311a6baeebd56101e Mon Sep 17 00:00:00 2001 From: Trevor Harwell Date: Thu, 26 Apr 2018 14:42:41 -0400 Subject: [PATCH] Refactor to use class based approach --- papaparse.js | 124 ++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 92 insertions(+), 32 deletions(-) diff --git a/papaparse.js b/papaparse.js index a8bb39f..a7f102e 100755 --- a/papaparse.js +++ b/papaparse.js @@ -72,7 +72,7 @@ Papa.FileStreamer = FileStreamer; Papa.StringStreamer = StringStreamer; Papa.ReadableStreamStreamer = ReadableStreamStreamer; - Papa.createDuplexStream = createDuplexStream; + Papa.DuplexStreamStreamer = DuplexStreamStreamer; if (global.jQuery) { @@ -234,7 +234,8 @@ { // create a node Duplex stream for use // with .pipe - return createDuplexStream(_config); + streamer = new DuplexStreamStreamer(_config); + return streamer.getStream(); } else if (typeof _input === 'string') { @@ -846,47 +847,106 @@ ReadableStreamStreamer.prototype = Object.create(ChunkStreamer.prototype); ReadableStreamStreamer.prototype.constructor = ReadableStreamStreamer; - function createDuplexStream(_config) { + + function DuplexStreamStreamer(_config) { + var Duplex = require('stream').Duplex; + var parseOnWrite = true; + var writeStreamHasFinished = false; + var writeQueue = []; + var stream = null; var config = copy(_config); - config.step = function(results) { - results.data.forEach(function(item) { - duplexStream.push(item); - }); + + this._onCsvData = function(results) + { + var data = results.data; + for (var i = 0; i < data.length; i++) { + if (!stream.push(data[i]) && !this._handle.paused()) { + // the writeable consumer buffer has filled up + // so we need to pause until more items + // can be processed + this._handle.pause(); + } + } }; - config.complete = function() { - duplexStream.push(null); + + this._onCsvComplete = function() + { + stream.push(null); }; - var chunkStreamer = new ChunkStreamer(config); + config.step = bindFunction(this._onCsvData, this); + config.complete = bindFunction(this._onCsvComplete, this); + ChunkStreamer.call(this, config); - chunkStreamer._nextChunk = function() { - // empty function since this - // logic is handled by the Duplex class + this._nextChunk = function() + { + if (writeStreamHasFinished && writeQueue.length === 1) { + this._finished = true; + } + if (writeQueue.length) { + writeQueue.shift()(); + } else { + parseOnWrite = true; + } }; - // stream module from node must exist - // for this to run - var Duplex = require('stream').Duplex; - var duplexStream = new Duplex({ + this._addToParseQueue = function(chunk, callback) { + // add to queue so that we can indicate + // completion via callback + // node will automatically pause the incoming stream + // when too many items have been added without their + // callback being invoked + writeQueue.push(bindFunction(function() { + this.parseChunk(typeof chunk === 'string' ? chunk : chunk.toString(config.encoding)); + if (isFunction(callback)) { + return callback(); + } + }, this)); + if (parseOnWrite) { + parseOnWrite = false; + this._nextChunk(); + } + }; + + this._onRead = function() + { + if (this._handle.paused()) { + // the writeable consumer can handle more data + // so resume the chunk parsing + this._handle.resume(); + } + }; + + this._onWrite = function(chunk, encoding, callback) + { + this._addToParseQueue(chunk, callback); + }; + + this._onWriteComplete = function(callback) + { + writeStreamHasFinished = true; + // have to write empty string + // so parser knows its done + this._addToParseQueue(''); + callback(); + }; + + this.getStream = function() + { + return stream; + }; + + stream = new Duplex({ readableObjectMode: true, decodeStrings: false, - read: function(size) { - // since pausing controls the input into the parser - // we do not need to re-trigger the parser to continue - }, - write: function(chunk, encoding, callback) { - chunkStreamer.parseChunk(typeof chunk === 'string' ? chunk : chunk.toString(config.encoding)); - callback(); - }, - 'final': function(callback) { - chunkStreamer._finished = true; - chunkStreamer.parseChunk(''); - callback(); - } + read: bindFunction(this._onRead, this), + write: bindFunction(this._onWrite, this), + 'final': bindFunction(this._onWriteComplete, this) }); - - return duplexStream; } + DuplexStreamStreamer.prototype = Object.create(ChunkStreamer.prototype); + DuplexStreamStreamer.prototype.constructor = DuplexStreamStreamer; + // Use one ParserHandle per entire CSV file or string function ParserHandle(_config)