Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -324,5 +324,8 @@ export async function monitorTransactionsProposedOrderBook(
})
);

console.log("All proposals have been checked!");
Logger.debug({
at: "PolymarketMonitor",
message: `All ${allProposals.length} proposals have been checked!`,
});
}
255 changes: 254 additions & 1 deletion packages/monitor-v2/src/monitor-polymarket/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ export interface MonitoringParams {
fillEventsProposalGapSeconds: number;
httpClient: ReturnType<typeof createHttpClient>;
orderBookBatchSize: number;
orderBookSubgraphEndpoint: string;
ooV2Addresses: string[];
ooV1Addresses: string[];
aiConfig?: AIConfig;
subgraphSyncTolerance: number;
}
interface PolymarketMarketGraphql {
question: string;
Expand Down Expand Up @@ -374,6 +376,38 @@ export const getPolymarketMarketInformation = async (
});
};

interface OrderFilledEventSubgraph {
id: string;
transactionHash: string;
makerAssetId: string;
takerAssetId: string;
maker: string;
taker: string;
makerAmountFilled: string;
takerAmountFilled: string;
fee: string;
timestamp: string;
orderHash: string;
}

interface SubgraphOrderFilledResponse {
data?: {
orderFilledEvents: OrderFilledEventSubgraph[];
};
errors?: { message: string }[];
}

interface SubgraphMetaResponse {
data?: {
_meta: {
block: {
number: number;
};
};
};
errors?: { message: string }[];
}

const getTradeInfoFromOrderFilledEvent = async (
provider: Provider,
event: any
Expand All @@ -392,7 +426,186 @@ const getTradeInfoFromOrderFilledEvent = async (
};
};

export const getOrderFilledEvents = async (
const getTradeInfoFromSubgraphEvent = (event: OrderFilledEventSubgraph): PolymarketTradeInformation => {
const isBuy = event.makerAssetId === "0";
const makerAmountFilled = BigNumber.from(event.makerAmountFilled);
const takerAmountFilled = BigNumber.from(event.takerAmountFilled);

const numerator = (isBuy ? makerAmountFilled : takerAmountFilled).mul(1000);
const denominator = isBuy ? takerAmountFilled : makerAmountFilled;
const price = numerator.div(denominator).toNumber() / 1000;

return {
price,
type: isBuy ? "buy" : "sell",
timestamp: parseInt(event.timestamp),
// Convert to decimal value with 2 decimals
amount: (isBuy ? takerAmountFilled : makerAmountFilled).div(10_000).toNumber() / 100,
};
};

const querySubgraphOrderFilledEvents = async (
httpClient: AxiosInstance,
subgraphEndpoint: string,
whereField: "takerAssetId" | "makerAssetId",
assetId: string,
pageSize = 1000,
startTimestamp?: number
): Promise<OrderFilledEventSubgraph[]> => {
const allEvents: OrderFilledEventSubgraph[] = [];
let skip = 0;
let hasMore = true;

while (hasMore) {
// Build where clause with optional timestamp filter
const whereClause = startTimestamp
? `{timestamp_gt: ${startTimestamp}, ${whereField}: "${assetId}"}`
: `{${whereField}: "${assetId}"}`;

const query = `
{
orderFilledEvents(
where: ${whereClause},
first: ${pageSize},
skip: ${skip},
orderBy: timestamp,
orderDirection: asc
) {
id
transactionHash
makerAssetId
takerAssetId
maker
taker
makerAmountFilled
takerAmountFilled
fee
timestamp
orderHash
}
}
`;

const response = await httpClient.post<SubgraphOrderFilledResponse>(subgraphEndpoint, { query });

if (response.data.errors?.length) {
throw new Error(response.data.errors.map((e) => e.message).join("; "));
}

if (!response.data.data?.orderFilledEvents) {
throw new Error("Invalid response from subgraph");
}

const events = response.data.data.orderFilledEvents;
allEvents.push(...events);

// If we got fewer events than pageSize, we've reached the end
hasMore = events.length === pageSize;
skip += pageSize;
}

return allEvents;
};

const getOrderFilledEventsFromSubgraph = async (
params: MonitoringParams,
clobTokenIds: [string, string],
startTimestamp?: number
): Promise<[PolymarketTradeInformation[], PolymarketTradeInformation[]]> => {
// Query 4 combinations: takerAssetId for both tokens, makerAssetId for both tokens
const queries = [
{ whereField: "takerAssetId" as const, assetId: clobTokenIds[0], tokenIndex: 0 },
{ whereField: "takerAssetId" as const, assetId: clobTokenIds[1], tokenIndex: 1 },
{ whereField: "makerAssetId" as const, assetId: clobTokenIds[0], tokenIndex: 0 },
{ whereField: "makerAssetId" as const, assetId: clobTokenIds[1], tokenIndex: 1 },
];

// Execute all queries in parallel
const queryResults = await Promise.all(
queries.map((q) =>
querySubgraphOrderFilledEvents(
params.httpClient,
params.orderBookSubgraphEndpoint,
q.whereField,
q.assetId,
1000,
startTimestamp
)
)
);

// Group events by token index, deduplicating per token (same event can appear for both tokens)
const tokenOneEventIds = new Set<string>();
const tokenTwoEventIds = new Set<string>();
const tokenOneEvents: PolymarketTradeInformation[] = [];
const tokenTwoEvents: PolymarketTradeInformation[] = [];

// Process takerAssetId queries (index 0 and 1)
queryResults[0].forEach((event) => {
if (!tokenOneEventIds.has(event.id)) {
tokenOneEventIds.add(event.id);
tokenOneEvents.push(getTradeInfoFromSubgraphEvent(event));
}
});
queryResults[1].forEach((event) => {
if (!tokenTwoEventIds.has(event.id)) {
tokenTwoEventIds.add(event.id);
tokenTwoEvents.push(getTradeInfoFromSubgraphEvent(event));
}
});

// Process makerAssetId queries (index 2 and 3)
queryResults[2].forEach((event) => {
if (!tokenOneEventIds.has(event.id)) {
tokenOneEventIds.add(event.id);
tokenOneEvents.push(getTradeInfoFromSubgraphEvent(event));
}
});
queryResults[3].forEach((event) => {
if (!tokenTwoEventIds.has(event.id)) {
tokenTwoEventIds.add(event.id);
tokenTwoEvents.push(getTradeInfoFromSubgraphEvent(event));
}
});

// Sort by timestamp
const sortByTimestamp = (events: PolymarketTradeInformation[]): PolymarketTradeInformation[] => {
return events.sort((a, b) => a.timestamp - b.timestamp);
};

return [sortByTimestamp(tokenOneEvents), sortByTimestamp(tokenTwoEvents)];
};

const checkSubgraphSyncStatus = async (httpClient: AxiosInstance, subgraphEndpoint: string): Promise<number | null> => {
const query = `
{
_meta {
block {
number
}
}
}
`;

try {
const response = await httpClient.post<SubgraphMetaResponse>(subgraphEndpoint, { query });

if (response.data.errors?.length) {
throw new Error(response.data.errors.map((e) => e.message).join("; "));
}

if (!response.data.data?._meta?.block?.number) {
throw new Error("Invalid response from subgraph meta query");
}

return response.data.data._meta.block.number;
} catch (error) {
// Return null if we can't check sync status, caller should handle gracefully
return null;
}
};

const getOrderFilledEventsSlow = async (
params: MonitoringParams,
clobTokenIds: [string, string],
startBlockNumber: number
Expand Down Expand Up @@ -439,6 +652,38 @@ export const getOrderFilledEvents = async (
return [outcomeTokenOne, outcomeTokenTwo];
};

export const getOrderFilledEvents = async (
params: MonitoringParams,
clobTokenIds: [string, string],
startBlockNumber: number
): Promise<[PolymarketTradeInformation[], PolymarketTradeInformation[]]> => {
try {
// Check subgraph sync status first
const subgraphBlockNumber = await checkSubgraphSyncStatus(params.httpClient, params.orderBookSubgraphEndpoint);

if (subgraphBlockNumber !== null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when subgraphBlockNumber === null I don't think we should even try getOrderFilledEventsFromSubgraph below and instead fallback to getOrderFilledEventsSlow as subgraphSyncTolerance cannot be evaluated in this case.

// Get current block from provider
const currentBlockNumber = await params.provider.getBlockNumber();
const blockDifference = currentBlockNumber - subgraphBlockNumber;

// If subgraph is behind by more than tolerance, use slow method
if (blockDifference >= params.subgraphSyncTolerance) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the subgraphSyncTolerance naming implies that == should still be tolerated, so we might want to use > comparison here. Also running this couple of times I always observed subgraph lagging 1 block behind in my tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

return await getOrderFilledEventsSlow(params, clobTokenIds, startBlockNumber);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also might want to debug log the fallback

}
}

// Get the block timestamp from startBlockNumber
const startBlock = await params.provider.getBlock(startBlockNumber);
const startTimestamp = startBlock.timestamp;

// Try the fast subgraph version
return await getOrderFilledEventsFromSubgraph(params, clobTokenIds, startTimestamp);
} catch (error) {
// Fallback to the slow version if subgraph fails
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we debug log this for better observability

return await getOrderFilledEventsSlow(params, clobTokenIds, startBlockNumber);
}
};

export const calculatePolymarketQuestionID = (ancillaryData: string): string => {
return ethers.utils.keccak256(ancillaryData);
};
Expand Down Expand Up @@ -907,6 +1152,12 @@ export const initMonitoringParams = async (

const orderBookBatchSize = env.ORDER_BOOK_BATCH_SIZE ? Number(env.ORDER_BOOK_BATCH_SIZE) : 499;

const orderBookSubgraphEndpoint =
env.ORDER_BOOK_SUBGRAPH_ENDPOINT ||
"https://api.goldsky.com/api/public/project_cl6mb8i9h0003e201j6li0diw/subgraphs/orderbook-subgraph/0.0.1/gn";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


const subgraphSyncTolerance = env.SUBGRAPH_SYNC_TOLERANCE ? Number(env.SUBGRAPH_SYNC_TOLERANCE) : 1;

// Rate limit and retry with exponential backoff and jitter to handle rate limiting and errors from the APIs.
const httpClient = createHttpClient({
axios: { timeout: httpTimeout },
Expand Down Expand Up @@ -948,9 +1199,11 @@ export const initMonitoringParams = async (
fillEventsProposalGapSeconds,
httpClient,
orderBookBatchSize,
orderBookSubgraphEndpoint,
ooV2Addresses,
ooV1Addresses,
aiConfig,
subgraphSyncTolerance,
};
};

Expand Down