Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/hip-lamps-draw.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/node': minor
---

Introduced support for specifying proxy environment variables for the connection methods. For HTTP it supports `HTTP_PROXY` or `HTTPS_PROXY`, and for WebSockets it supports `WS_PROXY` and `WSS_PROXY`.
6 changes: 6 additions & 0 deletions .changeset/thick-lies-invent.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/common': minor
---

Added `fetchOptions` to AbstractRemoteOptions. Allows consumers to include fields such as `dispatcher` (e.g. for proxy support) to the fetch invocations.
Also ensuring all options provided to `connect()` are passed onwards, allows packages to have their own option definitions for `connect()` and the abstract `generateSyncStreamImplementation()`.
4 changes: 2 additions & 2 deletions demos/example-node/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import repl_factory from 'node:repl';
import { once } from 'node:events';
import repl_factory from 'node:repl';

import { PowerSyncDatabase, SyncStreamConnectionMethod } from '@powersync/node';
import { default as Logger } from 'js-logger';
import { AppSchema, DemoConnector } from './powersync.js';
import { exit } from 'node:process';
import { AppSchema, DemoConnector } from './powersync.js';

const main = async () => {
const logger = Logger.get('PowerSyncDemo');
Expand Down
38 changes: 18 additions & 20 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
// Use the options passed in during connect, or fallback to the options set during database creation or fallback to the default options
resolvedConnectionOptions(options?: PowerSyncConnectionOptions): RequiredAdditionalConnectionOptions {
return {
...options,
retryDelayMs:
options?.retryDelayMs ?? this.options.retryDelayMs ?? this.options.retryDelay ?? DEFAULT_RETRY_DELAY_MS,
crudUploadThrottleMs:
Expand All @@ -436,12 +437,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
throw new Error('Cannot connect using a closed client');
}

const { retryDelayMs, crudUploadThrottleMs } = this.resolvedConnectionOptions(options);
const resolvedConnectOptions = this.resolvedConnectionOptions(options);

this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, {
retryDelayMs,
crudUploadThrottleMs
});
this.syncStreamImplementation = this.generateSyncStreamImplementation(connector, resolvedConnectOptions);
this.syncStatusListenerDisposer = this.syncStreamImplementation.registerListener({
statusChanged: (status) => {
this.currentStatus = new SyncStatus({
Expand Down Expand Up @@ -555,7 +553,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* This method does include transaction ids in the result, but does not group
* data by transaction. One batch may contain data from multiple transactions,
* and a single transaction may be split over multiple batches.
*
*
* @param limit Maximum number of CRUD entries to include in the batch
* @returns A batch of CRUD operations to upload, or null if there are none
*/
Expand Down Expand Up @@ -594,7 +592,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
*
* Unlike {@link getCrudBatch}, this only returns data from a single transaction at a time.
* All data for the transaction is loaded into memory.
*
*
* @returns A transaction of CRUD operations to upload, or null if there are none
*/
async getNextCrudTransaction(): Promise<CrudTransaction | null> {
Expand Down Expand Up @@ -633,7 +631,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Get an unique client id for this database.
*
* The id is not reset when the database is cleared, only when the database is deleted.
*
*
* @returns A unique identifier for the database instance
*/
async getClientId(): Promise<string> {
Expand Down Expand Up @@ -661,7 +659,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
/**
* Execute a SQL write (INSERT/UPDATE/DELETE) query
* and optionally return results.
*
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @returns The query result as an object with structured key-value pairs
Expand All @@ -674,7 +672,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
/**
* Execute a SQL write (INSERT/UPDATE/DELETE) query directly on the database without any PowerSync processing.
* This bypasses certain PowerSync abstractions and is useful for accessing the raw database results.
*
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @returns The raw query result from the underlying database as a nested array of raw values, where each row is
Expand All @@ -689,7 +687,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Execute a write query (INSERT/UPDATE/DELETE) multiple times with each parameter set
* and optionally return results.
* This is faster than executing separately with each parameter set.
*
*
* @param sql The SQL query to execute
* @param parameters Optional 2D array of parameter sets, where each inner array is a set of parameters for one execution
* @returns The query result
Expand All @@ -701,7 +699,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

/**
* Execute a read-only query and return results.
*
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @returns An array of results
Expand All @@ -713,7 +711,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

/**
* Execute a read-only query and return the first result, or null if the ResultSet is empty.
*
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @returns The first result if found, or null if no results are returned
Expand All @@ -725,7 +723,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

/**
* Execute a read-only query and return the first result, error if the ResultSet is empty.
*
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @returns The first result matching the query
Expand Down Expand Up @@ -761,7 +759,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Open a read-only transaction.
* Read transactions can run concurrently to a write transaction.
* Changes from any write transaction are not visible to read transactions started before it.
*
*
* @param callback Function to execute within the transaction
* @param lockTimeout Time in milliseconds to wait for a lock before throwing an error
* @returns The result of the callback
Expand All @@ -786,7 +784,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Open a read-write transaction.
* This takes a global lock - only one write transaction can execute against the database at a time.
* Statements within the transaction must be done on the provided {@link Transaction} interface.
*
*
* @param callback Function to execute within the transaction
* @param lockTimeout Time in milliseconds to wait for a lock before throwing an error
* @returns The result of the callback
Expand Down Expand Up @@ -865,7 +863,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Source tables are automatically detected using `EXPLAIN QUERY PLAN`.
*
* Note that the `onChange` callback member of the handler is required.
*
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @param handler Callbacks for handling results and errors
Expand Down Expand Up @@ -915,7 +913,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Execute a read query every time the source tables are modified.
* Use {@link SQLWatchOptions.throttleMs} to specify the minimum interval between queries.
* Source tables are automatically detected using `EXPLAIN QUERY PLAN`.
*
*
* @param sql The SQL query to execute
* @param parameters Optional array of parameters to bind to the query
* @param options Options for configuring watch behavior
Expand Down Expand Up @@ -944,7 +942,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Resolves the list of tables that are used in a SQL query.
* If tables are specified in the options, those are used directly.
* Otherwise, analyzes the query using EXPLAIN to determine which tables are accessed.
*
*
* @param sql The SQL query to analyze
* @param parameters Optional parameters for the SQL query
* @param options Optional watch options that may contain explicit table list
Expand Down Expand Up @@ -1077,7 +1075,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
*
* This is preferred over {@link watchWithAsyncGenerator} when multiple queries need to be performed
* together when data is changed.
*
*
* Note: do not declare this as `async *onChange` as it will not work in React Native.
*
* @param options Options for configuring watch behavior
Expand Down
19 changes: 17 additions & 2 deletions packages/common/src/client/sync/stream/AbstractRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,23 @@ export type AbstractRemoteOptions = {
* Binding should be done before passing here.
*/
fetchImplementation: FetchImplementation | FetchImplementationProvider;

/**
* Optional options to pass directly to all `fetch` calls.
*
* This can include fields such as `dispatcher` (e.g. for proxy support),
* `cache`, or any other fetch-compatible options.
*/
fetchOptions?: {};
};

export const DEFAULT_REMOTE_OPTIONS: AbstractRemoteOptions = {
socketUrlTransformer: (url) =>
url.replace(/^https?:\/\//, function (match) {
return match === 'https://' ? 'wss://' : 'ws://';
}),
fetchImplementation: new FetchImplementationProvider()
fetchImplementation: new FetchImplementationProvider(),
fetchOptions: {}
};

export abstract class AbstractRemote {
Expand Down Expand Up @@ -231,6 +240,10 @@ export abstract class AbstractRemote {
*/
abstract getBSON(): Promise<BSONImplementation>;

protected createSocket(url: string): WebSocket {
return new WebSocket(url);
}

/**
* Connects to the sync/stream websocket endpoint
*/
Expand All @@ -249,7 +262,8 @@ export abstract class AbstractRemote {

const connector = new RSocketConnector({
transport: new WebsocketClientTransport({
url: this.options.socketUrlTransformer(request.url)
url: this.options.socketUrlTransformer(request.url),
wsCreator: (url) => this.createSocket(url)
}),
setup: {
keepAlive: KEEP_ALIVE_MS,
Expand Down Expand Up @@ -421,6 +435,7 @@ export abstract class AbstractRemote {
body: JSON.stringify(data),
signal: controller.signal,
cache: 'no-store',
...(this.options.fetchOptions ?? {}),
...options.fetchOptions
}).catch((ex) => {
if (ex.name == 'AbortError') {
Expand Down
16 changes: 16 additions & 0 deletions packages/node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ contains everything you need to know to get started implementing PowerSync in yo

A simple example using `@powersync/node` is available in the [`demos/example-node/`](../demos/example-node) directory.

# Proxy Support

This SDK supports HTTP, HTTPS, and WebSocket proxies via environment variables.

## HTTP Connection Method

Internally we probe the http environment variables and apply it to fetch requests ([undici](https://www.npmjs.com/package/undici/v/5.6.0))

- Set the `HTTPS_PROXY` or `HTTP_PROXY` environment variable to automatically route HTTP requests through a proxy.

## WEB Socket Connection Method

Internally the [proxy-agent](https://www.npmjs.com/package/proxy-agent) dependency for WebSocket proxies, which has its own internal code for automatically picking up the appropriate environment variables:

- Set the `WS_PROXY` or `WSS_PROXY` environment variable to route the webocket connections through a proxy.

# Found a bug or need help?

- Join our [Discord server](https://discord.gg/powersync) where you can browse topics from our community, ask questions, share feedback, or just say hello :)
Expand Down
7 changes: 5 additions & 2 deletions packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@
"@powersync/common": "workspace:*",
"async-lock": "^1.4.0",
"bson": "^6.6.0",
"comlink": "^4.4.2"
"comlink": "^4.4.2",
"proxy-agent": "^6.5.0",
"undici": "^7.8.0",
"ws": "^8.18.1"
},
"devDependencies": {
"@powersync/drizzle-driver": "workspace:*",
"@types/async-lock": "^1.4.0",
"drizzle-orm": "^0.35.2",
"@powersync/drizzle-driver": "workspace:*",
"rollup": "4.14.3",
"typescript": "^5.5.3",
"vitest": "^3.0.5"
Expand Down
22 changes: 20 additions & 2 deletions packages/node/src/db/PowerSyncDatabase.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import {
AbstractPowerSyncDatabase,
AbstractStreamingSyncImplementation,
AdditionalConnectionOptions,
BucketStorageAdapter,
DBAdapter,
PowerSyncBackendConnector,
PowerSyncConnectionOptions,
PowerSyncDatabaseOptions,
PowerSyncDatabaseOptionsWithSettings,
RequiredAdditionalConnectionOptions,
SqliteBucketStorage,
SQLOpenFactory
} from '@powersync/common';
Expand All @@ -15,11 +18,18 @@ import { NodeStreamingSyncImplementation } from '../sync/stream/NodeStreamingSyn

import { BetterSQLite3DBAdapter } from './BetterSQLite3DBAdapter.js';
import { NodeSQLOpenOptions } from './options.js';
import { Dispatcher } from 'undici';

export type NodePowerSyncDatabaseOptions = PowerSyncDatabaseOptions & {
database: DBAdapter | SQLOpenFactory | NodeSQLOpenOptions;
};

export type NodeAdditionalConnectionOptions = AdditionalConnectionOptions & {
dispatcher?: Dispatcher;
};

export type NodePowerSyncConnectionOptions = PowerSyncConnectionOptions & NodeAdditionalConnectionOptions;

/**
* A PowerSync database which provides SQLite functionality
* which is automatically synced.
Expand Down Expand Up @@ -54,10 +64,18 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
return new SqliteBucketStorage(this.database, AbstractPowerSyncDatabase.transactionMutex);
}

connect(
connector: PowerSyncBackendConnector,
options?: PowerSyncConnectionOptions & { dispatcher?: Dispatcher }
): Promise<void> {
return super.connect(connector, options);
}

protected generateSyncStreamImplementation(
connector: PowerSyncBackendConnector
connector: PowerSyncBackendConnector,
options: NodeAdditionalConnectionOptions
): AbstractStreamingSyncImplementation {
const remote = new NodeRemote(connector);
const remote = new NodeRemote(connector, this.options.logger, { dispatcher: options.dispatcher });

return new NodeStreamingSyncImplementation({
adapter: this.bucketStorageAdapter,
Expand Down
27 changes: 25 additions & 2 deletions packages/node/src/sync/stream/NodeRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import {
RemoteConnector
} from '@powersync/common';
import { BSON } from 'bson';
import Agent from 'proxy-agent';
import { EnvHttpProxyAgent, Dispatcher } from 'undici';
import { WebSocket } from 'ws';

export const STREAMING_POST_TIMEOUT_MS = 30_000;

Expand All @@ -21,18 +24,38 @@ class NodeFetchProvider extends FetchImplementationProvider {
}
}

export type NodeRemoteOptions = AbstractRemoteOptions & {
dispatcher?: Dispatcher;
};

export class NodeRemote extends AbstractRemote {
constructor(
protected connector: RemoteConnector,
protected logger: ILogger = DEFAULT_REMOTE_LOGGER,
options?: Partial<AbstractRemoteOptions>
options?: Partial<NodeRemoteOptions>
) {
// EnvHttpProxyAgent automatically uses relevant env vars for HTTP
const dispatcher = options?.dispatcher ?? new EnvHttpProxyAgent();

super(connector, logger, {
...(options ?? {}),
fetchImplementation: options?.fetchImplementation ?? new NodeFetchProvider()
fetchImplementation: options?.fetchImplementation ?? new NodeFetchProvider(),
fetchOptions: {
dispatcher
}
});
}

protected createSocket(url: string): globalThis.WebSocket {
return new WebSocket(url, {
// Automatically uses relevant env vars for web sockets
agent: new Agent.ProxyAgent(),
headers: {
'User-Agent': this.getUserAgent()
}
}) as any as globalThis.WebSocket; // This is compatible in Node environments
}

getUserAgent(): string {
return [
super.getUserAgent(),
Expand Down
Loading