|
|
|
@ -852,7 +852,7 @@
@@ -852,7 +852,7 @@
|
|
|
|
|
var Duplex = require('stream').Duplex; |
|
|
|
|
var parseOnWrite = true; |
|
|
|
|
var writeStreamHasFinished = false; |
|
|
|
|
var writeQueue = []; |
|
|
|
|
var parseCallbackQueue = []; |
|
|
|
|
var stream = null; |
|
|
|
|
var config = copy(_config); |
|
|
|
|
|
|
|
|
@ -880,11 +880,11 @@
@@ -880,11 +880,11 @@
|
|
|
|
|
|
|
|
|
|
this._nextChunk = function() |
|
|
|
|
{ |
|
|
|
|
if (writeStreamHasFinished && writeQueue.length === 1) { |
|
|
|
|
if (writeStreamHasFinished && parseCallbackQueue.length === 1) { |
|
|
|
|
this._finished = true; |
|
|
|
|
} |
|
|
|
|
if (writeQueue.length) { |
|
|
|
|
writeQueue.shift()(); |
|
|
|
|
if (parseCallbackQueue.length) { |
|
|
|
|
parseCallbackQueue.shift()(); |
|
|
|
|
} else { |
|
|
|
|
parseOnWrite = true; |
|
|
|
|
} |
|
|
|
@ -896,7 +896,7 @@
@@ -896,7 +896,7 @@
|
|
|
|
|
// node will automatically pause the incoming stream
|
|
|
|
|
// when too many items have been added without their
|
|
|
|
|
// callback being invoked
|
|
|
|
|
writeQueue.push(bindFunction(function() { |
|
|
|
|
parseCallbackQueue.push(bindFunction(function() { |
|
|
|
|
this.parseChunk(typeof chunk === 'string' ? chunk : chunk.toString(config.encoding)); |
|
|
|
|
if (isFunction(callback)) { |
|
|
|
|
return callback(); |
|
|
|
|