Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 46 additions & 17 deletions lazer/sdk/js/examples/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
/* eslint-disable no-console */
/* eslint-disable @typescript-eslint/no-empty-function */

import { PythLazerClient } from "../src/index.js";

/* eslint-disable no-console */
const client = new PythLazerClient("ws://127.0.0.1:1234/v1/stream", "ctoken1");
// Ignore debug messages
console.debug = () => {};

const client = new PythLazerClient(
["wss://pyth-lazer.dourolabs.app/v1/stream"],
"access_token",
3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3.
console // Optionally log socket operations (to the console in this case.)
);

client.addMessageListener((message) => {
console.log("got message:", message);
console.info("got message:", message);
switch (message.type) {
case "json": {
if (message.value.type == "streamUpdated") {
console.log(
console.info(
"stream updated for subscription",
message.value.subscriptionId,
":",
Expand All @@ -18,24 +29,42 @@ client.addMessageListener((message) => {
}
case "binary": {
if ("solana" in message.value) {
console.log("solana message:", message.value.solana?.toString("hex"));
console.info("solana message:", message.value.solana?.toString("hex"));
}
if ("evm" in message.value) {
console.log("evm message:", message.value.evm?.toString("hex"));
console.info("evm message:", message.value.evm?.toString("hex"));
}
break;
}
}
});
client.ws.addEventListener("open", () => {
client.send({
type: "subscribe",
subscriptionId: 1,
priceFeedIds: [1, 2],
properties: ["price"],
chains: ["solana"],
deliveryFormat: "json",
channel: "fixed_rate@200ms",
jsonBinaryEncoding: "hex",
});

// Create and remove one or more subscriptions on the fly
await client.subscribe({
type: "subscribe",
subscriptionId: 1,
priceFeedIds: [1, 2],
properties: ["price"],
chains: ["solana"],
deliveryFormat: "binary",
channel: "fixed_rate@200ms",
parsed: false,
jsonBinaryEncoding: "base64",
});
await client.subscribe({
type: "subscribe",
subscriptionId: 2,
priceFeedIds: [1, 2, 3, 4, 5],
properties: ["price"],
chains: ["evm"],
deliveryFormat: "json",
channel: "fixed_rate@200ms",
parsed: true,
jsonBinaryEncoding: "hex",
});

await new Promise((resolve) => setTimeout(resolve, 10_000));

await client.unsubscribe(1);
await client.unsubscribe(2);
client.shutdown();
6 changes: 4 additions & 2 deletions lazer/sdk/js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pythnetwork/pyth-lazer-sdk",
"version": "0.1.2",
"version": "0.2.0",
"description": "Pyth Lazer SDK",
"publishConfig": {
"access": "public"
Expand Down Expand Up @@ -63,6 +63,8 @@
"@solana/buffer-layout": "^4.0.1",
"@solana/web3.js": "^1.98.0",
"isomorphic-ws": "^5.0.0",
"ws": "^8.18.0"
"ws": "^8.18.0",
"@isaacs/ttlcache": "^1.4.1",
"ts-log": "^2.2.7"
}
}
69 changes: 46 additions & 23 deletions lazer/sdk/js/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import WebSocket from "isomorphic-ws";
import { dummyLogger, type Logger } from "ts-log";

import {
BINARY_UPDATE_FORMAT_MAGIC,
Expand All @@ -9,6 +10,7 @@ import {
type Response,
SOLANA_FORMAT_MAGIC_BE,
} from "./protocol.js";
import { WebSocketPool } from "./socket/web-socket-pool.js";

export type BinaryResponse = {
subscriptionId: number;
Expand All @@ -28,52 +30,58 @@ const UINT32_NUM_BYTES = 4;
const UINT64_NUM_BYTES = 8;

export class PythLazerClient {
ws: WebSocket;
wsp: WebSocketPool;

constructor(url: string, token: string) {
const finalUrl = new URL(url);
finalUrl.searchParams.append("ACCESS_TOKEN", token);
this.ws = new WebSocket(finalUrl);
/**
* Creates a new PythLazerClient instance.
* @param urls - List of WebSocket URLs of the Pyth Lazer service
* @param token - The access token for authentication
* @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream.
* @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`.
*/
constructor(
urls: string[],
token: string,
numConnections = 3,
logger: Logger = dummyLogger
) {
this.wsp = new WebSocketPool(urls, token, numConnections, logger);
}

addMessageListener(handler: (event: JsonOrBinaryResponse) => void) {
this.ws.addEventListener("message", (event: WebSocket.MessageEvent) => {
if (typeof event.data == "string") {
this.wsp.addMessageListener((data: WebSocket.Data) => {
if (typeof data == "string") {
handler({
type: "json",
value: JSON.parse(event.data) as Response,
value: JSON.parse(data) as Response,
});
} else if (Buffer.isBuffer(event.data)) {
} else if (Buffer.isBuffer(data)) {
let pos = 0;
const magic = event.data
.subarray(pos, pos + UINT32_NUM_BYTES)
.readUint32BE();
const magic = data.subarray(pos, pos + UINT32_NUM_BYTES).readUint32BE();
pos += UINT32_NUM_BYTES;
if (magic != BINARY_UPDATE_FORMAT_MAGIC) {
throw new Error("binary update format magic mismatch");
}
// TODO: some uint64 values may not be representable as Number.
const subscriptionId = Number(
event.data.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE()
data.subarray(pos, pos + UINT64_NUM_BYTES).readBigInt64BE()
);
pos += UINT64_NUM_BYTES;

const value: BinaryResponse = { subscriptionId };
while (pos < event.data.length) {
const len = event.data
.subarray(pos, pos + UINT16_NUM_BYTES)
.readUint16BE();
while (pos < data.length) {
const len = data.subarray(pos, pos + UINT16_NUM_BYTES).readUint16BE();
pos += UINT16_NUM_BYTES;
const magic = event.data
const magic = data
.subarray(pos, pos + UINT32_NUM_BYTES)
.readUint32BE();
if (magic == EVM_FORMAT_MAGIC) {
value.evm = event.data.subarray(pos, pos + len);
value.evm = data.subarray(pos, pos + len);
} else if (magic == SOLANA_FORMAT_MAGIC_BE) {
value.solana = event.data.subarray(pos, pos + len);
value.solana = data.subarray(pos, pos + len);
} else if (magic == PARSED_FORMAT_MAGIC) {
value.parsed = JSON.parse(
event.data.subarray(pos + UINT32_NUM_BYTES, pos + len).toString()
data.subarray(pos + UINT32_NUM_BYTES, pos + len).toString()
) as ParsedPayload;
} else {
throw new Error("unknown magic: " + magic.toString());
Expand All @@ -87,7 +95,22 @@ export class PythLazerClient {
});
}

send(request: Request) {
this.ws.send(JSON.stringify(request));
async subscribe(request: Request): Promise<void> {
if (request.type !== "subscribe") {
throw new Error("Request must be a subscribe request");
}
await this.wsp.addSubscription(request);
}

async unsubscribe(subscriptionId: number): Promise<void> {
await this.wsp.removeSubscription(subscriptionId);
}

async send(request: Request): Promise<void> {
await this.wsp.sendRequest(request);
}

shutdown(): void {
this.wsp.shutdown();
}
}
Loading
Loading