2025-04-24 17:03:28 +08:00

314 lines
8.3 KiB
JavaScript

'use strict';
// var debug = require('debug')('ali-oss:multipart');
var is = require('is-type-of');
var util = require('util');
var path = require('path');
var mime = require('mime');
var proto = exports;
/**
* Multipart operations
*/
/**
* Upload a file to OSS using multipart uploads
* @param {String} name
* @param {String|File} file
* @param {Object} options
* {Object} options.callback The callback parameter is composed of a JSON string encoded in Base64
* {String} options.callback.url the OSS sends a callback request to this URL
* {String} options.callback.host The host header value for initiating callback requests
* {String} options.callback.body The value of the request body when a callback is initiated
* {String} options.callback.contentType The Content-Type of the callback requests initiatiated
* {Object} options.callback.customValue Custom parameters are a map of key-values, e.g:
* customValue = {
* key1: 'value1',
* key2: 'value2'
* }
*/
proto.multipartUpload = function* multipartUpload(name, file, options) {
this.resetCancelFlag();
options = options || {};
if (options.checkpoint && options.checkpoint.uploadId) {
return yield this._resumeMultipart(options.checkpoint, options);
}
var minPartSize = 100 * 1024;
var filename = is.file(file) ? file.name : file;
options.mime = options.mime || mime.getType(path.extname(filename));
options.headers = options.headers || {};
this._convertMetaToHeaders(options.meta, options.headers);
var fileSize = yield this._getFileSize(file);
if (fileSize < minPartSize) {
var stream = this._createStream(file, 0, fileSize);
options.contentLength = fileSize;
var result = yield this.putStream(name, stream, options);
if (options && options.progress) {
yield options.progress(1);
}
var ret = {
res: result.res,
bucket: this.options.bucket,
name: name,
etag: result.res.headers.etag
};
if (options.headers && options.headers['x-oss-callback']) {
ret.data = result.data;
}
return ret;
}
if (options.partSize && options.partSize < minPartSize) {
throw new Error('partSize must not be smaller than ' + minPartSize);
}
var result = yield this.initMultipartUpload(name, options);
var uploadId = result.uploadId;
var partSize = this._getPartSize(fileSize, options.partSize);
var checkpoint = {
file: file,
name: name,
fileSize: fileSize,
partSize: partSize,
uploadId: uploadId,
doneParts: []
};
if (options && options.progress) {
yield options.progress(0, checkpoint, result.res);
}
return yield this._resumeMultipart(checkpoint, options);
};
/*
* Resume multipart upload from checkpoint. The checkpoint will be
* updated after each successful part upload.
* @param {Object} checkpoint the checkpoint
* @param {Object} options
*/
proto._resumeMultipart = function* _resumeMultipart(checkpoint, options) {
if (this.isCancel()) {
throw this._makeCancelEvent();
}
var file = checkpoint.file;
var fileSize = checkpoint.fileSize;
var partSize = checkpoint.partSize;
var uploadId = checkpoint.uploadId;
var doneParts = checkpoint.doneParts;
var name = checkpoint.name;
var partOffs = this._divideParts(fileSize, partSize);
var numParts = partOffs.length;
var uploadPartJob = function* (self, partNo) {
if (!self.isCancel()) {
try {
var pi = partOffs[partNo - 1];
var data = {
stream: self._createStream(file, pi.start, pi.end),
size: pi.end - pi.start
};
var result = yield self._uploadPart(name, uploadId, partNo, data);
doneParts.push({
number: partNo,
etag: result.res.headers.etag
});
checkpoint.doneParts = doneParts;
if (!self.isCancel() && options && options.progress) {
yield options.progress(doneParts.length / numParts, checkpoint, result.res);
}
} catch (err) {
err.partNum = partNo;
throw err;
}
}
};
var all = Array.from(new Array(numParts), (x, i) => i + 1);
var done = doneParts.map(p => p.number);
var todo = all.filter(p => done.indexOf(p) < 0);
var defaultParallel = 5;
var parallel = options.parallel || defaultParallel;
if (this.checkBrowserAndVersion('Internet Explorer', '10') || parallel === 1) {
for (var i = 0; i < todo.length; i++) {
if (this.isCancel()) {
throw this._makeCancelEvent();
}
yield uploadPartJob(this, todo[i]);
}
} else {
// upload in parallel
var jobs = [];
for (var i = 0; i < todo.length; i++) {
jobs.push(uploadPartJob(this, todo[i]));
}
// start uploads jobs
var errors = yield this._thunkPool(jobs, parallel);
if (this.isCancel()) {
jobs = null;
throw this._makeCancelEvent();
}
// check errors after all jobs are completed
if (errors && errors.length > 0) {
var err = errors[0];
err.message = 'Failed to upload some parts with error: ' + err.toString() + " part_num: "+ err.partNum;
throw err;
}
}
return yield this.completeMultipartUpload(name, uploadId, doneParts, options);
};
is.file = function (file) {
return typeof(File) !== 'undefined' && file instanceof File;
};
/**
* Get file size
*/
proto._getFileSize = function* _getFileSize(file) {
if (is.buffer(file)) {
return file.length;
} else if (is.file(file)) {
return file.size;
} if (is.string(file)) {
var stat = yield this._statFile(file);
return stat.size;
}
throw new Error('_getFileSize requires Buffer/File/String.');
};
/*
* Readable stream for Web File
*/
var Readable = require('stream').Readable;
function WebFileReadStream(file, options) {
if (!(this instanceof WebFileReadStream)) {
return new WebFileReadStream(file, options);
}
Readable.call(this, options);
this.file = file;
this.reader = new FileReader();
this.start = 0;
this.finish = false;
this.fileBuffer;
}
util.inherits(WebFileReadStream, Readable);
WebFileReadStream.prototype.readFileAndPush = function readFileAndPush(size) {
if (this.fileBuffer){
var pushRet = true;
while (pushRet && this.fileBuffer && this.start < this.fileBuffer.length) {
var start = this.start;
var end = start + size;
end = end > this.fileBuffer.length ? this.fileBuffer.length : end;
this.start = end;
pushRet = this.push(this.fileBuffer.slice(start, end));
}
}
}
WebFileReadStream.prototype._read = function _read(size) {
if ((this.file && this.start >= this.file.size) ||
(this.fileBuffer && this.start >= this.fileBuffer.length) ||
(this.finish) || (0 == this.start && !this.file)) {
if (!this.finish) {
this.fileBuffer = null;
this.finish = true;
}
this.push(null);
return;
}
var defaultReadSize = 16 * 1024;
size = size ? size : defaultReadSize;
var that = this;
this.reader.onload = function (e) {
that.fileBuffer = new Buffer(new Uint8Array(e.target.result));
that.file = null;
that.readFileAndPush(size);
};
if (0 == this.start) {
this.reader.readAsArrayBuffer(this.file);
} else {
this.readFileAndPush(size);
}
};
proto._createStream = function _createStream(file, start, end) {
if (is.file(file)) {
return new WebFileReadStream(file.slice(start, end));
}
// else if (is.string(file)) {
// return fs.createReadStream(file, {
// start: start,
// end: end - 1
// });
// }
else {
throw new Error('_createStream requires File/String.');
}
};
proto._getPartSize = function _getPartSize(fileSize, partSize) {
var maxNumParts = 10 * 1000;
var defaultPartSize = 1 * 1024 * 1024;
if (!partSize) {
return defaultPartSize;
}
return Math.max(
Math.ceil(fileSize / maxNumParts),
partSize
);
};
proto._divideParts = function _divideParts(fileSize, partSize) {
var numParts = Math.ceil(fileSize / partSize);
var partOffs = [];
for (var i = 0; i < numParts; i++) {
var start = partSize * i;
var end = Math.min(start + partSize, fileSize);
partOffs.push({
start: start,
end: end
});
}
return partOffs;
};
//cancel is not error , so create an object
proto._makeCancelEvent = function () {
var cancelEvent = {
status: 0,
name: 'cancel'
};
return cancelEvent;
}