323 lines
11 KiB
JavaScript
Raw Normal View History

2025-04-23 09:34:08 +08:00
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const command_1 = require("./command");
const util_1 = require("util");
const standard_as_callback_1 = require("standard-as-callback");
const redis_commands_1 = require("redis-commands");
const cluster_key_slot_1 = require("cluster-key-slot");
const PromiseContainer = require("./promiseContainer");
const commander_1 = require("./commander");
function Pipeline(redis) {
commander_1.default.call(this);
this.redis = redis;
this.isCluster = this.redis.constructor.name === "Cluster";
this.options = redis.options;
this._queue = [];
this._result = [];
this._transactions = 0;
this._shaToScript = {};
Object.keys(redis.scriptsSet).forEach((name) => {
const script = redis.scriptsSet[name];
this._shaToScript[script.sha] = script;
this[name] = redis[name];
this[name + "Buffer"] = redis[name + "Buffer"];
});
const Promise = PromiseContainer.get();
this.promise = new Promise((resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
});
const _this = this;
Object.defineProperty(this, "length", {
get: function () {
return _this._queue.length;
},
});
}
exports.default = Pipeline;
Object.assign(Pipeline.prototype, commander_1.default.prototype);
Pipeline.prototype.fillResult = function (value, position) {
if (this._queue[position].name === "exec" && Array.isArray(value[1])) {
const execLength = value[1].length;
for (let i = 0; i < execLength; i++) {
if (value[1][i] instanceof Error) {
continue;
}
const cmd = this._queue[position - (execLength - i)];
try {
value[1][i] = cmd.transformReply(value[1][i]);
}
catch (err) {
value[1][i] = err;
}
}
}
this._result[position] = value;
if (--this.replyPending) {
return;
}
if (this.isCluster) {
let retriable = true;
let commonError;
for (let i = 0; i < this._result.length; ++i) {
const error = this._result[i][0];
const command = this._queue[i];
if (error) {
if (command.name === "exec" &&
error.message ===
"EXECABORT Transaction discarded because of previous errors.") {
continue;
}
if (!commonError) {
commonError = {
name: error.name,
message: error.message,
};
}
else if (commonError.name !== error.name ||
commonError.message !== error.message) {
retriable = false;
break;
}
}
else if (!command.inTransaction) {
const isReadOnly = redis_commands_1.exists(command.name) && redis_commands_1.hasFlag(command.name, "readonly");
if (!isReadOnly) {
retriable = false;
break;
}
}
}
if (commonError && retriable) {
const _this = this;
const errv = commonError.message.split(" ");
const queue = this._queue;
let inTransaction = false;
this._queue = [];
for (let i = 0; i < queue.length; ++i) {
if (errv[0] === "ASK" &&
!inTransaction &&
queue[i].name !== "asking" &&
(!queue[i - 1] || queue[i - 1].name !== "asking")) {
const asking = new command_1.default("asking");
asking.ignore = true;
this.sendCommand(asking);
}
queue[i].initPromise();
this.sendCommand(queue[i]);
inTransaction = queue[i].inTransaction;
}
let matched = true;
if (typeof this.leftRedirections === "undefined") {
this.leftRedirections = {};
}
const exec = function () {
_this.exec();
};
this.redis.handleError(commonError, this.leftRedirections, {
moved: function (slot, key) {
_this.preferKey = key;
_this.redis.slots[errv[1]] = [key];
_this.redis.refreshSlotsCache();
_this.exec();
},
ask: function (slot, key) {
_this.preferKey = key;
_this.exec();
},
tryagain: exec,
clusterDown: exec,
connectionClosed: exec,
maxRedirections: () => {
matched = false;
},
defaults: () => {
matched = false;
},
});
if (matched) {
return;
}
}
}
let ignoredCount = 0;
for (let i = 0; i < this._queue.length - ignoredCount; ++i) {
if (this._queue[i + ignoredCount].ignore) {
ignoredCount += 1;
}
this._result[i] = this._result[i + ignoredCount];
}
this.resolve(this._result.slice(0, this._result.length - ignoredCount));
};
Pipeline.prototype.sendCommand = function (command) {
if (this._transactions > 0) {
command.inTransaction = true;
}
const position = this._queue.length;
command.pipelineIndex = position;
command.promise
.then((result) => {
this.fillResult([null, result], position);
})
.catch((error) => {
this.fillResult([error], position);
});
this._queue.push(command);
return this;
};
Pipeline.prototype.addBatch = function (commands) {
let command, commandName, args;
for (let i = 0; i < commands.length; ++i) {
command = commands[i];
commandName = command[0];
args = command.slice(1);
this[commandName].apply(this, args);
}
return this;
};
const multi = Pipeline.prototype.multi;
Pipeline.prototype.multi = function () {
this._transactions += 1;
return multi.apply(this, arguments);
};
const execBuffer = Pipeline.prototype.execBuffer;
const exec = Pipeline.prototype.exec;
Pipeline.prototype.execBuffer = util_1.deprecate(function () {
if (this._transactions > 0) {
this._transactions -= 1;
}
return execBuffer.apply(this, arguments);
}, "Pipeline#execBuffer: Use Pipeline#exec instead");
Pipeline.prototype.exec = function (callback) {
if (this._transactions > 0) {
this._transactions -= 1;
return (this.options.dropBufferSupport ? exec : execBuffer).apply(this, arguments);
}
if (!this.nodeifiedPromise) {
this.nodeifiedPromise = true;
standard_as_callback_1.default(this.promise, callback);
}
if (!this._queue.length) {
this.resolve([]);
}
let pipelineSlot;
if (this.isCluster) {
// List of the first key for each command
const sampleKeys = [];
for (let i = 0; i < this._queue.length; i++) {
const keys = this._queue[i].getKeys();
if (keys.length) {
sampleKeys.push(keys[0]);
}
}
if (sampleKeys.length) {
pipelineSlot = cluster_key_slot_1.generateMulti(sampleKeys);
if (pipelineSlot < 0) {
this.reject(new Error("All keys in the pipeline should belong to the same slot"));
return this.promise;
}
}
else {
// Send the pipeline to a random node
pipelineSlot = (Math.random() * 16384) | 0;
}
}
// Check whether scripts exists
const scripts = [];
const addedScriptHashes = {};
for (let i = 0; i < this._queue.length; ++i) {
const item = this._queue[i];
if (this.isCluster && item.isCustomCommand) {
this.reject(new Error("Sending custom commands in pipeline is not supported in Cluster mode."));
return this.promise;
}
if (item.name !== "evalsha") {
continue;
}
const script = this._shaToScript[item.args[0]];
if (!script || addedScriptHashes[script.sha]) {
continue;
}
scripts.push(script);
addedScriptHashes[script.sha] = true;
}
const _this = this;
if (!scripts.length) {
return execPipeline();
}
return this.redis
.script("exists", scripts.map(({ sha }) => sha))
.then(function (results) {
const pending = [];
for (let i = 0; i < results.length; ++i) {
if (!results[i]) {
pending.push(scripts[i]);
}
}
const Promise = PromiseContainer.get();
return Promise.all(pending.map(function (script) {
return _this.redis.script("load", script.lua);
}));
})
.then(execPipeline);
function execPipeline() {
let data = "";
let buffers;
let writePending = (_this.replyPending = _this._queue.length);
let node;
if (_this.isCluster) {
node = {
slot: pipelineSlot,
redis: _this.redis.connectionPool.nodes.all[_this.preferKey],
};
}
let bufferMode = false;
const stream = {
write: function (writable) {
if (writable instanceof Buffer) {
bufferMode = true;
}
if (bufferMode) {
if (!buffers) {
buffers = [];
}
if (typeof data === "string") {
buffers.push(Buffer.from(data, "utf8"));
data = undefined;
}
buffers.push(typeof writable === "string"
? Buffer.from(writable, "utf8")
: writable);
}
else {
data += writable;
}
if (!--writePending) {
let sendData;
if (buffers) {
sendData = Buffer.concat(buffers);
}
else {
sendData = data;
}
if (_this.isCluster) {
node.redis.stream.write(sendData);
}
else {
_this.redis.stream.write(sendData);
}
// Reset writePending for resending
writePending = _this._queue.length;
data = "";
buffers = undefined;
bufferMode = false;
}
},
};
for (let i = 0; i < _this._queue.length; ++i) {
_this.redis.sendCommand(_this._queue[i], stream, node);
}
return _this.promise;
}
};