diff --git a/coins/src/adapters/bridges/index.ts b/coins/src/adapters/bridges/index.ts index 02e6539d5b..f226a2fdf1 100644 --- a/coins/src/adapters/bridges/index.ts +++ b/coins/src/adapters/bridges/index.ts @@ -129,6 +129,7 @@ import { getCurrentUnixTimestamp } from "../../utils/date"; import produceKafkaTopics from "../../utils/coins3/produce"; import { chainsThatShouldNotBeLowerCased } from "../../utils/shared/constants"; import { sendMessage } from "../../../../defi/src/utils/discord"; +import { isDistressed } from "../../utils/shared/distressedCoins"; const craftToPK = (to: string) => (to.includes("#") ? to : `asset#${to}`); @@ -206,6 +207,8 @@ async function _storeTokensOfBridge(bridge: Bridge, i: number) { const writes: any[] = []; await Promise.all( unlisted.map(async (token) => { + if (await isDistressed(token.to)) return; + const finalPK = toAddressToRecord[craftToPK(token.to)]; if (finalPK === undefined) return; diff --git a/coins/src/scripts/coingecko.ts b/coins/src/scripts/coingecko.ts index c55e7ef4e9..5ed45056a8 100644 --- a/coins/src/scripts/coingecko.ts +++ b/coins/src/scripts/coingecko.ts @@ -20,6 +20,7 @@ import { storeAllTokens } from "../utils/shared/bridgedTvlPostgres"; import { sendMessage } from "../../../defi/src/utils/discord"; import { chainsThatShouldNotBeLowerCased } from "../utils/shared/constants"; import { cacheSolanaTokens, getSymbolAndDecimals } from "./coingeckoUtils"; +import { batchIsDistressed, isDistressed } from "../utils/shared/distressedCoins"; // Kill the script after 5 minutes to prevent infinite execution const TIMEOUT_MS = 10 * 60 * 1000; // 5 minutes in milliseconds @@ -46,6 +47,8 @@ interface IdToSymbol { } async function storeCoinData(coinData: Write[]) { + const distressedIds = await batchIsDistressed(coinData.map((i) => i.PK)); + const items = coinData .map((c) => ({ PK: c.PK, @@ -58,7 +61,8 @@ async function storeCoinData(coinData: Write[]) { volume: c.volume, adapter: 'coingecko' })) - .filter((c: Write) => c.symbol != null); + .filter((c: Write) => c.symbol != null && !distressedIds[c.PK]); + await Promise.all([ produceKafkaTopics( items.map((i) => { @@ -71,13 +75,16 @@ async function storeCoinData(coinData: Write[]) { } async function storeHistoricalCoinData(coinData: Write[]) { + const distressedIds = await batchIsDistressed(coinData.map((i) => i.PK)); + const items = coinData.map((c) => ({ SK: c.SK, PK: c.PK, price: c.price, confidence: c.confidence, volume: c.volume, - })); + })).filter((c: Write) => !distressedIds[c.PK]); + await Promise.all([ produceKafkaTopics( items.map((i) => ({ @@ -335,6 +342,8 @@ async function getAndStoreHourly( } const PK = cgPK(coin.id); + if (await isDistressed(PK)) return; + const prevWritenItems = await batchReadPostgres( `coingecko:${coin.id}`, toUNIXTimestamp(coinData.prices[0][0]), diff --git a/coins/src/scripts/defiCoins.ts b/coins/src/scripts/defiCoins.ts index 0a7139579f..09bdaa81c4 100644 --- a/coins/src/scripts/defiCoins.ts +++ b/coins/src/scripts/defiCoins.ts @@ -8,6 +8,8 @@ console.log(process.version); import adapters from "../adapters/index"; console.log("adapters imported"); import { PromisePool } from "@supercharge/promise-pool"; +import { batchIsDistressed } from "../utils/shared/distressedCoins"; + console.log("imports successful"); @@ -25,7 +27,7 @@ async function storeDefiCoins() { process.env.tableName = "prod-coins-table"; const adaptersArray = Object.entries(adapters); const protocolIndexes: number[] = Array.from( - Array(adaptersArray.length).keys(), + Array(adaptersArray.length).keys() ); shuffleArray(protocolIndexes); const a = Object.entries(adapters); @@ -40,14 +42,22 @@ async function storeDefiCoins() { try { const adapterFn = typeof b === "function" ? b : b[adapterKey]; const results = await withTimeout(timeout, adapterFn(timestamp)); + const blacklist = await batchIsDistressed( + results.flat().map((c: any) => c.PK) + ); + const resultsWithoutDuplicates = await filterWritesWithLowConfidence( - results.flat().filter((c: any) => c.symbol != null || c.SK != 0), + results + .flat() + .filter( + (c: any) => (c.symbol != null || c.SK != 0) && !blacklist[c.PK] + ) ); for (let i = 0; i < resultsWithoutDuplicates.length; i += step) { await Promise.all([ batchWriteWithAlerts( resultsWithoutDuplicates.slice(i, i + step), - true, + true ), ]); // await batchWrite2WithAlerts( @@ -76,3 +86,4 @@ async function storeDefiCoins() { } storeDefiCoins(); // ts-node coins/src/scripts/defiCoins.ts + diff --git a/defi/src/storeTvlInterval/computeTVL.ts b/defi/src/storeTvlInterval/computeTVL.ts index 117806d72b..d17a0d2b0f 100644 --- a/defi/src/storeTvlInterval/computeTVL.ts +++ b/defi/src/storeTvlInterval/computeTVL.ts @@ -5,6 +5,7 @@ import * as sdk from '@defillama/sdk' import { once, EventEmitter } from 'events' import { searchWidth } from "../utils/shared/constants"; import { Client } from "@elastic/elasticsearch"; +import { logDistressedCoins } from "../utils/shared/distressedCoins"; const ethereumAddress = "0x0000000000000000000000000000000000000000"; const weth = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"; @@ -59,7 +60,9 @@ export default async function (balances: { [address: string]: string }, timestam const usdTokenBalances = {} as Balances; const now = timestamp === "now" ? Math.round(Date.now() / 1000) : timestamp; const tokenData = await getTokenData(readKeys, timestamp) + const mcapData = await getMcapData(readKeys, timestamp); const staleCoinsInclusive: any = {}; + const distressedCoins: any[] = [] tokenData.forEach((response: any) => { if (Math.abs(response.timestamp - now) < searchWidth) { PKsToTokens[response.PK].forEach((address) => { @@ -75,6 +78,7 @@ export default async function (balances: { [address: string]: string }, timestam amount = new BigNumber(balance).div(10 ** decimals).toNumber(); } const usdAmount = amount * price; + checkMcaps(address, mcapData, usdAmount, distressedCoins, protocol) checkForStaleness(usdAmount, response, now, protocol, staleCoinsInclusive); tokenBalances[symbol] = (tokenBalances[symbol] ?? 0) + amount; usdTokenBalances[symbol] = (usdTokenBalances[symbol] ?? 0) + usdAmount; @@ -85,6 +89,8 @@ export default async function (balances: { [address: string]: string }, timestam appendToStaleCoins(usdTvl, staleCoinsInclusive, staleCoins); + if (distressedCoins.length) await logDistressedCoins(distressedCoins); + return { usdTvl, tokenBalances, @@ -92,6 +98,12 @@ export default async function (balances: { [address: string]: string }, timestam }; } +function checkMcaps(address: string, mcapData: any, usdAmount: number, distressedCoins: any[], protocol: string) { + if (usdAmount < 1e7) return true; + const mcap = mcapData[address]; + if (mcap && usdAmount > mcap) distressedCoins.push({ address, usdAmount, mcap, protocol }); +} + function replaceETHwithWETH(balances: { [address: string]: string }) { const keys = [ethereumAddress, 'ethereum:' + ethereumAddress] for (const key of keys) { @@ -215,6 +227,100 @@ async function getTokenData(readKeys: string[], timestamp: string | number): Pro } } +const mcapCache: { [PK: string]: any } = {} + +async function getMcapData(readKeys: string[], timestamp: string | number): Promise { + if (!readKeys.length) return [] + + + const currentId = counter.requestCount++ + const eventId = `${currentId}` + + if (counter.activeWorkers > maxParallelCalls) { + counter.queue.push(eventId) + await once(emitter, eventId) + } + + counter.activeWorkers++ + + const showEveryX = counter.queue.length > 100 ? 30 : 10 // show log fewer times if lot more are queued up + if (currentId % showEveryX === 0) sdk.log(`request #: ${currentId} queue: ${counter.queue.length} active requests: ${counter.activeWorkers}`) + + let response + try { + response = await _getMcapData() + onComplete() + } catch (e) { + onComplete() + throw e + } + + return response + + function onComplete() { + counter.activeWorkers-- + if (counter.queue.length) { + const nextRequestId = counter.pickFromTop ? counter.queue.shift() : counter.queue.pop() + counter.pickFromTop = !counter.pickFromTop + emitter.emit(nextRequestId) + } + } + + async function _getMcapData() { + let cachedMcapData: { [PK: string]: number } = {} + + // read data from cache where possible + readKeys = readKeys.filter((PK: string) => { + if (timestamp !== 'now') return true; + if (mcapCache[PK]) { + cachedMcapData[PK] = mcapCache[PK]; + return false; + } + return true; + }) + + if (!readKeys.length) return cachedMcapData; + + const readRequests: any[] = []; + sdk.log(`mcap request count: ${readKeys.length}`) + for (let i = 0; i < readKeys.length; i += 100) { + const body = { + "coins": readKeys.slice(i, i + 100), + } as any + if (timestamp !== "now") { + body.timestamp = timestamp; + } + readRequests.push( + fetch(`https://coins.llama.fi/mcaps${process.env.COINS_KEY ? `?apikey=${process.env.COINS_KEY}` : ""}`, { + method: "POST", + body: JSON.stringify(body), + headers: { "Content-Type": "application/json" }, + }).then((r) => r.json()).then(r => { + const mcaps: { [PK: string]: number } = {} + Object.keys(r).map((PK) => { + const mcap = r[PK].mcap + if (!mcap) return; + mcapCache[PK] = mcap + mcaps[PK] = mcap + }) + + return mcaps; + }) + ); + } + const tokenData = [cachedMcapData].concat(...(await Promise.all(readRequests))); + + const mcapObject: { [PK: string]: number } = {} + tokenData.map((mcaps) => { + Object.entries(mcaps).forEach(([PK, mcap]: any) => { + mcapObject[PK] = mcap + }) + }) + + return mcapObject; + } +} + interface Counter { activeWorkers: number; requestCount: number; diff --git a/defi/src/utils/shared/distressedCoins.ts b/defi/src/utils/shared/distressedCoins.ts new file mode 100644 index 0000000000..a5d8429bec --- /dev/null +++ b/defi/src/utils/shared/distressedCoins.ts @@ -0,0 +1,136 @@ +import { chainsThatShouldNotBeLowerCased } from "../../utils/shared/constants"; +import { elastic } from "@defillama/sdk"; +import { getClient as getCoinsESClient } from "../../storeTvlInterval/computeTVL"; +import { runInPromisePool } from "@defillama/sdk/build/generalUtil"; +import { batchWrite } from "./dynamodb"; + +const client: any = elastic.getClient(); + +function sanitizeKey(key: string): string { + const chain = key.split(":")[0]; + const address = key.substring(chain.length + 1); + const normalizedAddress = chainsThatShouldNotBeLowerCased.includes(chain) ? address : address.toLowerCase(); + return `${chain}:${normalizedAddress}`; +} + +// batch check if a list of coins are distressed +export async function batchIsDistressed(keys: string[]) { + const results: { [key: string]: boolean } = {}; + + await runInPromisePool({ + items: keys, + concurrency: 5, + processor: async (PK: any) => { + const key = PK.replace("asset#", "").replace("#", ":"); + const isBlacklisted = await isDistressed(key); + results[PK] = isBlacklisted; + }, + }); + + return results; +} + +// check if a coin is distressed +export async function isDistressed(key: string) { + + const _id = sanitizeKey(key); + const { hits } = await client.search({ + index: "distressed-assets-store*", + body: { + query: { + match: { _id }, + }, + }, + }); + + return hits?.hits?.length > 0; +} + +// write sorted logs, and manual entries, to the assets store +export async function addToDistressed(keys: string[]) { + const body: any[] = []; + keys.map((key: string) => { + const _id = sanitizeKey(key); + body.push({ index: { _index: "distressed-assets-store", _id } }); + body.push({}) + }); + + await storeDistressedCoins(keys); + const res = await client.bulk({ body }); +} + +// initial write all to temp logs index +export async function logDistressedCoins(keys: any[]) { + for (const data of keys) { + data.reportTime = Math.floor(Date.now() / 1000); + await elastic.writeLog("distressed-assets", data); + } +} + +// get list of possible distressed coins from ES logs in the last week +export async function readDistressedLogs() { + const aWeekAgo = Math.floor(Date.now() / 1000) - 3600 * 24 * 7; + + let { + hits: { hits }, + }: any = await client.search({ + index: "distressed-assets*", + size: 999, + body: { + query: { + range: { + // find records with reportTime > lastCheckTS + reportTime: { + gt: aWeekAgo, // reportTime is in ms + }, + }, + }, + }, + }); + + return hits.map((hit: any) => hit._source); +} + +// store distressed metadata to DDB SK0 for metadata retrieval +export async function storeDistressedCoins(keys: string[], coinsESClient?: any) { + if (!coinsESClient) coinsESClient = getCoinsESClient(); + const metadata: { [key: string]: any } = {}; + await runInPromisePool({ + items: keys, + concurrency: 5, + processor: async (key: string) => { + const chain = key.split(":")[0]; + const address = key.substring(chain.length + 1); + const pid = ["coingecko", "ethereum"].includes(chain) ? address : key; + const matches = await coinsESClient.search({ + index: "coins-metadata", + body: { + query: { + match: { + pid: pid.toLowerCase(), + }, + }, + }, + }); + + if (!matches?.hits?.hits?.length) return; + metadata[key] = matches.hits.hits[0]._source; + }, + }); + + const items: any[] = []; + keys.map((key: string) => { + const PK = key.split(":")[0] == "coingecko" ? key.replace(":", "#") : `asset#${key}`; + items.push({ + PK, + SK: 0, + confidence: 1.01, + adapter: "distressed", + timestamp: Math.floor(Date.now() / 1000), + symbol: metadata[key]?.symbol ?? "-", + decimals: metadata[key]?.decimals ?? 0, + }); + }); + + await batchWrite(items, false); +}