Skip to content

Commit fe2792a

Browse files
rocwangtim-smart
andauthored
Fix query result streaming in @effect/sql-pg (#5758)
Co-authored-by: Tim Smart <[email protected]>
1 parent 1deeb6a commit fe2792a

File tree

3 files changed

+25
-4
lines changed

3 files changed

+25
-4
lines changed

.changeset/many-spies-raise.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/sql-pg": patch
3+
---
4+
5+
Fix query result streaming in @effect/sql-pg

packages/sql-pg/src/PgClient.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -289,10 +289,10 @@ export const make = (
289289
// eslint-disable-next-line @typescript-eslint/no-this-alias
290290
const self = this
291291
return Effect.gen(function*() {
292-
const cursor = yield* Effect.acquireRelease(
293-
Effect.sync(() => (self.pg ?? pool).query(new Cursor(sql, params as any))),
294-
(cursor) => Effect.sync(() => cursor.close())
295-
)
292+
const scope = yield* Effect.scope
293+
const client = self.pg ?? (yield* reserveRaw)
294+
yield* Scope.addFinalizer(scope, Effect.promise(() => cursor.close()))
295+
const cursor = client.query(new Cursor(sql, params as any))
296296
const pull = Effect.async<Chunk.Chunk<any>, Option.Option<SqlError>>((resume) => {
297297
cursor.read(128, (err, rows) => {
298298
if (err) {

packages/sql-pg/test/Client.test.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import * as SqlClient from "@effect/sql/SqlClient"
33
import * as Statement from "@effect/sql/Statement"
44
import { assert, expect, it } from "@effect/vitest"
55
import { Effect, Redacted, String } from "effect"
6+
import * as Chunk from "effect/Chunk"
7+
import * as Stream from "effect/Stream"
68
import * as TestServices from "effect/TestServices"
79
import { parse as parsePgConnectionString } from "pg-connection-string"
810
import { PgContainer } from "./utils.js"
@@ -237,6 +239,20 @@ it.layer(PgContainer.ClientLive, { timeout: "30 seconds" })("PgClient", (it) =>
237239
const rows = yield* sql<{ json: unknown }>`select ${{ testValue: 123 }}::jsonb as json`
238240
expect(rows[0].json).toEqual({ testValue: 123 })
239241
}))
242+
243+
it.effect("stream", () =>
244+
Effect.gen(function*() {
245+
const sql = yield* SqlClient.SqlClient
246+
const rows = yield* sql`SELECT generate_series(1, 3)`.stream.pipe(
247+
Stream.runCollect,
248+
Effect.map(Chunk.toReadonlyArray)
249+
)
250+
expect(rows).toEqual([
251+
{ "generate_series": 1 },
252+
{ "generate_series": 2 },
253+
{ "generate_series": 3 }
254+
])
255+
}))
240256
})
241257

242258
it.layer(PgContainer.ClientTransformLive, { timeout: "30 seconds" })("PgClient transforms", (it) => {

0 commit comments

Comments
 (0)