Browse Source

Fix async pause/resume with ReadableStreamStreamer (#490)

* Fix bug where pausing and resuming asynchronously caused the csv file to be parsed incorrectly
pull/496/head
Trevor Harwell 7 years ago committed by Sergi Almacellas Abellana
parent
commit
748b663d44
  1. 24
      papaparse.js
  2. 167
      tests/node-tests.js

24
papaparse.js

@ -753,6 +753,19 @@
var queue = []; var queue = [];
var parseOnData = true; var parseOnData = true;
var streamHasEnded = false;
this.pause = function()
{
ChunkStreamer.prototype.pause.apply(this, arguments);
this._input.pause();
};
this.resume = function()
{
ChunkStreamer.prototype.resume.apply(this, arguments);
this._input.resume();
};
this.stream = function(stream) this.stream = function(stream)
{ {
@ -763,8 +776,16 @@
this._input.on('error', this._streamError); this._input.on('error', this._streamError);
}; };
this._checkIsFinished = function()
{
if (streamHasEnded && queue.length === 1) {
this._finished = true;
}
};
this._nextChunk = function() this._nextChunk = function()
{ {
this._checkIsFinished();
if (queue.length) if (queue.length)
{ {
this.parseChunk(queue.shift()); this.parseChunk(queue.shift());
@ -784,6 +805,7 @@
if (parseOnData) if (parseOnData)
{ {
parseOnData = false; parseOnData = false;
this._checkIsFinished();
this.parseChunk(queue.shift()); this.parseChunk(queue.shift());
} }
} }
@ -802,7 +824,7 @@
this._streamEnd = bindFunction(function() this._streamEnd = bindFunction(function()
{ {
this._streamCleanUp(); this._streamCleanUp();
this._finished = true; streamHasEnded = true;
this._streamData(''); this._streamData('');
}, this); }, this);

167
tests/node-tests.js

@ -57,79 +57,116 @@ describe('PapaParse', function() {
done(); done();
}, },
}); });
});
it('should support pausing and resuming on same tick when streaming', function(done) {
var rows = [];
Papa.parse(fs.createReadStream(__dirname + '/long-sample.csv', 'utf8'), {
chunk: function(results, parser) {
rows = rows.concat(results.data);
parser.pause();
parser.resume();
},
error: function(err) {
done(new Error(err));
},
complete: function() {
assert.deepEqual(rows[0], [
'Grant',
'Dyer',
'Donec.elementum@orciluctuset.example',
'2013-11-23T02:30:31-08:00',
'2014-05-31T01:06:56-07:00',
'Magna Ut Associates',
'ljenkins'
]);
assert.deepEqual(rows[7], [
'Talon',
'Salinas',
'posuere.vulputate.lacus@Donecsollicitudin.example',
'2015-01-31T09:19:02-08:00',
'2014-12-17T04:59:18-08:00',
'Aliquam Iaculis Incorporate',
'Phasellus@Quisquetincidunt.example'
]);
done();
}
});
});
it('should support pausing and resuming on same tick when streaming', function(done) { it('should support pausing and resuming asynchronously when streaming', function(done) {
var rows = []; var rows = [];
Papa.parse(fs.createReadStream(__dirname + '/long-sample.csv', 'utf8'), { Papa.parse(fs.createReadStream(__dirname + '/long-sample.csv', 'utf8'), {
chunk: function(results, parser) { chunk: function(results, parser) {
rows = rows.concat(results.data); rows = rows.concat(results.data);
parser.pause(); parser.pause();
setTimeout(function() {
parser.resume(); parser.resume();
}, }, 200);
error: function(err) { },
done(new Error(err)); error: function(err) {
}, done(new Error(err));
complete: function() { },
assert.deepEqual(rows[0], [ complete: function() {
'Grant', assert.deepEqual(rows[0], [
'Dyer', 'Grant',
'Donec.elementum@orciluctuset.example', 'Dyer',
'2013-11-23T02:30:31-08:00', 'Donec.elementum@orciluctuset.example',
'2014-05-31T01:06:56-07:00', '2013-11-23T02:30:31-08:00',
'Magna Ut Associates', '2014-05-31T01:06:56-07:00',
'ljenkins' 'Magna Ut Associates',
]); 'ljenkins'
assert.deepEqual(rows[7], [ ]);
'Talon', assert.deepEqual(rows[7], [
'Salinas', 'Talon',
'posuere.vulputate.lacus@Donecsollicitudin.example', 'Salinas',
'2015-01-31T09:19:02-08:00', 'posuere.vulputate.lacus@Donecsollicitudin.example',
'2014-12-17T04:59:18-08:00', '2015-01-31T09:19:02-08:00',
'Aliquam Iaculis Incorporate', '2014-12-17T04:59:18-08:00',
'Phasellus@Quisquetincidunt.example' 'Aliquam Iaculis Incorporate',
]); 'Phasellus@Quisquetincidunt.example'
done(); ]);
} done();
}); }
}); });
});
it('handles errors in beforeFirstChunk', function(done) { it('handles errors in beforeFirstChunk', function(done) {
var expectedError = new Error('test'); var expectedError = new Error('test');
Papa.parse(fs.createReadStream(__dirname + '/long-sample.csv', 'utf8'), { Papa.parse(fs.createReadStream(__dirname + '/long-sample.csv', 'utf8'), {
beforeFirstChunk: function() { beforeFirstChunk: function() {
throw expectedError; throw expectedError;
}, },
error: function(err) { error: function(err) {
assert.deepEqual(err, expectedError); assert.deepEqual(err, expectedError);
done(); done();
} }
});
}); });
});
it('handles errors in chunk', function(done) { it('handles errors in chunk', function(done) {
var expectedError = new Error('test'); var expectedError = new Error('test');
Papa.parse(fs.createReadStream(__dirname + '/long-sample.csv', 'utf8'), { Papa.parse(fs.createReadStream(__dirname + '/long-sample.csv', 'utf8'), {
chunk: function() { chunk: function() {
throw expectedError; throw expectedError;
}, },
error: function(err) { error: function(err) {
assert.deepEqual(err, expectedError); assert.deepEqual(err, expectedError);
done(); done();
} }
});
}); });
});
it('handles errors in step', function(done) { it('handles errors in step', function(done) {
var expectedError = new Error('test'); var expectedError = new Error('test');
Papa.parse(fs.createReadStream(__dirname + '/long-sample.csv', 'utf8'), { Papa.parse(fs.createReadStream(__dirname + '/long-sample.csv', 'utf8'), {
step: function() { step: function() {
throw expectedError; throw expectedError;
}, },
error: function(err) { error: function(err) {
assert.deepEqual(err, expectedError); assert.deepEqual(err, expectedError);
done(); done();
} }
});
}); });
}); });
}); });

Loading…
Cancel
Save