Skip to content

Commit 465aa0b

Browse files
authored
fix: event_observer_requests json writes (#1334)
* fix: event_observer_requests json format * chore: remove tsv from another branch
1 parent cdef6b2 commit 465aa0b

File tree

2 files changed

+6
-6
lines changed

2 files changed

+6
-6
lines changed

src/datastore/pg-write-store.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ import {
7474
} from './helpers';
7575
import { PgNotifier } from './pg-notifier';
7676
import { PgStore } from './pg-store';
77-
import { connectPostgres, PgServer, PgSqlClient } from './connection';
77+
import { connectPostgres, PgJsonb, PgServer, PgSqlClient } from './connection';
7878
import { runMigrations } from './migrations';
7979
import { getPgClientConfig } from './connection-legacy';
8080
import { isProcessableTokenMetadata } from '../token-metadata/helpers';
@@ -162,7 +162,7 @@ export class PgWriteStore extends PgStore {
162162
};
163163
}
164164

165-
async storeRawEventRequest(eventPath: string, payload: string): Promise<void> {
165+
async storeRawEventRequest(eventPath: string, payload: PgJsonb): Promise<void> {
166166
// To avoid depending on the DB more than once and to allow the query transaction to settle,
167167
// we'll take the complete insert result and move that to the output TSV file instead of taking
168168
// only the `id` and performing a `COPY` of that row later.

src/event-stream/event-server.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ import {
6868

6969
async function handleRawEventRequest(
7070
eventPath: string,
71-
payload: string,
71+
payload: any,
7272
db: PgWriteStore
7373
): Promise<void> {
7474
await db.storeRawEventRequest(eventPath, payload);
@@ -583,7 +583,7 @@ async function handleNewAttachmentMessage(msg: CoreNodeAttachmentMessage[], db:
583583
}
584584

585585
interface EventMessageHandler {
586-
handleRawEventRequest(eventPath: string, payload: string, db: PgWriteStore): Promise<void> | void;
586+
handleRawEventRequest(eventPath: string, payload: any, db: PgWriteStore): Promise<void> | void;
587587
handleBlockMessage(
588588
chainId: ChainID,
589589
msg: CoreNodeBlockMessage,
@@ -607,7 +607,7 @@ function createMessageProcessorQueue(): EventMessageHandler {
607607
// Create a promise queue so that only one message is handled at a time.
608608
const processorQueue = new PQueue({ concurrency: 1 });
609609
const handler: EventMessageHandler = {
610-
handleRawEventRequest: (eventPath: string, payload: string, db: PgWriteStore) => {
610+
handleRawEventRequest: (eventPath: string, payload: any, db: PgWriteStore) => {
611611
return processorQueue
612612
.add(() => handleRawEventRequest(eventPath, payload, db))
613613
.catch(e => {
@@ -733,7 +733,7 @@ export async function startEventServer(opts: {
733733
asyncHandler(async (req, res, next) => {
734734
const eventPath = req.path;
735735
let payload = JSON.stringify(req.body);
736-
await messageHandler.handleRawEventRequest(eventPath, payload, db);
736+
await messageHandler.handleRawEventRequest(eventPath, req.body, db);
737737
if (logger.isDebugEnabled()) {
738738
// Skip logging massive event payloads, this _should_ only exclude the genesis block payload which is ~80 MB.
739739
if (payload.length > 10_000_000) {

0 commit comments

Comments
 (0)