Skip to content

Commit ef7162c

Browse files
committed
Add COPY IN support
Implement PostgreSQL's COPY ... FROM STDIN protocol for bulk data loading. The pull-based CopyInReceiver interface gives callers O(1) bounded memory: the session calls pg_copy_ready after each send_copy_data, so the client controls data flow. Session API: copy_in(sql, receiver), send_copy_data(data), finish_copy(), abort_copy(reason). CopyInReceiver callbacks: pg_copy_ready, pg_copy_complete, pg_copy_failed. The COPY query is sent via the simple query protocol. The server responds with CopyInResponse, then the client streams CopyData messages, finishing with CopyDone (success) or CopyFail (abort). The server responds with CommandComplete + ReadyForQuery on success, or ErrorResponse + ReadyForQuery on failure. New query state _CopyInInFlight manages the lifecycle. Data-stream methods (send_copy_data, finish_copy, abort_copy) use direct type matching on query_state (following the cancel pattern) rather than going through the _QueryState interface, since they are only valid during an active COPY. Design: #104
1 parent 4768b00 commit ef7162c

15 files changed

+1637
-12
lines changed

CLAUDE.md

Lines changed: 24 additions & 11 deletions
Large diffs are not rendered by default.

examples/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ Multi-query workflow mixing `SimpleQuery` and `PreparedQuery`. Creates a table,
3030

3131
Asynchronous notifications using PostgreSQL's LISTEN/NOTIFY mechanism. Subscribes to a channel with `LISTEN`, sends a notification with `NOTIFY`, receives it via the `pg_notification` callback on `SessionStatusNotify`, and unsubscribes with `UNLISTEN`. Shows the `Notification` class fields (channel, payload, pid).
3232

33+
## copy-in
34+
35+
Bulk data loading using `COPY ... FROM STDIN`. Creates a table, loads three rows of tab-delimited text data via `Session.copy_in()`, verifies the data with a SELECT, then drops the table. Demonstrates the pull-based `CopyInReceiver` interface: `pg_copy_ready` fires after each `send_copy_data`, and `finish_copy` completes the operation.
36+
3337
## transaction-status
3438

3539
Transaction status tracking using `pg_transaction_status`. Sends `BEGIN` and `COMMIT` and prints the `TransactionStatus` reported at each step. Shows how `SessionStatusNotify.pg_transaction_status` fires on every `ReadyForQuery` with `TransactionIdle`, `TransactionInBlock`, or `TransactionFailed`.
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
use "cli"
2+
use "collections"
3+
use lori = "lori"
4+
// in your code this `use` statement would be:
5+
// use "postgres"
6+
use "../../postgres"
7+
8+
actor Main
9+
new create(env: Env) =>
10+
let server_info = ServerInfo(env.vars)
11+
let auth = lori.TCPConnectAuth(env.root)
12+
13+
let client = Client(auth, server_info, env.out)
14+
15+
// This example demonstrates COPY IN for bulk data loading. It creates a table,
16+
// uses COPY FROM STDIN to load three rows of tab-delimited text data, verifies
17+
// the data with a SELECT query, then drops the table.
18+
//
19+
// The COPY IN protocol uses a pull-based flow: the session calls pg_copy_ready
20+
// after each send_copy_data, letting the client send the next chunk. When all
21+
// data is sent, call finish_copy to complete the operation.
22+
actor Client is (SessionStatusNotify & ResultReceiver & CopyInReceiver)
23+
let _session: Session
24+
let _out: OutStream
25+
var _phase: USize = 0
26+
var _rows_sent: USize = 0
27+
28+
new create(auth: lori.TCPConnectAuth, info: ServerInfo, out: OutStream) =>
29+
_out = out
30+
_session = Session(
31+
ServerConnectInfo(auth, info.host, info.port),
32+
DatabaseConnectInfo(info.username, info.password, info.database),
33+
this)
34+
35+
be close() =>
36+
_session.close()
37+
38+
be pg_session_authenticated(session: Session) =>
39+
_out.print("Authenticated.")
40+
_phase = 0
41+
session.execute(
42+
SimpleQuery("DROP TABLE IF EXISTS copy_in_example"), this)
43+
44+
be pg_session_authentication_failed(
45+
s: Session,
46+
reason: AuthenticationFailureReason)
47+
=>
48+
_out.print("Failed to authenticate.")
49+
50+
be pg_copy_ready(session: Session) =>
51+
_rows_sent = _rows_sent + 1
52+
if _rows_sent <= 3 then
53+
// Send one row per callback. Tab-delimited, newline-terminated.
54+
let row: Array[U8] val = recover val
55+
("row" + _rows_sent.string() + "\t" + (_rows_sent * 10).string()
56+
+ "\n").array()
57+
end
58+
_out.print(" Sending row " + _rows_sent.string() + "...")
59+
session.send_copy_data(row)
60+
else
61+
_out.print(" All rows sent. Finishing COPY...")
62+
session.finish_copy()
63+
end
64+
65+
be pg_copy_complete(session: Session, count: USize) =>
66+
_out.print("COPY complete: " + count.string() + " rows copied.")
67+
// Verify with a SELECT
68+
_out.print("Verifying with SELECT...")
69+
_session.execute(
70+
SimpleQuery("SELECT name, value FROM copy_in_example ORDER BY name"),
71+
this)
72+
73+
be pg_copy_failed(session: Session,
74+
failure: (ErrorResponseMessage | ClientQueryError))
75+
=>
76+
match failure
77+
| let e: ErrorResponseMessage =>
78+
_out.print("COPY failed: [" + e.severity + "] " + e.code + ": "
79+
+ e.message)
80+
| let e: ClientQueryError =>
81+
_out.print("COPY failed: client error")
82+
end
83+
close()
84+
85+
be pg_query_result(session: Session, result: Result) =>
86+
_phase = _phase + 1
87+
88+
match _phase
89+
| 1 =>
90+
// Table dropped (or didn't exist). Create it.
91+
_out.print("Creating table...")
92+
_session.execute(
93+
SimpleQuery(
94+
"""
95+
CREATE TABLE copy_in_example (
96+
name VARCHAR(50) NOT NULL,
97+
value INT NOT NULL
98+
)
99+
"""),
100+
this)
101+
| 2 =>
102+
// Table created. Start COPY IN.
103+
_out.print("Table created. Starting COPY IN...")
104+
_session.copy_in(
105+
"COPY copy_in_example (name, value) FROM STDIN", this)
106+
| 3 =>
107+
// SELECT done. Print results and drop table.
108+
match result
109+
| let r: ResultSet =>
110+
_out.print("ResultSet (" + r.rows().size().string() + " rows):")
111+
for row in r.rows().values() do
112+
_out.write(" ")
113+
for field in row.fields.values() do
114+
_out.write(" " + field.name + "=")
115+
match field.value
116+
| let v: String => _out.write(v)
117+
| let v: I32 => _out.write(v.string())
118+
| None => _out.write("NULL")
119+
end
120+
end
121+
_out.print("")
122+
end
123+
end
124+
_out.print("Dropping table...")
125+
_session.execute(
126+
SimpleQuery("DROP TABLE copy_in_example"), this)
127+
| 4 =>
128+
// Table dropped. Done.
129+
_out.print("Done.")
130+
close()
131+
end
132+
133+
be pg_query_failed(session: Session, query: Query,
134+
failure: (ErrorResponseMessage | ClientQueryError))
135+
=>
136+
match failure
137+
| let e: ErrorResponseMessage =>
138+
_out.print("Query failed: [" + e.severity + "] " + e.code + ": "
139+
+ e.message)
140+
| let e: ClientQueryError =>
141+
_out.print("Query failed: client error")
142+
end
143+
close()
144+
145+
class val ServerInfo
146+
let host: String
147+
let port: String
148+
let username: String
149+
let password: String
150+
let database: String
151+
152+
new val create(vars: (Array[String] val | None)) =>
153+
let e = EnvVars(vars)
154+
host = try e("POSTGRES_HOST")? else "127.0.0.1" end
155+
port = try e("POSTGRES_PORT")? else "5432" end
156+
username = try e("POSTGRES_USERNAME")? else "postgres" end
157+
password = try e("POSTGRES_PASSWORD")? else "postgres" end
158+
database = try e("POSTGRES_DATABASE")? else "postgres" end

postgres/_backend_messages.pony

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,18 @@ class val _NotificationResponseMessage
169169
channel = channel'
170170
payload = payload'
171171

172+
class val _CopyInResponseMessage
173+
"""
174+
Message from the backend indicating it is ready to receive COPY data.
175+
Contains the overall format (0=text, 1=binary) and per-column format codes.
176+
"""
177+
let format: U8
178+
let column_formats: Array[U8] val
179+
180+
new val create(format': U8, column_formats': Array[U8] val) =>
181+
format = format'
182+
column_formats = column_formats'
183+
172184
primitive _PortalSuspendedMessage
173185
"""
174186
Message from the backend indicating that an Execute command has been

postgres/_frontend_message.pony

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,79 @@ primitive _FrontendMessage
464464
[]
465465
end
466466

467+
fun copy_data(data: Array[U8] val): Array[U8] val =>
468+
"""
469+
Build a CopyData message containing a chunk of COPY stream data.
470+
471+
Format: Byte1('d') Int32(4 + data.size()) Byte[](data)
472+
"""
473+
try
474+
recover val
475+
let length: U32 = 4 + data.size().u32()
476+
let msg_size = (length + 1).usize()
477+
let msg: Array[U8] = Array[U8].init(0, msg_size)
478+
msg.update_u8(0, 'd')?
479+
ifdef bigendian then
480+
msg.update_u32(1, length)?
481+
else
482+
msg.update_u32(1, length.bswap())?
483+
end
484+
msg.copy_from(data, 0, 5, data.size())
485+
msg
486+
end
487+
else
488+
_Unreachable()
489+
[]
490+
end
491+
492+
fun copy_done(): Array[U8] val =>
493+
"""
494+
Build a CopyDone message signaling successful end of COPY data.
495+
496+
Format: Byte1('c') Int32(4)
497+
"""
498+
try
499+
recover val
500+
let msg: Array[U8] = Array[U8].init(0, 5)
501+
msg.update_u8(0, 'c')?
502+
ifdef bigendian then
503+
msg.update_u32(1, U32(4))?
504+
else
505+
msg.update_u32(1, U32(4).bswap())?
506+
end
507+
msg
508+
end
509+
else
510+
_Unreachable()
511+
[]
512+
end
513+
514+
fun copy_fail(reason: String): Array[U8] val =>
515+
"""
516+
Build a CopyFail message aborting the COPY operation.
517+
518+
Format: Byte1('f') Int32(4 + reason.size() + 1) String(reason)
519+
"""
520+
try
521+
recover val
522+
let length: U32 = reason.size().u32() + 5
523+
let msg_size = (length + 1).usize()
524+
let msg: Array[U8] = Array[U8].init(0, msg_size)
525+
msg.update_u8(0, 'f')?
526+
ifdef bigendian then
527+
msg.update_u32(1, length)?
528+
else
529+
msg.update_u32(1, length.bswap())?
530+
end
531+
msg.copy_from(reason.array(), 0, 5, reason.size())
532+
// space for null left here
533+
msg
534+
end
535+
else
536+
_Unreachable()
537+
[]
538+
end
539+
467540
fun terminate(): Array[U8] val =>
468541
"""
469542
Build a Terminate message. Sent before closing the TCP connection to

postgres/_message_type.pony

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,8 @@ primitive _MessageType
3737
fun parameter_status(): U8 =>
3838
'S'
3939

40+
fun copy_in_response(): U8 =>
41+
'G'
42+
4043
fun row_description(): U8 =>
4144
'T'

postgres/_response_message_parser.pony

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ primitive _ResponseMessageParser
5555
s.state.on_backend_key_data(s, msg)
5656
| let msg: _NotificationResponseMessage =>
5757
s.state.on_notification(s, msg)
58+
| let msg: _CopyInResponseMessage =>
59+
s.state.on_copy_in_response(s, msg)
5860
| _SkippedMessage =>
5961
// Known async message (ParameterStatus, NoticeResponse) —
6062
// intentionally not routed.

postgres/_response_parser.pony

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type _ResponseParserResult is
2323
| _NoDataMessage
2424
| _ParameterDescriptionMessage
2525
| _NotificationResponseMessage
26+
| _CopyInResponseMessage
2627
| _PortalSuspendedMessage
2728
| _SkippedMessage
2829
| _UnsupportedMessage
@@ -218,6 +219,12 @@ primitive _ResponseParser
218219
// and parse the notification payload in an isolated reader
219220
let notification_payload = buffer.block(payload_size)?
220221
return _notification_response(consume notification_payload)?
222+
| _MessageType.copy_in_response() =>
223+
// Slide past the header...
224+
buffer.skip(5)?
225+
// and parse the CopyInResponse payload in an isolated reader
226+
let copy_payload = buffer.block(payload_size)?
227+
return _copy_in_response(consume copy_payload)?
221228
else
222229
buffer.skip(message_size)?
223230
return _UnsupportedMessage
@@ -383,6 +390,23 @@ primitive _ResponseParser
383390
let payload' = String.from_array(consume payload_bytes)
384391
_NotificationResponseMessage(pid, channel, payload')
385392

393+
fun _copy_in_response(payload: Array[U8] val)
394+
: _CopyInResponseMessage ?
395+
=>
396+
"""
397+
Parse a CopyInResponse message.
398+
"""
399+
let reader: Reader = Reader.>append(payload)
400+
let format = reader.u8()?
401+
let num_cols = reader.u16_be()?.usize()
402+
let col_fmts: Array[U8] iso = recover iso Array[U8](num_cols) end
403+
404+
for i in Range(0, num_cols) do
405+
col_fmts.push(reader.u16_be()?.u8())
406+
end
407+
408+
_CopyInResponseMessage(format, consume col_fmts)
409+
386410
fun _parameter_description(payload: Array[U8] val)
387411
: _ParameterDescriptionMessage ?
388412
=>

postgres/_test.pony

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,17 @@ actor \nodoc\ Main is TestList
137137
test(_TestNotificationDelivery)
138138
test(_TestNotificationDuringDataRows)
139139
test(_TestListenNotify)
140+
test(_TestResponseParserCopyInResponseMessage)
141+
test(_TestFrontendMessageCopyData)
142+
test(_TestFrontendMessageCopyDone)
143+
test(_TestFrontendMessageCopyFail)
144+
test(_TestCopyInSuccess)
145+
test(_TestCopyInAbort)
146+
test(_TestCopyInServerError)
147+
test(_TestCopyInShutdownDrainsCopyQueue)
148+
test(_TestCopyInAfterSessionClosed)
149+
test(_TestCopyInInsert)
150+
test(_TestCopyInAbortRollback)
140151

141152
class \nodoc\ iso _TestAuthenticate is UnitTest
142153
"""

0 commit comments

Comments
 (0)