|
|
@ -81,7 +81,9 @@ if (!Array.isArray) |
|
|
|
Papa.FileStreamer = FileStreamer; |
|
|
|
Papa.FileStreamer = FileStreamer; |
|
|
|
Papa.StringStreamer = StringStreamer; |
|
|
|
Papa.StringStreamer = StringStreamer; |
|
|
|
Papa.ReadableStreamStreamer = ReadableStreamStreamer; |
|
|
|
Papa.ReadableStreamStreamer = ReadableStreamStreamer; |
|
|
|
Papa.DuplexStreamStreamer = DuplexStreamStreamer; |
|
|
|
if (typeof PAPA_BROWSER_CONTEXT === 'undefined') { |
|
|
|
|
|
|
|
Papa.DuplexStreamStreamer = DuplexStreamStreamer; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (global.jQuery) |
|
|
|
if (global.jQuery) |
|
|
|
{ |
|
|
|
{ |
|
|
@ -241,7 +243,7 @@ if (!Array.isArray) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
var streamer = null; |
|
|
|
var streamer = null; |
|
|
|
if (_input === Papa.NODE_STREAM_INPUT) |
|
|
|
if (_input === Papa.NODE_STREAM_INPUT && typeof PAPA_BROWSER_CONTEXT === 'undefined') |
|
|
|
{ |
|
|
|
{ |
|
|
|
// create a node Duplex stream for use
|
|
|
|
// create a node Duplex stream for use
|
|
|
|
// with .pipe
|
|
|
|
// with .pipe
|
|
|
@ -871,107 +873,106 @@ if (!Array.isArray) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
function DuplexStreamStreamer(_config) { |
|
|
|
function DuplexStreamStreamer(_config) { |
|
|
|
if (typeof PAPA_BROWSER_CONTEXT === 'undefined') { |
|
|
|
var Duplex = require('stream').Duplex; |
|
|
|
var Duplex = require('stream').Duplex; |
|
|
|
var config = copy(_config); |
|
|
|
|
|
|
|
var parseOnWrite = true; |
|
|
|
var config = copy(_config); |
|
|
|
var writeStreamHasFinished = false; |
|
|
|
var parseOnWrite = true; |
|
|
|
var parseCallbackQueue = []; |
|
|
|
var writeStreamHasFinished = false; |
|
|
|
var stream = null; |
|
|
|
var parseCallbackQueue = []; |
|
|
|
|
|
|
|
var stream = null; |
|
|
|
this._onCsvData = function(results) |
|
|
|
|
|
|
|
{ |
|
|
|
this._onCsvData = function(results) |
|
|
|
var data = results.data; |
|
|
|
{ |
|
|
|
for (var i = 0; i < data.length; i++) { |
|
|
|
var data = results.data; |
|
|
|
if (!stream.push(data[i]) && !this._handle.paused()) { |
|
|
|
for (var i = 0; i < data.length; i++) { |
|
|
|
// the writeable consumer buffer has filled up
|
|
|
|
if (!stream.push(data[i]) && !this._handle.paused()) { |
|
|
|
// so we need to pause until more items
|
|
|
|
// the writeable consumer buffer has filled up
|
|
|
|
// can be processed
|
|
|
|
// so we need to pause until more items
|
|
|
|
this._handle.pause(); |
|
|
|
// can be processed
|
|
|
|
|
|
|
|
this._handle.pause(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
}; |
|
|
|
} |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
this._onCsvComplete = function() |
|
|
|
this._onCsvComplete = function() |
|
|
|
{ |
|
|
|
{ |
|
|
|
// node will finish the read stream when
|
|
|
|
// node will finish the read stream when
|
|
|
|
// null is pushed
|
|
|
|
// null is pushed
|
|
|
|
stream.push(null); |
|
|
|
stream.push(null); |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
config.step = bindFunction(this._onCsvData, this); |
|
|
|
config.step = bindFunction(this._onCsvData, this); |
|
|
|
config.complete = bindFunction(this._onCsvComplete, this); |
|
|
|
config.complete = bindFunction(this._onCsvComplete, this); |
|
|
|
ChunkStreamer.call(this, config); |
|
|
|
ChunkStreamer.call(this, config); |
|
|
|
|
|
|
|
|
|
|
|
this._nextChunk = function() |
|
|
|
this._nextChunk = function() |
|
|
|
{ |
|
|
|
{ |
|
|
|
if (writeStreamHasFinished && parseCallbackQueue.length === 1) { |
|
|
|
if (writeStreamHasFinished && parseCallbackQueue.length === 1) { |
|
|
|
this._finished = true; |
|
|
|
this._finished = true; |
|
|
|
} |
|
|
|
} |
|
|
|
if (parseCallbackQueue.length) { |
|
|
|
if (parseCallbackQueue.length) { |
|
|
|
parseCallbackQueue.shift()(); |
|
|
|
parseCallbackQueue.shift()(); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
parseOnWrite = true; |
|
|
|
parseOnWrite = true; |
|
|
|
} |
|
|
|
} |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
this._addToParseQueue = function(chunk, callback) |
|
|
|
this._addToParseQueue = function(chunk, callback) |
|
|
|
{ |
|
|
|
{ |
|
|
|
// add to queue so that we can indicate
|
|
|
|
// add to queue so that we can indicate
|
|
|
|
// completion via callback
|
|
|
|
// completion via callback
|
|
|
|
// node will automatically pause the incoming stream
|
|
|
|
// node will automatically pause the incoming stream
|
|
|
|
// when too many items have been added without their
|
|
|
|
// when too many items have been added without their
|
|
|
|
// callback being invoked
|
|
|
|
// callback being invoked
|
|
|
|
parseCallbackQueue.push(bindFunction(function() { |
|
|
|
parseCallbackQueue.push(bindFunction(function() { |
|
|
|
this.parseChunk(typeof chunk === 'string' ? chunk : chunk.toString(config.encoding)); |
|
|
|
this.parseChunk(typeof chunk === 'string' ? chunk : chunk.toString(config.encoding)); |
|
|
|
if (isFunction(callback)) { |
|
|
|
if (isFunction(callback)) { |
|
|
|
return callback(); |
|
|
|
return callback(); |
|
|
|
} |
|
|
|
|
|
|
|
}, this)); |
|
|
|
|
|
|
|
if (parseOnWrite) { |
|
|
|
|
|
|
|
parseOnWrite = false; |
|
|
|
|
|
|
|
this._nextChunk(); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
}; |
|
|
|
}, this)); |
|
|
|
|
|
|
|
if (parseOnWrite) { |
|
|
|
|
|
|
|
parseOnWrite = false; |
|
|
|
|
|
|
|
this._nextChunk(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
this._onRead = function() |
|
|
|
this._onRead = function() |
|
|
|
{ |
|
|
|
{ |
|
|
|
if (this._handle.paused()) { |
|
|
|
if (this._handle.paused()) { |
|
|
|
// the writeable consumer can handle more data
|
|
|
|
// the writeable consumer can handle more data
|
|
|
|
// so resume the chunk parsing
|
|
|
|
// so resume the chunk parsing
|
|
|
|
this._handle.resume(); |
|
|
|
this._handle.resume(); |
|
|
|
} |
|
|
|
} |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
this._onWrite = function(chunk, encoding, callback) |
|
|
|
this._onWrite = function(chunk, encoding, callback) |
|
|
|
{ |
|
|
|
{ |
|
|
|
this._addToParseQueue(chunk, callback); |
|
|
|
this._addToParseQueue(chunk, callback); |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
this._onWriteComplete = function() |
|
|
|
this._onWriteComplete = function() |
|
|
|
{ |
|
|
|
{ |
|
|
|
writeStreamHasFinished = true; |
|
|
|
writeStreamHasFinished = true; |
|
|
|
// have to write empty string
|
|
|
|
// have to write empty string
|
|
|
|
// so parser knows its done
|
|
|
|
// so parser knows its done
|
|
|
|
this._addToParseQueue(''); |
|
|
|
this._addToParseQueue(''); |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
this.getStream = function() |
|
|
|
this.getStream = function() |
|
|
|
{ |
|
|
|
{ |
|
|
|
return stream; |
|
|
|
return stream; |
|
|
|
}; |
|
|
|
}; |
|
|
|
stream = new Duplex({ |
|
|
|
stream = new Duplex({ |
|
|
|
readableObjectMode: true, |
|
|
|
readableObjectMode: true, |
|
|
|
decodeStrings: false, |
|
|
|
decodeStrings: false, |
|
|
|
read: bindFunction(this._onRead, this), |
|
|
|
read: bindFunction(this._onRead, this), |
|
|
|
write: bindFunction(this._onWrite, this) |
|
|
|
write: bindFunction(this._onWrite, this) |
|
|
|
}); |
|
|
|
}); |
|
|
|
stream.once('finish', bindFunction(this._onWriteComplete, this)); |
|
|
|
stream.once('finish', bindFunction(this._onWriteComplete, this)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if (typeof PAPA_BROWSER_CONTEXT === 'undefined') { |
|
|
|
|
|
|
|
DuplexStreamStreamer.prototype = Object.create(ChunkStreamer.prototype); |
|
|
|
|
|
|
|
DuplexStreamStreamer.prototype.constructor = DuplexStreamStreamer; |
|
|
|
} |
|
|
|
} |
|
|
|
DuplexStreamStreamer.prototype = Object.create(ChunkStreamer.prototype); |
|
|
|
|
|
|
|
DuplexStreamStreamer.prototype.constructor = DuplexStreamStreamer; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Use one ParserHandle per entire CSV file or string
|
|
|
|
// Use one ParserHandle per entire CSV file or string
|
|
|
|