|
|
@ -72,7 +72,7 @@ |
|
|
|
Papa.FileStreamer = FileStreamer; |
|
|
|
Papa.FileStreamer = FileStreamer; |
|
|
|
Papa.StringStreamer = StringStreamer; |
|
|
|
Papa.StringStreamer = StringStreamer; |
|
|
|
Papa.ReadableStreamStreamer = ReadableStreamStreamer; |
|
|
|
Papa.ReadableStreamStreamer = ReadableStreamStreamer; |
|
|
|
Papa.createDuplexStream = createDuplexStream; |
|
|
|
Papa.DuplexStreamStreamer = DuplexStreamStreamer; |
|
|
|
|
|
|
|
|
|
|
|
if (global.jQuery) |
|
|
|
if (global.jQuery) |
|
|
|
{ |
|
|
|
{ |
|
|
@ -234,7 +234,8 @@ |
|
|
|
{ |
|
|
|
{ |
|
|
|
// create a node Duplex stream for use
|
|
|
|
// create a node Duplex stream for use
|
|
|
|
// with .pipe
|
|
|
|
// with .pipe
|
|
|
|
return createDuplexStream(_config); |
|
|
|
streamer = new DuplexStreamStreamer(_config); |
|
|
|
|
|
|
|
return streamer.getStream(); |
|
|
|
} |
|
|
|
} |
|
|
|
else if (typeof _input === 'string') |
|
|
|
else if (typeof _input === 'string') |
|
|
|
{ |
|
|
|
{ |
|
|
@ -846,47 +847,106 @@ |
|
|
|
ReadableStreamStreamer.prototype = Object.create(ChunkStreamer.prototype); |
|
|
|
ReadableStreamStreamer.prototype = Object.create(ChunkStreamer.prototype); |
|
|
|
ReadableStreamStreamer.prototype.constructor = ReadableStreamStreamer; |
|
|
|
ReadableStreamStreamer.prototype.constructor = ReadableStreamStreamer; |
|
|
|
|
|
|
|
|
|
|
|
function createDuplexStream(_config) { |
|
|
|
|
|
|
|
|
|
|
|
function DuplexStreamStreamer(_config) { |
|
|
|
|
|
|
|
var Duplex = require('stream').Duplex; |
|
|
|
|
|
|
|
var parseOnWrite = true; |
|
|
|
|
|
|
|
var writeStreamHasFinished = false; |
|
|
|
|
|
|
|
var writeQueue = []; |
|
|
|
|
|
|
|
var stream = null; |
|
|
|
var config = copy(_config); |
|
|
|
var config = copy(_config); |
|
|
|
config.step = function(results) { |
|
|
|
|
|
|
|
results.data.forEach(function(item) { |
|
|
|
this._onCsvData = function(results) |
|
|
|
duplexStream.push(item); |
|
|
|
{ |
|
|
|
}); |
|
|
|
var data = results.data; |
|
|
|
|
|
|
|
for (var i = 0; i < data.length; i++) { |
|
|
|
|
|
|
|
if (!stream.push(data[i]) && !this._handle.paused()) { |
|
|
|
|
|
|
|
// the writeable consumer buffer has filled up
|
|
|
|
|
|
|
|
// so we need to pause until more items
|
|
|
|
|
|
|
|
// can be processed
|
|
|
|
|
|
|
|
this._handle.pause(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
}; |
|
|
|
}; |
|
|
|
config.complete = function() { |
|
|
|
|
|
|
|
duplexStream.push(null); |
|
|
|
this._onCsvComplete = function() |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
stream.push(null); |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
var chunkStreamer = new ChunkStreamer(config); |
|
|
|
config.step = bindFunction(this._onCsvData, this); |
|
|
|
|
|
|
|
config.complete = bindFunction(this._onCsvComplete, this); |
|
|
|
|
|
|
|
ChunkStreamer.call(this, config); |
|
|
|
|
|
|
|
|
|
|
|
chunkStreamer._nextChunk = function() { |
|
|
|
this._nextChunk = function() |
|
|
|
// empty function since this
|
|
|
|
{ |
|
|
|
// logic is handled by the Duplex class
|
|
|
|
if (writeStreamHasFinished && writeQueue.length === 1) { |
|
|
|
|
|
|
|
this._finished = true; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if (writeQueue.length) { |
|
|
|
|
|
|
|
writeQueue.shift()(); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
parseOnWrite = true; |
|
|
|
|
|
|
|
} |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
// stream module from node must exist
|
|
|
|
this._addToParseQueue = function(chunk, callback) { |
|
|
|
// for this to run
|
|
|
|
// add to queue so that we can indicate
|
|
|
|
var Duplex = require('stream').Duplex; |
|
|
|
// completion via callback
|
|
|
|
var duplexStream = new Duplex({ |
|
|
|
// node will automatically pause the incoming stream
|
|
|
|
|
|
|
|
// when too many items have been added without their
|
|
|
|
|
|
|
|
// callback being invoked
|
|
|
|
|
|
|
|
writeQueue.push(bindFunction(function() { |
|
|
|
|
|
|
|
this.parseChunk(typeof chunk === 'string' ? chunk : chunk.toString(config.encoding)); |
|
|
|
|
|
|
|
if (isFunction(callback)) { |
|
|
|
|
|
|
|
return callback(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}, this)); |
|
|
|
|
|
|
|
if (parseOnWrite) { |
|
|
|
|
|
|
|
parseOnWrite = false; |
|
|
|
|
|
|
|
this._nextChunk(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
this._onRead = function() |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
if (this._handle.paused()) { |
|
|
|
|
|
|
|
// the writeable consumer can handle more data
|
|
|
|
|
|
|
|
// so resume the chunk parsing
|
|
|
|
|
|
|
|
this._handle.resume(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
this._onWrite = function(chunk, encoding, callback) |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
this._addToParseQueue(chunk, callback); |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
this._onWriteComplete = function(callback) |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
writeStreamHasFinished = true; |
|
|
|
|
|
|
|
// have to write empty string
|
|
|
|
|
|
|
|
// so parser knows its done
|
|
|
|
|
|
|
|
this._addToParseQueue(''); |
|
|
|
|
|
|
|
callback(); |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
this.getStream = function() |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
return stream; |
|
|
|
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
stream = new Duplex({ |
|
|
|
readableObjectMode: true, |
|
|
|
readableObjectMode: true, |
|
|
|
decodeStrings: false, |
|
|
|
decodeStrings: false, |
|
|
|
read: function(size) { |
|
|
|
read: bindFunction(this._onRead, this), |
|
|
|
// since pausing controls the input into the parser
|
|
|
|
write: bindFunction(this._onWrite, this), |
|
|
|
// we do not need to re-trigger the parser to continue
|
|
|
|
'final': bindFunction(this._onWriteComplete, this) |
|
|
|
}, |
|
|
|
|
|
|
|
write: function(chunk, encoding, callback) { |
|
|
|
|
|
|
|
chunkStreamer.parseChunk(typeof chunk === 'string' ? chunk : chunk.toString(config.encoding)); |
|
|
|
|
|
|
|
callback(); |
|
|
|
|
|
|
|
}, |
|
|
|
|
|
|
|
'final': function(callback) { |
|
|
|
|
|
|
|
chunkStreamer._finished = true; |
|
|
|
|
|
|
|
chunkStreamer.parseChunk(''); |
|
|
|
|
|
|
|
callback(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}); |
|
|
|
}); |
|
|
|
|
|
|
|
|
|
|
|
return duplexStream; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
DuplexStreamStreamer.prototype = Object.create(ChunkStreamer.prototype); |
|
|
|
|
|
|
|
DuplexStreamStreamer.prototype.constructor = DuplexStreamStreamer; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Use one ParserHandle per entire CSV file or string
|
|
|
|
// Use one ParserHandle per entire CSV file or string
|
|
|
|
function ParserHandle(_config) |
|
|
|
function ParserHandle(_config) |
|
|
|