Skip to content

Commit 6ab6a35

Browse files
authored
Merge pull request #1879 from drift-labs/jack/add-laserstream-client
chore: add laserstream client
2 parents 9228924 + 42c8b10 commit 6ab6a35

File tree

5 files changed

+317
-12
lines changed

5 files changed

+317
-12
lines changed

sdk/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
"@triton-one/yellowstone-grpc": "1.3.0",
5656
"anchor-bankrun": "0.3.0",
5757
"gill": "^0.10.2",
58+
"helius-laserstream": "^0.1.7",
5859
"nanoid": "3.3.4",
5960
"node-cache": "5.1.2",
6061
"rpc-websockets": "7.5.1",
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
import { GrpcConfigs, ResubOpts } from './types';
2+
import { Program } from '@coral-xyz/anchor';
3+
import { Context, MemcmpFilter, PublicKey } from '@solana/web3.js';
4+
import * as Buffer from 'buffer';
5+
import { WebSocketProgramAccountSubscriber } from './webSocketProgramAccountSubscriber';
6+
7+
import {
8+
CommitmentLevel as LaserCommitmentLevel,
9+
subscribe as LaserSubscribe,
10+
LaserstreamConfig,
11+
SubscribeRequest,
12+
SubscribeUpdate,
13+
CompressionAlgorithms,
14+
} from 'helius-laserstream';
15+
import { CommitmentLevel } from '@triton-one/yellowstone-grpc';
16+
17+
type LaserCommitment =
18+
(typeof LaserCommitmentLevel)[keyof typeof LaserCommitmentLevel];
19+
20+
export class LaserstreamProgramAccountSubscriber<
21+
T,
22+
> extends WebSocketProgramAccountSubscriber<T> {
23+
private stream:
24+
| {
25+
id: string;
26+
cancel: () => void;
27+
write?: (req: SubscribeRequest) => Promise<void>;
28+
}
29+
| undefined;
30+
31+
private commitmentLevel: CommitmentLevel;
32+
public listenerId?: number;
33+
34+
private readonly laserConfig: LaserstreamConfig;
35+
36+
private constructor(
37+
laserConfig: LaserstreamConfig,
38+
commitmentLevel: CommitmentLevel,
39+
subscriptionName: string,
40+
accountDiscriminator: string,
41+
program: Program,
42+
decodeBufferFn: (accountName: string, ix: Buffer) => T,
43+
options: { filters: MemcmpFilter[] } = { filters: [] },
44+
resubOpts?: ResubOpts
45+
) {
46+
super(
47+
subscriptionName,
48+
accountDiscriminator,
49+
program,
50+
decodeBufferFn,
51+
options,
52+
resubOpts
53+
);
54+
this.laserConfig = laserConfig;
55+
this.commitmentLevel = this.toLaserCommitment(commitmentLevel);
56+
}
57+
58+
public static async create<U>(
59+
grpcConfigs: GrpcConfigs,
60+
subscriptionName: string,
61+
accountDiscriminator: string,
62+
program: Program,
63+
decodeBufferFn: (accountName: string, ix: Buffer) => U,
64+
options: { filters: MemcmpFilter[] } = {
65+
filters: [],
66+
},
67+
resubOpts?: ResubOpts
68+
): Promise<LaserstreamProgramAccountSubscriber<U>> {
69+
const laserConfig: LaserstreamConfig = {
70+
apiKey: grpcConfigs.token,
71+
endpoint: grpcConfigs.endpoint,
72+
maxReconnectAttempts: grpcConfigs.enableReconnect ? 10 : 0,
73+
channelOptions: {
74+
'grpc.default_compression_algorithm': CompressionAlgorithms.zstd,
75+
'grpc.max_receive_message_length': 1_000_000_000,
76+
},
77+
};
78+
79+
const commitmentLevel =
80+
grpcConfigs.commitmentLevel ?? CommitmentLevel.CONFIRMED;
81+
82+
return new LaserstreamProgramAccountSubscriber<U>(
83+
laserConfig,
84+
commitmentLevel,
85+
subscriptionName,
86+
accountDiscriminator,
87+
program,
88+
decodeBufferFn,
89+
options,
90+
resubOpts
91+
);
92+
}
93+
94+
async subscribe(
95+
onChange: (
96+
accountId: PublicKey,
97+
data: T,
98+
context: Context,
99+
buffer: Buffer
100+
) => void
101+
): Promise<void> {
102+
if (this.listenerId != null || this.isUnsubscribing) return;
103+
104+
this.onChange = onChange;
105+
106+
const filters = this.options.filters.map((filter) => {
107+
return {
108+
memcmp: {
109+
offset: filter.memcmp.offset,
110+
base58: filter.memcmp.bytes,
111+
},
112+
};
113+
});
114+
115+
const request: SubscribeRequest = {
116+
slots: {},
117+
accounts: {
118+
drift: {
119+
account: [],
120+
owner: [this.program.programId.toBase58()],
121+
filters,
122+
},
123+
},
124+
transactions: {},
125+
blocks: {},
126+
blocksMeta: {},
127+
accountsDataSlice: [],
128+
commitment: this.commitmentLevel,
129+
entry: {},
130+
transactionsStatus: {},
131+
};
132+
133+
try {
134+
const stream = await LaserSubscribe(
135+
this.laserConfig,
136+
request,
137+
async (update: SubscribeUpdate) => {
138+
if (update.account) {
139+
const slot = Number(update.account.slot);
140+
const acc = update.account.account;
141+
142+
const accountInfo = {
143+
owner: new PublicKey(acc.owner),
144+
lamports: Number(acc.lamports),
145+
data: Buffer.Buffer.from(acc.data),
146+
executable: acc.executable,
147+
rentEpoch: Number(acc.rentEpoch),
148+
};
149+
150+
const payload = {
151+
accountId: new PublicKey(acc.pubkey),
152+
accountInfo,
153+
};
154+
155+
if (this.resubOpts?.resubTimeoutMs) {
156+
this.receivingData = true;
157+
clearTimeout(this.timeoutId);
158+
this.handleRpcResponse({ slot }, payload);
159+
this.setTimeout();
160+
} else {
161+
this.handleRpcResponse({ slot }, payload);
162+
}
163+
}
164+
},
165+
async (error) => {
166+
console.error('LaserStream client error:', error);
167+
throw error;
168+
}
169+
);
170+
171+
this.stream = stream;
172+
this.listenerId = 1;
173+
174+
if (this.resubOpts?.resubTimeoutMs) {
175+
this.receivingData = true;
176+
this.setTimeout();
177+
}
178+
} catch (err) {
179+
console.error('Failed to start LaserStream client:', err);
180+
throw err;
181+
}
182+
}
183+
184+
public async unsubscribe(onResub = false): Promise<void> {
185+
if (!onResub && this.resubOpts) {
186+
this.resubOpts.resubTimeoutMs = undefined;
187+
}
188+
this.isUnsubscribing = true;
189+
clearTimeout(this.timeoutId);
190+
this.timeoutId = undefined;
191+
192+
if (this.listenerId != null && this.stream) {
193+
try {
194+
this.stream.cancel();
195+
} finally {
196+
this.listenerId = undefined;
197+
this.isUnsubscribing = false;
198+
}
199+
} else {
200+
this.isUnsubscribing = false;
201+
}
202+
}
203+
204+
public toLaserCommitment(
205+
level: string | number | undefined
206+
): LaserCommitment {
207+
if (typeof level === 'string') {
208+
return (
209+
(LaserCommitmentLevel as any)[level.toUpperCase()] ??
210+
LaserCommitmentLevel.CONFIRMED
211+
);
212+
}
213+
return (level as LaserCommitment) ?? LaserCommitmentLevel.CONFIRMED;
214+
}
215+
}

sdk/src/accounts/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ export type GrpcConfigs = {
234234
* Defaults to false, will throw on connection loss.
235235
*/
236236
enableReconnect?: boolean;
237+
client?: 'yellowstone' | 'lazer';
237238
};
238239

239240
export interface HighLeverageModeConfigAccountSubscriber {

sdk/src/orderSubscriber/grpcSubscription.ts

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,17 @@ import { OrderSubscriber } from './OrderSubscriber';
55
import { GrpcConfigs, ResubOpts } from '../accounts/types';
66
import { UserAccount } from '../types';
77
import { getUserFilter, getNonIdleUserFilter } from '../memcmp';
8+
import { LaserstreamProgramAccountSubscriber } from '../accounts/laserProgramAccountSubscriber';
89

910
export class grpcSubscription {
1011
private orderSubscriber: OrderSubscriber;
1112
private skipInitialLoad: boolean;
1213
private resubOpts?: ResubOpts;
1314
private resyncIntervalMs?: number;
1415

15-
private subscriber?: grpcProgramAccountSubscriber<UserAccount>;
16+
private subscriber?:
17+
| grpcProgramAccountSubscriber<UserAccount>
18+
| LaserstreamProgramAccountSubscriber<UserAccount>;
1619
private resyncTimeoutId?: ReturnType<typeof setTimeout>;
1720

1821
private decoded?: boolean;
@@ -47,17 +50,32 @@ export class grpcSubscription {
4750
return;
4851
}
4952

50-
this.subscriber = await grpcProgramAccountSubscriber.create<UserAccount>(
51-
this.grpcConfigs,
52-
'OrderSubscriber',
53-
'User',
54-
this.orderSubscriber.driftClient.program,
55-
this.orderSubscriber.decodeFn,
56-
{
57-
filters: [getUserFilter(), getNonIdleUserFilter()],
58-
},
59-
this.resubOpts
60-
);
53+
if (this.grpcConfigs.client === 'lazer') {
54+
this.subscriber =
55+
await LaserstreamProgramAccountSubscriber.create<UserAccount>(
56+
this.grpcConfigs,
57+
'OrderSubscriber',
58+
'User',
59+
this.orderSubscriber.driftClient.program,
60+
this.orderSubscriber.decodeFn,
61+
{
62+
filters: [getUserFilter(), getNonIdleUserFilter()],
63+
},
64+
this.resubOpts
65+
);
66+
} else {
67+
this.subscriber = await grpcProgramAccountSubscriber.create<UserAccount>(
68+
this.grpcConfigs,
69+
'OrderSubscriber',
70+
'User',
71+
this.orderSubscriber.driftClient.program,
72+
this.orderSubscriber.decodeFn,
73+
{
74+
filters: [getUserFilter(), getNonIdleUserFilter()],
75+
},
76+
this.resubOpts
77+
);
78+
}
6179

6280
await this.subscriber.subscribe(
6381
(

sdk/yarn.lock

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1281,6 +1281,13 @@
12811281
dependencies:
12821282
undici-types "~5.26.4"
12831283

1284+
"@types/protobufjs@^6.0.0":
1285+
version "6.0.0"
1286+
resolved "https://registry.yarnpkg.com/@types/protobufjs/-/protobufjs-6.0.0.tgz#aeabb43f9507bb19c8adfb479584c151082353e4"
1287+
integrity sha512-A27RDExpAf3rdDjIrHKiJK6x8kqqJ4CmoChwtipfhVAn1p7+wviQFFP7dppn8FslSbHtQeVPvi8wNKkDjSYjHw==
1288+
dependencies:
1289+
protobufjs "*"
1290+
12841291
"@types/semver@^7.5.0":
12851292
version "7.7.0"
12861293
resolved "https://registry.yarnpkg.com/@types/semver/-/semver-7.7.0.tgz#64c441bdae033b378b6eef7d0c3d77c329b9378e"
@@ -2787,6 +2794,51 @@ he@^1.2.0:
27872794
resolved "https://registry.yarnpkg.com/he/-/he-1.2.0.tgz#84ae65fa7eafb165fddb61566ae14baf05664f0f"
27882795
integrity sha512-F/1DnUGPopORZi0ni+CvrCgHQ5FyEAHRLSApuYWMmrbSwoN2Mn/7k+Gl38gJnR7yyDZk6WLXwiGod1JOWNDKGw==
27892796

2797+
2798+
version "0.1.7"
2799+
resolved "https://registry.yarnpkg.com/helius-laserstream-darwin-arm64/-/helius-laserstream-darwin-arm64-0.1.7.tgz#c84df402bdb8a2159bcfc2711bf2b64fe09edd24"
2800+
integrity sha512-oMkt6qr7EQLfgiOCVO/9lTQLi8futBVqhUSRvsCmYcAqmaFNsvmf+/rRVq/o56+iq0PseqV6DcRF+5s88tYEIg==
2801+
2802+
2803+
version "0.1.7"
2804+
resolved "https://registry.yarnpkg.com/helius-laserstream-darwin-x64/-/helius-laserstream-darwin-x64-0.1.7.tgz#fe15b0513a8d1f55075f27b1a5836bf73ec470ff"
2805+
integrity sha512-88utpg/ZMtsnF9RX268D50fl6B3kQOFS0nCzoenS1CwMvMprTqHSEDqYFgBX8O7t52gox5aw8+x3XqhHX0fIMQ==
2806+
2807+
2808+
version "0.1.7"
2809+
resolved "https://registry.yarnpkg.com/helius-laserstream-linux-arm64-gnu/-/helius-laserstream-linux-arm64-gnu-0.1.7.tgz#cc7a636bc12961d314a4c46008d63f4fb6fe6b03"
2810+
integrity sha512-4YQuISaa3OWOBQCUnslT+HguuGRMO1KRQeWSjtuSHDYn7oO/KXePCtEo9vqnQx7HtQFDnS1/kuFExIa2L4Sp2w==
2811+
2812+
2813+
version "0.1.7"
2814+
resolved "https://registry.yarnpkg.com/helius-laserstream-linux-arm64-musl/-/helius-laserstream-linux-arm64-musl-0.1.7.tgz#96dcf87b855698e9f28214d96031d0934324db45"
2815+
integrity sha512-VyN/5nzqUtBN88PLDIMVJmFDgImMCL1sjBWKf49ppAWKN3LVvYVYT8tGMi4nzb5vj7ObduIi1ZZ+cGWNM6H2kA==
2816+
2817+
2818+
version "0.1.7"
2819+
resolved "https://registry.yarnpkg.com/helius-laserstream-linux-x64-gnu/-/helius-laserstream-linux-x64-gnu-0.1.7.tgz#0bcaed9697b975033e86a74ab4d734ee3d4d3e1c"
2820+
integrity sha512-0//p5wlITWbWKBaW2CIYfS3/9fBJCvMn8fBvBKot28psIWSQ6Uc5u/IqS2ls438NvTiEvBp6pgScWoYHXKU+VQ==
2821+
2822+
2823+
version "0.1.7"
2824+
resolved "https://registry.yarnpkg.com/helius-laserstream-linux-x64-musl/-/helius-laserstream-linux-x64-musl-0.1.7.tgz#6c38ca7f97bd1ff46947794fd907ecd8492a0249"
2825+
integrity sha512-4VFxsKE+X3Jj/DBrdKQCUs/6ljuYadiPrF5QBMIyaflzZka/hOOvd2bDAo8Bi9/qGGCCaJG6F3U3OFKGKSiE9w==
2826+
2827+
helius-laserstream@^0.1.7:
2828+
version "0.1.7"
2829+
resolved "https://registry.yarnpkg.com/helius-laserstream/-/helius-laserstream-0.1.7.tgz#67f9d570f56ba9bb801d210ebff5e9cf1cc97faa"
2830+
integrity sha512-xsCbc8dApJpLb6OShOCeJ5/6pQMdGk6sQojEgihTNGZaGhaAzwYzJcxL5q4uszE3qG/viJO67Mi/MxArDb+QaQ==
2831+
dependencies:
2832+
"@types/protobufjs" "^6.0.0"
2833+
protobufjs "^7.5.3"
2834+
optionalDependencies:
2835+
helius-laserstream-darwin-arm64 "0.1.7"
2836+
helius-laserstream-darwin-x64 "0.1.7"
2837+
helius-laserstream-linux-arm64-gnu "0.1.7"
2838+
helius-laserstream-linux-arm64-musl "0.1.7"
2839+
helius-laserstream-linux-x64-gnu "0.1.7"
2840+
helius-laserstream-linux-x64-musl "0.1.7"
2841+
27902842
humanize-ms@^1.2.1:
27912843
version "1.2.1"
27922844
resolved "https://registry.yarnpkg.com/humanize-ms/-/humanize-ms-1.2.1.tgz#c46e3159a293f6b896da29316d8b6fe8bb79bbed"
@@ -3635,6 +3687,24 @@ pretty-ms@^7.0.1:
36353687
dependencies:
36363688
parse-ms "^2.1.0"
36373689

3690+
protobufjs@*, protobufjs@^7.5.3:
3691+
version "7.5.4"
3692+
resolved "https://registry.yarnpkg.com/protobufjs/-/protobufjs-7.5.4.tgz#885d31fe9c4b37f25d1bb600da30b1c5b37d286a"
3693+
integrity sha512-CvexbZtbov6jW2eXAvLukXjXUW1TzFaivC46BpWc/3BpcCysb5Vffu+B3XHMm8lVEuy2Mm4XGex8hBSg1yapPg==
3694+
dependencies:
3695+
"@protobufjs/aspromise" "^1.1.2"
3696+
"@protobufjs/base64" "^1.1.2"
3697+
"@protobufjs/codegen" "^2.0.4"
3698+
"@protobufjs/eventemitter" "^1.1.0"
3699+
"@protobufjs/fetch" "^1.1.0"
3700+
"@protobufjs/float" "^1.0.2"
3701+
"@protobufjs/inquire" "^1.1.0"
3702+
"@protobufjs/path" "^1.1.2"
3703+
"@protobufjs/pool" "^1.1.0"
3704+
"@protobufjs/utf8" "^1.1.0"
3705+
"@types/node" ">=13.7.0"
3706+
long "^5.0.0"
3707+
36383708
protobufjs@^7.2.5, protobufjs@^7.4.0:
36393709
version "7.5.3"
36403710
resolved "https://registry.yarnpkg.com/protobufjs/-/protobufjs-7.5.3.tgz#13f95a9e3c84669995ec3652db2ac2fb00b89363"

0 commit comments

Comments
 (0)