Browse Source

Merge pull request #137 from bluej100/chunkstreamer

refactor common functionality of chunk streaming classes
pull/138/head
Matt Holt 10 years ago
parent
commit
7307708420
  1. 399
      papaparse.js

399
papaparse.js

@ -403,73 +403,158 @@
} }
} }
function bindFunction(f, self)
// TODO: Many of the functions of NetworkStreamer and FileStreamer are similar or the same. Consolidate?
function NetworkStreamer(config)
{ {
config = config || {}; return function() {
if (!config.chunkSize) f.apply(self, arguments);
config.chunkSize = Papa.RemoteChunkSize; }
}
var start = 0, baseIndex = 0, fileSize = 0, rowCount = 0; function ChunkStreamer(config)
var aggregate = ""; {
var partialLine = ""; this._handle = null;
var self = this; this._paused = false;
var xhr, url, nextChunk, finishedWithEntireFile; this._finished = false;
var userComplete, handle, configCopy; this._userComplete = null;
replaceConfig(config); this._input = null;
this._baseIndex = 0;
this._partialLine = "";
this._rowCount = 0;
this._start = 0;
this._config = replaceConfig.call(this, config);
this.resume = function() this.resume = function()
{ {
paused = false; this._paused = false;
nextChunk(); this._nextChunk();
}; };
this.finished = function() this.finished = function()
{ {
return finishedWithEntireFile; return this._finished;
}; };
this.pause = function() this.pause = function()
{ {
paused = true; this._paused = true;
}; };
this.abort = function() this.abort = function()
{ {
finishedWithEntireFile = true; this._finished = true;
if (isFunction(userComplete)) if (isFunction(this._userComplete))
userComplete({ data: [], errors: [], meta: { aborted: true } }); this._userComplete({ data: [], errors: [], meta: { aborted: true } });
}; };
this.stream = function(u) this._nextChunk = null;
{
url = u; this._parseChunk = function(chunk) {
// Rejoin the line we likely just split in two by chunking the file
var aggregate = this._partialLine + chunk;
this._partialLine = "";
var results = this._handle.parse(aggregate, this._baseIndex, !this._finished);
var lastIndex = results.meta.cursor;
if (!this._finished)
{
this._partialLine = aggregate.substring(lastIndex - this._baseIndex);
this._baseIndex = lastIndex;
}
if (results && results.data)
this._rowCount += results.data.length;
var finishedIncludingPreview = this._finished || (this._config.preview && this._rowCount >= this._config.preview);
if (IS_WORKER) if (IS_WORKER)
{ {
nextChunk = function() global.postMessage({
{ results: results,
readChunk(); workerId: Papa.WORKER_ID,
chunkLoaded(); finished: finishedIncludingPreview
}; });
} }
else else if (isFunction(this._config.chunk))
{ {
nextChunk = function() this._config.chunk(results, this._handle);
{ if (this._paused)
readChunk(); return;
}; results = undefined;
}
if (isFunction(this._userComplete) && finishedIncludingPreview)
this._userComplete(results);
if (!finishedIncludingPreview && (!results || !results.meta.paused))
this._nextChunk();
};
this._sendError = function(error)
{
if (isFunction(this._config.error))
this._config.error(error);
else if (IS_WORKER && this._config.error)
{
global.postMessage({
workerId: Papa.WORKER_ID,
error: error,
finished: false
});
} }
};
nextChunk(); // Starts streaming function replaceConfig(config)
{
// Deep-copy the config so we can edit it; we need
// to call the complete function if we are to ensure
// that the last chunk callback, if any, will be called
// BEFORE the complete function.
var configCopy = copy(config);
this._userComplete = configCopy.complete;
configCopy.complete = undefined;
configCopy.chunkSize = parseInt(configCopy.chunkSize); // VERY important so we don't concatenate strings!
this._handle = new ParserHandle(configCopy);
this._handle.streamer = this;
return configCopy;
}
}
function NetworkStreamer(config)
{
config = config || {};
if (!config.chunkSize)
config.chunkSize = Papa.RemoteChunkSize;
ChunkStreamer.call(this, config);
var xhr;
if (IS_WORKER)
{
this._nextChunk = function()
{
this._readChunk();
this._chunkLoaded();
};
}
else
{
this._nextChunk = function()
{
this._readChunk();
};
}
this.stream = function(url)
{
this._input = url;
this._nextChunk(); // Starts streaming
}; };
function readChunk() this._readChunk = function()
{ {
if (finishedWithEntireFile) if (this._finished)
{ {
chunkLoaded(); this._chunkLoaded();
return; return;
} }
@ -477,18 +562,16 @@
if (!IS_WORKER) if (!IS_WORKER)
{ {
xhr.onload = chunkLoaded; xhr.onload = bindFunction(this._chunkLoaded, this);
xhr.onerror = chunkError; xhr.onerror = bindFunction(this._chunkError, this);
} }
xhr.open("GET", url, !IS_WORKER); xhr.open("GET", this._input, !IS_WORKER);
if (config.step || config.chunk) if (this._config.step || this._config.chunk)
{ {
var end = start + configCopy.chunkSize - 1; // minus one because byte range is inclusive var end = this._start + this._config.chunkSize - 1; // minus one because byte range is inclusive
if (fileSize && end > fileSize) // Hack around a Chrome bug: http://stackoverflow.com/q/24745095/1048862 xhr.setRequestHeader("Range", "bytes="+this._start+"-"+end);
end = fileSize;
xhr.setRequestHeader("Range", "bytes="+start+"-"+end);
xhr.setRequestHeader("If-None-Match", "webkit-no-cache"); // https://bugs.webkit.org/show_bug.cgi?id=82672 xhr.setRequestHeader("If-None-Match", "webkit-no-cache"); // https://bugs.webkit.org/show_bug.cgi?id=82672
} }
@ -496,79 +579,34 @@
xhr.send(); xhr.send();
} }
catch (err) { catch (err) {
chunkError(err.message); this._chunkError(err.message);
} }
if (IS_WORKER && xhr.status == 0) if (IS_WORKER && xhr.status == 0)
chunkError(); this._chunkError();
else else
start += configCopy.chunkSize; this._start += this._config.chunkSize;
} }
function chunkLoaded() this._chunkLoaded = function()
{ {
if (xhr.readyState != 4) if (xhr.readyState != 4)
return; return;
if (xhr.status < 200 || xhr.status >= 400) if (xhr.status < 200 || xhr.status >= 400)
{ {
chunkError(); this._chunkError();
return; return;
} }
// Rejoin the line we likely just split in two by chunking the file this._finished = (!this._config.step && !this._config.chunk) || this._start > getFileSize(xhr);
aggregate += partialLine + xhr.responseText; this._parseChunk(xhr.responseText);
partialLine = "";
finishedWithEntireFile = (!config.step && !config.chunk) || start > getFileSize(xhr);
var results = handle.parse(aggregate, baseIndex, !finishedWithEntireFile);
var lastIndex = results.meta.cursor;
if (!finishedWithEntireFile)
{
partialLine = aggregate.substring(lastIndex - baseIndex);
baseIndex = lastIndex;
}
if (results && results.data)
rowCount += results.data.length;
aggregate = "";
var finishedIncludingPreview = finishedWithEntireFile || (configCopy.preview && rowCount >= configCopy.preview);
if (IS_WORKER)
{
global.postMessage({
results: results,
workerId: Papa.WORKER_ID,
finished: finishedIncludingPreview
});
}
else if (isFunction(config.chunk))
{
config.chunk(results, handle);
results = undefined;
}
if (isFunction(userComplete) && finishedIncludingPreview)
userComplete(results);
if (!finishedIncludingPreview && (!results || !results.meta.paused))
nextChunk();
} }
function chunkError(errorMessage) this._chunkError = function(errorMessage)
{ {
var errorText = xhr.statusText || errorMessage; var errorText = xhr.statusText || errorMessage;
if (isFunction(config.error)) this._sendError(errorText);
config.error(errorText);
else if (IS_WORKER && config.error)
{
global.postMessage({
workerId: Papa.WORKER_ID,
error: errorText,
finished: false
});
}
} }
function getFileSize(xhr) function getFileSize(xhr)
@ -576,28 +614,9 @@
var contentRange = xhr.getResponseHeader("Content-Range"); var contentRange = xhr.getResponseHeader("Content-Range");
return parseInt(contentRange.substr(contentRange.lastIndexOf("/") + 1)); return parseInt(contentRange.substr(contentRange.lastIndexOf("/") + 1));
} }
function replaceConfig(config)
{
// Deep-copy the config so we can edit it; we need
// to call the complete function if we are to ensure
// that the last chunk callback, if any, will be called
// BEFORE the complete function.
configCopy = copy(config);
userComplete = configCopy.complete;
configCopy.complete = undefined;
configCopy.chunkSize = parseInt(configCopy.chunkSize); // VERY important so we don't concatenate strings!
handle = new ParserHandle(configCopy);
handle.streamer = self;
}
} }
NetworkStreamer.prototype = Object.create(ChunkStreamer.prototype);
NetworkStreamer.prototype.constructor = NetworkStreamer;
function FileStreamer(config) function FileStreamer(config)
@ -605,154 +624,62 @@
config = config || {}; config = config || {};
if (!config.chunkSize) if (!config.chunkSize)
config.chunkSize = Papa.LocalChunkSize; config.chunkSize = Papa.LocalChunkSize;
ChunkStreamer.call(this, config);
var start = 0, baseIndex = 0;
var file; var reader, slice;
var slice;
var aggregate = "";
var partialLine = "";
var rowCount = 0;
var paused = false;
var self = this;
var reader, nextChunk, slice, finishedWithEntireFile;
var userComplete, handle, configCopy;
replaceConfig(config);
// FileReader is better than FileReaderSync (even in worker) - see http://stackoverflow.com/q/24708649/1048862 // FileReader is better than FileReaderSync (even in worker) - see http://stackoverflow.com/q/24708649/1048862
// But Firefox is a pill, too - see issue #76: https://github.com/mholt/PapaParse/issues/76 // But Firefox is a pill, too - see issue #76: https://github.com/mholt/PapaParse/issues/76
var usingAsyncReader = typeof FileReader !== 'undefined'; // Safari doesn't consider it a function - see issue #105 var usingAsyncReader = typeof FileReader !== 'undefined'; // Safari doesn't consider it a function - see issue #105
this.stream = function(f) this.stream = function(file)
{ {
file = f; this._input = file;
slice = file.slice || file.webkitSlice || file.mozSlice; slice = file.slice || file.webkitSlice || file.mozSlice;
if (usingAsyncReader) if (usingAsyncReader)
{ {
reader = new FileReader(); // Preferred method of reading files, even in workers reader = new FileReader(); // Preferred method of reading files, even in workers
reader.onload = chunkLoaded; reader.onload = bindFunction(this._chunkLoaded, this);
reader.onerror = chunkError; reader.onerror = bindFunction(this._chunkError, this);
} }
else else
reader = new FileReaderSync(); // Hack for running in a web worker in Firefox reader = new FileReaderSync(); // Hack for running in a web worker in Firefox
nextChunk(); // Starts streaming this._nextChunk(); // Starts streaming
};
this.finished = function()
{
return finishedWithEntireFile;
};
this.pause = function()
{
paused = true;
};
this.resume = function()
{
paused = false;
nextChunk();
};
this.abort = function()
{
finishedWithEntireFile = true;
if (isFunction(userComplete))
userComplete({ data: [], errors: [], meta: { aborted: true } });
}; };
function nextChunk() this._nextChunk = function()
{ {
if (!finishedWithEntireFile && (!configCopy.preview || rowCount < configCopy.preview)) if (!this._finished && (!this._config.preview || this._rowCount < this._config.preview))
readChunk(); this._readChunk();
} }
function readChunk() this._readChunk = function()
{ {
var end = Math.min(start + configCopy.chunkSize, file.size); var end = Math.min(this._start + this._config.chunkSize, this._input.size);
var txt = reader.readAsText(slice.call(file, start, end), config.encoding); var txt = reader.readAsText(slice.call(this._input, this._start, end), this._config.encoding);
if (!usingAsyncReader) if (!usingAsyncReader)
chunkLoaded({ target: { result: txt } }); // mimic the async signature this._chunkLoaded({ target: { result: txt } }); // mimic the async signature
} }
function chunkLoaded(event) this._chunkLoaded = function(event)
{ {
// Very important to increment start each time before handling results // Very important to increment start each time before handling results
start += configCopy.chunkSize; this._start += this._config.chunkSize;
this._finished = this._start >= this._input.size;
// Rejoin the line we likely just split in two by chunking the file this._parseChunk(event.target.result);
aggregate += partialLine + event.target.result;
partialLine = "";
finishedWithEntireFile = start >= file.size;
var results = handle.parse(aggregate, baseIndex, !finishedWithEntireFile);
var lastIndex = results.meta.cursor;
if (!finishedWithEntireFile)
{
partialLine = aggregate.substring(lastIndex - baseIndex);
baseIndex = lastIndex;
}
if (results && results.data)
rowCount += results.data.length;
aggregate = "";
var finishedIncludingPreview = finishedWithEntireFile || (configCopy.preview && rowCount >= configCopy.preview);
if (IS_WORKER)
{
global.postMessage({
results: results,
workerId: Papa.WORKER_ID,
finished: finishedIncludingPreview
});
}
else if (isFunction(config.chunk))
{
config.chunk(results, self, file);
if (paused)
return;
results = undefined;
}
if (isFunction(userComplete) && finishedIncludingPreview)
userComplete(results);
if (!finishedIncludingPreview && (!results || !results.meta.paused))
nextChunk();
}
function chunkError()
{
if (isFunction(config.error))
config.error(reader.error, file);
else if (IS_WORKER && config.error)
{
global.postMessage({
workerId: Papa.WORKER_ID,
error: reader.error,
file: file,
finished: false
});
}
} }
function replaceConfig(config) this._chunkError = function()
{ {
// Deep-copy the config so we can edit it; we need this._sendError(reader.error);
// to call the complete function if we are to ensure
// that the last chunk callback, if any, will be called
// BEFORE the complete function.
configCopy = copy(config);
userComplete = configCopy.complete;
configCopy.complete = undefined;
configCopy.chunkSize = parseInt(configCopy.chunkSize); // VERY important so we don't concatenate strings!
handle = new ParserHandle(configCopy);
handle.streamer = self;
} }
} }
FileStreamer.prototype = Object.create(ChunkStreamer.prototype);
FileStreamer.prototype.constructor = FileStreamer;

Loading…
Cancel
Save