Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions defi/src/storeTvlInterval/computeTVL.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import * as sdk from '@defillama/sdk'
import { once, EventEmitter } from 'events'
import { searchWidth } from "../utils/shared/constants";
import { Client } from "@elastic/elasticsearch";
import { addToDistressedList } from "../utils/shared/distressedCoins";
import { logDistressedCoins } from "../utils/shared/distressedCoins";

const ethereumAddress = "0x0000000000000000000000000000000000000000";
const weth = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2";
Expand Down Expand Up @@ -62,7 +62,7 @@ export default async function (balances: { [address: string]: string }, timestam
const tokenData = await getTokenData(readKeys, timestamp)
const mcapData = await getMcapData(readKeys, timestamp);
const staleCoinsInclusive: any = {};
const distressedCoinsPromises: Promise<void>[] = [];
const distressedCoins: string[] = []
tokenData.forEach((response: any) => {
if (Math.abs(response.timestamp - now) < searchWidth) {
PKsToTokens[response.PK].forEach((address) => {
Expand All @@ -78,7 +78,7 @@ export default async function (balances: { [address: string]: string }, timestam
amount = new BigNumber(balance).div(10 ** decimals).toNumber();
}
const usdAmount = amount * price;
checkMcaps(address, mcapData, usdAmount, distressedCoinsPromises)
checkMcaps(address, mcapData, usdAmount, distressedCoins)
checkForStaleness(usdAmount, response, now, protocol, staleCoinsInclusive);
tokenBalances[symbol] = (tokenBalances[symbol] ?? 0) + amount;
usdTokenBalances[symbol] = (usdTokenBalances[symbol] ?? 0) + usdAmount;
Expand All @@ -88,23 +88,20 @@ export default async function (balances: { [address: string]: string }, timestam
});

appendToStaleCoins(usdTvl, staleCoinsInclusive, staleCoins);
await Promise.all(distressedCoinsPromises);

if (distressedCoins.length) await logDistressedCoins(distressedCoins, protocol);

return {
usdTvl,
tokenBalances,
usdTokenBalances,
};
}

function checkMcaps(address: string, mcapData: any, usdAmount: number, promises: Promise<void>[]) {
function checkMcaps(address: string, mcapData: any, usdAmount: number, distressedCoins: string[]) {
if (usdAmount < 1e7) return true;
const mcap = mcapData[address];
if (mcap && usdAmount > mcap) {
promises.push(addToDistressedList(address));
return false;
}
return true;
if (mcap && usdAmount > mcap) distressedCoins.push(address);
}

function replaceETHwithWETH(balances: { [address: string]: string }) {
Expand Down
101 changes: 80 additions & 21 deletions defi/src/utils/shared/distressedCoins.ts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, dont like the idea of using r2 strings as a db, can you replace it with a dedicated index (table in ES) in our coins ES db? are we storing the token mcaps in coins-current index now?

Original file line number Diff line number Diff line change
@@ -1,32 +1,91 @@
import { storeR2JSONString, getR2JSONString } from "../../utils/r2";
import { getUnixTimeNow } from "../../api2/utils/time";
import { chainsThatShouldNotBeLowerCased } from "../../utils/shared/constants";
import { elastic, cache } from "@defillama/sdk";

const r2Key = "distressedAssetsList.json";

export async function isDistressed(key: string) {
function sanitizeKey(key: string) {
const chain = key.split(":")[0];
const address = key.substring(chain.length + 1);
const normalizedAddress = chainsThatShouldNotBeLowerCased.includes(chain)
? address
: address.toLowerCase();
const data = await getR2JSONString(r2Key);
const normalizedAddress = chainsThatShouldNotBeLowerCased.includes(chain) ? address : address.toLowerCase();
return `${chain}:${normalizedAddress}`;
}

export async function isDistressed(key: string, client?: any) {
const isLocalClient: boolean = client == undefined
if (isLocalClient) client = elastic.getClient();

if (!data[chain]) return false;
if (data[chain][normalizedAddress]) return true;
const _id = sanitizeKey(key)
const { hits } = await client.search({
index: "distressed-assets-store*",
body: {
query: {
match: { _id },
},
},
});

return false;
if (isLocalClient) await client?.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can leave the connection open, can close it once when process exits, noticed this pattern in other places as well, not sure if it is a good idea to open & close connection per requests, most of our scripts run under an hour, so dont see why we need to close it per write


return hits?.hits?.length > 0;
}

export async function addToDistressedList(key: string) {
const chain = key.split(":")[0];
const address = key.substring(chain.length + 1);
const normalizedAddress = chainsThatShouldNotBeLowerCased.includes(chain)
? address
: address.toLowerCase();
const data = await getR2JSONString(r2Key);
export async function addToDistressed(keys: string[], client?: any) {
const isLocalClient: boolean = client == undefined
if (isLocalClient) client = elastic.getClient();

const body: any[] = [];
keys.map((key: string) => {
const _id = sanitizeKey(key)
body.push({ index: { _index: "distressed-assets-store", _id } });
});

await client.bulk({ body });

if (isLocalClient) await client?.close();
}

export async function logDistressedCoins(keys: string[], protocol: string) {
await elastic.writeLog("distressed-assets", { keys, protocol, reportTime: getUnixTimeNow() });
}

export async function readDistressedLogs() {
const esClient = elastic.getClient();
const hourAgo = Math.floor(Date.now() / 1000) - 3600;
let { lastCheckTS } = (await cache.readExpiringJsonCache("distressed-assets-last-check")) || { lastCheckTS: 0 };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use this: elastic.getAllLogs() and skip the cache?

if (!lastCheckTS || lastCheckTS < hourAgo) lastCheckTS = hourAgo - 1;

let {
hits: { hits },
}: any = await esClient?.search({
index: "distressed-assets*",
size: 9999,
body: {
query: {
range: {
// find records with reportTime > lastCheckTS
reportTime: {
gt: lastCheckTS, // reportTime is in ms
},
},
},
},
});

if (!hits?.length) return;

const newDistressedCoins: string[] = [];
hits.map(({ _source: { keys } }: any) => {
newDistressedCoins.push(...keys);
});

await addToDistressed(newDistressedCoins, esClient);

const timeNow = Math.floor(Date.now() / 1000);

if (!data[chain]) data[chain] = {};
data[chain].push(normalizedAddress);
await cache.writeExpiringJsonCache(
"distressed-assets-last-check",
{ lastCheckTS: timeNow },
{ expireAfter: 7 * 24 * 3600 }
);

await storeR2JSONString(r2Key, JSON.stringify(data));
await esClient?.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to close this, or use expiring cache. can use the es index as single source of truth

}