Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions coins/src/adapters/bridges/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ import { getCurrentUnixTimestamp } from "../../utils/date";
import produceKafkaTopics from "../../utils/coins3/produce";
import { chainsThatShouldNotBeLowerCased } from "../../utils/shared/constants";
import { sendMessage } from "../../../../defi/src/utils/discord";
import { isDistressed } from "../../utils/shared/distressedCoins";

const craftToPK = (to: string) => (to.includes("#") ? to : `asset#${to}`);

Expand Down Expand Up @@ -206,6 +207,8 @@ async function _storeTokensOfBridge(bridge: Bridge, i: number) {
const writes: any[] = [];
await Promise.all(
unlisted.map(async (token) => {
if (await isDistressed(token.to)) return;

const finalPK = toAddressToRecord[craftToPK(token.to)];
if (finalPK === undefined) return;

Expand Down
13 changes: 11 additions & 2 deletions coins/src/scripts/coingecko.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import { storeAllTokens } from "../utils/shared/bridgedTvlPostgres";
import { sendMessage } from "../../../defi/src/utils/discord";
import { chainsThatShouldNotBeLowerCased } from "../utils/shared/constants";
import { cacheSolanaTokens, getSymbolAndDecimals } from "./coingeckoUtils";
import { batchIsDistressed, isDistressed } from "../utils/shared/distressedCoins";

// Kill the script after 5 minutes to prevent infinite execution
const TIMEOUT_MS = 10 * 60 * 1000; // 5 minutes in milliseconds
Expand All @@ -46,6 +47,8 @@ interface IdToSymbol {
}

async function storeCoinData(coinData: Write[]) {
const distressedIds = await batchIsDistressed(coinData.map((i) => i.PK));

const items = coinData
.map((c) => ({
PK: c.PK,
Expand All @@ -58,7 +61,8 @@ async function storeCoinData(coinData: Write[]) {
volume: c.volume,
adapter: 'coingecko'
}))
.filter((c: Write) => c.symbol != null);
.filter((c: Write) => c.symbol != null && !distressedIds[c.PK]);

await Promise.all([
produceKafkaTopics(
items.map((i) => {
Expand All @@ -71,13 +75,16 @@ async function storeCoinData(coinData: Write[]) {
}

async function storeHistoricalCoinData(coinData: Write[]) {
const distressedIds = await batchIsDistressed(coinData.map((i) => i.PK));

const items = coinData.map((c) => ({
SK: c.SK,
PK: c.PK,
price: c.price,
confidence: c.confidence,
volume: c.volume,
}));
})).filter((c: Write) => !distressedIds[c.PK]);

await Promise.all([
produceKafkaTopics(
items.map((i) => ({
Expand Down Expand Up @@ -335,6 +342,8 @@ async function getAndStoreHourly(
}
const PK = cgPK(coin.id);

if (await isDistressed(PK)) return;

const prevWritenItems = await batchReadPostgres(
`coingecko:${coin.id}`,
toUNIXTimestamp(coinData.prices[0][0]),
Expand Down
17 changes: 14 additions & 3 deletions coins/src/scripts/defiCoins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ console.log(process.version);
import adapters from "../adapters/index";
console.log("adapters imported");
import { PromisePool } from "@supercharge/promise-pool";
import { batchIsDistressed } from "../utils/shared/distressedCoins";


console.log("imports successful");

Expand All @@ -25,7 +27,7 @@ async function storeDefiCoins() {
process.env.tableName = "prod-coins-table";
const adaptersArray = Object.entries(adapters);
const protocolIndexes: number[] = Array.from(
Array(adaptersArray.length).keys(),
Array(adaptersArray.length).keys()
);
shuffleArray(protocolIndexes);
const a = Object.entries(adapters);
Expand All @@ -40,14 +42,22 @@ async function storeDefiCoins() {
try {
const adapterFn = typeof b === "function" ? b : b[adapterKey];
const results = await withTimeout(timeout, adapterFn(timestamp));
const blacklist = await batchIsDistressed(
results.flat().map((c: any) => c.PK)
);

const resultsWithoutDuplicates = await filterWritesWithLowConfidence(
results.flat().filter((c: any) => c.symbol != null || c.SK != 0),
results
.flat()
.filter(
(c: any) => (c.symbol != null || c.SK != 0) && !blacklist[c.PK]
)
);
for (let i = 0; i < resultsWithoutDuplicates.length; i += step) {
await Promise.all([
batchWriteWithAlerts(
resultsWithoutDuplicates.slice(i, i + step),
true,
true
),
]);
// await batchWrite2WithAlerts(
Expand Down Expand Up @@ -76,3 +86,4 @@ async function storeDefiCoins() {
}
storeDefiCoins();
// ts-node coins/src/scripts/defiCoins.ts

106 changes: 106 additions & 0 deletions defi/src/storeTvlInterval/computeTVL.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as sdk from '@defillama/sdk'
import { once, EventEmitter } from 'events'
import { searchWidth } from "../utils/shared/constants";
import { Client } from "@elastic/elasticsearch";
import { logDistressedCoins } from "../utils/shared/distressedCoins";

const ethereumAddress = "0x0000000000000000000000000000000000000000";
const weth = "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2";
Expand Down Expand Up @@ -59,7 +60,9 @@ export default async function (balances: { [address: string]: string }, timestam
const usdTokenBalances = {} as Balances;
const now = timestamp === "now" ? Math.round(Date.now() / 1000) : timestamp;
const tokenData = await getTokenData(readKeys, timestamp)
const mcapData = await getMcapData(readKeys, timestamp);
const staleCoinsInclusive: any = {};
const distressedCoins: any[] = []
tokenData.forEach((response: any) => {
if (Math.abs(response.timestamp - now) < searchWidth) {
PKsToTokens[response.PK].forEach((address) => {
Expand All @@ -75,6 +78,7 @@ export default async function (balances: { [address: string]: string }, timestam
amount = new BigNumber(balance).div(10 ** decimals).toNumber();
}
const usdAmount = amount * price;
checkMcaps(address, mcapData, usdAmount, distressedCoins, protocol)
checkForStaleness(usdAmount, response, now, protocol, staleCoinsInclusive);
tokenBalances[symbol] = (tokenBalances[symbol] ?? 0) + amount;
usdTokenBalances[symbol] = (usdTokenBalances[symbol] ?? 0) + usdAmount;
Expand All @@ -85,13 +89,21 @@ export default async function (balances: { [address: string]: string }, timestam

appendToStaleCoins(usdTvl, staleCoinsInclusive, staleCoins);

if (distressedCoins.length) await logDistressedCoins(distressedCoins);

return {
usdTvl,
tokenBalances,
usdTokenBalances,
};
}

function checkMcaps(address: string, mcapData: any, usdAmount: number, distressedCoins: any[], protocol: string) {
if (usdAmount < 1e7) return true;
const mcap = mcapData[address];
if (mcap && usdAmount > mcap) distressedCoins.push({ address, usdAmount, mcap, protocol });
}

function replaceETHwithWETH(balances: { [address: string]: string }) {
const keys = [ethereumAddress, 'ethereum:' + ethereumAddress]
for (const key of keys) {
Expand Down Expand Up @@ -215,6 +227,100 @@ async function getTokenData(readKeys: string[], timestamp: string | number): Pro
}
}

const mcapCache: { [PK: string]: any } = {}

async function getMcapData(readKeys: string[], timestamp: string | number): Promise<any> {
if (!readKeys.length) return []


const currentId = counter.requestCount++
const eventId = `${currentId}`

if (counter.activeWorkers > maxParallelCalls) {
counter.queue.push(eventId)
await once(emitter, eventId)
}

counter.activeWorkers++

const showEveryX = counter.queue.length > 100 ? 30 : 10 // show log fewer times if lot more are queued up
if (currentId % showEveryX === 0) sdk.log(`request #: ${currentId} queue: ${counter.queue.length} active requests: ${counter.activeWorkers}`)

let response
try {
response = await _getMcapData()
onComplete()
} catch (e) {
onComplete()
throw e
}

return response

function onComplete() {
counter.activeWorkers--
if (counter.queue.length) {
const nextRequestId = counter.pickFromTop ? counter.queue.shift() : counter.queue.pop()
counter.pickFromTop = !counter.pickFromTop
emitter.emit(<string>nextRequestId)
}
}

async function _getMcapData() {
let cachedMcapData: { [PK: string]: number } = {}

// read data from cache where possible
readKeys = readKeys.filter((PK: string) => {
if (timestamp !== 'now') return true;
if (mcapCache[PK]) {
cachedMcapData[PK] = mcapCache[PK];
return false;
}
return true;
})

if (!readKeys.length) return cachedMcapData;

const readRequests: any[] = [];
sdk.log(`mcap request count: ${readKeys.length}`)
for (let i = 0; i < readKeys.length; i += 100) {
const body = {
"coins": readKeys.slice(i, i + 100),
} as any
if (timestamp !== "now") {
body.timestamp = timestamp;
}
readRequests.push(
fetch(`https://coins.llama.fi/mcaps${process.env.COINS_KEY ? `?apikey=${process.env.COINS_KEY}` : ""}`, {
method: "POST",
body: JSON.stringify(body),
headers: { "Content-Type": "application/json" },
}).then((r) => r.json()).then(r => {
const mcaps: { [PK: string]: number } = {}
Object.keys(r).map((PK) => {
const mcap = r[PK].mcap
if (!mcap) return;
mcapCache[PK] = mcap
mcaps[PK] = mcap
})

return mcaps;
})
);
}
const tokenData = [cachedMcapData].concat(...(await Promise.all(readRequests)));

const mcapObject: { [PK: string]: number } = {}
tokenData.map((mcaps) => {
Object.entries(mcaps).forEach(([PK, mcap]: any) => {
mcapObject[PK] = mcap
})
})

return mcapObject;
}
}

interface Counter {
activeWorkers: number;
requestCount: number;
Expand Down
Loading