2025-04-24 17:03:28 +08:00

237 lines
6.2 KiB
JavaScript

'use strict';
const co = require('co');
const defer = require('co-defer');
const Base = require('sdk-base');
const util = require('util');
const utility = require('utility');
const ready = require('get-ready');
const copy = require('copy-to');
const currentIP = require('address').ip();
const RR = 'roundRobin';
const MS = 'masterSlave';
module.exports = function (OssClient) {
function Client(options) {
if (!(this instanceof Client)) {
return new Client(options);
}
if (!options || !Array.isArray(options.cluster)) {
throw new Error('require options.cluster to be an array');
}
Base.call(this);
this.clients = [];
this.availables = {};
for (var i = 0; i < options.cluster.length; i++) {
var opt = options.cluster[i];
copy(options).pick('timeout', 'agent', 'urllib').to(opt);
this.clients.push(new OssClient(opt));
this.availables[i] = true;
}
this.schedule = options.schedule || RR;
// only read from master, default is false
this.masterOnly = !!options.masterOnly;
this.index = 0;
const heartbeatInterval = options.heartbeatInterval || 10000;
this._checkAvailableLock = false;
this._timerId = defer.setInterval(this._checkAvailable.bind(this, true), heartbeatInterval);
this._ignoreStatusFile = options.ignoreStatusFile || false;
this._init();
}
util.inherits(Client, Base);
const proto = Client.prototype;
ready.mixin(proto);
const GET_METHODS = [
'head',
'get',
'getStream',
'list',
'getACL',
];
const PUT_METHODS = [
'put',
'putStream',
'delete',
'deleteMulti',
'copy',
'putMeta',
'putACL',
];
GET_METHODS.forEach(function (method) {
proto[method] = function* () {
const args = utility.argumentsToArray(arguments);
var client = this.chooseAvailable();
var lastError;
try {
return yield client[method].apply(client, args);
} catch (err) {
if (err.status && err.status >= 200 && err.status < 500) {
// 200 ~ 499 belong to normal response, don't try again
throw err;
}
// < 200 || >= 500 need to retry from other cluser node
lastError = err;
}
for (var i = 0; i < this.clients.length; i++) {
var c = this.clients[i];
if (c === client) {
continue;
}
try {
return yield c[method].apply(client, args);
} catch (err){
if (err.status && err.status >= 200 && err.status < 500) {
// 200 ~ 499 belong to normal response, don't try again
throw err;
}
// < 200 || >= 500 need to retry from other cluser node
lastError = err;
}
}
lastError.message += ' (all clients are down)';
throw lastError;
};
});
// must cluster node write success
PUT_METHODS.forEach(function (method) {
proto[method] = function* () {
var args = utility.argumentsToArray(arguments);
var res = yield this.clients.map(function (client) {
return client[method].apply(client, args);
});
return res[0];
};
});
proto.signatureUrl = function signatureUrl(/* name */) {
var args = utility.argumentsToArray(arguments);
var client = this.chooseAvailable();
return client.signatureUrl.apply(client, args);
};
proto.getObjectUrl = function getObjectUrl(/* name, baseUrl */) {
var args = utility.argumentsToArray(arguments);
var client = this.chooseAvailable();
return client.getObjectUrl.apply(client, args);
};
proto._init = function _init() {
const that = this;
co(function*() {
yield that._checkAvailable(that._ignoreStatusFile);
that.ready(true);
}).catch(function(err) {
that.emit('error', err);
});
};
proto._checkAvailable = function* _checkAvailable(ignoreStatusFile) {
const name = '._ali-oss/check.status.' + currentIP + '.txt';
if (!ignoreStatusFile) {
// only start will try to write the file
yield this.put(name, new Buffer('check available started at ' + Date()));
}
if (this._checkAvailableLock) {
return;
}
this._checkAvailableLock = true;
var downStatusFiles = [];
for (var i = 0; i < this.clients.length; i++) {
var client = this.clients[i];
// check 3 times
var available = yield this._checkStatus(client, name);
if (!available) {
// check again
available = yield this._checkStatus(client, name);
}
if (!available) {
// check again
available = yield this._checkStatus(client, name);
if (!available) {
downStatusFiles.push(client._objectUrl(name));
}
}
this.availables[i] = available;
}
this._checkAvailableLock = false;
if (downStatusFiles.length > 0) {
const err = new Error(downStatusFiles.length + ' data node down, please check status file: ' + downStatusFiles.join(', '));
err.name = 'CheckAvailableError';
this.emit('error', err);
}
};
proto._checkStatus = function* _checkStatus(client, name) {
var available = true;
try {
yield client.head(name);
} catch (err) {
// 404 will be available too
if (!err.status || err.status >= 500 || err.status < 200) {
available = false;
}
}
return available;
};
proto.chooseAvailable = function chooseAvailable() {
if (this.schedule === MS) {
// only read from master
if (this.masterOnly) {
return this.clients[0];
}
for (var i = 0; i < this.clients.length; i++) {
if (this.availables[i]) {
return this.clients[i];
}
}
// all down, try to use this first one
return this.clients[0];
}
// RR
var n = this.clients.length;
while (n > 0) {
var i = this._nextRRIndex();
if (this.availables[i]) {
return this.clients[i];
}
n--;
}
// all down, try to use this first one
return this.clients[0];
};
proto._nextRRIndex = function _nextRRIndex() {
var index = this.index++;
if (this.index >= this.clients.length) {
this.index = 0;
}
return index;
};
proto.close = function close() {
clearInterval(this._timerId);
this._timerId = null;
};
return Client;
};