diff --git a/papaparse.js b/papaparse.js index b3f7bd7..09c89b5 100644 --- a/papaparse.js +++ b/papaparse.js @@ -1354,11 +1354,22 @@ { var msg = e.data; var worker = workers[msg.workerId]; + var aborted = false; if (msg.error) worker.userError(msg.error, msg.file); else if (msg.results && msg.results.data) { + var abort = function() { + aborted = true; + completeWorker(msg.workerId, { data: [], errors: [], meta: { aborted: true } }); + } + var handle = { + abort: abort, + pause: abort, + resume: notImplemented + }; + if (isFunction(worker.userStep)) { for (var i = 0; i < msg.results.data.length; i++) @@ -1367,7 +1378,9 @@ data: [msg.results.data[i]], errors: msg.results.errors, meta: msg.results.meta - }); + }, handle); + if (aborted) + break; } delete msg.results; // free memory ASAP } @@ -1378,15 +1391,24 @@ } } - if (msg.finished) + if (msg.finished && !aborted) { - if (isFunction(workers[msg.workerId].userComplete)) - workers[msg.workerId].userComplete(msg.results); - workers[msg.workerId].terminate(); - delete workers[msg.workerId]; + completeWorker(msg.workerId, msg.results); } } + function completeWorker(workerId, results) { + var worker = workers[workerId]; + if (isFunction(worker.userComplete)) + worker.userComplete(results); + worker.terminate(); + delete workers[workerId]; + } + + function notImplemented() { + throw "Not implemented."; + } + // Callback when worker thread receives a message function workerThreadReceivedMessage(e) { diff --git a/tests/test-cases.js b/tests/test-cases.js index 8497dc9..519f777 100644 --- a/tests/test-cases.js +++ b/tests/test-cases.js @@ -1197,5 +1197,78 @@ var CUSTOM_TESTS = [ } }); } + }, + { + description: "Step functions can abort parsing", + expected: [['A', 'b', 'c']], + run: function(callback) { + var updates = []; + Papa.parse('A,b,c\nd,E,f\nG,h,i', { + step: function(response, handle) { + updates.push(response.data[0]); + handle.abort(); + } + }); + setTimeout(function() { + callback(updates); + }, 100); + } + }, + { + description: "Step functions can pause parsing", + expected: [['A', 'b', 'c']], + run: function(callback) { + var updates = []; + Papa.parse('A,b,c\nd,E,f\nG,h,i', { + step: function(response, handle) { + updates.push(response.data[0]); + handle.pause(); + } + }); + setTimeout(function() { + callback(updates); + }, 100); + } + }, + { + description: "Step functions can resume parsing", + expected: [['A', 'b', 'c'], ['d', 'E', 'f'], ['G', 'h', 'i']], + run: function(callback) { + var updates = []; + var handle = null; + var first = true; + Papa.parse('A,b,c\nd,E,f\nG,h,i', { + step: function(response, h) { + updates.push(response.data[0]); + if (!first) return; + handle = h; + handle.pause(); + first = false; + }, complete: function() { + callback(updates); + } + }); + setTimeout(function() { + handle.resume(); + }, 100); + } + }, + { + description: "Step functions can abort workers", + expected: 1, + run: function(callback) { + var updates = 0; + Papa.parse("/tests/long-sample.csv", { + worker: true, + chunkSize: 500, + step: function(response, handle) { + updates++; + handle.abort(); + }, + complete: function() { + callback(updates); + } + }); + } } ];