Skip to content

Commit 301a52c

Browse files
authored
Merge pull request #1900 from drift-labs/jack/add-laserstream-client
Jack/add laserstream client
2 parents 107f0a8 + 0656f34 commit 301a52c

File tree

6 files changed

+307
-12
lines changed

6 files changed

+307
-12
lines changed

sdk/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
"@triton-one/yellowstone-grpc": "1.3.0",
5656
"anchor-bankrun": "0.3.0",
5757
"gill": "^0.10.2",
58+
"helius-laserstream": "0.1.8",
5859
"nanoid": "3.3.4",
5960
"node-cache": "5.1.2",
6061
"rpc-websockets": "7.5.1",
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
import { GrpcConfigs, ResubOpts } from './types';
2+
import { Program } from '@coral-xyz/anchor';
3+
import { Context, MemcmpFilter, PublicKey } from '@solana/web3.js';
4+
import * as Buffer from 'buffer';
5+
import { WebSocketProgramAccountSubscriber } from './webSocketProgramAccountSubscriber';
6+
7+
import {
8+
LaserCommitmentLevel,
9+
LaserSubscribe,
10+
LaserstreamConfig,
11+
LaserSubscribeRequest,
12+
LaserSubscribeUpdate,
13+
CompressionAlgorithms,
14+
} from '../isomorphic/grpc.node';
15+
import { CommitmentLevel } from '@triton-one/yellowstone-grpc';
16+
17+
type LaserCommitment =
18+
(typeof LaserCommitmentLevel)[keyof typeof LaserCommitmentLevel];
19+
20+
export class LaserstreamProgramAccountSubscriber<
21+
T,
22+
> extends WebSocketProgramAccountSubscriber<T> {
23+
private stream:
24+
| {
25+
id: string;
26+
cancel: () => void;
27+
write?: (req: LaserSubscribeRequest) => Promise<void>;
28+
}
29+
| undefined;
30+
31+
private commitmentLevel: CommitmentLevel;
32+
public listenerId?: number;
33+
34+
private readonly laserConfig: LaserstreamConfig;
35+
36+
private constructor(
37+
laserConfig: LaserstreamConfig,
38+
commitmentLevel: CommitmentLevel,
39+
subscriptionName: string,
40+
accountDiscriminator: string,
41+
program: Program,
42+
decodeBufferFn: (accountName: string, ix: Buffer) => T,
43+
options: { filters: MemcmpFilter[] } = { filters: [] },
44+
resubOpts?: ResubOpts
45+
) {
46+
super(
47+
subscriptionName,
48+
accountDiscriminator,
49+
program,
50+
decodeBufferFn,
51+
options,
52+
resubOpts
53+
);
54+
this.laserConfig = laserConfig;
55+
this.commitmentLevel = this.toLaserCommitment(commitmentLevel);
56+
}
57+
58+
public static async create<U>(
59+
grpcConfigs: GrpcConfigs,
60+
subscriptionName: string,
61+
accountDiscriminator: string,
62+
program: Program,
63+
decodeBufferFn: (accountName: string, ix: Buffer) => U,
64+
options: { filters: MemcmpFilter[] } = {
65+
filters: [],
66+
},
67+
resubOpts?: ResubOpts
68+
): Promise<LaserstreamProgramAccountSubscriber<U>> {
69+
const laserConfig: LaserstreamConfig = {
70+
apiKey: grpcConfigs.token,
71+
endpoint: grpcConfigs.endpoint,
72+
maxReconnectAttempts: grpcConfigs.enableReconnect ? 10 : 0,
73+
channelOptions: {
74+
'grpc.default_compression_algorithm': CompressionAlgorithms.zstd,
75+
'grpc.max_receive_message_length': 1_000_000_000,
76+
},
77+
};
78+
79+
const commitmentLevel =
80+
grpcConfigs.commitmentLevel ?? CommitmentLevel.CONFIRMED;
81+
82+
return new LaserstreamProgramAccountSubscriber<U>(
83+
laserConfig,
84+
commitmentLevel,
85+
subscriptionName,
86+
accountDiscriminator,
87+
program,
88+
decodeBufferFn,
89+
options,
90+
resubOpts
91+
);
92+
}
93+
94+
async subscribe(
95+
onChange: (
96+
accountId: PublicKey,
97+
data: T,
98+
context: Context,
99+
buffer: Buffer
100+
) => void
101+
): Promise<void> {
102+
if (this.listenerId != null || this.isUnsubscribing) return;
103+
104+
this.onChange = onChange;
105+
106+
const filters = this.options.filters.map((filter) => {
107+
return {
108+
memcmp: {
109+
offset: filter.memcmp.offset,
110+
base58: filter.memcmp.bytes,
111+
},
112+
};
113+
});
114+
115+
const request: LaserSubscribeRequest = {
116+
slots: {},
117+
accounts: {
118+
drift: {
119+
account: [],
120+
owner: [this.program.programId.toBase58()],
121+
filters,
122+
},
123+
},
124+
transactions: {},
125+
blocks: {},
126+
blocksMeta: {},
127+
accountsDataSlice: [],
128+
commitment: this.commitmentLevel,
129+
entry: {},
130+
transactionsStatus: {},
131+
};
132+
133+
try {
134+
const stream = await LaserSubscribe(
135+
this.laserConfig,
136+
request,
137+
async (update: LaserSubscribeUpdate) => {
138+
if (update.account) {
139+
const slot = Number(update.account.slot);
140+
const acc = update.account.account;
141+
142+
const accountInfo = {
143+
owner: new PublicKey(acc.owner),
144+
lamports: Number(acc.lamports),
145+
data: Buffer.Buffer.from(acc.data),
146+
executable: acc.executable,
147+
rentEpoch: Number(acc.rentEpoch),
148+
};
149+
150+
const payload = {
151+
accountId: new PublicKey(acc.pubkey),
152+
accountInfo,
153+
};
154+
155+
if (this.resubOpts?.resubTimeoutMs) {
156+
this.receivingData = true;
157+
clearTimeout(this.timeoutId);
158+
this.handleRpcResponse({ slot }, payload);
159+
this.setTimeout();
160+
} else {
161+
this.handleRpcResponse({ slot }, payload);
162+
}
163+
}
164+
},
165+
async (error) => {
166+
console.error('LaserStream client error:', error);
167+
throw error;
168+
}
169+
);
170+
171+
this.stream = stream;
172+
this.listenerId = 1;
173+
174+
if (this.resubOpts?.resubTimeoutMs) {
175+
this.receivingData = true;
176+
this.setTimeout();
177+
}
178+
} catch (err) {
179+
console.error('Failed to start LaserStream client:', err);
180+
throw err;
181+
}
182+
}
183+
184+
public async unsubscribe(onResub = false): Promise<void> {
185+
if (!onResub && this.resubOpts) {
186+
this.resubOpts.resubTimeoutMs = undefined;
187+
}
188+
this.isUnsubscribing = true;
189+
clearTimeout(this.timeoutId);
190+
this.timeoutId = undefined;
191+
192+
if (this.listenerId != null && this.stream) {
193+
try {
194+
this.stream.cancel();
195+
} finally {
196+
this.listenerId = undefined;
197+
this.isUnsubscribing = false;
198+
}
199+
} else {
200+
this.isUnsubscribing = false;
201+
}
202+
}
203+
204+
public toLaserCommitment(
205+
level: string | number | undefined
206+
): LaserCommitment {
207+
if (typeof level === 'string') {
208+
return (
209+
(LaserCommitmentLevel as any)[level.toUpperCase()] ??
210+
LaserCommitmentLevel.CONFIRMED
211+
);
212+
}
213+
return (level as LaserCommitment) ?? LaserCommitmentLevel.CONFIRMED;
214+
}
215+
}

sdk/src/accounts/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ export type GrpcConfigs = {
234234
* Defaults to false, will throw on connection loss.
235235
*/
236236
enableReconnect?: boolean;
237+
client?: 'yellowstone' | 'laser';
237238
};
238239

239240
export interface HighLeverageModeConfigAccountSubscriber {

sdk/src/isomorphic/grpc.node.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,28 @@ import type {
66
} from '@triton-one/yellowstone-grpc';
77
import { ClientDuplexStream, ChannelOptions } from '@grpc/grpc-js';
88

9+
import {
10+
CommitmentLevel as LaserCommitmentLevel,
11+
subscribe as LaserSubscribe,
12+
LaserstreamConfig,
13+
SubscribeRequest as LaserSubscribeRequest,
14+
SubscribeUpdate as LaserSubscribeUpdate,
15+
CompressionAlgorithms,
16+
} from 'helius-laserstream';
17+
918
export {
1019
ClientDuplexStream,
1120
ChannelOptions,
1221
SubscribeRequest,
1322
SubscribeUpdate,
1423
CommitmentLevel,
1524
Client,
25+
LaserSubscribe,
26+
LaserCommitmentLevel,
27+
LaserstreamConfig,
28+
LaserSubscribeRequest,
29+
LaserSubscribeUpdate,
30+
CompressionAlgorithms,
1631
};
1732

1833
// Export a function to create a new Client instance

sdk/src/orderSubscriber/grpcSubscription.ts

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,17 @@ import { OrderSubscriber } from './OrderSubscriber';
55
import { GrpcConfigs, ResubOpts } from '../accounts/types';
66
import { UserAccount } from '../types';
77
import { getUserFilter, getNonIdleUserFilter } from '../memcmp';
8+
import { LaserstreamProgramAccountSubscriber } from '../accounts/laserProgramAccountSubscriber';
89

910
export class grpcSubscription {
1011
private orderSubscriber: OrderSubscriber;
1112
private skipInitialLoad: boolean;
1213
private resubOpts?: ResubOpts;
1314
private resyncIntervalMs?: number;
1415

15-
private subscriber?: grpcProgramAccountSubscriber<UserAccount>;
16+
private subscriber?:
17+
| grpcProgramAccountSubscriber<UserAccount>
18+
| LaserstreamProgramAccountSubscriber<UserAccount>;
1619
private resyncTimeoutId?: ReturnType<typeof setTimeout>;
1720

1821
private decoded?: boolean;
@@ -47,17 +50,32 @@ export class grpcSubscription {
4750
return;
4851
}
4952

50-
this.subscriber = await grpcProgramAccountSubscriber.create<UserAccount>(
51-
this.grpcConfigs,
52-
'OrderSubscriber',
53-
'User',
54-
this.orderSubscriber.driftClient.program,
55-
this.orderSubscriber.decodeFn,
56-
{
57-
filters: [getUserFilter(), getNonIdleUserFilter()],
58-
},
59-
this.resubOpts
60-
);
53+
if (this.grpcConfigs.client === 'laser') {
54+
this.subscriber =
55+
await LaserstreamProgramAccountSubscriber.create<UserAccount>(
56+
this.grpcConfigs,
57+
'OrderSubscriber',
58+
'User',
59+
this.orderSubscriber.driftClient.program,
60+
this.orderSubscriber.decodeFn,
61+
{
62+
filters: [getUserFilter(), getNonIdleUserFilter()],
63+
},
64+
this.resubOpts
65+
);
66+
} else {
67+
this.subscriber = await grpcProgramAccountSubscriber.create<UserAccount>(
68+
this.grpcConfigs,
69+
'OrderSubscriber',
70+
'User',
71+
this.orderSubscriber.driftClient.program,
72+
this.orderSubscriber.decodeFn,
73+
{
74+
filters: [getUserFilter(), getNonIdleUserFilter()],
75+
},
76+
this.resubOpts
77+
);
78+
}
6179

6280
await this.subscriber.subscribe(
6381
(

sdk/yarn.lock

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2820,6 +2820,51 @@ he@^1.2.0:
28202820
resolved "https://registry.yarnpkg.com/he/-/he-1.2.0.tgz#84ae65fa7eafb165fddb61566ae14baf05664f0f"
28212821
integrity sha512-F/1DnUGPopORZi0ni+CvrCgHQ5FyEAHRLSApuYWMmrbSwoN2Mn/7k+Gl38gJnR7yyDZk6WLXwiGod1JOWNDKGw==
28222822

2823+
2824+
version "0.1.8"
2825+
resolved "https://registry.yarnpkg.com/helius-laserstream-darwin-arm64/-/helius-laserstream-darwin-arm64-0.1.8.tgz#d78ad15e6cd16dc9379a9a365f9fcb3f958e6c01"
2826+
integrity sha512-p/K2Mi3wZnMxEYSLCvu858VyMvtJFonhdF8cLaMcszFv04WWdsK+hINNZpVRfakypvDfDPbMudEhL1Q9USD5+w==
2827+
2828+
2829+
version "0.1.8"
2830+
resolved "https://registry.yarnpkg.com/helius-laserstream-darwin-x64/-/helius-laserstream-darwin-x64-0.1.8.tgz#e57bc8f03135fd3b5c01a5aebd7b87c42129da50"
2831+
integrity sha512-Hd5irFyfOqQZLdoj5a+OV7vML2YfySSBuKlOwtisMHkUuIXZ4NpAexslDmK7iP5VWRI+lOv9X/tA7BhxW7RGSQ==
2832+
2833+
2834+
version "0.1.8"
2835+
resolved "https://registry.yarnpkg.com/helius-laserstream-linux-arm64-gnu/-/helius-laserstream-linux-arm64-gnu-0.1.8.tgz#1b3c8440804d143f650166842620fc334f9c319b"
2836+
integrity sha512-PlPm1dvOvTGBL1nuzK98Xe40BJq1JWNREXlBHKDVA/B+KCGQnIMJ1s6e1MevSvFE7SOix5i1BxhYIxGioK2GMg==
2837+
2838+
2839+
version "0.1.8"
2840+
resolved "https://registry.yarnpkg.com/helius-laserstream-linux-arm64-musl/-/helius-laserstream-linux-arm64-musl-0.1.8.tgz#28e0645bebc3253d2a136cf0bd13f8cb5256f47b"
2841+
integrity sha512-LFadfMRuTd1zo6RZqLTgHQapo3gJYioS7wFMWFoBOFulG0BpAqHEDNobkxx0002QArw+zX29MQ/5OaOCf8kKTA==
2842+
2843+
2844+
version "0.1.8"
2845+
resolved "https://registry.yarnpkg.com/helius-laserstream-linux-x64-gnu/-/helius-laserstream-linux-x64-gnu-0.1.8.tgz#e59990ca0bcdc27e46f71a8fc2c18fddbe6f07e3"
2846+
integrity sha512-IZWK/OQIe0647QqfYikLb1DFK+nYtXLJiMcpj24qnNVWBOtMXmPc1hL6ebazdEiaKt7fxNd5IiM1RqeaqZAZMw==
2847+
2848+
2849+
version "0.1.8"
2850+
resolved "https://registry.yarnpkg.com/helius-laserstream-linux-x64-musl/-/helius-laserstream-linux-x64-musl-0.1.8.tgz#42aa0919ef266c40f50ac74d6f9d871d4e2e7c9c"
2851+
integrity sha512-riTS6VgxDae1fHOJ2XC/o/v1OZRbEv/3rcoa3NlAOnooDKp5HDgD0zJTcImjQHpYWwGaejx1oX/Ht53lxNoijw==
2852+
2853+
2854+
version "0.1.8"
2855+
resolved "https://registry.yarnpkg.com/helius-laserstream/-/helius-laserstream-0.1.8.tgz#6ee5e0bc9fe2560c03a0d2c9079b9f875c3e6bb7"
2856+
integrity sha512-jXQkwQRWiowbVPGQrGacOkI5shKPhrEixCu93OpoMtL5fs9mnhtD7XKMPi8CX0W8gsqsJjwR4NlaR+EflyANbQ==
2857+
dependencies:
2858+
"@types/protobufjs" "^6.0.0"
2859+
protobufjs "^7.5.3"
2860+
optionalDependencies:
2861+
helius-laserstream-darwin-arm64 "0.1.8"
2862+
helius-laserstream-darwin-x64 "0.1.8"
2863+
helius-laserstream-linux-arm64-gnu "0.1.8"
2864+
helius-laserstream-linux-arm64-musl "0.1.8"
2865+
helius-laserstream-linux-x64-gnu "0.1.8"
2866+
helius-laserstream-linux-x64-musl "0.1.8"
2867+
28232868
humanize-ms@^1.2.1:
28242869
version "1.2.1"
28252870
resolved "https://registry.yarnpkg.com/humanize-ms/-/humanize-ms-1.2.1.tgz#c46e3159a293f6b896da29316d8b6fe8bb79bbed"

0 commit comments

Comments
 (0)