Skip to content

Commit 13a5961

Browse files
authored
fix(indexer): resolve sync issues with fastnear (#837)
1 parent ff23be2 commit 13a5961

File tree

8 files changed

+176
-41
lines changed

8 files changed

+176
-41
lines changed

apps/indexer-base/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616
"lint:check": "tsc --noEmit && eslint ./"
1717
},
1818
"dependencies": {
19+
"@aws-sdk/client-s3": "3.750.0",
1920
"@near-lake/framework": "0.1.5",
2021
"@scure/base": "1.1.3",
2122
"@sentry/node": "7.74.1",
23+
"@smithy/node-http-handler": "4.0.3",
2224
"axios": "1.5.1",
2325
"envalid": "8.0.0",
2426
"ethers": "6.13.2",
2527
"lodash-es": "4.17.21",
26-
"near-lake-framework": "2.0.0",
2728
"stream-json": "1.8.0"
2829
},
2930
"devDependencies": {

apps/indexer-base/src/config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
import { types } from '@near-lake/framework';
12
import { bool, cleanEnv, num, str, url } from 'envalid';
2-
import { types } from 'near-lake-framework';
33

44
import { Network } from 'nb-types';
55

apps/indexer-base/src/libs/utils.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ import { createRequire } from 'module';
33
import { base58 } from '@scure/base';
44
import { decodeBase64, hexlify, Transaction } from 'ethers';
55
import { snakeCase, toUpper } from 'lodash-es';
6-
import { types } from 'near-lake-framework';
76

87
import { logger } from 'nb-logger';
8+
import { Action, ExecutionStatus, ReceiptEnum } from 'nb-neardata';
99
import {
1010
AccessKeyPermissionKind,
1111
ActionKind,
@@ -38,7 +38,7 @@ export const decodeArgs = <T>(args: string): T =>
3838
json.parse(Buffer.from(args, 'base64').toString());
3939

4040
export const mapExecutionStatus = (
41-
status: types.ExecutionStatus,
41+
status: ExecutionStatus,
4242
): ExecutionOutcomeStatus => {
4343
const key = toUpper(
4444
snakeCase(Object.keys(status)[0]),
@@ -47,13 +47,13 @@ export const mapExecutionStatus = (
4747
return ExecutionOutcomeStatus[key];
4848
};
4949

50-
export const mapReceiptKind = (receipt: types.ReceiptEnum): ReceiptKind => {
50+
export const mapReceiptKind = (receipt: ReceiptEnum): ReceiptKind => {
5151
const key = toUpper(Object.keys(receipt)[0]) as keyof typeof ReceiptKind;
5252

5353
return ReceiptKind[key];
5454
};
5555

56-
export const mapActionKind = (action: types.Action): ReceiptAction => {
56+
export const mapActionKind = (action: Action): ReceiptAction => {
5757
let kind = ActionKind.UNKNOWN;
5858
let args = {};
5959
let rlpHash = null;
@@ -172,12 +172,12 @@ export const mapActionKind = (action: types.Action): ReceiptAction => {
172172
};
173173

174174
export const camelCaseToSnakeCase = (
175-
value: Exclude<types.Action, 'CreateAccount'>,
175+
value: Exclude<Action, 'CreateAccount'>,
176176
) => {
177177
const newValue: Record<string, unknown> = {};
178178

179179
for (const key in value) {
180-
newValue[snakeCase(key)] = value[key as keyof types.Action];
180+
newValue[snakeCase(key)] = value[key as keyof Action];
181181
}
182182

183183
return newValue;
@@ -193,7 +193,7 @@ export const publicKeyFromImplicitAccount = (account: string) => {
193193
}
194194
};
195195

196-
export const isExecutionSuccess = (status: types.ExecutionStatus) => {
196+
export const isExecutionSuccess = (status: ExecutionStatus) => {
197197
if ('SuccessValue' in status || 'SuccessReceiptId' in status) {
198198
return true;
199199
}

apps/indexer-base/src/services/s3.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { PutObjectCommand, S3Client } from '@aws-sdk/client-s3';
2+
import { NodeHttpHandler } from '@smithy/node-http-handler';
23

34
import { Message } from 'nb-neardata';
45

@@ -11,7 +12,12 @@ const s3Client = new S3Client({
1112
},
1213
endpoint: config.s3Endpoint,
1314
forcePathStyle: true,
15+
maxAttempts: 3,
1416
region: config.s3Region,
17+
requestHandler: new NodeHttpHandler({
18+
connectionTimeout: 5000,
19+
requestTimeout: 10000,
20+
}),
1521
});
1622

1723
export const uploadJson = async (message: Message) => {

apps/indexer-base/src/types/types.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
import { types } from 'near-lake-framework';
1+
import { types } from '@near-lake/framework';
22

3+
import { Action } from 'nb-neardata';
34
import { AccessKeyPermissionKind, ActionKind, Network } from 'nb-types';
45

56
export interface Config {
@@ -38,7 +39,7 @@ export interface Config {
3839

3940
export type ActionReceipt = {
4041
Action: {
41-
actions: types.Action[];
42+
actions: Action[];
4243
gasPrice: string;
4344
inputDataIds: string[];
4445
outputDataReceivers: DataReceiver[];

packages/nb-neardata/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "nb-neardata",
3-
"version": "0.2.2",
3+
"version": "0.2.3",
44
"author": "NearBlocks",
55
"license": "Business Source License 1.1",
66
"type": "module",

packages/nb-neardata/src/index.ts

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ const fetchBlock = async (url: string, block: number): Promise<Message> => {
6464

6565
if (!response.ok) {
6666
if (response.status === 404) {
67-
await sleep(700);
67+
await sleep(100);
6868
}
6969

7070
throw new Error(`status: ${response.status}`);
@@ -78,7 +78,6 @@ const fetchBlock = async (url: string, block: number): Promise<Message> => {
7878
);
7979
};
8080

81-
/*
8281
const fetchFinal = async (url: string): Promise<Message> => {
8382
return await retry(
8483
async () => {
@@ -98,14 +97,13 @@ const fetchFinal = async (url: string): Promise<Message> => {
9897
{ exponential: true, logger: retryLogger, retries },
9998
);
10099
};
101-
*/
102100

103101
export const streamBlock = (config: BlockStreamConfig) => {
104102
const url = config.url ?? endpoint(config.network);
105103
const limit = config.limit ?? 10;
106104
let isFetching = false;
107105
let block = config.start;
108-
const highWaterMark = limit * 2;
106+
const highWaterMark = limit * 5;
109107

110108
const readable = new Readable({
111109
highWaterMark,
@@ -119,31 +117,47 @@ export const streamBlock = (config: BlockStreamConfig) => {
119117
isFetching = true;
120118

121119
try {
122-
/*
123120
const remaining = highWaterMark - readable.readableLength;
124121

122+
logger.warn({ fetchingBlock: block, queueSize: readable.readableLength });
123+
125124
if (block % 10 === 0 && remaining >= 5) {
126-
const final = (await fetchFinal(url)).block.header.height;
125+
const finalBlocks = [];
126+
127+
for (let i = 0; i < 2; i++) {
128+
finalBlocks.push((await fetchFinal(url)).block.header.height);
129+
130+
if (i === 0) {
131+
await sleep(100);
132+
}
133+
}
127134

135+
const final = Math.max(...finalBlocks);
128136
const promises: Promise<Message>[] = [];
129137
const concurrency = Math.min(limit, final - block, remaining);
130138

131-
for (let i = 0; i < concurrency; i++) {
132-
promises.push(fetchBlock(url, block + i));
133-
}
139+
logger.warn({ concurrency, finalBlock: final });
134140

135-
const results = await Promise.all(promises);
141+
if (concurrency > 0) {
142+
for (let i = 0; i < concurrency; i++) {
143+
promises.push(fetchBlock(url, block + i));
144+
}
145+
146+
const results = await Promise.all(promises);
147+
148+
for (const result of results) {
149+
if (result) {
150+
const message: Message = camelCaseKeys(result);
136151

137-
for (const result of results) {
138-
if (result && !readable.push(camelCaseKeys(result))) {
139-
return;
152+
if (!readable.push(message)) {
153+
return;
154+
}
155+
156+
block = message.block.header.height + 1;
157+
}
140158
}
141159
}
142-
143-
block += concurrency;
144-
return;
145160
}
146-
*/
147161

148162
const result = await fetchBlock(url, block);
149163

@@ -164,7 +178,7 @@ export const streamBlock = (config: BlockStreamConfig) => {
164178
}
165179
};
166180

167-
const interval = setInterval(fetchBlocks, 500);
181+
const interval = setInterval(fetchBlocks, 100);
168182

169183
readable.on('close', () => clearInterval(interval));
170184

0 commit comments

Comments
 (0)