Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions apps/processing/src/services/processing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { EvmProvider } from "@grants-stack-indexer/chain-providers";
import {
DatabaseEventRegistry,
DatabaseStrategyRegistry,
InMemoryCachedEventRegistry,
InMemoryCachedStrategyRegistry,
Orchestrator,
RetroactiveProcessor,
Expand Down Expand Up @@ -67,12 +66,6 @@ export class ProcessingService {
// Initialize EVM provider
const evmProvider = new EvmProvider(chain.rpcUrls, optimism, logger);

// Initialize events registry for the chain
const cachedEventsRegistry = await InMemoryCachedEventRegistry.initialize(
logger,
eventsRegistry,
[chain.id as ChainId],
);
const cachedStrategyRegistry = await InMemoryCachedStrategyRegistry.initialize(
logger,
strategyRegistry,
Expand All @@ -84,7 +77,7 @@ export class ProcessingService {
{ ...core, evmProvider },
indexerClient,
{
eventsRegistry: cachedEventsRegistry,
eventsRegistry,
strategyRegistry: cachedStrategyRegistry,
},
chain.fetchLimit,
Expand All @@ -97,7 +90,7 @@ export class ProcessingService {
{ ...core, evmProvider },
indexerClient,
{
eventsRegistry: cachedEventsRegistry,
eventsRegistry,
strategyRegistry: cachedStrategyRegistry,
checkpointRepository: strategyProcessingCheckpointRepository,
},
Expand Down
2 changes: 0 additions & 2 deletions apps/processing/test/unit/processing.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { EvmProvider } from "@grants-stack-indexer/chain-providers";
import {
DatabaseEventRegistry,
DatabaseStrategyRegistry,
InMemoryCachedEventRegistry,
InMemoryCachedStrategyRegistry,
Orchestrator,
RetroactiveProcessor,
Expand Down Expand Up @@ -118,7 +117,6 @@ describe("ProcessingService", () => {
expect(DatabaseEventRegistry).toHaveBeenCalledTimes(1);
expect(EvmProvider).toHaveBeenCalledTimes(2);
expect(InMemoryCachedStrategyRegistry.initialize).toHaveBeenCalledTimes(2);
expect(InMemoryCachedEventRegistry.initialize).toHaveBeenCalledTimes(2);

// Verify orchestrators were created with correct parameters
expect(processingService["orchestrators"].size).toBe(2);
Expand Down
4 changes: 4 additions & 0 deletions packages/data-flow/src/data-loader/dataLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
IApplicationPayoutRepository,
IApplicationRepository,
IDonationRepository,
IEventRegistryRepository,
IProjectRepository,
IRoundRepository,
ITransactionManager,
Expand All @@ -17,6 +18,7 @@ import {
createProjectHandlers,
createRoundHandlers,
} from "./handlers/index.js";
import { createProcessedEventHandlers } from "./handlers/processedEvent.handlers.js";
import { ChangesetHandlers } from "./types/index.js";

/**
Expand All @@ -42,6 +44,7 @@ export class DataLoader implements IDataLoader {
application: IApplicationRepository;
donation: IDonationRepository;
applicationPayout: IApplicationPayoutRepository;
eventRegistry: IEventRegistryRepository;
},
private readonly transactionManager: ITransactionManager,
private readonly logger: ILogger,
Expand All @@ -52,6 +55,7 @@ export class DataLoader implements IDataLoader {
...createApplicationHandlers(repositories.application),
...createDonationHandlers(repositories.donation),
...createApplicationPayoutHandlers(repositories.applicationPayout),
...createProcessedEventHandlers(repositories.eventRegistry),
};
}

Expand Down
1 change: 1 addition & 0 deletions packages/data-flow/src/data-loader/handlers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export * from "./project.handlers.js";
export * from "./round.handlers.js";
export * from "./donation.handlers.js";
export * from "./applicationPayout.handlers.js";
export * from "./processedEvent.handlers.js";
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import {
IEventRegistryRepository,
ProcessedEventChangeset,
} from "@grants-stack-indexer/repository";

import { ChangesetHandler } from "../types/index.js";

/**
* Collection of handlers for application-related operations.
* Each handler corresponds to a specific Application changeset type.
*/
export type ProcessedEventHandlers = {
[K in ProcessedEventChangeset["type"]]: ChangesetHandler<K>;
};

/**
* Creates handlers for managing application-related operations.
*
* @param repository - The application repository instance used for database operations
* @returns An object containing all application-related handlers
*/
export const createProcessedEventHandlers = (
repository: IEventRegistryRepository,
): ProcessedEventHandlers => ({
InsertProcessedEvent: (async (changeset, txConnection): Promise<void> => {
await repository.saveLastProcessedEvent(
changeset.args.chainId,
changeset.args.processedEvent,
txConnection,
);
}) satisfies ChangesetHandler<"InsertProcessedEvent">,
});
51 changes: 37 additions & 14 deletions packages/data-flow/src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import {
UnsupportedEventException,
UnsupportedStrategy,
} from "@grants-stack-indexer/processors";
import { RoundNotFound, RoundNotFoundForId } from "@grants-stack-indexer/repository";
import {
Changeset,
IEventRegistryRepository,
RoundNotFound,
RoundNotFoundForId,
} from "@grants-stack-indexer/repository";
import {
Address,
AnyEvent,
Expand All @@ -29,7 +34,7 @@ import {
Token,
} from "@grants-stack-indexer/shared";

import type { IEventsFetcher, IEventsRegistry, IStrategyRegistry } from "./interfaces/index.js";
import type { IEventsFetcher, IStrategyRegistry } from "./interfaces/index.js";
import { EventsFetcher } from "./eventsFetcher.js";
import { EventsProcessor } from "./eventsProcessor.js";
import { InvalidEvent } from "./exceptions/index.js";
Expand Down Expand Up @@ -72,7 +77,7 @@ export class Orchestrator {
private readonly eventsByBlockContext: Map<number, AnyIndexerFetchedEvent[]>;
private readonly eventsFetcher: IEventsFetcher;
private readonly eventsProcessor: EventsProcessor;
private readonly eventsRegistry: IEventsRegistry;
private readonly eventsRegistry: IEventRegistryRepository;
private readonly strategyRegistry: IStrategyRegistry;
private readonly dataLoader: DataLoader;
private readonly retryHandler: RetryHandler;
Expand All @@ -91,7 +96,7 @@ export class Orchestrator {
private dependencies: Readonly<CoreDependencies>,
private indexerClient: IIndexerClient,
private registries: {
eventsRegistry: IEventsRegistry;
eventsRegistry: IEventRegistryRepository;
strategyRegistry: IStrategyRegistry;
},
private fetchLimit: number = 1000,
Expand All @@ -113,6 +118,7 @@ export class Orchestrator {
application: this.dependencies.applicationRepository,
donation: this.dependencies.donationRepository,
applicationPayout: this.dependencies.applicationPayoutRepository,
eventRegistry: this.eventsRegistry,
},
this.dependencies.transactionManager,
this.logger,
Expand Down Expand Up @@ -146,18 +152,34 @@ export class Orchestrator {
continue;
}

await this.eventsRegistry.saveLastProcessedEvent(this.chainId, {
...event,
rawEvent: event,
});

await this.retryHandler.execute(
async () => {
await this.handleEvent(event!);
const changesets = await this.handleEvent(event!);
if (changesets) {
await this.dataLoader.applyChanges([
...changesets,
{
type: "InsertProcessedEvent",
args: {
chainId: this.chainId,
processedEvent: {
...event!,
rawEvent: event,
},
},
},
]);
}
},
{ abortSignal: signal },
);
Comment on lines +157 to 175
Copy link
Copy Markdown

@coderabbitai coderabbitai bot Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential null reference in event

There is a risk of a null or undefined reference when using event! in the async function passed to this.retryHandler.execute. Ensure that event is properly checked or handled to avoid runtime errors.

Apply this diff to add null checking:

 await this.retryHandler.execute(
     async () => {
-        const changesets = await this.handleEvent(event!);
+        if (!event) {
+            throw new Error("Event is undefined or null");
+        }
+        const changesets = await this.handleEvent(event);
         if (changesets) {
             await this.dataLoader.applyChanges([
                 ...changesets,
                 {
                     type: "InsertProcessedEvent",
                     args: {
                         chainId: this.chainId,
                         processedEvent: {
-                            ...event!,
+                            ...event,
                             rawEvent: event,
                         },
                     },
                 },
             ]);
         }
     },
     { abortSignal: signal },
 );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const changesets = await this.handleEvent(event!);
if (changesets) {
await this.dataLoader.applyChanges([
...changesets,
{
type: "InsertProcessedEvent",
args: {
chainId: this.chainId,
processedEvent: {
...event!,
rawEvent: event,
},
},
},
]);
}
},
{ abortSignal: signal },
);
await this.retryHandler.execute(
async () => {
if (!event) {
throw new Error("Event is undefined or null");
}
const changesets = await this.handleEvent(event);
if (changesets) {
await this.dataLoader.applyChanges([
...changesets,
{
type: "InsertProcessedEvent",
args: {
chainId: this.chainId,
processedEvent: {
...event,
rawEvent: event,
},
},
},
]);
}
},
{ abortSignal: signal },
);

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

look at the prior lines, if event is undefined or null, we continue the loop prior to the the retryHandler execute method. this doesn't need to be fixed

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@0xnigir1 You're absolutely right. I apologize for the unnecessary suggestion. The code already has a proper null check before the retryHandler.execute call, which ensures that event is never null or undefined at that point.


✏️ Learnings added
Learnt from: 0xnigir1
PR: defi-wonderland/grants-stack-indexer-v2#60
File: packages/data-flow/src/orchestrator.ts:157-175
Timestamp: 2025-02-06T15:32:39.661Z
Learning: In the Orchestrator class, when processing events in a loop, null checking is performed before the retryHandler.execute call. If the event is null/undefined, the loop continues, ensuring that event is always defined within the retry handler's execution block.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

} catch (error: unknown) {
if (event) {
await this.eventsRegistry.saveLastProcessedEvent(this.chainId, {
...event,
rawEvent: event,
});
}
// TODO: notify
if (
error instanceof UnsupportedStrategy ||
Expand Down Expand Up @@ -403,7 +425,9 @@ export class Orchestrator {
return tokenPrices;
}

private async handleEvent(event: ProcessorEvent<ContractName, AnyEvent>): Promise<void> {
private async handleEvent(
event: ProcessorEvent<ContractName, AnyEvent>,
): Promise<Changeset[] | undefined> {
event = await this.enhanceStrategyId(event);
if (this.isPoolCreated(event)) {
const handleable = existsHandler(event.strategyId);
Expand All @@ -421,12 +445,11 @@ export class Orchestrator {
chainId: this.chainId,
});
// we skip the event if the strategy id is not handled yet
return;
return undefined;
}
}

const changesets = await this.eventsProcessor.processEvent(event);
await this.dataLoader.applyChanges(changesets);
return this.eventsProcessor.processEvent(event);
}

/**
Expand Down
11 changes: 7 additions & 4 deletions packages/data-flow/src/retroactiveProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { IIndexerClient } from "@grants-stack-indexer/indexer-client";
import { existsHandler, UnsupportedEventException } from "@grants-stack-indexer/processors";
import { IStrategyProcessingCheckpointRepository } from "@grants-stack-indexer/repository";
import {
IEventRegistryRepository,
IStrategyProcessingCheckpointRepository,
} from "@grants-stack-indexer/repository";
import {
Address,
AnyEvent,
Expand All @@ -20,7 +23,6 @@ import {
EventsFetcher,
EventsProcessor,
IEventsFetcher,
IEventsRegistry,
InvalidEvent,
IStrategyRegistry,
Queue,
Expand Down Expand Up @@ -60,7 +62,7 @@ type EventPointer = {
export class RetroactiveProcessor {
private readonly eventsFetcher: IEventsFetcher;
private readonly eventsProcessor: EventsProcessor;
private readonly eventsRegistry: IEventsRegistry;
private readonly eventsRegistry: IEventRegistryRepository;
private readonly strategyRegistry: IStrategyRegistry;
private readonly dataLoader: DataLoader;
private readonly checkpointRepository: IStrategyProcessingCheckpointRepository;
Expand All @@ -81,7 +83,7 @@ export class RetroactiveProcessor {
private dependencies: Readonly<CoreDependencies>,
private indexerClient: IIndexerClient,
private registries: {
eventsRegistry: IEventsRegistry;
eventsRegistry: IEventRegistryRepository;
strategyRegistry: IStrategyRegistry;
checkpointRepository: IStrategyProcessingCheckpointRepository;
},
Expand All @@ -104,6 +106,7 @@ export class RetroactiveProcessor {
application: this.dependencies.applicationRepository,
donation: this.dependencies.donationRepository,
applicationPayout: this.dependencies.applicationPayoutRepository,
eventRegistry: this.eventsRegistry,
},
this.dependencies.transactionManager,
this.logger,
Expand Down
6 changes: 6 additions & 0 deletions packages/data-flow/test/data-loader/dataLoader.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
IApplicationPayoutRepository,
IApplicationRepository,
IDonationRepository,
IEventRegistryRepository,
IProjectRepository,
IRoundRepository,
ITransactionManager,
Expand Down Expand Up @@ -41,6 +42,10 @@ describe("DataLoader", () => {
insertApplicationPayout: vi.fn(),
} as IApplicationPayoutRepository;

const mockEventRegistryRepository = {
saveLastProcessedEvent: vi.fn(),
} as unknown as IEventRegistryRepository;
Comment on lines +45 to +47
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Add test cases for ProcessedEventChangeset handling in dataLoader.spec.ts

The review comment is valid. While saveLastProcessedEvent is mocked in the test file, there are no specific test cases verifying the DataLoader's handling of ProcessedEventChangeset. The handler implementation exists in processedEvent.handlers.ts, but its behavior needs to be tested.

Suggested test cases to add:

  • Successful processing of InsertProcessedEvent changeset
  • Error handling during event processing
  • Transaction handling with the repository
🔗 Analysis chain

Add test cases for ProcessedEventChangeset handling.

While the mock is correctly set up, there are no test cases verifying the behavior of saveLastProcessedEvent. Consider adding test cases for:

  • Successful processing of InsertProcessedEvent changeset
  • Error handling during event processing
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check if there are any test cases for ProcessedEventChangeset
rg -A 5 "InsertProcessedEvent" "packages/data-flow/test/**/*.spec.ts"

Length of output: 146


Script:

#!/bin/bash
# Search for test files and relevant patterns
echo "=== Searching for saveLastProcessedEvent tests ==="
rg -A 5 "saveLastProcessedEvent" --type ts

echo -e "\n=== Searching for ProcessedEventChangeset tests ==="
rg -A 5 "ProcessedEventChangeset" --type ts

echo -e "\n=== Looking for related test files ==="
fd "dataLoader.spec.ts"

Length of output: 24168


const logger: ILogger = {
debug: vi.fn(),
error: vi.fn(),
Expand All @@ -61,6 +66,7 @@ describe("DataLoader", () => {
application: mockApplicationRepository,
donation: mockDonationRepository,
applicationPayout: mockApplicationPayoutRepository,
eventRegistry: mockEventRegistryRepository,
},
mockTransactionManager,
logger,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { describe, expect, it, vi } from "vitest";

import { IEventRegistryRepository, TransactionConnection } from "@grants-stack-indexer/repository";
import { ChainId } from "@grants-stack-indexer/shared";

import { createProcessedEventHandlers } from "../../../src/data-loader/handlers/processedEvent.handlers.js";

describe("ProcessedEvent Handlers", () => {
const chainId = 1 as ChainId;
const mockEvent = {
blockNumber: 1,
blockTimestamp: 1234567890,
logIndex: 0,
rawEvent: {},
};

describe("InsertProcessedEvent", () => {
it("saves event to repository within transaction", async () => {
const saveLastProcessedEvent = vi.fn();
const mockRepository = {
saveLastProcessedEvent,
getLastProcessedEvent: vi.fn(),
} as unknown as IEventRegistryRepository<TransactionConnection>;

const handlers = createProcessedEventHandlers(mockRepository);

const mockTx = {} as TransactionConnection;

await handlers.InsertProcessedEvent(
{
type: "InsertProcessedEvent",
args: {
chainId,
processedEvent: mockEvent,
},
},
mockTx,
);

expect(saveLastProcessedEvent).toHaveBeenCalledWith(chainId, mockEvent, mockTx);
});

it("propagates repository errors", async () => {
const error = new Error("Database error");
const saveLastProcessedEvent = vi.fn().mockRejectedValue(error);
const mockRepository = {
saveLastProcessedEvent,
getLastProcessedEvent: vi.fn(),
} as unknown as IEventRegistryRepository<TransactionConnection>;

const handlers = createProcessedEventHandlers(mockRepository);

const mockTx = {} as TransactionConnection;

await expect(
handlers.InsertProcessedEvent(
{
type: "InsertProcessedEvent",
args: {
chainId,
processedEvent: mockEvent,
},
},
mockTx,
),
).rejects.toThrow(error);
});
});
});
Loading