276 lines
9.5 KiB
JavaScript
276 lines
9.5 KiB
JavaScript
|
/*
|
||
|
This file is part of web3.js.
|
||
|
|
||
|
web3.js is free software: you can redistribute it and/or modify
|
||
|
it under the terms of the GNU Lesser General Public License as published by
|
||
|
the Free Software Foundation, either version 3 of the License, or
|
||
|
(at your option) any later version.
|
||
|
|
||
|
web3.js is distributed in the hope that it will be useful,
|
||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
GNU Lesser General Public License for more details.
|
||
|
|
||
|
You should have received a copy of the GNU Lesser General Public License
|
||
|
along with web3.js. If not, see <http://www.gnu.org/licenses/>.
|
||
|
*/
|
||
|
/**
|
||
|
* @file subscription.js
|
||
|
* @author Fabian Vogelsteller <fabian@ethereum.org>
|
||
|
* @date 2017
|
||
|
*/
|
||
|
"use strict";
|
||
|
var _ = require('underscore');
|
||
|
var errors = require('web3-core-helpers').errors;
|
||
|
var EventEmitter = require('eventemitter3');
|
||
|
var formatters = require('web3-core-helpers').formatters;
|
||
|
function Subscription(options) {
|
||
|
EventEmitter.call(this);
|
||
|
this.id = null;
|
||
|
this.callback = _.identity;
|
||
|
this.arguments = null;
|
||
|
this.lastBlock = null; // "from" block tracker for backfilling events on reconnection
|
||
|
this.options = {
|
||
|
subscription: options.subscription,
|
||
|
type: options.type,
|
||
|
requestManager: options.requestManager
|
||
|
};
|
||
|
}
|
||
|
// INHERIT
|
||
|
Subscription.prototype = Object.create(EventEmitter.prototype);
|
||
|
Subscription.prototype.constructor = Subscription;
|
||
|
/**
|
||
|
* Should be used to extract callback from array of arguments. Modifies input param
|
||
|
*
|
||
|
* @method extractCallback
|
||
|
* @param {Array} arguments
|
||
|
* @return {Function|Null} callback, if exists
|
||
|
*/
|
||
|
Subscription.prototype._extractCallback = function (args) {
|
||
|
if (_.isFunction(args[args.length - 1])) {
|
||
|
return args.pop(); // modify the args array!
|
||
|
}
|
||
|
};
|
||
|
/**
|
||
|
* Should be called to check if the number of arguments is correct
|
||
|
*
|
||
|
* @method validateArgs
|
||
|
* @param {Array} arguments
|
||
|
* @throws {Error} if it is not
|
||
|
*/
|
||
|
Subscription.prototype._validateArgs = function (args) {
|
||
|
var subscription = this.options.subscription;
|
||
|
if (!subscription)
|
||
|
subscription = {};
|
||
|
if (!subscription.params)
|
||
|
subscription.params = 0;
|
||
|
if (args.length !== subscription.params) {
|
||
|
throw errors.InvalidNumberOfParams(args.length, subscription.params, subscription.subscriptionName);
|
||
|
}
|
||
|
};
|
||
|
/**
|
||
|
* Should be called to format input args of method
|
||
|
*
|
||
|
* @method formatInput
|
||
|
* @param {Array}
|
||
|
* @return {Array}
|
||
|
*/
|
||
|
Subscription.prototype._formatInput = function (args) {
|
||
|
var subscription = this.options.subscription;
|
||
|
if (!subscription) {
|
||
|
return args;
|
||
|
}
|
||
|
if (!subscription.inputFormatter) {
|
||
|
return args;
|
||
|
}
|
||
|
var formattedArgs = subscription.inputFormatter.map(function (formatter, index) {
|
||
|
return formatter ? formatter(args[index]) : args[index];
|
||
|
});
|
||
|
return formattedArgs;
|
||
|
};
|
||
|
/**
|
||
|
* Should be called to format output(result) of method
|
||
|
*
|
||
|
* @method formatOutput
|
||
|
* @param result {Object}
|
||
|
* @return {Object}
|
||
|
*/
|
||
|
Subscription.prototype._formatOutput = function (result) {
|
||
|
var subscription = this.options.subscription;
|
||
|
return (subscription && subscription.outputFormatter && result) ? subscription.outputFormatter(result) : result;
|
||
|
};
|
||
|
/**
|
||
|
* Should create payload from given input args
|
||
|
*
|
||
|
* @method toPayload
|
||
|
* @param {Array} args
|
||
|
* @return {Object}
|
||
|
*/
|
||
|
Subscription.prototype._toPayload = function (args) {
|
||
|
var params = [];
|
||
|
this.callback = this._extractCallback(args) || _.identity;
|
||
|
if (!this.subscriptionMethod) {
|
||
|
this.subscriptionMethod = args.shift();
|
||
|
// replace subscription with given name
|
||
|
if (this.options.subscription.subscriptionName) {
|
||
|
this.subscriptionMethod = this.options.subscription.subscriptionName;
|
||
|
}
|
||
|
}
|
||
|
if (!this.arguments) {
|
||
|
this.arguments = this._formatInput(args);
|
||
|
this._validateArgs(this.arguments);
|
||
|
args = []; // make empty after validation
|
||
|
}
|
||
|
// re-add subscriptionName
|
||
|
params.push(this.subscriptionMethod);
|
||
|
params = params.concat(this.arguments);
|
||
|
if (args.length) {
|
||
|
throw new Error('Only a callback is allowed as parameter on an already instantiated subscription.');
|
||
|
}
|
||
|
return {
|
||
|
method: this.options.type + '_subscribe',
|
||
|
params: params
|
||
|
};
|
||
|
};
|
||
|
/**
|
||
|
* Unsubscribes and clears callbacks
|
||
|
*
|
||
|
* @method unsubscribe
|
||
|
* @return {Object}
|
||
|
*/
|
||
|
Subscription.prototype.unsubscribe = function (callback) {
|
||
|
this.options.requestManager.removeSubscription(this.id, callback);
|
||
|
this.id = null;
|
||
|
this.lastBlock = null;
|
||
|
this.removeAllListeners();
|
||
|
};
|
||
|
/**
|
||
|
* Subscribes and watches for changes
|
||
|
*
|
||
|
* @method subscribe
|
||
|
* @param {String} subscription the subscription
|
||
|
* @param {Object} options the options object with address topics and fromBlock
|
||
|
* @return {Object}
|
||
|
*/
|
||
|
Subscription.prototype.subscribe = function () {
|
||
|
var _this = this;
|
||
|
var args = Array.prototype.slice.call(arguments);
|
||
|
var payload = this._toPayload(args);
|
||
|
if (!payload) {
|
||
|
return this;
|
||
|
}
|
||
|
// throw error, if provider is not set
|
||
|
if (!this.options.requestManager.provider) {
|
||
|
setTimeout(function () {
|
||
|
var err1 = new Error('No provider set.');
|
||
|
_this.callback(err1, null, _this);
|
||
|
_this.emit('error', err1);
|
||
|
}, 0);
|
||
|
return this;
|
||
|
}
|
||
|
// throw error, if provider doesnt support subscriptions
|
||
|
if (!this.options.requestManager.provider.on) {
|
||
|
setTimeout(function () {
|
||
|
var err2 = new Error('The current provider doesn\'t support subscriptions: ' +
|
||
|
_this.options.requestManager.provider.constructor.name);
|
||
|
_this.callback(err2, null, _this);
|
||
|
_this.emit('error', err2);
|
||
|
}, 0);
|
||
|
return this;
|
||
|
}
|
||
|
// Re-subscription only: continue fetching from the last block we received.
|
||
|
// a dropped connection may have resulted in gaps in the logs...
|
||
|
if (this.lastBlock && _.isObject(this.options.params)) {
|
||
|
payload.params[1] = this.options.params;
|
||
|
payload.params[1].fromBlock = formatters.inputBlockNumberFormatter(this.lastBlock + 1);
|
||
|
}
|
||
|
// if id is there unsubscribe first
|
||
|
if (this.id) {
|
||
|
this.unsubscribe();
|
||
|
}
|
||
|
// store the params in the options object
|
||
|
this.options.params = payload.params[1];
|
||
|
// get past logs, if fromBlock is available
|
||
|
if (payload.params[0] === 'logs' && _.isObject(payload.params[1]) && payload.params[1].hasOwnProperty('fromBlock') && isFinite(payload.params[1].fromBlock)) {
|
||
|
// send the subscription request
|
||
|
// copy the params to avoid race-condition with deletion below this block
|
||
|
var blockParams = Object.assign({}, payload.params[1]);
|
||
|
this.options.requestManager.send({
|
||
|
method: 'eth_getLogs',
|
||
|
params: [blockParams]
|
||
|
}, function (err, logs) {
|
||
|
if (!err) {
|
||
|
logs.forEach(function (log) {
|
||
|
var output = _this._formatOutput(log);
|
||
|
_this.callback(null, output, _this);
|
||
|
_this.emit('data', output);
|
||
|
});
|
||
|
// TODO subscribe here? after the past logs?
|
||
|
}
|
||
|
else {
|
||
|
setTimeout(function () {
|
||
|
_this.callback(err, null, _this);
|
||
|
_this.emit('error', err);
|
||
|
}, 0);
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
// create subscription
|
||
|
// TODO move to separate function? so that past logs can go first?
|
||
|
if (typeof payload.params[1] === 'object')
|
||
|
delete payload.params[1].fromBlock;
|
||
|
this.options.requestManager.send(payload, function (err, result) {
|
||
|
if (!err && result) {
|
||
|
_this.id = result;
|
||
|
_this.method = payload.params[0];
|
||
|
_this.emit('connected', result);
|
||
|
// call callback on notifications
|
||
|
_this.options.requestManager.addSubscription(_this, function (error, result) {
|
||
|
if (!error) {
|
||
|
if (!_.isArray(result)) {
|
||
|
result = [result];
|
||
|
}
|
||
|
result.forEach(function (resultItem) {
|
||
|
var output = _this._formatOutput(resultItem);
|
||
|
// Track current block (for gaps introduced by dropped connections)
|
||
|
_this.lastBlock = _.isObject(output) ? output.blockNumber : null;
|
||
|
if (_.isFunction(_this.options.subscription.subscriptionHandler)) {
|
||
|
return _this.options.subscription.subscriptionHandler.call(_this, output);
|
||
|
}
|
||
|
else {
|
||
|
_this.emit('data', output);
|
||
|
}
|
||
|
// call the callback, last so that unsubscribe there won't affect the emit above
|
||
|
_this.callback(null, output, _this);
|
||
|
});
|
||
|
}
|
||
|
else {
|
||
|
_this.callback(error, false, _this);
|
||
|
_this.emit('error', error);
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
else {
|
||
|
setTimeout(function () {
|
||
|
_this.callback(err, false, _this);
|
||
|
_this.emit('error', err);
|
||
|
}, 0);
|
||
|
}
|
||
|
});
|
||
|
// return an object to cancel the subscription
|
||
|
return this;
|
||
|
};
|
||
|
/**
|
||
|
* Resubscribe
|
||
|
*
|
||
|
* @method resubscribe
|
||
|
*
|
||
|
* @returns {void}
|
||
|
*/
|
||
|
Subscription.prototype.resubscribe = function () {
|
||
|
this.options.requestManager.removeSubscription(this.id); // unsubscribe
|
||
|
this.id = null;
|
||
|
this.subscribe(this.callback);
|
||
|
};
|
||
|
module.exports = Subscription;
|