|
|
@ -850,6 +850,7 @@ |
|
|
|
|
|
|
|
|
|
|
|
function DuplexStreamStreamer(_config) { |
|
|
|
function DuplexStreamStreamer(_config) { |
|
|
|
var Duplex = require('stream').Duplex; |
|
|
|
var Duplex = require('stream').Duplex; |
|
|
|
|
|
|
|
// var self = this;
|
|
|
|
var config = copy(_config); |
|
|
|
var config = copy(_config); |
|
|
|
var parseOnWrite = true; |
|
|
|
var parseOnWrite = true; |
|
|
|
var writeStreamHasFinished = false; |
|
|
|
var writeStreamHasFinished = false; |
|
|
@ -920,30 +921,31 @@ |
|
|
|
|
|
|
|
|
|
|
|
this._onWrite = function(chunk, encoding, callback) |
|
|
|
this._onWrite = function(chunk, encoding, callback) |
|
|
|
{ |
|
|
|
{ |
|
|
|
|
|
|
|
if (chunk === null) { |
|
|
|
|
|
|
|
console.log('final'); |
|
|
|
|
|
|
|
} |
|
|
|
this._addToParseQueue(chunk, callback); |
|
|
|
this._addToParseQueue(chunk, callback); |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
this._onWriteComplete = function(callback) |
|
|
|
this._onWriteComplete = function() |
|
|
|
{ |
|
|
|
{ |
|
|
|
writeStreamHasFinished = true; |
|
|
|
writeStreamHasFinished = true; |
|
|
|
// have to write empty string
|
|
|
|
// have to write empty string
|
|
|
|
// so parser knows its done
|
|
|
|
// so parser knows its done
|
|
|
|
this._addToParseQueue(''); |
|
|
|
this._addToParseQueue(''); |
|
|
|
callback(); |
|
|
|
|
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
this.getStream = function() |
|
|
|
this.getStream = function() |
|
|
|
{ |
|
|
|
{ |
|
|
|
return stream; |
|
|
|
return stream; |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
stream = new Duplex({ |
|
|
|
stream = new Duplex({ |
|
|
|
readableObjectMode: true, |
|
|
|
readableObjectMode: true, |
|
|
|
decodeStrings: false, |
|
|
|
decodeStrings: false, |
|
|
|
read: bindFunction(this._onRead, this), |
|
|
|
read: bindFunction(this._onRead, this), |
|
|
|
write: bindFunction(this._onWrite, this), |
|
|
|
write: bindFunction(this._onWrite, this) |
|
|
|
'final': bindFunction(this._onWriteComplete, this) |
|
|
|
|
|
|
|
}); |
|
|
|
}); |
|
|
|
|
|
|
|
stream.once('finish', bindFunction(this._onWriteComplete, this)); |
|
|
|
} |
|
|
|
} |
|
|
|
DuplexStreamStreamer.prototype = Object.create(ChunkStreamer.prototype); |
|
|
|
DuplexStreamStreamer.prototype = Object.create(ChunkStreamer.prototype); |
|
|
|
DuplexStreamStreamer.prototype.constructor = DuplexStreamStreamer; |
|
|
|
DuplexStreamStreamer.prototype.constructor = DuplexStreamStreamer; |
|
|
|