Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"@triton-one/yellowstone-grpc": "1.3.0",
"anchor-bankrun": "0.3.0",
"gill": "^0.10.2",
"helius-laserstream": "0.1.8",
"nanoid": "3.3.4",
"node-cache": "5.1.2",
"rpc-websockets": "7.5.1",
Expand Down
215 changes: 215 additions & 0 deletions sdk/src/accounts/laserProgramAccountSubscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import { GrpcConfigs, ResubOpts } from './types';
import { Program } from '@coral-xyz/anchor';
import { Context, MemcmpFilter, PublicKey } from '@solana/web3.js';
import * as Buffer from 'buffer';
import { WebSocketProgramAccountSubscriber } from './webSocketProgramAccountSubscriber';

import {
LaserCommitmentLevel,
LaserSubscribe,
LaserstreamConfig,
LaserSubscribeRequest,
LaserSubscribeUpdate,
CompressionAlgorithms,
} from '../isomorphic/grpc.node';
import { CommitmentLevel } from '@triton-one/yellowstone-grpc';

type LaserCommitment =
(typeof LaserCommitmentLevel)[keyof typeof LaserCommitmentLevel];

export class LaserstreamProgramAccountSubscriber<
T,
> extends WebSocketProgramAccountSubscriber<T> {
private stream:
| {
id: string;
cancel: () => void;
write?: (req: LaserSubscribeRequest) => Promise<void>;
}
| undefined;

private commitmentLevel: CommitmentLevel;
public listenerId?: number;

private readonly laserConfig: LaserstreamConfig;

private constructor(
laserConfig: LaserstreamConfig,
commitmentLevel: CommitmentLevel,
subscriptionName: string,
accountDiscriminator: string,
program: Program,
decodeBufferFn: (accountName: string, ix: Buffer) => T,
options: { filters: MemcmpFilter[] } = { filters: [] },
resubOpts?: ResubOpts
) {
super(
subscriptionName,
accountDiscriminator,
program,
decodeBufferFn,
options,
resubOpts
);
this.laserConfig = laserConfig;
this.commitmentLevel = this.toLaserCommitment(commitmentLevel);
}

public static async create<U>(
grpcConfigs: GrpcConfigs,
subscriptionName: string,
accountDiscriminator: string,
program: Program,
decodeBufferFn: (accountName: string, ix: Buffer) => U,
options: { filters: MemcmpFilter[] } = {
filters: [],
},
resubOpts?: ResubOpts
): Promise<LaserstreamProgramAccountSubscriber<U>> {
const laserConfig: LaserstreamConfig = {
apiKey: grpcConfigs.token,
endpoint: grpcConfigs.endpoint,
maxReconnectAttempts: grpcConfigs.enableReconnect ? 10 : 0,
channelOptions: {
'grpc.default_compression_algorithm': CompressionAlgorithms.zstd,
'grpc.max_receive_message_length': 1_000_000_000,
},
};

const commitmentLevel =
grpcConfigs.commitmentLevel ?? CommitmentLevel.CONFIRMED;

return new LaserstreamProgramAccountSubscriber<U>(
laserConfig,
commitmentLevel,
subscriptionName,
accountDiscriminator,
program,
decodeBufferFn,
options,
resubOpts
);
}

async subscribe(
onChange: (
accountId: PublicKey,
data: T,
context: Context,
buffer: Buffer
) => void
): Promise<void> {
if (this.listenerId != null || this.isUnsubscribing) return;

this.onChange = onChange;

const filters = this.options.filters.map((filter) => {
return {
memcmp: {
offset: filter.memcmp.offset,
base58: filter.memcmp.bytes,
},
};
});

const request: LaserSubscribeRequest = {
slots: {},
accounts: {
drift: {
account: [],
owner: [this.program.programId.toBase58()],
filters,
},
},
transactions: {},
blocks: {},
blocksMeta: {},
accountsDataSlice: [],
commitment: this.commitmentLevel,
entry: {},
transactionsStatus: {},
};

try {
const stream = await LaserSubscribe(
this.laserConfig,
request,
async (update: LaserSubscribeUpdate) => {
if (update.account) {
const slot = Number(update.account.slot);
const acc = update.account.account;

const accountInfo = {
owner: new PublicKey(acc.owner),
lamports: Number(acc.lamports),
data: Buffer.Buffer.from(acc.data),
executable: acc.executable,
rentEpoch: Number(acc.rentEpoch),
};

const payload = {
accountId: new PublicKey(acc.pubkey),
accountInfo,
};

if (this.resubOpts?.resubTimeoutMs) {
this.receivingData = true;
clearTimeout(this.timeoutId);
this.handleRpcResponse({ slot }, payload);
this.setTimeout();
} else {
this.handleRpcResponse({ slot }, payload);
}
}
},
async (error) => {
console.error('LaserStream client error:', error);
throw error;
}
);

this.stream = stream;
this.listenerId = 1;

if (this.resubOpts?.resubTimeoutMs) {
this.receivingData = true;
this.setTimeout();
}
} catch (err) {
console.error('Failed to start LaserStream client:', err);
throw err;
}
}

public async unsubscribe(onResub = false): Promise<void> {
if (!onResub && this.resubOpts) {
this.resubOpts.resubTimeoutMs = undefined;
}
this.isUnsubscribing = true;
clearTimeout(this.timeoutId);
this.timeoutId = undefined;

if (this.listenerId != null && this.stream) {
try {
this.stream.cancel();
} finally {
this.listenerId = undefined;
this.isUnsubscribing = false;
}
} else {
this.isUnsubscribing = false;
}
}

public toLaserCommitment(
level: string | number | undefined
): LaserCommitment {
if (typeof level === 'string') {
return (
(LaserCommitmentLevel as any)[level.toUpperCase()] ??
LaserCommitmentLevel.CONFIRMED
);
}
return (level as LaserCommitment) ?? LaserCommitmentLevel.CONFIRMED;
}
}
1 change: 1 addition & 0 deletions sdk/src/accounts/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ export type GrpcConfigs = {
* Defaults to false, will throw on connection loss.
*/
enableReconnect?: boolean;
client?: 'yellowstone' | 'laser';
};

export interface HighLeverageModeConfigAccountSubscriber {
Expand Down
15 changes: 15 additions & 0 deletions sdk/src/isomorphic/grpc.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,28 @@ import type {
} from '@triton-one/yellowstone-grpc';
import { ClientDuplexStream, ChannelOptions } from '@grpc/grpc-js';

import {
CommitmentLevel as LaserCommitmentLevel,
subscribe as LaserSubscribe,
LaserstreamConfig,
SubscribeRequest as LaserSubscribeRequest,
SubscribeUpdate as LaserSubscribeUpdate,
CompressionAlgorithms,
} from 'helius-laserstream';

export {
ClientDuplexStream,
ChannelOptions,
SubscribeRequest,
SubscribeUpdate,
CommitmentLevel,
Client,
LaserSubscribe,
LaserCommitmentLevel,
LaserstreamConfig,
LaserSubscribeRequest,
LaserSubscribeUpdate,
CompressionAlgorithms,
};

// Export a function to create a new Client instance
Expand Down
42 changes: 30 additions & 12 deletions sdk/src/orderSubscriber/grpcSubscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import { OrderSubscriber } from './OrderSubscriber';
import { GrpcConfigs, ResubOpts } from '../accounts/types';
import { UserAccount } from '../types';
import { getUserFilter, getNonIdleUserFilter } from '../memcmp';
import { LaserstreamProgramAccountSubscriber } from '../accounts/laserProgramAccountSubscriber';

export class grpcSubscription {
private orderSubscriber: OrderSubscriber;
private skipInitialLoad: boolean;
private resubOpts?: ResubOpts;
private resyncIntervalMs?: number;

private subscriber?: grpcProgramAccountSubscriber<UserAccount>;
private subscriber?:
| grpcProgramAccountSubscriber<UserAccount>
| LaserstreamProgramAccountSubscriber<UserAccount>;
private resyncTimeoutId?: ReturnType<typeof setTimeout>;

private decoded?: boolean;
Expand Down Expand Up @@ -47,17 +50,32 @@ export class grpcSubscription {
return;
}

this.subscriber = await grpcProgramAccountSubscriber.create<UserAccount>(
this.grpcConfigs,
'OrderSubscriber',
'User',
this.orderSubscriber.driftClient.program,
this.orderSubscriber.decodeFn,
{
filters: [getUserFilter(), getNonIdleUserFilter()],
},
this.resubOpts
);
if (this.grpcConfigs.client === 'laser') {
this.subscriber =
await LaserstreamProgramAccountSubscriber.create<UserAccount>(
this.grpcConfigs,
'OrderSubscriber',
'User',
this.orderSubscriber.driftClient.program,
this.orderSubscriber.decodeFn,
{
filters: [getUserFilter(), getNonIdleUserFilter()],
},
this.resubOpts
);
} else {
this.subscriber = await grpcProgramAccountSubscriber.create<UserAccount>(
this.grpcConfigs,
'OrderSubscriber',
'User',
this.orderSubscriber.driftClient.program,
this.orderSubscriber.decodeFn,
{
filters: [getUserFilter(), getNonIdleUserFilter()],
},
this.resubOpts
);
}

await this.subscriber.subscribe(
(
Expand Down
45 changes: 45 additions & 0 deletions sdk/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2820,6 +2820,51 @@ he@^1.2.0:
resolved "https://registry.yarnpkg.com/he/-/he-1.2.0.tgz#84ae65fa7eafb165fddb61566ae14baf05664f0f"
integrity sha512-F/1DnUGPopORZi0ni+CvrCgHQ5FyEAHRLSApuYWMmrbSwoN2Mn/7k+Gl38gJnR7yyDZk6WLXwiGod1JOWNDKGw==

[email protected]:
version "0.1.8"
resolved "https://registry.yarnpkg.com/helius-laserstream-darwin-arm64/-/helius-laserstream-darwin-arm64-0.1.8.tgz#d78ad15e6cd16dc9379a9a365f9fcb3f958e6c01"
integrity sha512-p/K2Mi3wZnMxEYSLCvu858VyMvtJFonhdF8cLaMcszFv04WWdsK+hINNZpVRfakypvDfDPbMudEhL1Q9USD5+w==

[email protected]:
version "0.1.8"
resolved "https://registry.yarnpkg.com/helius-laserstream-darwin-x64/-/helius-laserstream-darwin-x64-0.1.8.tgz#e57bc8f03135fd3b5c01a5aebd7b87c42129da50"
integrity sha512-Hd5irFyfOqQZLdoj5a+OV7vML2YfySSBuKlOwtisMHkUuIXZ4NpAexslDmK7iP5VWRI+lOv9X/tA7BhxW7RGSQ==

[email protected]:
version "0.1.8"
resolved "https://registry.yarnpkg.com/helius-laserstream-linux-arm64-gnu/-/helius-laserstream-linux-arm64-gnu-0.1.8.tgz#1b3c8440804d143f650166842620fc334f9c319b"
integrity sha512-PlPm1dvOvTGBL1nuzK98Xe40BJq1JWNREXlBHKDVA/B+KCGQnIMJ1s6e1MevSvFE7SOix5i1BxhYIxGioK2GMg==

[email protected]:
version "0.1.8"
resolved "https://registry.yarnpkg.com/helius-laserstream-linux-arm64-musl/-/helius-laserstream-linux-arm64-musl-0.1.8.tgz#28e0645bebc3253d2a136cf0bd13f8cb5256f47b"
integrity sha512-LFadfMRuTd1zo6RZqLTgHQapo3gJYioS7wFMWFoBOFulG0BpAqHEDNobkxx0002QArw+zX29MQ/5OaOCf8kKTA==

[email protected]:
version "0.1.8"
resolved "https://registry.yarnpkg.com/helius-laserstream-linux-x64-gnu/-/helius-laserstream-linux-x64-gnu-0.1.8.tgz#e59990ca0bcdc27e46f71a8fc2c18fddbe6f07e3"
integrity sha512-IZWK/OQIe0647QqfYikLb1DFK+nYtXLJiMcpj24qnNVWBOtMXmPc1hL6ebazdEiaKt7fxNd5IiM1RqeaqZAZMw==

[email protected]:
version "0.1.8"
resolved "https://registry.yarnpkg.com/helius-laserstream-linux-x64-musl/-/helius-laserstream-linux-x64-musl-0.1.8.tgz#42aa0919ef266c40f50ac74d6f9d871d4e2e7c9c"
integrity sha512-riTS6VgxDae1fHOJ2XC/o/v1OZRbEv/3rcoa3NlAOnooDKp5HDgD0zJTcImjQHpYWwGaejx1oX/Ht53lxNoijw==

[email protected]:
version "0.1.8"
resolved "https://registry.yarnpkg.com/helius-laserstream/-/helius-laserstream-0.1.8.tgz#6ee5e0bc9fe2560c03a0d2c9079b9f875c3e6bb7"
integrity sha512-jXQkwQRWiowbVPGQrGacOkI5shKPhrEixCu93OpoMtL5fs9mnhtD7XKMPi8CX0W8gsqsJjwR4NlaR+EflyANbQ==
dependencies:
"@types/protobufjs" "^6.0.0"
protobufjs "^7.5.3"
optionalDependencies:
helius-laserstream-darwin-arm64 "0.1.8"
helius-laserstream-darwin-x64 "0.1.8"
helius-laserstream-linux-arm64-gnu "0.1.8"
helius-laserstream-linux-arm64-musl "0.1.8"
helius-laserstream-linux-x64-gnu "0.1.8"
helius-laserstream-linux-x64-musl "0.1.8"

humanize-ms@^1.2.1:
version "1.2.1"
resolved "https://registry.yarnpkg.com/humanize-ms/-/humanize-ms-1.2.1.tgz#c46e3159a293f6b896da29316d8b6fe8bb79bbed"
Expand Down
Loading