2025-04-24 17:03:28 +08:00

97 lines
2.0 KiB
JavaScript

'use strict';
var co = require('co');
var Queue = require('co-priority-queue');
var proto = exports;
/**
* use parallel operations pools
* @param thunks
* @param parallel concurrent number
* @returns {Promise | Promise<any>}
* @private
*/
proto._thunkPool = function thunkPool(thunks, parallel) {
var that = this;
var _Promise = require('any-promise');
var _done = 0;
var _errs = [];
function checkFinish(thunks, resolve) {
_done++;
if (_done === thunks.length) {
resolve(_errs);
}
}
function Limiter(concurrency, resolve) {
var queue = new Queue();
var endQueueSum = 0;
function* concurrencyJob() {
var yieldable;
while (!that.isCancel()) {
yieldable = yield queue.next();
yield yieldable();
}
endQueueSum += 1;
if (endQueueSum === concurrency) {
queue.fns = [];
queue.buffer = [];
resolve();
}
}
function catchError(err) {
console.error(err.stack);
}
// Create consumers
for (var i = 0; i < concurrency; i++) {
co(concurrencyJob).catch(catchError);
}
var limit = function(generator, priority) {
return function(cb) {
queue.push(function *() {
try {
cb(null, yield generator);
} catch (err) {
cb(err);
}
}, priority);
};
};
return limit;
}
return new _Promise(function (resolve) {
var _limit = new Limiter(parallel, resolve);
function* job() {
yield _limit(thunks[i]);
checkFinish(thunks, resolve);
}
function catchError(err) {
_errs.push(err);
checkFinish(thunks, resolve);
}
for (var i = 0; i < thunks.length; i++) {
co(job).catch(catchError);
}
});
};
/**
* cancel operation, now can use with multipartUpload
*/
proto.cancel = function () {
this.options.cancelFlag = true;
}
proto.isCancel = function () {
return this.options.cancelFlag;
}
proto.resetCancelFlag = function () {
this.options.cancelFlag = false;
}