Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 87 additions & 49 deletions src/components/Indexer/processors/BaseProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import ERC20Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC20TemplateEnterprise.sol/ERC20TemplateEnterprise.json' with { type: 'json' }
import { fetchTransactionReceipt } from '../../core/utils/validateOrders.js'
import { withRetrial } from '../utils.js'
import { OceanNodeKeys } from '../../../@types/OceanNode.js'

export abstract class BaseEventProcessor {
protected networkId: number
Expand Down Expand Up @@ -205,6 +206,31 @@ export abstract class BaseEventProcessor {
return true
}

private async getNonce(decryptorURL: string, keys: OceanNodeKeys) {
try {
if (URLUtils.isValidUrl(decryptorURL)) {
INDEXER_LOGGER.logMessage(
`decryptDDO: Making HTTP request for nonce. DecryptorURL: ${decryptorURL}`
)
const nonceResponse = await axios.get(
`${decryptorURL}/api/services/nonce?userAddress=${keys.ethAddress}`,
{ timeout: 20000 }
)
return nonceResponse.status === 200 && nonceResponse.data
? String(parseInt(nonceResponse.data.nonce) + 1)
: Date.now().toString()
} else {
return Date.now().toString()
}
} catch (err) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`decryptDDO: Error getting nonce, using timestamp: ${err.message}`
)
return Date.now().toString()
}
}

protected async decryptDDO(
decryptorURL: string,
flag: string,
Expand All @@ -224,75 +250,78 @@ export abstract class BaseEventProcessor {
)
const config = await getConfiguration()
const { keys } = config
let nonce: string
try {
if (URLUtils.isValidUrl(decryptorURL)) {
INDEXER_LOGGER.logMessage(
`decryptDDO: Making HTTP request for nonce. DecryptorURL: ${decryptorURL}`
)
const nonceResponse = await axios.get(
`${decryptorURL}/api/services/nonce?userAddress=${keys.ethAddress}`,
{ timeout: 20000 }
)
nonce =
nonceResponse.status === 200 && nonceResponse.data
? String(parseInt(nonceResponse.data.nonce) + 1)
: Date.now().toString()
} else {
nonce = Date.now().toString()
}
} catch (err) {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`decryptDDO: Error getting nonce, using timestamp: ${err.message}`
)
nonce = Date.now().toString()
}
const nodeId = keys.peerId.toString()

const wallet: ethers.Wallet = new ethers.Wallet(process.env.PRIVATE_KEY as string)

const useTxIdOrContractAddress = txId || contractAddress
const message = String(
useTxIdOrContractAddress + keys.ethAddress + chainId.toString() + nonce
)

const messageHash = ethers.solidityPackedKeccak256(
['bytes'],
[ethers.hexlify(ethers.toUtf8Bytes(message))]
)
const createSignature = async () => {
const nonce: string = await this.getNonce(decryptorURL, keys)
INDEXER_LOGGER.logMessage(
`decryptDDO: Fetched fresh nonce ${nonce} for decrypt attempt`
)

const messageHashBytes = ethers.getBytes(messageHash)
const signature = await wallet.signMessage(messageHashBytes)
const message = String(
useTxIdOrContractAddress + keys.ethAddress + chainId.toString() + nonce
)
const messageHash = ethers.solidityPackedKeccak256(
['bytes'],
[ethers.hexlify(ethers.toUtf8Bytes(message))]
)
const messageHashBytes = ethers.getBytes(messageHash)
const signature = await wallet.signMessage(messageHashBytes)

const recoveredAddress = ethers.verifyMessage(messageHashBytes, signature)
INDEXER_LOGGER.logMessage(
`decryptDDO: recovered address: ${recoveredAddress}, expected: ${keys.ethAddress}`
)
const recoveredAddress = ethers.verifyMessage(messageHashBytes, signature)
INDEXER_LOGGER.logMessage(
`decryptDDO: recovered address: ${recoveredAddress}, expected: ${keys.ethAddress}`
)

return { nonce, signature }
}

if (URLUtils.isValidUrl(decryptorURL)) {
try {
const payload = {
transactionId: txId,
chainId,
decrypterAddress: keys.ethAddress,
dataNftAddress: contractAddress,
signature,
nonce
}
const response = await withRetrial(async () => {
const { nonce, signature } = await createSignature()

const payload = {
transactionId: txId,
chainId,
decrypterAddress: keys.ethAddress,
dataNftAddress: contractAddress,
signature,
nonce
}
try {
const res = await axios({
method: 'post',
url: `${decryptorURL}/api/services/decrypt`,
data: payload,
timeout: 30000
timeout: 30000,
validateStatus: (status) => {
return (
(status >= 200 && status < 300) || status === 400 || status === 403
)
}
})

INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_INFO,
`Decrypt request successful. Status: ${res.status}, ${res.statusText}`
)

if (res.status === 400 || res.status === 403) {
// Return error response, to avoid retry for unnecessary errors
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`bProvider exception on decrypt DDO. Status: ${res.status}, ${res.statusText}`
)
return res
}

if (res.status !== 200 && res.status !== 201) {
const message = `bProvider exception on decrypt DDO. Status: ${res.status}, ${res.statusText}`
INDEXER_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, message)
throw new Error(message) // do NOT retry
throw new Error(message) // Retry 5XX errors
}
return res
} catch (err: any) {
Expand All @@ -313,6 +342,10 @@ export abstract class BaseEventProcessor {
}
})

if (response.status === 400 || response.status === 403) {
throw new Error(`Provider validation failed: ${response.statusText}`)
}

let responseHash
if (response.data instanceof Object) {
responseHash = create256Hash(JSON.stringify(response.data))
Expand All @@ -334,6 +367,9 @@ export abstract class BaseEventProcessor {
} else {
const node = OceanNode.getInstance(config, await getDatabase())
if (nodeId === decryptorURL) {
// Fetch nonce and signature for local node path
const { nonce, signature } = await createSignature()

const decryptDDOTask: DecryptDDOCommand = {
command: PROTOCOL_COMMANDS.DECRYPT_DDO,
transactionId: txId,
Expand All @@ -358,6 +394,8 @@ export abstract class BaseEventProcessor {
}
} else {
try {
const { nonce, signature } = await createSignature()

const p2pNode = await node.getP2PNode()
let isBinaryContent = false
const sink = async function (source: any) {
Expand Down
67 changes: 57 additions & 10 deletions src/components/Indexer/processors/MetadataEventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { INDEXER_LOGGER } from '../../../utils/logging/common.js'
import { LOG_LEVELS_STR } from '../../../utils/logging/Logger.js'
import { asyncCallWithTimeout } from '../../../utils/util.js'
import { PolicyServer } from '../../policyServer/index.js'
import { wasNFTDeployedByOurFactory, getPricingStatsForDddo } from '../utils.js'
import { wasNFTDeployedByOurFactory, getPricingStatsForDddo, getDid } from '../utils.js'
import { BaseEventProcessor } from './BaseProcessor.js'
import ERC721Template from '@oceanprotocol/contracts/artifacts/contracts/templates/ERC721Template.sol/ERC721Template.json' with { type: 'json' }
import { Purgatory } from '../purgatory.js'
Expand Down Expand Up @@ -49,10 +49,65 @@ export class MetadataEventProcessor extends BaseEventProcessor {
ERC721Template.abi,
eventName
)

const metadata = decodedEventData.args[4]
const metadataHash = decodedEventData.args[5]
const flag = decodedEventData.args[3]
const owner = decodedEventData.args[0]

const dataNftAddress = ethers.getAddress(event.address)

did = getDid(event.address, chainId)

const templateContract = new ethers.Contract(
dataNftAddress,
ERC721Template.abi,
signer
)
const metaData = await templateContract.getMetaData()
const metadataState = Number(metaData[2])

if ([MetadataStates.DEPRECATED, MetadataStates.REVOKED].includes(metadataState)) {
INDEXER_LOGGER.logMessage(
`Delete DDO because Metadata state is ${metadataState}`,
true
)
const { ddo: ddoDatabase } = await getDatabase()
const ddo = await ddoDatabase.retrieve(did)
if (!ddo) {
INDEXER_LOGGER.logMessage(
`Detected MetadataState changed for ${did}, but it does not exists.`
)
return
}

const ddoInstance = DDOManager.getDDOClass(ddo)

INDEXER_LOGGER.logMessage(
`DDO became non-visible from ${
ddoInstance.getAssetFields().indexedMetadata.nft.state
} to ${metadataState}`
)

const shortDdoInstance = DDOManager.getDDOClass({
id: ddo.id,
version: 'deprecated',
chainId,
nftAddress: ddo.nftAddress,
indexedMetadata: {
nft: {
state: metadataState
}
}
})

const savedDDO = await this.createOrUpdateDDO(
shortDdoInstance,
EVENTS.METADATA_STATE
)
return savedDDO
}

const ddo = await this.decryptDDO(
decodedEventData.args[2],
flag,
Expand Down Expand Up @@ -208,17 +263,9 @@ export class MetadataEventProcessor extends BaseEventProcessor {
if (eventName === EVENTS.METADATA_UPDATED) {
if (!previousDdoInstance) {
INDEXER_LOGGER.logMessage(
`Previous DDO with did ${ddoInstance.getDid()} was not found the database. Maybe it was deleted/hidden to some violation issues`,
`Previous DDO with did ${ddoInstance.getDid()} was not found the database`,
true
)
await ddoState.update(
this.networkId,
did,
event.address,
event.transactionHash,
false,
`Previous DDO with did ${ddoInstance.getDid()} was not found the database. Maybe it was deleted/hidden to some violation issues`
)
return
}
const [isUpdateable, error] = this.isUpdateable(
Expand Down
9 changes: 2 additions & 7 deletions src/components/core/handler/ddoHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,13 +280,7 @@ export class DecryptDdoHandler extends CommandHandler {
)
const metaData = await templateContract.getMetaData()
const metaDataState = Number(metaData[2])
if (
[
MetadataStates.END_OF_LIFE,
MetadataStates.DEPRECATED,
MetadataStates.REVOKED
].includes(metaDataState)
) {
if ([MetadataStates.DEPRECATED, MetadataStates.REVOKED].includes(metaDataState)) {
CORE_LOGGER.logMessage(`Decrypt DDO: error metadata state ${metaDataState}`, true)
return {
stream: null,
Expand All @@ -300,6 +294,7 @@ export class DecryptDdoHandler extends CommandHandler {
if (
![
MetadataStates.ACTIVE,
MetadataStates.END_OF_LIFE,
MetadataStates.ORDERING_DISABLED,
MetadataStates.UNLISTED
].includes(metaDataState)
Expand Down
Loading