Replies: 1 comment 1 reply
-
One option could be to change the format in buckets to be a concatenation of BSON objects for interface SyncBucketDataReference {
bucket: string;
url: string;
// If set, points to the start index (in bytes) of a BSON object representing an oplog
// entry with an id <= after
start_offset: usize | undefined;
// If set, points to the end index (exclusive, in bytes) of a BSON object representing an oplog
// entry with an id >= next_after
end_offset: usize | undefined;
after: ProtocolOpId;
next_after: ProtocolOpId;
} Then, if That might also make us support appending to existing data in directory buckets.
A decent way to handle backpressure could be to have clients push chunks of response data into the Rust client directly. The client could buffer data until the end of each BSON object, then deserialize and write into the database before returning. Because this process is asynchronous, awaiting that in the client would throttle the stream.
One benefit of option 2 (skipping the old checkpoint) is that this is closer to the current behavior, and we rely on this for stream priorities (since new data on a higher-priority bucket while we're syncing a lower-priority bucket would interrupt the process). However, one thing that's currently used in the protocol is the fact that the service always knows which oplog entries have already been synced (their ids are part of the original request, and the fact that the protocol is serialized allows the service to track sent lines). This is no longer possible, so we can't reliably determine whether a new checkpoint should interrupt an old one. Maybe this logic needs to be moved to the client. I wonder if we really need something like a
IMO, no. Realistically all clients need the core extension anyway (I think the test client for the service is the only exception, but when we migrate that to support object storage downloads, it's probably easier to adopt the Rust client for everything instead of implementing that logic twice). So since the core extension can deal with BSON, adding JSON support feels like a complication. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Background
PowerSync currently syncs all data directly via either an HTTP stream, or a WebSocket connection.
This proposal specifies an optional addition in the protocol, to allow syncing bulk data "out-of-band", by including links to the data in the stream, instead of the data itself.
The main goals here are to improve server-side efficiency, to improve sync throughput, especially for initial sync.
Status
2025-10-15: Initial ideas in place; need to dig into details on back-pressure and bucket priorities.
2025-10-16: Changed from one large BSON document to individual concatenated BSON documents. Clarify some details on bucket priorities
Proposal - protocol changes
The current protocol generally sends data in three steps:
checkpoint
orcheckpoint_diff
: Sends a list of (changed) buckets, priorities and checksums.data
lines: Sends the data inline.checkpoint_complete
(orpartial_checkpoint_complete
) line to indicate that all data for the checkpoint has been sent.The
data
line is in this format, (in JSON or BSON):This replaces the
data
line withdata_references
:Each message may contain one or more references (we explicitly support multiple in a single message, to reduce the total number of messages when syncing large buckets). We remove
has_more
- this is generally unused by the client. We do keepafter
, andlast
replacesnext_after
- the client uses that to filter the referenced data.Here, the URL is a string URL, either a path on the current endpoint, or an absolute URL. If the string is a path starting with /, it is joined with the endpoint configured on the client. Otherwise, it is interpreted as an absolute URL.
In both cases, the client is expected to make a GET request to the URL within 5 minutes. If the client makes a request after that period, the server may respond with a 401 or 403 response, in which case the client must re-establish the streaming connection to get a new URL.
When making the request, the client must not include any authorization headers. Authorization is performed by including a token in the URL path. The client must include the following headers:
User-Agent.
Accept-Encoding
, to indicate support for gzip or zstd compression.Accept: application/bson
The server may then respond with:
Each response in the chain must have the same CORS headers as the service itself.
The server may respond with zstd or gzip-compressed data, if supported by the client.
The response is a stream (concatenated bytes) of BSON-encoded documents. The first document is a "header":
Every document after the first is an
OplogEntry
document. These are always sorted by op_id. The data here may include data outside the range indicated by the streamed SyncBucketData - the client must ignore any data entry withop_id <= after
, orop_id > last
. Since the data is sorted, the client may be able to skip some of the parsing for entries outside the range (although the data still has to be downloaded). The client may completely stop downloading and/or parsing when it reached the entry matchinglast
. If there is no entry matchinglast
, this is considered an error, and the client should retry the download.Indicating support
Older clients may not support this new response format. Clients must indicate support by including
supports_data_url: true
in the request. If that is not included in the request, the server must respond with the data inline.Optionally, the server may disable support for clients that could negatively affect performance:
Service implementation
The service will be modified to store bucket data on object storage, such as AWS S3. The
bucket_data
collection/table (or alternatively a new one) will still be used to store an "index" of this data. Each entry would include:op_id
op_id
The service may choose to store the data inline, instead of as a path, especially in cases where the data is small. A reasonable size threshold should be used for this, e.g. around 10-100KB.
The service may store batches of data directly in object storage when replicating, and/or move data to object storage when compacting. The compact process may also merge multiple object storage files into one.
When the client requests a file, the service will respond with a 307 response directly to S3 or a CDN. (CDN integration TBD).
For self-hosting purposes, the service will support any S3-compatible object storage (exact requirements TBD). To simplify the setup, the service may support proxying all requests, rather than redirecting. The service may also support using a local filesystem as object storage for simplicity in development.
Proposed initial service compression strategy:
Accept-Encoding
header there.There are some options to do the compression outside the main service itself. Some examples include Cloudflare's workers, Lambda@Edge, or even just a dedicated service process for this. Those are not important for the initial implementation, but could be options later.
Client implementation
This will only be supported when using the Rust implementation.
The client will send the sync line to the Rust client as always. If the Rust client detects a reference, it will send a message indicating the client should download the URL.
The client should continue sending more sync lines to the Rust client while downloading the URL.
When the client has finished a download, it must send the data to the Rust client using a new message type.
If the request fails, the client should send the response code to the Rust client, which may update the status and/or ask the client to re-download.
Once the Rust client has received all the required data for a checkpoint, it will respond in the usual way.
TBD: How do we manage back-pressure here? If we reach a certain threshold of in-progress requests, we may want to pause streaming. And for example, if a client receives a new checkpoint while still downloading data for the last one, does it:
Bucket priorities and interruptions
In the current protocol and service implementation, bulk data in low-priority buckets may be interrupted with a new checkpoint for high-priority buckets. When using data references, there may not be anything to interrupt in the service, since the service only sends references which are sent pretty-much instantly.
This means the checkpoint interruption logic will shift to the client: The client could receive one checkpoint containing bulk data in low-priority buckets, as well as data in high-priority buckets. Or alternatively, the high-priority buckets could be in a following checkpoint, while the client is still downloading data for the previous checkpoint.
Instead of relying on partial_checkpoint_complete messages from the service, the client may implement its own prioritized downloads based on the received bucket priorities.
Other considerations
Range
headers? Or rather soft-limit files to smaller sizes, e.g. 10MB?Long
forop_id
, instead of aString
?This does not include data-level compression yet (see #330). Adding data-level compression would require another protocol change, but would make the project too big if we include it here.
Performance advantages
There are a couple of advantages on the service side:
Notes
We specifically do not use the original authentication token in the file download request, since authorization could be computationally expensive (requires evaluating Sync Rules to determine whether the bucket may be synced). By including a synced token in the query parameters, only the original streaming request needs to perform that check.
Beta Was this translation helpful? Give feedback.
All reactions