220 lines
6.6 KiB
JavaScript
220 lines
6.6 KiB
JavaScript
|
'use strict';
|
||
|
|
||
|
var debug = require('debug')('ali-oss:multipart-copy');
|
||
|
var copy = require('copy-to');
|
||
|
var proto = exports;
|
||
|
|
||
|
|
||
|
/**
|
||
|
* Upload a part copy in a multipart from the source bucket/object, used with initMultipartUpload and completeMultipartUpload.
|
||
|
* @param {String} name copy object name
|
||
|
* @param {String} uploadId the upload id
|
||
|
* @param {Number} partNo the part number
|
||
|
* @param {String} range like 0-102400 part size need to copy
|
||
|
* @param {Object} sourceData
|
||
|
* {String} sourceData.sourceKey the source object name
|
||
|
* {String} sourceData.sourceBucketName the source bucket name
|
||
|
* @param {Object} options
|
||
|
*/
|
||
|
proto.uploadPartCopy = function* uploadPartCopy(name, uploadId, partNo, range, sourceData, options) {
|
||
|
options = options || {};
|
||
|
options.headers = options.headers || {};
|
||
|
var copySource = '/' + sourceData.sourceBucketName + '/' + encodeURIComponent(sourceData.sourceKey);
|
||
|
options.headers["x-oss-copy-source"] = copySource;
|
||
|
if (range) {
|
||
|
options.headers["x-oss-copy-source-range"] = 'bytes=' + range;
|
||
|
}
|
||
|
|
||
|
options.subres = {
|
||
|
partNumber: partNo,
|
||
|
uploadId: uploadId
|
||
|
};
|
||
|
var params = this._objectRequestParams('PUT', name, options);
|
||
|
params.mime = options.mime;
|
||
|
params.successStatuses = [200];
|
||
|
|
||
|
var result = yield this.request(params);
|
||
|
|
||
|
return {
|
||
|
name: name,
|
||
|
etag: result.res.headers.etag,
|
||
|
res: result.res
|
||
|
};
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* @param {String} name copy object name
|
||
|
* @param {Object} sourceData
|
||
|
* {String} sourceData.sourceKey the source object name
|
||
|
* {String} sourceData.sourceBucketName the source bucket name
|
||
|
* {Number} sourceData.startOffset data copy start byte offset, e.g: 0
|
||
|
* {Number} sourceData.endOffset data copy end byte offset, e.g: 102400
|
||
|
* @param {Object} options
|
||
|
* {Number} options.partSize
|
||
|
*/
|
||
|
proto.multipartUploadCopy = function* multipartUploadCopy(name, sourceData, options) {
|
||
|
this.resetCancelFlag();
|
||
|
options = options || {};
|
||
|
var objectMeta = yield this._getObjectMeta(sourceData.sourceBucketName, sourceData.sourceKey, {});
|
||
|
var fileSize = objectMeta.res.headers['content-length'];
|
||
|
sourceData.startOffset = sourceData.startOffset || 0;
|
||
|
sourceData.endOffset = sourceData.endOffset || fileSize;
|
||
|
|
||
|
if (options.checkpoint && options.checkpoint.uploadId) {
|
||
|
return yield this._resumeMultipartCopy(options.checkpoint, sourceData, options);
|
||
|
}
|
||
|
|
||
|
var minPartSize = 100 * 1024;
|
||
|
|
||
|
var copySize = sourceData.endOffset - sourceData.startOffset;
|
||
|
if (copySize < minPartSize) {
|
||
|
throw new Error('copySize must not be smaller than ' + minPartSize);
|
||
|
}
|
||
|
|
||
|
if (options.partSize && options.partSize < minPartSize) {
|
||
|
throw new Error('partSize must not be smaller than ' + minPartSize);
|
||
|
}
|
||
|
|
||
|
var result = yield this.initMultipartUpload(name, options);
|
||
|
var uploadId = result.uploadId;
|
||
|
var partSize = this._getPartSize(copySize, options.partSize);
|
||
|
|
||
|
var checkpoint = {
|
||
|
name: name,
|
||
|
copySize: copySize,
|
||
|
partSize: partSize,
|
||
|
uploadId: uploadId,
|
||
|
doneParts: []
|
||
|
};
|
||
|
|
||
|
if (options && options.progress) {
|
||
|
yield options.progress(0, checkpoint, result.res);
|
||
|
}
|
||
|
|
||
|
return yield this._resumeMultipartCopy(checkpoint, sourceData, options);
|
||
|
};
|
||
|
|
||
|
/*
|
||
|
* Resume multipart copy from checkpoint. The checkpoint will be
|
||
|
* updated after each successful part copy.
|
||
|
* @param {Object} checkpoint the checkpoint
|
||
|
* @param {Object} options
|
||
|
*/
|
||
|
proto._resumeMultipartCopy = function* _resumeMultipartCopy(checkpoint, sourceData, options) {
|
||
|
if (this.isCancel()) {
|
||
|
throw this._makeCancelEvent();
|
||
|
}
|
||
|
var copySize = checkpoint.copySize;
|
||
|
var partSize = checkpoint.partSize;
|
||
|
var uploadId = checkpoint.uploadId;
|
||
|
var doneParts = checkpoint.doneParts;
|
||
|
var name = checkpoint.name;
|
||
|
|
||
|
var partOffs = this._divideMultipartCopyParts(copySize, partSize, sourceData.startOffset);
|
||
|
var numParts = partOffs.length;
|
||
|
|
||
|
var uploadPartCopyOptions = {
|
||
|
headers:{}
|
||
|
};
|
||
|
|
||
|
if (options.copyheaders) {
|
||
|
copy(options.copyheaders).to(uploadPartCopyOptions.headers);
|
||
|
}
|
||
|
|
||
|
var uploadPartJob = function* (self, partNo, sourceData) {
|
||
|
if (!self.isCancel()) {
|
||
|
try {
|
||
|
var pi = partOffs[partNo - 1];
|
||
|
var range = pi.start + '-' + (pi.end - 1);
|
||
|
|
||
|
var result = yield self.uploadPartCopy(name, uploadId, partNo, range, sourceData, uploadPartCopyOptions);
|
||
|
|
||
|
if (!self.isCancel()) {
|
||
|
debug('content-range ' + result.res.headers['content-range']);
|
||
|
doneParts.push({
|
||
|
number: partNo,
|
||
|
etag: result.res.headers.etag
|
||
|
});
|
||
|
checkpoint.doneParts = doneParts;
|
||
|
|
||
|
if (options && options.progress) {
|
||
|
yield options.progress(doneParts.length / numParts, checkpoint, result.res);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
} catch (err) {
|
||
|
err.partNum = partNo;
|
||
|
throw err;
|
||
|
}
|
||
|
}
|
||
|
};
|
||
|
|
||
|
var all = Array.from(new Array(numParts), (x, i) => i + 1);
|
||
|
var done = doneParts.map(p => p.number);
|
||
|
var todo = all.filter(p => done.indexOf(p) < 0);
|
||
|
var defaultParallel = 5;
|
||
|
var parallel = options.parallel || defaultParallel;
|
||
|
|
||
|
if (this.checkBrowserAndVersion('Internet Explorer', '10') || parallel === 1) {
|
||
|
for (var i = 0; i < todo.length; i++) {
|
||
|
if (this.isCancel()) {
|
||
|
throw this._makeCancelEvent();
|
||
|
}
|
||
|
yield uploadPartJob(this, todo[i], sourceData);
|
||
|
}
|
||
|
} else {
|
||
|
// upload in parallel
|
||
|
var jobs = [];
|
||
|
for (var i = 0; i < todo.length; i++) {
|
||
|
jobs.push(uploadPartJob(this, todo[i], sourceData));
|
||
|
}
|
||
|
|
||
|
// start uploads jobs
|
||
|
var errors = yield this._thunkPool(jobs, parallel);
|
||
|
|
||
|
if (this.isCancel()) {
|
||
|
jobs = null;
|
||
|
throw this._makeCancelEvent();
|
||
|
}
|
||
|
|
||
|
// check errors after all jobs are completed
|
||
|
if (errors && errors.length > 0) {
|
||
|
var err = errors[0];
|
||
|
err.message = 'Failed to copy some parts with error: ' + err.toString() + " part_num: "+ err.partNum;
|
||
|
throw err;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return yield this.completeMultipartUpload(name, uploadId, doneParts, options);
|
||
|
};
|
||
|
|
||
|
proto._divideMultipartCopyParts = function _divideMultipartCopyParts(fileSize, partSize, startOffset) {
|
||
|
var numParts = Math.ceil(fileSize / partSize);
|
||
|
|
||
|
var partOffs = [];
|
||
|
for (var i = 0; i < numParts; i++) {
|
||
|
var start = partSize * i + startOffset;
|
||
|
var end = Math.min(start + partSize, fileSize + startOffset);
|
||
|
|
||
|
partOffs.push({
|
||
|
start: start,
|
||
|
end: end
|
||
|
});
|
||
|
}
|
||
|
|
||
|
return partOffs;
|
||
|
};
|
||
|
|
||
|
/**
|
||
|
* Get Object Meta
|
||
|
* @param {String} bucket bucket name
|
||
|
* @param {String} name object name
|
||
|
* @param {Object} options
|
||
|
*/
|
||
|
proto._getObjectMeta = function* _getObjectMeta(bucket, name, options) {
|
||
|
var currentBucket = this.getBucket();
|
||
|
this.setBucket(bucket);
|
||
|
var data = yield this.head(name, options);
|
||
|
this.setBucket(currentBucket);
|
||
|
return data;
|
||
|
};
|