Design: COPY IN Protocol #104
Replies: 4 comments
-
Backpressure for COPY INProblemThe current design has Decision: Add backpressure callbacks to CopyInReceiverAdd throttle/unthrottle callbacks to interface tag CopyInReceiver
be pg_copy_ready(session: Session)
be pg_copy_complete(session: Session, count: USize)
be pg_copy_failed(session: Session,
failure: (ErrorResponseMessage | ClientQueryError))
be pg_throttled(session: Session)
"""
Called when the session is experiencing TCP backpressure. The receiver
should stop calling send_copy_data() until pg_unthrottled() is called.
"""
be pg_unthrottled(session: Session)
"""
Called when TCP backpressure is released. The receiver may resume
calling send_copy_data().
"""Internally, Session implements lori's Lori's The caller is responsible for respecting throttle/unthrottle. This is cooperative backpressure — the same model used throughout Pony's ecosystem. Alternatives consideredPull-based model — instead of the caller pushing chunks, Session calls something like Single payload — |
Beta Was this translation helpful? Give feedback.
-
|
Reconsidering: the throttle/unthrottle approach could lead to surprising results for programmers who aren't familiar with backpressure systems, and could lead to livelocks. The pull-based model (Session calls Leaning toward pull-based for now. |
Beta Was this translation helpful? Give feedback.
-
Pull-Based Data FlowConcrete sketch of the pull-based model from the previous comment. Change from the Original DesignThe original design uses a push model: after The pull model inverts control: Session requests each chunk by calling API SurfaceThe public API types are identical to the original design. The change is behavioral — the contract of CopyInReceiver: interface tag CopyInReceiver
be pg_copy_ready(session: Session)
"""
Called when the session is ready to accept the next chunk of COPY data.
Called once when the server enters COPY mode, and again after each
send_copy_data() call.
The receiver should respond by calling exactly one of:
- session.send_copy_data(data) to send a chunk (triggers another
pg_copy_ready)
- session.finish_copy() to signal successful end of data
- session.abort_copy(reason) to abort the operation
The flow control guarantee depends on the receiver calling exactly one
of these methods per pg_copy_ready. Calling send_copy_data without a
preceding pg_copy_ready bypasses the bounded-memory guarantee.
"""
be pg_copy_complete(session: Session, count: USize)
"""
Called when the COPY operation completes successfully. The count is the
number of rows copied.
"""
be pg_copy_failed(session: Session,
failure: (ErrorResponseMessage | ClientQueryError))
"""
Called when the COPY operation fails.
"""No Session behaviors (unchanged from original): be copy_in(sql: String, receiver: CopyInReceiver)
be send_copy_data(data: Array[U8] val)
be finish_copy()
be abort_copy(reason: String)Pull LoopThe loop lives in Implementation Detail
// _SessionLoggedIn — thin dispatcher
fun ref send_copy_data(s: Session ref, data: Array[U8] val) =>
match query_state
| let c: _CopyInInFlight => c.send_copy_data(s, this, data)
end
// _CopyInInFlight — owns the pull loop
fun ref send_copy_data(s: Session ref, li: _SessionLoggedIn ref,
data: Array[U8] val)
=>
s._connection().send(_FrontendMessage.copy_data(data))
try
(li.query_queue(0)? as _QueuedCopyIn).receiver.pg_copy_ready(s)
else
_Unreachable()
endThis is consistent with how Backpressure PropertiesAt steady state, the pipeline contains at most:
This is O(1) memory regardless of dataset size. The actor round-trip (Session → Receiver → Session) is the rate limiter. If the network is slower than actor messaging, CopyData messages accumulate in lori's write buffer at the round-trip rate (microseconds per chunk) rather than all at once. For most workloads, the network keeps up and no accumulation occurs. PerformanceThe pull model adds one extra actor message per chunk (the Chunk size is the receiver's choice. PostgreSQL's COPY protocol doesn't require message boundaries to align with rows, so receivers can batch many rows per Edge CasesServer error during COPY (e.g., constraint violation): The server sends ErrorResponse + ReadyForQuery. A Cancellation: Unchanged from the original design. Shutdown: Unchanged. Queued Exampleactor BulkLoader is CopyInReceiver
let _env: Env
let _rows: Array[String] val
var _index: USize = 0
new create(env: Env, rows: Array[String] val) =>
_env = env
_rows = rows
be pg_copy_ready(session: Session) =>
if _index < _rows.size() then
try
session.send_copy_data(_rows(_index)?.array())
_index = _index + 1
end
else
session.finish_copy()
end
be pg_copy_complete(session: Session, count: USize) =>
_env.out.print("Loaded " + count.string() + " rows")
be pg_copy_failed(session: Session,
failure: (ErrorResponseMessage | ClientQueryError)) =>
_env.out.print("COPY failed")Relationship to Original DesignEverything not mentioned above is unchanged: internal message types ( |
Beta Was this translation helpful? Give feedback.
-
|
Implemented in #112. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Add support for PostgreSQL's COPY ... FROM STDIN protocol for bulk data loading. Users initiate a COPY operation via a dedicated
Session.copy_in()API, feed data chunks throughsend_copy_data(), and signal completion withfinish_copy()or abort withabort_copy().PR type:
changelog - added(single type — new feature, no breaking changes).Background
COPY table FROM STDINis PostgreSQL's bulk loading mechanism — orders of magnitude faster than individual INSERTs. The protocol is fundamentally different from normal queries: after the initial command, the client streams data to the server.Wire Protocol
Backend messages:
'G'):Byte1('G') Int32(length) Int8(format) Int16(num_cols) Int16[](col_formats)format: 0=text, 1=binary (overall format)col_formats: per-column format codes (0=text, 1=binary)Frontend messages:
'd'):Byte1('d') Int32(length) Byte[](data)— arbitrary chunk of copy data; message boundaries do NOT need to align with row boundaries'c'):Byte1('c') Int32(4)— signals successful end of data stream'f'):Byte1('f') Int32(length) String(error_message)— aborts the operationFlow:
On error (malformed data, constraint violation), the server sends ErrorResponse + ReadyForQuery. After the server sends ErrorResponse, it ignores any further CopyData/CopyDone/CopyFail messages.
Current State
No COPY support exists.
CopyInResponse('G') falls through the parser to_UnsupportedMessageand is silently consumed. If a user sends a COPY FROM STDIN query viasession.execute(), the session hangs — the server enters COPY mode waiting for data, while the driver waits for CommandComplete, and neither side makes progress.Design
Public API
New behaviors on
Session:New receiver interface:
Why a Separate API
COPY IN has fundamentally different semantics from regular queries: the client streams data to the server instead of receiving results. Using
session.execute()would be ambiguous — the user would have no callback to know when to start sending data, and the result delivery (row count vs result set) is different. A dedicatedcopy_in+CopyInReceiverinterface follows "distinct semantics deserve distinct representations."Data Format
The driver sends raw bytes via
send_copy_data(). The user is responsible for formatting data in the appropriate format (text or binary). For the default text format, this means tab-delimited columns, newline-terminated rows,\Nfor NULLs. This follows "it is easier to give than take away" — convenience helpers for text format encoding can be added later without changing the core API.Internal Message Types
_backend_messages.pony— new message:Add
_CopyInResponseMessageto_ResponseParserResultunion.Parser Change
Add to
_response_parser.pony:Frontend Messages
Add to
_frontend_message.pony:State Machine Changes
Queue item:
Add
_QueuedCopyInto_QueueItemunion.Session state interface (
_SessionState):Add four new methods:
copy_infollows theexecute/preparepattern — each pre-auth state must callreceiver.pg_copy_failedwith the appropriateClientQueryError:_SessionUnopened:receiver.pg_copy_failed(s, SessionNeverOpened)_SessionClosed:receiver.pg_copy_failed(s, SessionClosed)_SessionSSLNegotiating,_SessionConnected,_SessionSCRAMAuthenticating:receiver.pg_copy_failed(s, SessionNotAuthenticated)send_copy_data,finish_copy,abort_copydefault to no-op in_ConnectedStateand_UnconnectedState(likecancel)._SessionSSLNegotiatingalso needs explicit no-op implementations for these three methods since it mixes in neither trait.on_copy_in_responsedefaults to_IllegalState()in_NotAuthenticated(like other post-auth callbacks)._SessionLoggedIn:copy_inqueues a_QueuedCopyInand tries to run. The three data methods (send_copy_data,finish_copy,abort_copy) checkquery_stateand delegate only to_CopyInInFlight:_QueryReadydispatch:COPY FROM STDIN is sent as a regular simple query. The server recognizes it and responds with CopyInResponse.
New query state:
_CopyInInFlightCallbacks:
on_copy_in_responseli.query_queue(0)? as _QueuedCopyIn. Callreceiver.pg_copy_ready(s).on_error_response_error = true. Access receiver viali.query_queue(0)? as _QueuedCopyIn. Callreceiver.pg_copy_failed(s, msg).on_command_completemsg.valuein_complete_counton_ready_for_queryli.query_queue(0)? as _QueuedCopyIn, callreceiver.pg_copy_complete(s, _complete_count). Shift queue. Ifmsg.idle(), transition to_QueryReadyandtry_run_query; else transition to_QueryNotReady.on_data_rowshutdown(protocol anomaly)on_row_descriptionshutdown(protocol anomaly)on_empty_query_responseshutdown(protocol anomaly)send_copy_datas._connection().send(_FrontendMessage.copy_data(data))finish_copys._connection().send(_FrontendMessage.copy_done())abort_copys._connection().send(_FrontendMessage.copy_fail(reason))_QueryStateinterface changes:Add
on_copy_in_responseto the_QueryStateinterface. Default in_QueryNoQueryInFlight:li.shutdown(s)(protocol anomaly — CopyInResponse without a query in flight). Existing in-flight states (_SimpleQueryInFlight,_ExtendedQueryInFlight,_PrepareInFlight,_CloseStatementInFlight) must also implementon_copy_in_responseasli.shutdown(s)— receiving CopyInResponse during any other operation is a protocol anomaly.The
send_copy_data,finish_copy,abort_copymethods are NOT on_QueryState— they're handled by_SessionLoggedInvia direct type matching on the query state (following thecancelpattern).Query Cancellation
cancel()works automatically during COPY IN with no changes needed. The existing_SessionLoggedIn.cancel()fires for any in-flight query state (anything that isn't_QueryReadyor_QueryNotReady). During COPY IN, theCancelRequesttells the server to abort the operation; the server then sendsErrorResponse + ReadyForQuery, which_CopyInInFlight.on_error_responseandon_ready_for_queryhandle correctly.Shutdown Handling
_SessionLoggedIn.on_shutdownneeds to drain_QueuedCopyInitems:Router Change
Add to
_response_message_parser.pony:Files Changed
postgres/copy_in_receiver.pony—CopyInReceiverinterfacepostgres/session.pony— addcopy_in,send_copy_data,finish_copy,abort_copybehaviors onSession; addcopy_in,send_copy_data,finish_copy,abort_copy,on_copy_in_responseto_SessionStateinterface; addcopy_inwithpg_copy_failedcallbacks to_SessionUnopened,_SessionClosed,_SessionSSLNegotiating,_SessionConnected,_SessionSCRAMAuthenticating; addsend_copy_data/finish_copy/abort_copyno-ops to_ConnectedState,_UnconnectedState, and_SessionSSLNegotiating; addon_copy_in_responsedefault_IllegalState()to_NotAuthenticated; add_QueuedCopyInqueue item; add_CopyInInFlightquery state; update_SessionLoggedIn(queue dispatch, COPY methods, shutdown drain); addon_copy_in_responseto_QueryStateinterface,_QueryNoQueryInFlighttrait, and all existing in-flight states (_SimpleQueryInFlight,_ExtendedQueryInFlight,_PrepareInFlight,_CloseStatementInFlight) asli.shutdown(s)postgres/_backend_messages.pony— add_CopyInResponseMessage; add to_ResponseParserResultpostgres/_response_parser.pony— parse CopyInResponse ('G')postgres/_response_message_parser.pony— route_CopyInResponseMessagepostgres/_frontend_message.pony— addcopy_data,copy_done,copy_failbuilderspostgres/_test_copy_in.pony— mock server COPY IN tests and pre-auth state testspostgres/_test_frontend_message.pony— unit tests for new frontend messagespostgres/_test_response_parser.pony— parser unit test for CopyInResponsepostgres/_test_query.pony— COPY IN integration testspostgres/_test.pony— register new test classesexamples/copy-in/copy-in-example.pony— COPY IN example programCLAUDE.md— update architecture, query execution flow, public API types, file layout, supported features, test organizationTests
Frontend Message Unit Tests
In
_test_frontend_message.pony:_TestFrontendMessageCopyData— verify CopyData wire format: type byte'd', length, data payload_TestFrontendMessageCopyDone— verify CopyDone wire format: type byte'c', length = 4_TestFrontendMessageCopyFail— verify CopyFail wire format: type byte'f', length, null-terminated error stringParser Unit Test
In
_test_response_parser.pony:_TestResponseParserCopyInResponseMessage— parse CopyInResponse bytes with text format and multiple column format codes; verify fields.Unit Tests (Mock Servers)
New test file:
postgres/_test_copy_in.ponyMock port assignments: 7688, 7689, 7690, 7691
_TestCopyInSuccess(port 7688) — mock server authenticates, becomes ready. Client sendscopy_in. Server responds with CopyInResponse. Client sends two CopyData chunks + CopyDone. Server verifies CopyDone received, responds with CommandComplete("COPY 2") + ReadyForQuery. Client verifiespg_copy_readyandpg_copy_complete(2)callbacks._TestCopyInAbort(port 7689) — mock server authenticates, becomes ready. Client sendscopy_in. Server responds with CopyInResponse. Client sends CopyFail. Server responds with ErrorResponse + ReadyForQuery. Client verifiespg_copy_readyandpg_copy_failedcallbacks._TestCopyInServerError(port 7690) — mock server authenticates, becomes ready. Client sendscopy_in. Server responds with CopyInResponse. Client sends CopyData. Server responds with ErrorResponse (constraint violation) + ReadyForQuery. Client verifiespg_copy_failedfires and session remains usable (send a follow-up query)._TestCopyInShutdownDrainsCopyQueue(port 7691) — mock server authenticates but never becomes ready. Client queuescopy_in. Close session. Verifypg_copy_failedwithSessionClosed. Follows existing patterns_TestUnansweredQueriesFailOnShutdownand_TestPrepareShutdownDrainsPrepareQueue.Pre-Auth State Tests
In
_test_copy_in.pony(no mock server needed):_TestCopyInAfterSessionClosed— connect and authenticate, then close session. Callcopy_in. Verifypg_copy_failedwithSessionClosed. FollowsQuery/AfterSessionHasBeenClosedpattern.Integration Tests
Added to
postgres/_test_query.pony:_TestCopyInInsert— create a temp table, COPY 3 rows of tab-delimited text data, verify row count via SELECT._TestCopyInAbortRollback— create a temp table, start COPY, send some data, abort withabort_copy, verify table remains empty.Existing Tests
No regressions expected. The new
'G'parser case handles a message type that previously fell through to_UnsupportedMessage. The new queue item and query state are additive. Theon_copy_in_responsecallback is added to interfaces with appropriate defaults.Example
New example:
examples/copy-in/copy-in-example.ponyDemonstrates:
executecopy_into bulk load text-format dataBeta Was this translation helpful? Give feedback.
All reactions