Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions coins/src/adapters/other/distressedUI.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import getBlock from "../utils/block";
import { getTokenInfo } from "../utils/erc20";
import { Write } from "../utils/dbInterfaces";
import { addToDBWritesList } from "../utils/database";
import { getCurrentUnixTimestamp } from "../../utils/date";
import { getR2JSONString } from "../../utils/r2";
import PromisePool from "@supercharge/promise-pool";

const r2Key = "distressedAssetsList.json";

export default async function getTokenPrices(timestamp: number) {
const writes: Write[] = [];
const assets = await getR2JSONString(r2Key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, this either get manually triggered?
or we store the same distressed list to ddb every hour?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently we store every hour but we could change this by

  • writing new distressed assets to SK=0 so we can fetch metadata
  • adding a case to coins endpoints where latency checks are skipped where isDistressed() returns true
  • adding cases to deficoins, bridged, coingecko where writes are removed when isDistressed() returns true


await PromisePool.withConcurrency(5)
.for(Object.keys(assets))
.process(async (chain) => {
const block: number | undefined = await getBlock(chain, timestamp);

if (chain == "coingecko") {
assets[chain].map((id: string) => {
writes.push(
{
PK: `coingecko#${id}`,
SK: 0,
confidence: 1.01,
price: 0,
symbol: "-",
adapter: "distressed",
timestamp: timestamp == 0 ? getCurrentUnixTimestamp() : timestamp,
},
{
PK: `coingecko#${id}`,
SK: timestamp,
confidence: 1.01,
price: 0,
adapter: "distressed",
}
);
});
} else {
const tokenInfos = await getTokenInfo(chain, assets[chain], block);
assets[chain].map((a: string, i: number) => {
addToDBWritesList(
writes,
chain,
a,
0,
tokenInfos.decimals[i].output ?? 0,
tokenInfos.symbols[i].output ?? "-",
timestamp,
"distressed",
1.01
);
});
}
});

return writes;
}
106 changes: 106 additions & 0 deletions defi/src/storeTvlInterval/computeTVL.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as sdk from '@defillama/sdk'
import { once, EventEmitter } from 'events'
import { searchWidth } from "../utils/shared/constants";
import { Client } from "@elastic/elasticsearch";
import { logDistressedCoins } from "../utils/shared/distressedCoins";

const ethereumAddress = "0x0000000000000000000000000000000000000000";
const weth = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2";
Expand Down Expand Up @@ -59,7 +60,9 @@ export default async function (balances: { [address: string]: string }, timestam
const usdTokenBalances = {} as Balances;
const now = timestamp === "now" ? Math.round(Date.now() / 1000) : timestamp;
const tokenData = await getTokenData(readKeys, timestamp)
const mcapData = await getMcapData(readKeys, timestamp);
const staleCoinsInclusive: any = {};
const distressedCoins: string[] = []
tokenData.forEach((response: any) => {
if (Math.abs(response.timestamp - now) < searchWidth) {
PKsToTokens[response.PK].forEach((address) => {
Expand All @@ -75,6 +78,7 @@ export default async function (balances: { [address: string]: string }, timestam
amount = new BigNumber(balance).div(10 ** decimals).toNumber();
}
const usdAmount = amount * price;
checkMcaps(address, mcapData, usdAmount, distressedCoins)
checkForStaleness(usdAmount, response, now, protocol, staleCoinsInclusive);
tokenBalances[symbol] = (tokenBalances[symbol] ?? 0) + amount;
usdTokenBalances[symbol] = (usdTokenBalances[symbol] ?? 0) + usdAmount;
Expand All @@ -85,13 +89,21 @@ export default async function (balances: { [address: string]: string }, timestam

appendToStaleCoins(usdTvl, staleCoinsInclusive, staleCoins);

if (distressedCoins.length) await logDistressedCoins(distressedCoins, protocol);

return {
usdTvl,
tokenBalances,
usdTokenBalances,
};
}

function checkMcaps(address: string, mcapData: any, usdAmount: number, distressedCoins: string[]) {
if (usdAmount < 1e7) return true;
const mcap = mcapData[address];
if (mcap && usdAmount > mcap) distressedCoins.push(address);
}

function replaceETHwithWETH(balances: { [address: string]: string }) {
const keys = [ethereumAddress, 'ethereum:' + ethereumAddress]
for (const key of keys) {
Expand Down Expand Up @@ -215,6 +227,100 @@ async function getTokenData(readKeys: string[], timestamp: string | number): Pro
}
}

const mcapCache: { [PK: string]: any } = {}

async function getMcapData(readKeys: string[], timestamp: string | number): Promise<any> {
if (!readKeys.length) return []


const currentId = counter.requestCount++
const eventId = `${currentId}`

if (counter.activeWorkers > maxParallelCalls) {
counter.queue.push(eventId)
await once(emitter, eventId)
}

counter.activeWorkers++

const showEveryX = counter.queue.length > 100 ? 30 : 10 // show log fewer times if lot more are queued up
if (currentId % showEveryX === 0) sdk.log(`request #: ${currentId} queue: ${counter.queue.length} active requests: ${counter.activeWorkers}`)

let response
try {
response = await _getMcapData()
onComplete()
} catch (e) {
onComplete()
throw e
}

return response

function onComplete() {
counter.activeWorkers--
if (counter.queue.length) {
const nextRequestId = counter.pickFromTop ? counter.queue.shift() : counter.queue.pop()
counter.pickFromTop = !counter.pickFromTop
emitter.emit(<string>nextRequestId)
}
}

async function _getMcapData() {
let cachedMcapData: { [PK: string]: number } = {}

// read data from cache where possible
readKeys = readKeys.filter((PK: string) => {
if (timestamp !== 'now') return true;
if (mcapCache[PK]) {
cachedMcapData[PK] = mcapCache[PK];
return false;
}
return true;
})

if (!readKeys.length) return cachedMcapData;

const readRequests: any[] = [];
sdk.log(`mcap request count: ${readKeys.length}`)
for (let i = 0; i < readKeys.length; i += 100) {
const body = {
"coins": readKeys.slice(i, i + 100),
} as any
if (timestamp !== "now") {
body.timestamp = timestamp;
}
readRequests.push(
fetch(`https://coins.llama.fi/mcaps${process.env.COINS_KEY ? `?apikey=${process.env.COINS_KEY}` : ""}`, {
method: "POST",
body: JSON.stringify(body),
headers: { "Content-Type": "application/json" },
}).then((r) => r.json()).then(r => {
const mcaps: { [PK: string]: number } = {}
Object.keys(r).map((PK) => {
const mcap = r[PK].mcap
if (!mcap) return;
mcapCache[PK] = mcap
mcaps[PK] = mcap
})

return mcaps;
})
);
}
const tokenData = [cachedMcapData].concat(...(await Promise.all(readRequests)));

const mcapObject: { [PK: string]: number } = {}
tokenData.map((mcaps) => {
Object.entries(mcaps).forEach(([PK, mcap]: any) => {
mcapObject[PK] = mcap
})
})

return mcapObject;
}
}

interface Counter {
activeWorkers: number;
requestCount: number;
Expand Down
91 changes: 91 additions & 0 deletions defi/src/utils/shared/distressedCoins.ts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, dont like the idea of using r2 strings as a db, can you replace it with a dedicated index (table in ES) in our coins ES db? are we storing the token mcaps in coins-current index now?

Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { getUnixTimeNow } from "../../api2/utils/time";
import { chainsThatShouldNotBeLowerCased } from "../../utils/shared/constants";
import { elastic, cache } from "@defillama/sdk";

function sanitizeKey(key: string) {
const chain = key.split(":")[0];
const address = key.substring(chain.length + 1);
const normalizedAddress = chainsThatShouldNotBeLowerCased.includes(chain) ? address : address.toLowerCase();
return `${chain}:${normalizedAddress}`;
}

export async function isDistressed(key: string, client?: any) {
const isLocalClient: boolean = client == undefined
if (isLocalClient) client = elastic.getClient();

const _id = sanitizeKey(key)
const { hits } = await client.search({
index: "distressed-assets-store*",
body: {
query: {
match: { _id },
},
},
});

if (isLocalClient) await client?.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can leave the connection open, can close it once when process exits, noticed this pattern in other places as well, not sure if it is a good idea to open & close connection per requests, most of our scripts run under an hour, so dont see why we need to close it per write


return hits?.hits?.length > 0;
}

export async function addToDistressed(keys: string[], client?: any) {
const isLocalClient: boolean = client == undefined
if (isLocalClient) client = elastic.getClient();

const body: any[] = [];
keys.map((key: string) => {
const _id = sanitizeKey(key)
body.push({ index: { _index: "distressed-assets-store", _id } });
});

await client.bulk({ body });

if (isLocalClient) await client?.close();
}

export async function logDistressedCoins(keys: string[], protocol: string) {
await elastic.writeLog("distressed-assets", { keys, protocol, reportTime: getUnixTimeNow() });
}

export async function readDistressedLogs() {
const esClient = elastic.getClient();
const hourAgo = Math.floor(Date.now() / 1000) - 3600;
let { lastCheckTS } = (await cache.readExpiringJsonCache("distressed-assets-last-check")) || { lastCheckTS: 0 };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use this: elastic.getAllLogs() and skip the cache?

if (!lastCheckTS || lastCheckTS < hourAgo) lastCheckTS = hourAgo - 1;

let {
hits: { hits },
}: any = await esClient?.search({
index: "distressed-assets*",
size: 9999,
body: {
query: {
range: {
// find records with reportTime > lastCheckTS
reportTime: {
gt: lastCheckTS, // reportTime is in ms
},
},
},
},
});

if (!hits?.length) return;

const newDistressedCoins: string[] = [];
hits.map(({ _source: { keys } }: any) => {
newDistressedCoins.push(...keys);
});

await addToDistressed(newDistressedCoins, esClient);

const timeNow = Math.floor(Date.now() / 1000);

await cache.writeExpiringJsonCache(
"distressed-assets-last-check",
{ lastCheckTS: timeNow },
{ expireAfter: 7 * 24 * 3600 }
);

await esClient?.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to close this, or use expiring cache. can use the es index as single source of truth

}