Skip to content

Conversation

@ThomasRalee
Copy link
Collaborator

@ThomasRalee ThomasRalee commented Feb 6, 2026

RFQ API and Streaming Support

  • gRPC & Websocket Streaming Architecture:
    Introduces a modular, real-time streaming architecture for RFQ (Request For Quote) data, supporting both gRPC and websocket protocols. The websocket layer, implemented within the ws directory, allows scalable and efficient push-based delivery of RFQ updates for clients who prefer websocket over pure gRPC streaming.

  • New API and Transformers:

    • Added IndexerGrpcRFQApi class to provide methods for submitting RFQ requests and quotes, and for fetching settlements via gRPC.
    • Introduced IndexerGrpcRfqTransformer and IndexerRfqStreamTransformer to handle transformation of RFQ-related gRPC responses into SDK-friendly types and streaming callback formats. [1] [2]
    • Implemented IndexerGrpcRfqStreamV2 to enable event-based streaming of RFQ requests, quotes, and settlements, with support for both gRPC and websocket transports.
    • The websocket infrastructure in ws/ allows SDK consumers to subscribe to real-time RFQ events using websockets, making integration easier for web, Node.js, and other environments.
    • Added corresponding exports for the new API, streaming, and transformer modules to the SDK's public interface. [1] [2] [3] [4]
  • Developer Flexibility:
    Developers can choose between gRPC streaming or websocket event subscription based on their infrastructure and requirements, enabling highly interactive and responsive RFQ-enabled applications.

@coderabbitai
Copy link

coderabbitai bot commented Feb 6, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR introduces comprehensive RFQ (Request for Quote) functionality to the Injective TypeScript SDK, adding gRPC API clients, WebSocket-based bidirectional streaming, type definitions, transformers, and transport infrastructure to support both taker and maker RFQ operations across indexer services.

Changes

Cohort / File(s) Summary
RFQ Exception Module
packages/exceptions/src/exceptions/types/modules.ts
Added RFQ indexer error module constant RFQ: 'indexer-rfq' to IndexerErrorModule.
RFQ gRPC API Client
packages/sdk-ts/src/client/indexer/grpc/IndexerGrpcRfqApi.ts, IndexerGrpcRfqApi.spec.ts
Introduced IndexerGrpcRFQApi class with methods for submitting requests/quotes and fetching open requests, pending quotes, and settlements via gRPC; includes comprehensive test coverage.
RFQ gRPC Stream API
packages/sdk-ts/src/client/indexer/grpc_stream/streamV2/IndexerGrpcRfqStreamV2.ts
Added V2 streaming client for RFQ data with callbacks for requests, quotes, and settlements via GrpcWebRpcTransport.
RFQ Types and Transformers
packages/sdk-ts/src/client/indexer/types/rfq.ts, transformers/IndexerGrpcRfqTransformer.ts, transformers/IndexerRfqStreamTransformer.ts
Defined RFQ domain types (RFQRequest, RFQQuote, RFQSettlement) and static transformer methods mapping protobuf shapes to domain models.
RFQ WebSocket Infrastructure
packages/sdk-ts/src/client/indexer/ws/GrpcWebSocketCodec.ts, GrpcWebSocketTransport.ts, types.ts
Implemented low-level WebSocket transport with gRPC framing (5-byte headers), state management, reconnection with backoff, and comprehensive type definitions for transport events and configurations.
RFQ WebSocket Stream Clients
packages/sdk-ts/src/client/indexer/ws/rfq/IndexerWsTakerStream.ts, IndexerWsMakerStream.ts
Added high-level WebSocket stream APIs for taker and maker roles with event-driven lifecycle, message decoding, and periodic ping management.
Module Exports and Public API
packages/sdk-ts/src/client/indexer/grpc/index.ts, grpc_stream/index.ts, transformers/index.ts, types/index.ts, ws/index.ts, index.ts
Updated barrel exports to expose RFQ APIs, streams, types, and WebSocket components to public module surface.
Dependency Version Update
packages/sdk-ts/package.json
Bumped @injectivelabs/indexer-proto-ts-v2 from 1.17.6 to 1.17.7-alpha.4.
Proto Generation and Publishing
proto/indexer/gen.sh, protoV2/core/publish.sh, protoV2/indexer/gen.sh, protoV2/indexer/publish.sh, protoV2/mito/publish.sh
Updated git branch reference to f/rfq in indexer proto generation; refactored publish scripts to support explicit version arguments and prerelease tag detection.
Proto Index Exports
protoV2/indexer/src/index.template.d.ts, index.template.ts, index.ts, protoV2/core/src/index.template.d.ts, protoV2/abacus/src/index.template.d.ts, protoV2/mito/src/index.template.d.ts
Added InjectiveRfqRPCClient and InjectiveRfqRpcPb exports to indexer; removed large barrel exports from core and mito index files to reduce public API surface bloat.
Documentation
packages/sdk-ts/src/client/indexer/ws/README.md
Added comprehensive documentation for WebSocket RFQ streaming feature set, 5-byte framing details, isomorphic transport behavior, and usage examples.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant TakerStream as IndexerWsTakerStream
    participant Transport as GrpcWebSocketTransport
    participant Codec as GrpcWebSocketCodec
    participant Server as RFQ Server

    User->>TakerStream: connect()
    TakerStream->>Transport: connect()
    Transport->>Server: WebSocket.open()
    Server-->>Transport: onopen
    Transport-->>TakerStream: emit('connect')
    TakerStream->>TakerStream: startPingInterval()

    User->>TakerStream: sendRequest(rfqRequest)
    TakerStream->>Codec: encodeTakerRequest(rfqRequest)
    Codec-->>TakerStream: Uint8Array (gRPC frame)
    TakerStream->>Transport: send(frameData)
    Transport->>Server: WebSocket.send(frameData)

    Server-->>Transport: onmessage(frameData)
    Transport->>Codec: decodeTakerResponse(frameData)
    Codec-->>Transport: GrpcFrame<TakerStreamResponse>
    Transport->>TakerStream: emit('message', frameData)
    TakerStream->>TakerStream: handleMessage(frameData)
    TakerStream-->>User: emit('quote', transformedQuote)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

This PR introduces substantial new functionality across multiple cohesive subsystems: RFQ gRPC/WebSocket clients, low-level transport layer with framing and reconnection logic, comprehensive type system, multiple transformers, and stream implementations. The changes span heterogeneous domains (transport protocols, stream handling, protobuf mapping) with dense logic in transport/codec modules, requiring careful review of frame handling, event flow, and type safety.

Poem

🐰 Hops through the codebase with glee!

With RFQ streams now flowing free,
From taker to maker, quotes dance with ease,
Five-byte frames hop through the breeze,
WebSockets and gRPC in harmony—
This feature brings joy, if you please! 🌟

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'feat/rfq' is vague and generic, using only a feature prefix and module name without describing the actual changes or deliverables. Enhance the title to clearly describe the primary feature or capability being added, e.g., 'Add RFQ API client with gRPC and WebSocket support' or 'Implement RFQ indexer streaming for Injective SDK'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/rfq

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Fix all issues with AI agents
In `@packages/sdk-ts/src/client/indexer/grpc/IndexerGrpcRfqApi.spec.ts`:
- Around line 9-34: The test "submitRequest" wraps the API call in a try/catch
which swallows failures and uses a tautological assertion; remove the try/catch
so exceptions from indexerGrpcRfqApi.submitRequest propagate and fail the test,
and replace the tautological expect(response).toEqual(expect.objectContaining<{
status: string }>(response)) with concrete assertions such as
expect(response).toBeDefined(), expect(response).toHaveProperty('status'), and a
type check like expect(typeof response.status).toBe('string') (adjust
property/type checks to match the actual response shape).
- Around line 55-80: The test labeled "submitQuote" incorrectly calls
indexerGrpcRfqApi.submitRequest and logs "IndexerGrpcRFQApi.submitRequest";
change the call to indexerGrpcRfqApi.submitQuote and update the catch log string
to "IndexerGrpcRFQApi.submitQuote" (and any related variable names/messages) so
the test invokes and reports the correct method.

In `@packages/sdk-ts/src/client/indexer/ws/GrpcWebSocketTransport.ts`:
- Around line 159-166: The createWebSocket function currently calls
require('ws') which fails in ESM strict mode; replace that fallback with a
dynamic import and construct the socket from the module default (e.g.
import('ws').then(m => new m.default(url, protocol))) or alternatively add at
top: import { createRequire } from 'module' and use const require =
createRequire(import.meta.url) before calling require('ws'); also ensure the ws
package is declared in dependencies so the dynamic import/require succeeds at
runtime.

In `@packages/sdk-ts/src/client/indexer/ws/types.ts`:
- Around line 240-267: Update the incorrect JSDoc comments on the RFQQuoteInput
interface: replace the repeated "/** Market ID */" comments for chainId and
contractAddress with accurate descriptions (e.g., chainId -> "/** Chain ID */"
and contractAddress -> "/** Contract address */"), and ensure marketId retains a
proper "/** Market ID */" comment; modify the comments directly on the
RFQQuoteInput type so each field (chainId, contractAddress, marketId) has the
correct, unique JSDoc description.

In `@protoV2/indexer/src/index.d.ts`:
- Line 19: The exports for InjectiveRfqRPCClient
(injective_rfq_rpc_pb.client.js) and injective_rfq_rpc_pb.js are referencing
missing generated RFQ protobuf files; run or add the protobuf generation that
produces these files before importing/exporting them, or gate the exports to
only export when the generated modules exist. Locate the export statements
referencing InjectiveRfqRPCClient and the injective_rfq_rpc_pb module in
index.d.ts and ensure the build step that runs the RFQ proto generator is
invoked prior to module resolution (or commit the generated files into
protoV2/indexer/src/generated/) so the imports succeed at runtime.
🧹 Nitpick comments (11)
proto/indexer/gen.sh (1)

10-10: Prefer a stable ref (tag/commit) over a feature branch for reproducible clones.
Feature branches like f/rfq can be deleted or rebased, breaking the clone step if this script is ever re-enabled. Consider pinning to a tag/commit or at least verify the branch is long-lived.

protoV2/indexer/gen.sh (1)

15-16: Avoid deleting lockfiles pre-install; it can dirty the repo and lose lock state on failure.
Consider preserving lockfiles and using npm ci when package-lock.json exists.

💡 Suggested safer install flow
-  rm -f package-lock.json pnpm-lock.yaml
-  npm install || {
+  if [ -f package-lock.json ]; then
+    npm ci || {
+      echo "❌ Failed to install dependencies"
+      exit 1
+    }
+  else
+    npm install || {
+      echo "❌ Failed to install dependencies"
+      exit 1
+    }
+  fi
-    echo "❌ Failed to install dependencies"
-    exit 1
-  }
protoV2/indexer/publish.sh (1)

38-42: Inconsistent with other publish scripts in this PR.

The mito/publish.sh and core/publish.sh scripts have been updated with:

  1. Support for explicitly provided versions via $1
  2. Automatic prerelease tag detection and --tag publishing

This script only has a commented note about alpha versions but lacks the actual implementation. Consider aligning this script with the others for consistency across the monorepo.

♻️ Suggested alignment with other publish scripts
-npm publish
+# Use provided version or auto-increment
+if [ -n "$1" ]; then
+  v1="$1"
+  echo "using provided version: $v1"
+else
+  v1="${v%.*}.$((${v##*.}+1))"
+  echo "auto-incremented version: $v1"
+fi
+
+# Update version in package.json
+npm version "$v1" --no-git-tag-version
 
-# note: making alpha versions
-# npm version 1.17.7-alpha.1
-# npm publish . --tag alpha
+# Detect prerelease tag (alpha, beta, rc, etc.)
+if [[ "$v1" =~ -([a-z]+) ]]; then
+  tag="${BASH_REMATCH[1]}"
+  echo "publishing prerelease version with tag: $tag"
+  npm publish . --tag "$tag"
+else
+  echo "publishing release version"
+  npm publish .
+fi
packages/sdk-ts/src/client/indexer/ws/README.md (1)

220-228: Add language specifier to fenced code block.

The frame format diagram should have a language specifier for consistency and to satisfy linting rules. Since this is a text diagram, use text or plaintext.

📝 Proposed fix
-```
+```text
 Frame format:
 [compressionFlag: 1 byte][length: 4 bytes BE][payload: N bytes]
packages/sdk-ts/src/client/indexer/grpc_stream/streamV2/IndexerGrpcRfqStreamV2.ts (1)

28-35: Consider removing unused transport field.

The transport field is stored but never accessed after being used to initialize client. You could simplify by not storing it as an instance property.

♻️ Suggested simplification
 export class IndexerGrpcRfqStreamV2 {
   private client: InjectiveRFQRPCClient
-  private transport: GrpcWebRpcTransport

   constructor(endpoint: string, metadata?: Record<string, string>) {
-    this.transport = new GrpcWebRpcTransport(endpoint, metadata)
-    this.client = new InjectiveRFQRPCClient(this.transport)
+    const transport = new GrpcWebRpcTransport(endpoint, metadata)
+    this.client = new InjectiveRFQRPCClient(transport)
   }
packages/sdk-ts/src/client/indexer/grpc/IndexerGrpcRfqApi.ts (1)

18-110: Add explicit return type annotation for consistency.

submitRequest lacks an explicit return type annotation while submitQuote has Promise<{ status: string }>. Adding it improves API clarity and consistency.

♻️ Add return type
-  }): Promise<{ status: string }> {
+  }: { ... }): Promise<{ status: string }> {

Apply at line 46:

-  }) {
+  }): Promise<{ status: string }> {
packages/sdk-ts/src/client/indexer/ws/GrpcWebSocketTransport.ts (2)

250-261: Empty ping interval callback may be confusing.

The ping interval is set up but the callback body is empty with only a comment. While the comment explains that higher-level classes handle gRPC-level pings, this empty interval still creates unnecessary timer overhead. Consider either removing it or adding a TODO to implement transport-level keepalive if needed.

♻️ Consider removing empty interval or adding transport-level ping

If transport-level ping is not needed, remove startPingInterval and clearPingInterval entirely since higher-level stream classes (like IndexerWsTakerStream) handle their own ping intervals.


284-303: Silent error swallowing in reconnection may hide issues.

The empty catch block at line 296 silently discards reconnection errors. Consider logging for observability or emitting an error event.

♻️ Add error logging
       try {
         await this.createConnection()
-      } catch {}
+      } catch (error) {
+        // Connection failed, backoff will continue
+        // Error already emitted via handleError
+      }
packages/sdk-ts/src/client/indexer/ws/rfq/IndexerWsTakerStream.ts (1)

215-228: Consider logging ping failures at debug level instead of error.

Failed pings during normal operation (e.g., during disconnect) might flood error logs. Consider using a debug log level or only logging on repeated failures.

♻️ Reduce log noise
       } catch (error) {
-        console.error('Failed to send ping:', error)
+        // Ping failures are expected during disconnect; logged for debugging
+        if (process.env.NODE_ENV === 'development') {
+          console.debug('Failed to send ping:', error)
+        }
       }

Or simply remove the log if pings failing during disconnect is expected behavior.

packages/sdk-ts/src/client/indexer/ws/GrpcWebSocketCodec.ts (2)

31-55: Consider validating input before BigInt conversion.

BigInt(input.rfqId) on Line 33 will throw a SyntaxError if rfqId is not a valid integer string (e.g., empty string, decimal, or non-numeric). While callers should provide valid inputs, the codec could fail silently or provide a clearer error message.

💡 Optional: Add validation or wrap with descriptive error
 encodeTakerRequest(input: RFQRequestInput): Uint8Array {
+  if (!/^\d+$/.test(input.rfqId)) {
+    throw new Error(`Invalid rfqId: expected numeric string, got "${input.rfqId}"`)
+  }
   const request = RFQRequestType.create({
     rfqId: BigInt(input.rfqId),

75-104: Same BigInt validation concern applies here.

Lines 80 and 85 have the same potential for SyntaxError if rfqId or expiry contain invalid values. Consider applying consistent validation across both encoder methods.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In `@packages/sdk-ts/src/client/indexer/ws/rfq/IndexerWsTakerStream.ts`:
- Around line 80-89: In sendRequest (IndexerWsTakerStream.sendRequest) remove
the two debug console.log calls so the method only checks isConnected(), encodes
the request with GrpcWebSocketCodec.encodeTakerRequest(request), and forwards
the encoded payload via this.transport.send(encoded); ensure no other logging
remains in that function.
- Around line 108-152: Remove the debug console.log calls in the event pipeline:
delete the console.log in emit and all console.log/debug prints inside
setupTransportHandlers (the 'connect', 'disconnect', 'state_change', 'error'
handlers and the emit call at top). Keep the functionality of emit,
startPingInterval/stopPingInterval, handleMessage, and the transport.on
registrations intact; if needed later, replace these console.log calls with
calls to a configurable logger (e.g., this.logger.debug) — but for this change
simply remove the console.log lines from the emit method and from the
transport.on handlers in setupTransportHandlers.
- Line 1: Remove the unused import "log" from 'console' in
IndexerWsTakerStream.ts; locate the top-level import statement "import { log }
from 'console'" and delete it so ESLint no longer flags an unused import (no
other code changes required).
- Around line 154-231: Remove the debug console.log calls in handleMessage (the
logs around response and messageType and quote) so production doesn't emit stray
logs, and add a null/undefined guard before calling .toString() on
response.requestAck.rfqId in the request_ack branch of handleMessage (use a safe
conversion or skip/emit an ack with a safe rfqId value when rfqId is missing) so
RFQStreamAckData.rfqId is never produced by calling toString() on
null/undefined.
🧹 Nitpick comments (1)
packages/sdk-ts/src/client/indexer/ws/rfq/IndexerWsTakerStream.ts (1)

31-34: Consider improving type safety for event listeners.

The Set<TakerEventListener<any>> loses the type safety provided by the generic TakerStreamEvents interface. This is a common trade-off in event emitter patterns, but you could consider a more type-safe approach using mapped types if stricter typing is desired.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@packages/sdk-ts/package.json`:
- Line 331: The package.json currently pins an alpha release of the indexer
proto package; change the dependency "@injectivelabs/indexer-proto-ts-v2":
"1.17.7-alpha.3" to a stable version (e.g., revert to "1.17.6" or update to a
released "1.17.7" if available) before cutting a production SDK; locate the
dependency entry in packages/sdk-ts/package.json and update the version string,
run a fresh install and tests, and add a note in the changelog or release
checklist that the alpha was replaced with a stable release.

In `@packages/sdk-ts/src/client/indexer/grpc/IndexerGrpcRfqApi.ts`:
- Around line 11-16: Add two methods to IndexerGrpcRFQApi: fetchOpenRequests and
fetchPendingQuotes. For fetchOpenRequests create an
InjectiveRFQExchangeRpcPb.GetOpenRequestsRequest, call this.executeGrpcCall with
that request and this.client.getOpenRequests.bind(this.client), then return
IndexerGrpcRfqTransformer.openRequestsResponseToOpenRequests(response). For
fetchPendingQuotes create an InjectiveRFQExchangeRpcPb.GetPendingQuotesRequest,
call this.executeGrpcCall with that request and
this.client.getPendingQuotes.bind(this.client), then return
IndexerGrpcRfqTransformer.pendingQuotesResponseToPendingQuotes(response). Ensure
you use the same generic request/response types as in the suggested
implementation and keep methods async.
🧹 Nitpick comments (1)
packages/sdk-ts/src/client/indexer/grpc/IndexerGrpcRfqApi.ts (1)

18-46: Add explicit return type annotation for consistency.

The submitQuote method has an explicit return type Promise<{ status: string }>, but submitRequest lacks one. Add the return type for consistency and better type safety.

Proposed fix
   transactionTime?: bigint
-  }) {
+  }): Promise<{ status: string }> {

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@packages/sdk-ts/src/client/indexer/ws/GrpcWebSocketTransport.ts`:
- Around line 120-161: When creating the WebSocket in createConnection(), ensure
the promise is rejected if the socket closes before onopen: update WsState
handling in the onclose handler (and the similar close handling in the other
block) to check for WsState.Connecting and, when seen, call
clearConnectionTimeout() and reject the pending promise with an error (e.g.,
"socket closed before open" or include the CloseEvent), so connect() doesn't
hang; adjust handleClose usage or inline logic to perform the
early-reject/clearTimeout while preserving existing close logic for
non-connecting states.
- Around line 276-286: startPingInterval currently sets a timer but does
nothing; replace the no-op with a real transport-level keepalive by invoking a
concrete ping send inside the interval: add a private helper (e.g.,
sendPingFrame) that uses the underlying socket (this.websocket / this.ws) to
send a WebSocket-level ping or a zero-length binary/frame suitable for your
transport, and call that helper when this.isConnected() is true; keep use of
clearPingInterval, this.pingInterval and this.config.pingIntervalMs unchanged
and ensure sendPingFrame handles errors and non-existent socket gracefully.
- Around line 248-260: The handleMessage method references Buffer directly which
can throw in browser runtimes; update the conditional that checks Buffer to
first ensure Buffer exists (e.g., typeof Buffer !== 'undefined' or using
globalThis.Buffer) before calling Buffer.isBuffer(data), and keep the same
conversion path (new Uint8Array(data).buffer) and this.emit('message', buffer)
inside that guarded branch so browsers won't access the undefined Buffer symbol.

@Frederick-88 Frederick-88 changed the title Feat: rfq feat/rfq Feb 12, 2026
Comment on lines +36 to +38
test('submitQuote', async () => {
try {
const response = await indexerGrpcRfqApi.submitRequest({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test is for submitQuote but we are doing a submitRequest

Comment on lines +150 to +156
const length = payload.length
frame[1] = (length >>> 24) & 0xff
frame[2] = (length >>> 16) & 0xff
frame[3] = (length >>> 8) & 0xff
frame[4] = length & 0xff

frame.set(payload, GRPC_HEADER_SIZE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this can be refactored to:

new DataView(frame.buffer).setUint32(1, payload.length, false)

can you try?

Comment on lines +161 to +194
function decodeGrpcFrame<T>(
data: Uint8Array,
messageType: { fromBinary(bytes: Uint8Array): T },
): GrpcFrame<T> {
if (data.length < GRPC_HEADER_SIZE) {
throw new GrpcDecodeError(
`Frame too short: expected at least ${GRPC_HEADER_SIZE} bytes, got ${data.length}`,
)
}

const compressionFlag = data[0]
const isTrailer = (compressionFlag & COMPRESSION_FLAG_TRAILER) !== 0

const payloadLength =
(data[1] << 24) | (data[2] << 16) | (data[3] << 8) | data[4]

if (data.length < GRPC_HEADER_SIZE + payloadLength) {
throw new GrpcDecodeError(
`Incomplete frame: expected ${GRPC_HEADER_SIZE + payloadLength} bytes, got ${data.length}`,
)
}

const payload = data.subarray(
GRPC_HEADER_SIZE,
GRPC_HEADER_SIZE + payloadLength,
)

if (isTrailer) {
return {
isTrailer: true,
payload,
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the paylaodLength seems very hacky, lets try the example bellow:

function decodeGrpcFrame<T>(
  data: Uint8Array,
  messageType: { fromBinary(bytes: Uint8Array): T },
): GrpcFrame<T> {
  if (data.byteLength < GRPC_HEADER_SIZE) {
    throw new GrpcDecodeError(
      `Frame too short: expected at least ${GRPC_HEADER_SIZE} bytes, got ${data.byteLength}`,
    )
  }

  // DataView over the *exact bytes* in this Uint8Array (handles non-zero byteOffset)
  const view = new DataView(data.buffer, data.byteOffset, data.byteLength)

  const compressionFlag = view.getUint8(0)
  const isTrailer = (compressionFlag & COMPRESSION_FLAG_TRAILER) !== 0
  const payloadLength = view.getUint32(1, false) // false = big-endian

  const totalLength = GRPC_HEADER_SIZE + payloadLength
  if (data.byteLength < totalLength) {
    throw new GrpcDecodeError(
      `Incomplete frame: expected ${totalLength} bytes, got ${data.byteLength}`,
    )
  }

  const payload = data.subarray(GRPC_HEADER_SIZE, totalLength)

  // Trailer frames: return raw payload (you likely parse it elsewhere)
  if (isTrailer) {
    return { isTrailer: true, payload }
  }

  // For non-trailer messages, require "no compression"
  // If you want to allow other bits, mask instead of strict compare.
  if (compressionFlag !== COMPRESSION_FLAG_NONE) {
    throw new GrpcDecodeError(
      `Unsupported compression flag: 0x${compressionFlag.toString(16)}`,
    )
  }

  try {
    const message = messageType.fromBinary(payload)
    return { isTrailer: false, message, payload }
  } catch (error) {
    throw new GrpcDecodeError(
      `Failed to decode protobuf message: ${
        error instanceof Error ? error.message : String(error)
      }`,
    )
  }
}

}

this.setState(WsState.Connecting)
await this.createConnection()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the connect fails, we will be stuck in a connected state, we need to catch the error and update the status acordingly.

Maybe something like this?

try {
    await this.createConnection()
  } catch (error) {
    const reason = WsDisconnectReason.ConnectionFailed

    if (this.shouldAttemptReconnect(reason)) {
      this.setState(WsState.Reconnecting)
      this.emit('disconnect', { reason, willRetry: true })
      this.scheduleReconnect()
    } else {
      this.cleanup(reason, false)
    }

    throw error
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants