Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 146 additions & 2 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import {
TABLE_QUERY_PARAM,
REPLICA_PARAM,
FORCE_DISCONNECT_AND_REFRESH,
EXPERIMENTAL_LIVE_SSE_QUERY_PARAM,
} from './constants'

const RESERVED_PARAMS: Set<ReservedParamKeys> = new Set([
Expand Down Expand Up @@ -244,6 +245,11 @@ export interface ShapeStreamOptions<T = never> {
*/
subscribe?: boolean

/**
* Experimental support for Server-Sent Events (SSE) for live updates.
*/
experimentalLiveSse?: boolean

signal?: AbortSignal
fetchClient?: typeof fetch
backoffOptions?: BackoffOptions
Expand Down Expand Up @@ -281,8 +287,9 @@ export interface ShapeStreamInterface<T extends Row<unknown> = Row> {
}

/**
* Reads updates to a shape from Electric using HTTP requests and long polling. Notifies subscribers
* when new messages come in. Doesn't maintain any history of the
* Reads updates to a shape from Electric using HTTP requests and long polling or
* Server-Sent Events (SSE).
* Notifies subscribers when new messages come in. Doesn't maintain any history of the
* log but does keep track of the offset position and is the best way
* to consume the HTTP `GET /v1/shape` api.
*
Expand All @@ -297,6 +304,14 @@ export interface ShapeStreamInterface<T extends Row<unknown> = Row> {
* })
* ```
*
* To use Server-Sent Events (SSE) for real-time updates:
* ```
* const stream = new ShapeStream({
* url: `http://localhost:3000/v1/shape`,
* liveMode: 'sse'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling this accept to mimic http could make sense

* })
* ```
*
* To abort the stream, abort the `signal`
* passed in via the `ShapeStreamOptions`.
* ```
Expand Down Expand Up @@ -484,6 +499,26 @@ export class ShapeStream<T extends Row<unknown> = Row>
}
}

// If using SSE mode we handle the connection differently using the
// this.#connectSSE method which wraps the EventSource API.
if (this.#isUpToDate && this.options.experimentalLiveSse) {
fetchUrl.searchParams.set(EXPERIMENTAL_LIVE_SSE_QUERY_PARAM, `true`)
try {
await this.#connectSSE(fetchUrl.toString())
} catch (error) {
if (error instanceof SSEConnectionAborted) {
break
}
this.#sendErrorToSubscribers(
error instanceof Error ? error : new Error(String(error))
)
throw error
}
// TODO: What should we do here? Is this the behaviour we want?
// Skip the regular fetch and continue the loop to reconnect if needed
continue
}

let response!: Response
try {
response = await this.#fetchClient(fetchUrl.toString(), {
Expand Down Expand Up @@ -714,6 +749,108 @@ export class ShapeStream<T extends Row<unknown> = Row>
this.#connected = false
this.#schema = undefined
}

/**
* Connects to the server using Server-Sent Events.
* Returns a promise that resolves when the connection is closed.
*/
async #connectSSE(url: string): Promise<void> {
return new Promise<void>((resolve, reject) => {
try {
if (!this.#requestAbortController) {
reject(
new Error(
`Request abort controller is not set - this should never happen`
)
)
return
}

if (this.#requestAbortController.signal.aborted) {
reject(
new SSEConnectionAborted(
`Connection aborted before SSE connection established`
)
)
return
}

// Create an EventSource instance
const eventSource = new EventSource(url)

// Set up event handlers
eventSource.onopen = () => {
this.#connected = true
}

eventSource.onmessage = async (event: MessageEvent) => {
try {
if (event.data) {
// Process the SSE message
// Provide an empty schema object if schema is undefined, which it
// should not be as we only get to SSE mode after being in normal mode
// and getting a schema from a header then.
// The event.data is a single JSON object, so we wrap it in an array
// to be consistent with the way we parse the response from the HTTP API.
// TODO: Is this needed?
const batch = this.#messageParser.parse(
`[${event.data}]`,
this.#schema || {}
)

if (batch.length > 0) {
const lastMessage = batch[batch.length - 1]
if (isUpToDateMessage(lastMessage)) {
const upToDateMsg = lastMessage as typeof lastMessage & {
headers: { global_last_seen_lsn: string }
}
this.#lastSyncedAt = Date.now()
this.#isUpToDate = true
this.#lastOffset =
`${upToDateMsg.headers.global_last_seen_lsn}_0` as Offset
// TODO: we also need the cache buster `cursor` value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it on the live SSE response as a header? I’m thinking that’s fine, as in, the client only needs to update the cursor when the response is empty, so it’s fine to get it from the header.

}

await this.#publish(batch)
}
}
} catch (error) {
// Handle parsing errors
this.#sendErrorToSubscribers(
error instanceof Error ? error : new Error(String(error))
)
}
}

eventSource.onerror = (_error: Event) => {
// Connection was closed or errored
// EventSource would normally automatically reconnect but want to close the
// connection and reconnect on the next outer loop iteration with the new
// url and offset.
// TODO: It may be that some errors we should elevate to the user
eventSource.close()
resolve()
}

// Listen for abort signals
const abortHandler = () => {
eventSource.close()
reject(new SSEConnectionAborted(`SSE connection aborted`))
}

this.#requestAbortController.signal.addEventListener(
`abort`,
abortHandler,
{ once: true }
)
} catch (error) {
this.#sendErrorToSubscribers(
error instanceof Error ? error : new Error(String(error))
)
reject(error)
}
})
}
}

/**
Expand Down Expand Up @@ -782,3 +919,10 @@ function convertWhereParamsToObj(
}
return allPgParams
}

class SSEConnectionAborted extends Error {
constructor(message: string) {
super(message)
this.name = `SSEConnectionAborted`
}
}
1 change: 1 addition & 0 deletions packages/typescript-client/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ export const TABLE_QUERY_PARAM = `table`
export const WHERE_QUERY_PARAM = `where`
export const REPLICA_PARAM = `replica`
export const WHERE_PARAMS_PARAM = `params`
export const EXPERIMENTAL_LIVE_SSE_QUERY_PARAM = `experimental_live_sse`
export const FORCE_DISCONNECT_AND_REFRESH = `force-disconnect-and-refresh`
2 changes: 1 addition & 1 deletion packages/typescript-client/test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ describe(`Shape`, () => {
todo: `fail`,
},
fetchClient: async (input, _init) => {
const url = new URL(input)
const url = new URL(input instanceof Request ? input.url : input)
if (url.searchParams.get(`todo`) === `fail`) {
return new Response(undefined, {
status: 401,
Expand Down
9 changes: 8 additions & 1 deletion packages/typescript-client/tsconfig.build.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
{
"extends": "../../tsconfig.build.json",
"include": ["src/**/*"],
"exclude": ["node_modules", "tests", "dist"]
"exclude": ["node_modules", "tests", "dist"],
"compilerOptions": {
"lib": [
"ESNext",
"DOM",
"dom.iterable"
]
}
}
4 changes: 3 additions & 1 deletion packages/typescript-client/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
/* Language and Environment */
"target": "es2016" /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */,
"lib": [
"ESNext"
"ESNext",
"DOM",
"dom.iterable"
] /* Specify a set of bundled library declaration files that describe the target runtime environment. */,
"jsx": "preserve" /* Specify what JSX code is generated. */,
// "experimentalDecorators": true, /* Enable experimental support for TC39 stage 2 draft decorators. */
Expand Down
Loading