diff --git a/sdk/package.json b/sdk/package.json index f26e7faa52..c5b9c00e2a 100644 --- a/sdk/package.json +++ b/sdk/package.json @@ -55,6 +55,7 @@ "@triton-one/yellowstone-grpc": "1.3.0", "anchor-bankrun": "0.3.0", "gill": "^0.10.2", + "helius-laserstream": "0.1.8", "nanoid": "3.3.4", "node-cache": "5.1.2", "rpc-websockets": "7.5.1", diff --git a/sdk/src/accounts/laserProgramAccountSubscriber.ts b/sdk/src/accounts/laserProgramAccountSubscriber.ts new file mode 100644 index 0000000000..914cf1f34a --- /dev/null +++ b/sdk/src/accounts/laserProgramAccountSubscriber.ts @@ -0,0 +1,215 @@ +import { GrpcConfigs, ResubOpts } from './types'; +import { Program } from '@coral-xyz/anchor'; +import { Context, MemcmpFilter, PublicKey } from '@solana/web3.js'; +import * as Buffer from 'buffer'; +import { WebSocketProgramAccountSubscriber } from './webSocketProgramAccountSubscriber'; + +import { + LaserCommitmentLevel, + LaserSubscribe, + LaserstreamConfig, + LaserSubscribeRequest, + LaserSubscribeUpdate, + CompressionAlgorithms, +} from '../isomorphic/grpc.node'; +import { CommitmentLevel } from '@triton-one/yellowstone-grpc'; + +type LaserCommitment = + (typeof LaserCommitmentLevel)[keyof typeof LaserCommitmentLevel]; + +export class LaserstreamProgramAccountSubscriber< + T, +> extends WebSocketProgramAccountSubscriber { + private stream: + | { + id: string; + cancel: () => void; + write?: (req: LaserSubscribeRequest) => Promise; + } + | undefined; + + private commitmentLevel: CommitmentLevel; + public listenerId?: number; + + private readonly laserConfig: LaserstreamConfig; + + private constructor( + laserConfig: LaserstreamConfig, + commitmentLevel: CommitmentLevel, + subscriptionName: string, + accountDiscriminator: string, + program: Program, + decodeBufferFn: (accountName: string, ix: Buffer) => T, + options: { filters: MemcmpFilter[] } = { filters: [] }, + resubOpts?: ResubOpts + ) { + super( + subscriptionName, + accountDiscriminator, + program, + decodeBufferFn, + options, + resubOpts + ); + this.laserConfig = laserConfig; + this.commitmentLevel = this.toLaserCommitment(commitmentLevel); + } + + public static async create( + grpcConfigs: GrpcConfigs, + subscriptionName: string, + accountDiscriminator: string, + program: Program, + decodeBufferFn: (accountName: string, ix: Buffer) => U, + options: { filters: MemcmpFilter[] } = { + filters: [], + }, + resubOpts?: ResubOpts + ): Promise> { + const laserConfig: LaserstreamConfig = { + apiKey: grpcConfigs.token, + endpoint: grpcConfigs.endpoint, + maxReconnectAttempts: grpcConfigs.enableReconnect ? 10 : 0, + channelOptions: { + 'grpc.default_compression_algorithm': CompressionAlgorithms.zstd, + 'grpc.max_receive_message_length': 1_000_000_000, + }, + }; + + const commitmentLevel = + grpcConfigs.commitmentLevel ?? CommitmentLevel.CONFIRMED; + + return new LaserstreamProgramAccountSubscriber( + laserConfig, + commitmentLevel, + subscriptionName, + accountDiscriminator, + program, + decodeBufferFn, + options, + resubOpts + ); + } + + async subscribe( + onChange: ( + accountId: PublicKey, + data: T, + context: Context, + buffer: Buffer + ) => void + ): Promise { + if (this.listenerId != null || this.isUnsubscribing) return; + + this.onChange = onChange; + + const filters = this.options.filters.map((filter) => { + return { + memcmp: { + offset: filter.memcmp.offset, + base58: filter.memcmp.bytes, + }, + }; + }); + + const request: LaserSubscribeRequest = { + slots: {}, + accounts: { + drift: { + account: [], + owner: [this.program.programId.toBase58()], + filters, + }, + }, + transactions: {}, + blocks: {}, + blocksMeta: {}, + accountsDataSlice: [], + commitment: this.commitmentLevel, + entry: {}, + transactionsStatus: {}, + }; + + try { + const stream = await LaserSubscribe( + this.laserConfig, + request, + async (update: LaserSubscribeUpdate) => { + if (update.account) { + const slot = Number(update.account.slot); + const acc = update.account.account; + + const accountInfo = { + owner: new PublicKey(acc.owner), + lamports: Number(acc.lamports), + data: Buffer.Buffer.from(acc.data), + executable: acc.executable, + rentEpoch: Number(acc.rentEpoch), + }; + + const payload = { + accountId: new PublicKey(acc.pubkey), + accountInfo, + }; + + if (this.resubOpts?.resubTimeoutMs) { + this.receivingData = true; + clearTimeout(this.timeoutId); + this.handleRpcResponse({ slot }, payload); + this.setTimeout(); + } else { + this.handleRpcResponse({ slot }, payload); + } + } + }, + async (error) => { + console.error('LaserStream client error:', error); + throw error; + } + ); + + this.stream = stream; + this.listenerId = 1; + + if (this.resubOpts?.resubTimeoutMs) { + this.receivingData = true; + this.setTimeout(); + } + } catch (err) { + console.error('Failed to start LaserStream client:', err); + throw err; + } + } + + public async unsubscribe(onResub = false): Promise { + if (!onResub && this.resubOpts) { + this.resubOpts.resubTimeoutMs = undefined; + } + this.isUnsubscribing = true; + clearTimeout(this.timeoutId); + this.timeoutId = undefined; + + if (this.listenerId != null && this.stream) { + try { + this.stream.cancel(); + } finally { + this.listenerId = undefined; + this.isUnsubscribing = false; + } + } else { + this.isUnsubscribing = false; + } + } + + public toLaserCommitment( + level: string | number | undefined + ): LaserCommitment { + if (typeof level === 'string') { + return ( + (LaserCommitmentLevel as any)[level.toUpperCase()] ?? + LaserCommitmentLevel.CONFIRMED + ); + } + return (level as LaserCommitment) ?? LaserCommitmentLevel.CONFIRMED; + } +} diff --git a/sdk/src/accounts/types.ts b/sdk/src/accounts/types.ts index 98ab7133fb..c572ae2798 100644 --- a/sdk/src/accounts/types.ts +++ b/sdk/src/accounts/types.ts @@ -234,6 +234,7 @@ export type GrpcConfigs = { * Defaults to false, will throw on connection loss. */ enableReconnect?: boolean; + client?: 'yellowstone' | 'laser'; }; export interface HighLeverageModeConfigAccountSubscriber { diff --git a/sdk/src/isomorphic/grpc.node.ts b/sdk/src/isomorphic/grpc.node.ts index 4d58f734d1..8652858ef0 100644 --- a/sdk/src/isomorphic/grpc.node.ts +++ b/sdk/src/isomorphic/grpc.node.ts @@ -6,6 +6,15 @@ import type { } from '@triton-one/yellowstone-grpc'; import { ClientDuplexStream, ChannelOptions } from '@grpc/grpc-js'; +import { + CommitmentLevel as LaserCommitmentLevel, + subscribe as LaserSubscribe, + LaserstreamConfig, + SubscribeRequest as LaserSubscribeRequest, + SubscribeUpdate as LaserSubscribeUpdate, + CompressionAlgorithms, +} from 'helius-laserstream'; + export { ClientDuplexStream, ChannelOptions, @@ -13,6 +22,12 @@ export { SubscribeUpdate, CommitmentLevel, Client, + LaserSubscribe, + LaserCommitmentLevel, + LaserstreamConfig, + LaserSubscribeRequest, + LaserSubscribeUpdate, + CompressionAlgorithms, }; // Export a function to create a new Client instance diff --git a/sdk/src/orderSubscriber/grpcSubscription.ts b/sdk/src/orderSubscriber/grpcSubscription.ts index 41101435ab..04160c0bff 100644 --- a/sdk/src/orderSubscriber/grpcSubscription.ts +++ b/sdk/src/orderSubscriber/grpcSubscription.ts @@ -5,6 +5,7 @@ import { OrderSubscriber } from './OrderSubscriber'; import { GrpcConfigs, ResubOpts } from '../accounts/types'; import { UserAccount } from '../types'; import { getUserFilter, getNonIdleUserFilter } from '../memcmp'; +import { LaserstreamProgramAccountSubscriber } from '../accounts/laserProgramAccountSubscriber'; export class grpcSubscription { private orderSubscriber: OrderSubscriber; @@ -12,7 +13,9 @@ export class grpcSubscription { private resubOpts?: ResubOpts; private resyncIntervalMs?: number; - private subscriber?: grpcProgramAccountSubscriber; + private subscriber?: + | grpcProgramAccountSubscriber + | LaserstreamProgramAccountSubscriber; private resyncTimeoutId?: ReturnType; private decoded?: boolean; @@ -47,17 +50,32 @@ export class grpcSubscription { return; } - this.subscriber = await grpcProgramAccountSubscriber.create( - this.grpcConfigs, - 'OrderSubscriber', - 'User', - this.orderSubscriber.driftClient.program, - this.orderSubscriber.decodeFn, - { - filters: [getUserFilter(), getNonIdleUserFilter()], - }, - this.resubOpts - ); + if (this.grpcConfigs.client === 'laser') { + this.subscriber = + await LaserstreamProgramAccountSubscriber.create( + this.grpcConfigs, + 'OrderSubscriber', + 'User', + this.orderSubscriber.driftClient.program, + this.orderSubscriber.decodeFn, + { + filters: [getUserFilter(), getNonIdleUserFilter()], + }, + this.resubOpts + ); + } else { + this.subscriber = await grpcProgramAccountSubscriber.create( + this.grpcConfigs, + 'OrderSubscriber', + 'User', + this.orderSubscriber.driftClient.program, + this.orderSubscriber.decodeFn, + { + filters: [getUserFilter(), getNonIdleUserFilter()], + }, + this.resubOpts + ); + } await this.subscriber.subscribe( ( diff --git a/sdk/yarn.lock b/sdk/yarn.lock index 4013c2cc05..3e80163db6 100644 --- a/sdk/yarn.lock +++ b/sdk/yarn.lock @@ -2820,6 +2820,51 @@ he@^1.2.0: resolved "https://registry.yarnpkg.com/he/-/he-1.2.0.tgz#84ae65fa7eafb165fddb61566ae14baf05664f0f" integrity sha512-F/1DnUGPopORZi0ni+CvrCgHQ5FyEAHRLSApuYWMmrbSwoN2Mn/7k+Gl38gJnR7yyDZk6WLXwiGod1JOWNDKGw== +helius-laserstream-darwin-arm64@0.1.8: + version "0.1.8" + resolved "https://registry.yarnpkg.com/helius-laserstream-darwin-arm64/-/helius-laserstream-darwin-arm64-0.1.8.tgz#d78ad15e6cd16dc9379a9a365f9fcb3f958e6c01" + integrity sha512-p/K2Mi3wZnMxEYSLCvu858VyMvtJFonhdF8cLaMcszFv04WWdsK+hINNZpVRfakypvDfDPbMudEhL1Q9USD5+w== + +helius-laserstream-darwin-x64@0.1.8: + version "0.1.8" + resolved "https://registry.yarnpkg.com/helius-laserstream-darwin-x64/-/helius-laserstream-darwin-x64-0.1.8.tgz#e57bc8f03135fd3b5c01a5aebd7b87c42129da50" + integrity sha512-Hd5irFyfOqQZLdoj5a+OV7vML2YfySSBuKlOwtisMHkUuIXZ4NpAexslDmK7iP5VWRI+lOv9X/tA7BhxW7RGSQ== + +helius-laserstream-linux-arm64-gnu@0.1.8: + version "0.1.8" + resolved "https://registry.yarnpkg.com/helius-laserstream-linux-arm64-gnu/-/helius-laserstream-linux-arm64-gnu-0.1.8.tgz#1b3c8440804d143f650166842620fc334f9c319b" + integrity sha512-PlPm1dvOvTGBL1nuzK98Xe40BJq1JWNREXlBHKDVA/B+KCGQnIMJ1s6e1MevSvFE7SOix5i1BxhYIxGioK2GMg== + +helius-laserstream-linux-arm64-musl@0.1.8: + version "0.1.8" + resolved "https://registry.yarnpkg.com/helius-laserstream-linux-arm64-musl/-/helius-laserstream-linux-arm64-musl-0.1.8.tgz#28e0645bebc3253d2a136cf0bd13f8cb5256f47b" + integrity sha512-LFadfMRuTd1zo6RZqLTgHQapo3gJYioS7wFMWFoBOFulG0BpAqHEDNobkxx0002QArw+zX29MQ/5OaOCf8kKTA== + +helius-laserstream-linux-x64-gnu@0.1.8: + version "0.1.8" + resolved "https://registry.yarnpkg.com/helius-laserstream-linux-x64-gnu/-/helius-laserstream-linux-x64-gnu-0.1.8.tgz#e59990ca0bcdc27e46f71a8fc2c18fddbe6f07e3" + integrity sha512-IZWK/OQIe0647QqfYikLb1DFK+nYtXLJiMcpj24qnNVWBOtMXmPc1hL6ebazdEiaKt7fxNd5IiM1RqeaqZAZMw== + +helius-laserstream-linux-x64-musl@0.1.8: + version "0.1.8" + resolved "https://registry.yarnpkg.com/helius-laserstream-linux-x64-musl/-/helius-laserstream-linux-x64-musl-0.1.8.tgz#42aa0919ef266c40f50ac74d6f9d871d4e2e7c9c" + integrity sha512-riTS6VgxDae1fHOJ2XC/o/v1OZRbEv/3rcoa3NlAOnooDKp5HDgD0zJTcImjQHpYWwGaejx1oX/Ht53lxNoijw== + +helius-laserstream@0.1.8: + version "0.1.8" + resolved "https://registry.yarnpkg.com/helius-laserstream/-/helius-laserstream-0.1.8.tgz#6ee5e0bc9fe2560c03a0d2c9079b9f875c3e6bb7" + integrity sha512-jXQkwQRWiowbVPGQrGacOkI5shKPhrEixCu93OpoMtL5fs9mnhtD7XKMPi8CX0W8gsqsJjwR4NlaR+EflyANbQ== + dependencies: + "@types/protobufjs" "^6.0.0" + protobufjs "^7.5.3" + optionalDependencies: + helius-laserstream-darwin-arm64 "0.1.8" + helius-laserstream-darwin-x64 "0.1.8" + helius-laserstream-linux-arm64-gnu "0.1.8" + helius-laserstream-linux-arm64-musl "0.1.8" + helius-laserstream-linux-x64-gnu "0.1.8" + helius-laserstream-linux-x64-musl "0.1.8" + humanize-ms@^1.2.1: version "1.2.1" resolved "https://registry.yarnpkg.com/humanize-ms/-/humanize-ms-1.2.1.tgz#c46e3159a293f6b896da29316d8b6fe8bb79bbed"