diff --git a/src/connection.ts b/src/connection.ts index 334faa894..f7bb3d876 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -102,6 +102,13 @@ abstract class Connection extends EventEmitter { this.emit('state', state); } + /** + * Some connection managers may need to run some action when the wallet is ready. + */ + notifyWalletReady() { + return; + } + /** * Connect to the server and start emitting events. **/ diff --git a/src/models/output.ts b/src/models/output.ts index 40876e820..698ecd53a 100644 --- a/src/models/output.ts +++ b/src/models/output.ts @@ -20,7 +20,7 @@ import ScriptData from './script_data' import Network from './network' import { bytesToOutputValue, unpackLen, unpackToInt, intToBytes, signedIntToBytes } from '../utils/buffer'; import { prettyValue } from '../utils/numbers'; -import {parseP2PKH, parseP2SH, parseScriptData} from '../utils/scripts' +import {parseP2PKH, parseP2SH, parseScriptData, parseScript} from '../utils/scripts' import _ from 'lodash' type optionsType = { @@ -178,28 +178,8 @@ class Output { // we can keep throwing the error. Otherwise, we should just return null // because this method will be used together with others when we are trying to parse a given script. - try { - let parsedScript; - if (P2PKH.identify(this.script)) { - // This is a P2PKH script - parsedScript = parseP2PKH(this.script, network); - } else if (P2SH.identify(this.script)) { - // This is a P2SH script - parsedScript = parseP2SH(this.script, network); - } else { - // defaults to data script - parsedScript = parseScriptData(this.script); - } - this.decodedScript = parsedScript; - return parsedScript; - } catch (error) { - if (error instanceof ParseError) { - // We don't know how to parse this script - return null; - } else { - throw error; - } - } + this.decodedScript = parseScript(this.script, network); + return this.decodedScript; } /** diff --git a/src/models/p2pkh.ts b/src/models/p2pkh.ts index 11543caca..0160579fa 100644 --- a/src/models/p2pkh.ts +++ b/src/models/p2pkh.ts @@ -10,6 +10,7 @@ import { util } from 'bitcore-lib'; import { intToBytes } from '../utils/buffer'; import helpers from '../utils/helpers'; import Address from './address'; +import { IHistoryOutputDecoded } from '../types'; type optionsType = { timelock?: number | null | undefined, @@ -75,6 +76,19 @@ class P2PKH { return util.buffer.concat(arr); } + /** + * Get decoded output. + * + * @return {IHistoryOutputDecoded} + */ + getDecoded(): IHistoryOutputDecoded { + return { + type: this.getType(), + address: this.address.base58, + timelock: this.timelock, + } + } + /** * Identify a script as P2PKH or not. * diff --git a/src/models/p2sh.ts b/src/models/p2sh.ts index e43cb069b..5d33fe8ff 100644 --- a/src/models/p2sh.ts +++ b/src/models/p2sh.ts @@ -10,6 +10,7 @@ import { util } from 'bitcore-lib'; import helpers from '../utils/helpers'; import { intToBytes } from '../utils/buffer'; import Address from './address'; +import { IHistoryOutputDecoded } from '../types'; type optionsType = { timelock?: number | null | undefined, @@ -71,6 +72,19 @@ class P2SH { return util.buffer.concat(arr); } + /** + * Get decoded output. + * + * @return {IHistoryOutputDecoded} + */ + getDecoded(): IHistoryOutputDecoded { + return { + type: this.getType(), + address: this.address.base58, + timelock: this.timelock, + } + } + /** * Identify a script as P2SH or not. * diff --git a/src/models/script_data.ts b/src/models/script_data.ts index b10f76dba..db77c4638 100644 --- a/src/models/script_data.ts +++ b/src/models/script_data.ts @@ -9,6 +9,7 @@ import { OP_CHECKSIG } from '../opcodes'; import { util } from 'bitcore-lib'; import helpers from '../utils/helpers'; import buffer from 'buffer'; +import { IHistoryOutputDecoded } from '../types'; class ScriptData { // String of data to store on the script @@ -25,11 +26,11 @@ class ScriptData { /** * Get script type * - * @return {String} + * @return {string} * @memberof ScriptData * @inner */ - getType(): String { + getType(): 'data' { return 'data'; } @@ -47,6 +48,18 @@ class ScriptData { arr.push(OP_CHECKSIG); return util.buffer.concat(arr); } + + /** + * Get decoded output. + * + * @return {IHistoryOutputDecoded} + */ + getDecoded(): IHistoryOutputDecoded { + return { + type: this.getType(), + data: this.data, + } + } } export default ScriptData; diff --git a/src/new/connection.ts b/src/new/connection.ts index 830d19d64..3c4c9e01a 100644 --- a/src/new/connection.ts +++ b/src/new/connection.ts @@ -97,12 +97,4 @@ class WalletConnection extends BaseConnection { } } -// TODO: This is to maintain compatibility until we migrate to typescript -// @ts-ignore -WalletConnection.CLOSED = 0; -// @ts-ignore -WalletConnection.CONNECTING = 1; -// @ts-ignore -WalletConnection.CONNECTED = 2; - export default WalletConnection; diff --git a/src/new/event_queue_connection.ts b/src/new/event_queue_connection.ts new file mode 100644 index 000000000..0f275311e --- /dev/null +++ b/src/new/event_queue_connection.ts @@ -0,0 +1,421 @@ +import GenericWebSocket from '../websocket'; +import helpers from '../utils/helpers'; +import BaseConnection, { + ConnectionParams, +} from '../connection'; +import { + ConnectionState, +} from '../wallet/types'; +import Output from '../models/output'; +import P2PKH from '../models/p2pkh'; +import P2SH from '../models/p2sh'; +import ScriptData from '../models/script_data'; +import Network from '../models/network'; +import { handleSubscribeAddress, handleWsDashboard } from '../utils/connection'; +import { parseScript } from '../utils/scripts'; +import tokenUtils from '../utils/tokens'; +import { IStorage, IHistoryTx, IHistoryOutputDecoded, IHistoryOutput } from '../types'; +import { HATHOR_TOKEN_CONFIG } from '../constants'; +import txApi from '../api/txApi'; +import { FullNodeTxResponse } from '../wallet/types'; +import transactionUtils from '../utils/transaction'; +import { cloneDeep } from 'lodash'; + + +interface IWalletUpdateEvent { + type: 'wallet:address_history', + history: IHistoryTx, +} + +/** + * EventQueueTxData + * + * | Attribute | Type | Description | + * |----------------|------------------|---------------------------------------------------------------------------| + * | `hash` | `str` | The hash of this vertex. | + * | `nonce` | `Optional[int]` | The nonce of this vertex. | + * | `timestamp` | `int` | The timestamp of this vertex. | + * | `version` | `int` | The version of this vertex. | + * | `weight` | `float` | The weight of this vertex. | + * | `inputs` | `List[TxInput]` | The inputs of this vertex. | + * | `outputs` | `List[TxOutput]` | The outputs of this vertex. | + * | `parents` | `List[str]` | The hashes of this vertex's parents. | + * | `tokens` | `List[str]` | The tokens of this vertex. | + * | `token_name` | `Optional[str]` | The token name of this vertex, if it is a `TokenCreationTransaction`. | + * | `token_symbol` | `Optional[str]` | The token symbol of this vertex, if it is a `TokenCreationTransaction`. | + * | `metadata` | `TxMetadata` | The metadata of this vertex. | + * | `aux_pow` | `Optional[str]` | The auxiliary Proof of Work of this vertex, if it is a `MergeMinedBlock`. | + */ +interface EventQueueTxData { + hash: string; + nonce?: number; + timestamp: number; + version: number; + weight: number; + inputs: EventQueueTxInput[]; + outputs: EventQueueTxOutput[]; + parents: string[]; + tokens: string[]; + token_name?: string; + token_symbol?: string; + metadata: EventQueueTxMetadata; + aux_pow?: string; +} + +/** + * EventQueueTxMetadata + * + * | Attribute | Type | + * |----------------------|---------------------| + * | `hash` | `str` | + * | `spent_outputs` | `List[SpentOutput]` | + * | `conflict_with` | `List[str]` | + * | `voided_by` | `List[str]` | + * | `received_by` | `List[int]` | + * | `children` | `List[str]` | + * | `twins` | `List[str]` | + * | `accumulated_weight` | `float` | + * | `score` | `float` | + * | `first_block` | `Optional[str]` | + * | `height` | `int` | + * | `validation` | `str` | + */ +interface EventQueueTxMetadata { + hash: string; + spent_outputs: EventQueueSpentOutput[]; + conflict_with: string[]; + voided_by: string[]; + received_by: number[]; + children: string[]; + twins: string[]; + accumulated_weight: number; + score: number; + first_block?: string; + height: number; + validation: string; +} + + +/** + * EventQueueTxInput + * + * | Attribute | Type | + * |-----------|-------| + * | `tx_id` | `str` | + * | `index` | `int` | + * | `data` | `str` | + */ +interface EventQueueTxInput { + tx_id: string; + index: number; + data: string; +} + +/** + * EventQueueTxOutput + * + * | Attribute | Type | + * |--------------|-------| + * | `value` | `int` | + * | `script` | `str` | + * | `token_data` | `int` | + */ +interface EventQueueTxOutput { + value: number; + script: string; + token_data: number; +} + +/** + * EventQueueSpentOutput + * + * | Attribute | Type | + * |-----------|-------------| + * | `index` | `int` | + * | `tx_ids` | `List[str]` | + */ +interface EventQueueSpentOutput { + index: number; + tx_ids: string[]; +} + +/** + * ReorgData + * + * | Attribute | Type | Description | + * |-----------------------|-------|--------------------------------------------------------------------------| + * | `reorg_size` | `int` | The amount of blocks affected by this reorg. | + * | `previous_best_block` | `str` | The hash of the best block before this reorg happened. | + * | `new_best_block` | `str` | The hash of the best block after this reorg. | + * | `common_block` | `str` | The hash of the last common block between the two differing blockchains. | + */ +interface EventQueueReorgData { + reorg_size: number; + previous_best_block: string; + new_best_block: string; + common_block: string; +} + + +enum EventType { + LOAD_STARTED = 'LOAD_STARTED', + LOAD_FINISHED = 'LOAD_FINISHED', + NEW_VERTEX_ACCEPTED = 'NEW_VERTEX_ACCEPTED', + REORG_STARTED = 'REORG_STARTED', + REORG_FINISHED = 'REORG_FINISHED', + VERTEX_METADATA_CHANGED = 'VERTEX_METADATA_CHANGED', +} + +type EmptyObject = Record; + +type EventQueueEventDataType = EventQueueTxData | EventQueueReorgData | EmptyObject + +function isEventQueueTxData(data: EventQueueEventDataType): data is EventQueueTxData { + return (data as EventQueueTxData).hash !== undefined; +} + +interface EventQueueBaseEvent { + peer_id: string; + id: number; + timestamp: number; + type: EventType; + data: EventQueueEventDataType; + group_id?: number; +} + +interface EventQueueData { + type: string; + event: EventQueueBaseEvent; + latest_event_id: number; +} + +const EVENT_QUEUE_EVENT_TYPE = 'EVENT'; + +class EventQueueConnection extends BaseConnection { + + private subAddresses: Record; + private storage: IStorage; + private txCache: Record; + + constructor(options: ConnectionParams & { storage: IStorage}) { + super(options); + + const wsOptions = { + wsURL: helpers.getWSServerURL(this.currentServer), + splitMessageType: false, + }; + + if (options.connectionTimeout) { + wsOptions['connectionTimeout'] = options.connectionTimeout; + } + + this.websocket = new GenericWebSocket(wsOptions); + this.subAddresses = {}; + this.txCache = {}; + this.storage = options.storage; + } + + setState(state: ConnectionState): void { + if (state === ConnectionState.CONNECTING) { + // We need to stop the listener to avoid adding transactions when the connection + // is reestablished but the wallet is not ready. + this.websocket?.removeAllListeners(EVENT_QUEUE_EVENT_TYPE); + } + + this.state = state; + this.emit('state', state); + } + + notifyWalletReady(): void { + this.websocket?.on(EVENT_QUEUE_EVENT_TYPE, this.handleEvent.bind(this)); + } + + /** + * Connect to the server and start emitting events. + **/ + start() { + // This should never happen as the websocket is initialized on the constructor + if (!this.websocket) { + throw new Error('Websocket is not initialized'); + } + + this.websocket.on('is_online', this.onConnectionChange); + + this.websocket.on(EVENT_QUEUE_EVENT_TYPE, this.handleEvent.bind(this)) + + this.setState(ConnectionState.CONNECTING); + this.websocket.setup(); + } + + subscribeAddresses(addresses: string[]) { + for (const address of addresses) { + this.subAddresses[address] = true; + } + } + + unsubscribeAddress(address: string) { + delete this.subAddresses[address]; + } + + isTxMine(data: EventQueueTxData): boolean { + for (const output of data.outputs) { + // We will parse the output script of each output + const scriptBuf = Buffer.from(output.script, 'hex'); + const parsedScript = parseScript(scriptBuf, new Network(this.network)); + + // We ignore data outputs and unknown scripts + // Only P2PKH and P2SH scripts have addresses so we can check against subAddresses + if (parsedScript instanceof P2PKH || parsedScript instanceof P2SH) { + if (parsedScript.address.base58 in this.subAddresses) { + // This means that the tx has a subscribed address + return true; + } + } + } + + // alternative, but async + // await this.storage.isAddressMine(parsedScript.address.base58) + + return false; + } + + /** + * Handler for `EVENT` messages from the fullnode. + */ + handleEvent(data: EventQueueData) { + const { event } = data; + if (!(event.type === EventType.NEW_VERTEX_ACCEPTED || event.type === EventType.VERTEX_METADATA_CHANGED)) { + // We only care for vertex events + return; + } + + if (!isEventQueueTxData(event.data)) { + return; + } + + if (!this.isTxMine(event.data)) { + return; + } + + // If the tx is mine, we will emit an event + this.buildEventData(event.data).then(eventData => { + this.emit('tx', eventData); + }); + + // Send ack for this event to the fullnode + // TODO + } + + /** + * Build the `WalletConnection` compatible event + * + * @param {EventQueueTxData} tx + * @returns {Promise} + */ + async buildEventData(tx: EventQueueTxData): Promise { + const historyTx = await this.convertTxData(tx); + return { + type: 'wallet:address_history', + history: historyTx, + }; + } + + async fetchTxData(txId: string): Promise { + + if (txId in this.txCache) { + return this.txCache[txId]; + } + + const storagetx = await this.storage.getTx(txId); + if (storagetx) { + return storagetx; + } + + /// TODO: retry at least 3 times + + // could not find tx in storage, try fullnode + const nodeResponse: FullNodeTxResponse = await new Promise((resolve, reject) => { + txApi.getTransaction(txId, resolve) + .then(() => reject(new Error('API client did not use the callback'))) + .catch(reject); + }); + + if (nodeResponse.success) { + const foundTx = transactionUtils.convertFullnodeTxToHistoryTx(nodeResponse, new Network(this.network)); + this.txCache[txId] = foundTx; + return foundTx; + } + + throw new Error('Could not find transaction'); + } + + /** + * Convert EventQueueTxData to IHistoryTx + * This is needed because of the structural difference between event queue and pubsub transactions + * + * Obs: We cannot call the fullnode api on tx.hash directly because the fullnode may have a different + * metadata and this can leave the wallet state in an inconsistent state. + * + * @param {EventQueueTxData} tx + * @returns {Promise} + */ + async convertTxData(tx: EventQueueTxData): Promise { + const is_voided = !!(tx.metadata && tx.metadata.voided_by && tx.metadata.voided_by.length !== 0); + const historyTx: IHistoryTx = { + tx_id: tx.hash, + signalBits: tx.version & 0xFF00, + version: tx.version & 0x00FF, + weight: tx.weight, + timestamp: tx.timestamp, + is_voided, + nonce: tx.nonce || 0, + inputs: [], + outputs: [], + parents: tx.parents, + tokens: tx.tokens, + }; + if (tx.token_name && tx.token_symbol) { + historyTx.token_name = tx.token_name; + historyTx.token_symbol = tx.token_symbol; + } + if (tx.metadata && tx.metadata.height) { + historyTx.height = tx.metadata.height; + } + + const spentOutputs: Record = {}; + for (const spent of tx.metadata.spent_outputs) { + if (spent.tx_ids.length > 0) { + spentOutputs[spent.index] = spent.tx_ids[0]; + } + } + + for (const [index, output] of tx.outputs.entries()) { + const { value, script, token_data } = output; + + // Get the token uid of this output + const tokenIndex = tokenUtils.getTokenIndexFromData(token_data); + const token = tokenIndex === 0 ? HATHOR_TOKEN_CONFIG.uid : tx.tokens[tokenIndex - 1]; + + // Build decoded data from script + const parsedScript = parseScript(Buffer.from(script, 'hex'), new Network(this.network)); + const decoded = parsedScript?.getDecoded() || {}; + + historyTx.outputs.push({ + value, + token_data, + script, + decoded, + token, + spent_by: spentOutputs[index] || null, + }); + } + + // This should only be used to build the inputs + const referenceTx = await this.fetchTxData(tx.hash); + historyTx.inputs = cloneDeep(referenceTx.inputs); + + return historyTx; + } +} + +export default EventQueueConnection; diff --git a/src/new/wallet.js b/src/new/wallet.js index 4769dbd67..b53e81406 100644 --- a/src/new/wallet.js +++ b/src/new/wallet.js @@ -1200,6 +1200,7 @@ class HathorWallet extends EventEmitter { // Started processing state now, so we prepare the local data to support using this facade interchangable with wallet service facade in both wallets try { await this.processTxQueue(); + this.conn.notifyWalletReady(); this.setState(HathorWallet.READY); } catch (e) { this.setState(HathorWallet.ERROR); @@ -1401,7 +1402,7 @@ class HathorWallet extends EventEmitter { }); if (info.network.indexOf(this.conn.network) >= 0) { this.storage.setApiVersion(info); - this.conn.start(); // XXX: maybe await? + this.conn.start(); } else { this.setState(HathorWallet.CLOSED); throw new Error(`Wrong network. server=${info.network} expected=${this.conn.network}`); diff --git a/src/utils/scripts.ts b/src/utils/scripts.ts index 5b3c32668..af83824ed 100644 --- a/src/utils/scripts.ts +++ b/src/utils/scripts.ts @@ -188,4 +188,24 @@ export function createP2SHRedeemScript(xpubs: string[], numSignatures: number, i // noSorting prevents that and keeps our order const redeemScript = Script.buildMultisigOut(pubkeys, numSignatures, {noSorting: true}); return redeemScript.toBuffer(); -} \ No newline at end of file +} + +export function parseScript(script: Buffer, network: Network): P2PKH | P2SH | ScriptData | null { + try { + if (P2PKH.identify(script)) { + // This is a P2PKH script + return parseP2PKH(script, network); + } else if (P2SH.identify(script)) { + // This is a P2SH script + return parseP2SH(script, network); + } + // defaults to data script + return parseScriptData(script); + } catch (error) { + if (error instanceof ParseError) { + // We don't know how to parse this script + return null; + } + throw error; + } +} diff --git a/src/utils/transaction.ts b/src/utils/transaction.ts index c90971547..667f7f13b 100644 --- a/src/utils/transaction.ts +++ b/src/utils/transaction.ts @@ -23,6 +23,9 @@ import ScriptData from '../models/script_data'; import { ParseError } from '../errors'; import helpers from './helpers'; import { getAddressType } from './address'; +import { FullNodeTxResponse } from '../wallet/types'; +import { parseScript } from './scripts'; +import { tokensUtils } from 'src/lib'; const transaction = { @@ -587,6 +590,100 @@ const transaction = { // If there is no match return 'Unknown'; }, + + /** + * Convert fullnode tx to history tx + * + * @param {FullNodeTxResponse} tx Fullnode tx + * @param {Network} network + * + * @returns {IHistoryTx} History tx + */ + convertFullnodeTxToHistoryTx(tx: FullNodeTxResponse, network: Network): IHistoryTx { + const historyTx: IHistoryTx = { + tx_id: tx.tx.hash, + signalBits: tx.tx.signal_bits, + version: tx.tx.version, + weight: tx.tx.weight, + timestamp: tx.tx.timestamp, + is_voided: tx.meta.voided_by.length !== 0, + nonce: parseInt(tx.tx.nonce, 10) || 0, + inputs: [], + outputs: [], + parents: tx.tx.parents, + tokens: tx.tx.tokens.map(t => t.uid), + }; + if (tx.tx.token_name && tx.tx.token_symbol) { + historyTx.token_name = tx.tx.token_name; + historyTx.token_symbol = tx.tx.token_symbol; + } + if (tx.meta && tx.meta.height) { + historyTx.height = tx.meta.height; + } + + const spentBys: Record = {}; + tx.meta.spent_outputs.forEach(spent => { + spentBys[spent[0]] = spent[1].length > 0 ? spent[1][0] : null; + }); + + for (const [outputIndex, output] of tx.tx.outputs.entries()) { + + let token: string; + if (output.token) { + token = output.token; + } else { + const tokenIndex = tokensUtils.getTokenIndexFromData(output.token_data); + if (tokenIndex === 0) { + token = HATHOR_TOKEN_CONFIG.uid; + } else { + token = tx.tx.tokens[tokenIndex - 1].uid; + } + } + + const parsedScript = parseScript(Buffer.from(output.script, 'hex'), network); + const decoded = parsedScript?.getDecoded() || {}; + + + historyTx.outputs.push({ + value: output.value, + token_data: output.token_data, + script: output.script, + token, + decoded, + spent_by: spentBys[outputIndex] || null, + }); + } + + for (const input of tx.tx.inputs) { + + let token: string; + if (input.token) { + token = input.token; + } else { + const tokenIndex = tokensUtils.getTokenIndexFromData(input.token_data); + if (tokenIndex === 0) { + token = HATHOR_TOKEN_CONFIG.uid; + } else { + token = tx.tx.tokens[tokenIndex - 1].uid; + } + } + + const parsedScript = parseScript(Buffer.from(input.script, 'hex'), network); + const decoded = parsedScript?.getDecoded() || {}; + + historyTx.inputs.push({ + value: input.value, + token_data: input.token_data, + script: input.script, + decoded, + token, + tx_id: input.tx_id, + index: input.index, + }); + } + + return historyTx; + } } export default transaction; diff --git a/src/wallet/types.ts b/src/wallet/types.ts index 8714232dd..7947545e0 100644 --- a/src/wallet/types.ts +++ b/src/wallet/types.ts @@ -500,6 +500,7 @@ export interface FullNodeOutput { export interface FullNodeTx { hash: string; nonce: string; + signal_bits: number; timestamp: number; version: number; weight: number;