Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions fuel/fuel-dump/src/dumper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ export class FuelDumper extends Dumper<BlockData, Options> {
return block.block.header.prevRoot
}

protected getBlockTimestamp(block: BlockData): number {
const TAI64_UNIX_OFFSET = BigInt("4611686018427387914");
const taiTimestamp = BigInt(block.block.header.time);
const unixTimeMs = taiTimestamp - TAI64_UNIX_OFFSET;
return Number(unixTimeMs);
}

protected validateChainContinuity(): boolean {
return false
}
Expand Down
11 changes: 11 additions & 0 deletions fuel/fuel-ingest/src/ingest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,15 @@ export class FuelIngest extends Ingest<IngestOptions> {
})
}
}

protected getBlockHeight(block: any): number {
return Number(block.header.height) || 0
}

protected getBlockTimestamp(block: any): number {
const TAI64_UNIX_OFFSET = BigInt("4611686018427387914");
const taiTimestamp = BigInt(block.header.time);
const unixTimeMs = taiTimestamp - TAI64_UNIX_OFFSET;
return Number(unixTimeMs);
}
}
4 changes: 4 additions & 0 deletions solana/solana-dump/src/dumper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ export class SolanaDumper extends Dumper<Block, Options> {
return block.block.previousBlockhash
}

protected getBlockTimestamp(block: Block): number {
return Number(block.block.blockTime) || 0
}

@def
private solanaRpc(): Rpc {
return new Rpc(this.rpc())
Expand Down
8 changes: 8 additions & 0 deletions solana/solana-ingest/src/ingest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,12 @@ export class SolanaIngest extends Ingest<Options> {
})
}
}

protected getBlockHeight(block: any): number {
return Number(block.header.height) || 0
}

protected getBlockTimestamp(block: any): number {
return Number(block.header.timestamp) || 0
}
}
4 changes: 4 additions & 0 deletions substrate/substrate-dump/src/dumper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,8 @@ export class SubstrateDumper extends Dumper<BlockData, Options> {
protected getPrevBlockHash(block: BlockData): string {
return block.block.block.header.parentHash
}

protected getBlockTimestamp(block: BlockData): number {
return Math.floor(Date.now() / 1000); //mocked timestamp for now
}
}
8 changes: 8 additions & 0 deletions substrate/substrate-ingest/src/ingest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,12 @@ export class SubstrateIngest extends Ingest<Options> {
yield toJSON(blocks)
}
}

protected getBlockHeight(block: any): number {
return block.header.height || 0
}

protected getBlockTimestamp(block: any): number {
return Math.floor(block.header.timestamp / 1000) || 0
}
}
4 changes: 4 additions & 0 deletions tron/tron-dump/src/dumper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ export class TronDumper extends Dumper<BlockData, Options> {
return block.block.block_header.raw_data.parentHash
}

protected getBlockTimestamp(block: BlockData): number {
return block.block.block_header.raw_data.timestamp || 0
}

@def
httpApi(): HttpApi {
let client = new TronHttpClient({
Expand Down
8 changes: 8 additions & 0 deletions tron/tron-ingest/src/ingest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,12 @@ export class TronIngest extends Ingest<IngestOptions> {
})
}
}

protected getBlockHeight(block: any): number {
return block.header.height || 0
}

protected getBlockTimestamp(block: any): number {
return Math.floor(block.header.timestamp / 1000) || 0
}
}
19 changes: 17 additions & 2 deletions util/rpc-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ export interface RpcClientOptions {
log?: Logger | null
}

// Add interface for RPC metrics
export interface RpcMetrics {
url: string
requestsServed: number
connectionErrors: number
notificationsReceived: number
avg_response_time: number
}


export interface CallOptions<R=any> {
priority?: number
Expand Down Expand Up @@ -113,6 +122,7 @@ export class RpcClient {
private connectionErrors = 0
private requestsServed = 0
private notificationsReceived = 0
private totalResponseTime = 0
private backoffEpoch = 0
private backoffTime?: number
private notificationListeners: ((msg: RpcNotification) => void)[] = []
Expand Down Expand Up @@ -179,12 +189,13 @@ export class RpcClient {
return this.maxCapacity
}

getMetrics() {
getMetrics(): RpcMetrics {
return {
url: this.url,
requestsServed: this.requestsServed,
connectionErrors: this.connectionErrors,
notificationsReceived: this.notificationsReceived
notificationsReceived: this.notificationsReceived,
avg_response_time: this.requestsServed > 0 ? this.totalResponseTime / this.requestsServed : 0
}
}

Expand Down Expand Up @@ -364,6 +375,7 @@ export class RpcClient {
this.capacity -= 1
let backoffEpoch = this.backoffEpoch
let promise: Promise<any>
const startTime = Date.now()
if (Array.isArray(req.call)) {
let call = req.call
this.log?.debug({rpcBatchId: [call[0].id, last(call).id]}, 'rpc send')
Expand All @@ -382,6 +394,9 @@ export class RpcClient {
})
}
promise.then(result => {
const responseTimeSeconds = (Date.now() - startTime) / 1000
this.totalResponseTime += responseTimeSeconds

this.requestsServed += 1
if (this.backoffEpoch == backoffEpoch) {
this.connectionErrorsInRow = 0
Expand Down
56 changes: 54 additions & 2 deletions util/util-internal-dump-cli/src/dumper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,37 @@ export interface DumperOptions {
writeBatchSize: number
topDirSize: number
metrics?: number
maxCacheSize?: number
}


export abstract class Dumper<B extends {hash: string, height: number}, O extends DumperOptions = DumperOptions> {

private timestampCache = new Map<number, number>();

private addToCache(block: B): void {
const maxCacheSize = this.options().maxCacheSize ?? this.getDefaultCacheSize();
if (this.timestampCache.size >= maxCacheSize) {
const heights = Array.from(this.timestampCache.keys()).sort((a, b) => a - b);
const removeCount = Math.ceil(maxCacheSize * 0.2);
const keysToRemove = heights.slice(0, removeCount);
for (const key of keysToRemove) {
this.timestampCache.delete(key);
}
this.log().debug(`Cache cleanup: removed ${keysToRemove.length} oldest block timestamps`);
}

this.timestampCache.set(block.height, this.getBlockTimestamp(block));
}

protected abstract getBlocks(range: Range): AsyncIterable<B[]>

protected abstract getFinalizedHeight(): Promise<number>

protected abstract getPrevBlockHash(block: B): string

protected abstract getBlockTimestamp(block: B): number

protected setUpProgram(program: Command): void {}

protected getDefaultChunkSize(): number {
Expand All @@ -43,6 +64,10 @@ export abstract class Dumper<B extends {hash: string, height: number}, O extends
return 1024
}

protected getDefaultCacheSize(): number {
return 10
}

protected getLoggingNamespace(): string {
return 'sqd:dump'
}
Expand All @@ -61,6 +86,7 @@ export abstract class Dumper<B extends {hash: string, height: number}, O extends
program.option('--chunk-size <MB>', 'Data chunk size in megabytes', positiveInt, this.getDefaultChunkSize())
program.option('--write-batch-size <number>', 'Number of blocks to write at a time', positiveInt, 10)
program.option('--top-dir-size <number>', 'Number of items in a top level dir', positiveInt, this.getDefaultTopDirSize())
program.option('--max-cache-size <number>', 'Maximum number of blocks to keep in memory cache', positiveInt, this.getDefaultCacheSize())
program.option('--metrics <port>', 'Enable prometheus metrics server', nat)
return program
}
Expand Down Expand Up @@ -176,6 +202,17 @@ export abstract class Dumper<B extends {hash: string, height: number}, O extends
}
}

const lastBlock = last(blocks)
const mintedTimestamp = this.getBlockTimestamp(lastBlock)

for (const block of blocks) {
this.addToCache(block);
}

this.prometheus().setLatestBlockMetrics(lastBlock.height, mintedTimestamp)
this.log().debug(`Received block ${lastBlock.height} with minted timestamp ${mintedTimestamp}`)
this.log().debug(`Cache size: ${this.timestampCache.size}`)

yield blocks

progress.setCurrentValue(last(blocks).height)
Expand Down Expand Up @@ -206,7 +243,11 @@ export abstract class Dumper<B extends {hash: string, height: number}, O extends
for (let block of bb) {
process.stdout.write(JSON.stringify(block) + '\n')
}
prometheus.setLastWrittenBlock(last(bb).height)
const lastBlockHeight = last(bb).height;
prometheus.setLastWrittenBlock(lastBlockHeight);
const processedTimestamp = this.getBlockTimestamp(last(bb));
prometheus.setProcessedBlockMetrics(processedTimestamp);
this.log().debug(`Processed block ${lastBlockHeight} at ${processedTimestamp}`);
}
} else {
let archive = new ArchiveLayout(this.destination(), {
Expand All @@ -217,7 +258,18 @@ export abstract class Dumper<B extends {hash: string, height: number}, O extends
range: this.range(),
chunkSize: chunkSize * 1024 * 1024,
writeBatchSize: this.options().writeBatchSize,
onSuccessWrite: ctx => prometheus.setLastWrittenBlock(ctx.blockRange.to.height)
onSuccessWrite: ctx => {
const blockHeight = ctx.blockRange.to.height;
prometheus.setLastWrittenBlock(blockHeight);

const cachedTimestamp = this.timestampCache.get(blockHeight);
if (cachedTimestamp) {
prometheus.setProcessedBlockMetrics(cachedTimestamp);
this.log().debug(`Processed block ${blockHeight} at ${cachedTimestamp}`);
} else {
this.log().warn(`No cached timestamp available for height ${blockHeight}`);
}
}
})
}
}, err => {
Expand Down
Loading
Loading