Skip to content

Commit e430e7e

Browse files
Spriteclaude
andcommitted
server: intercept pg_cursors queries for cursor emulation
psycopg's ServerCursor queries pg_cursors to check if a cursor exists before closing it: SELECT 1 FROM pg_cursors WHERE name = 'cursor_name' DuckDB doesn't have the pg_cursors system view, causing a Catalog Error. Since pg_cursors reflects per-connection state, it can't be implemented as a static view - we intercept the query and return synthetic results from the cursor emulation state (c.cursors map). Handles all three query paths: - Simple Query Protocol (handleQuery) - Multi-statement Simple Query (executeSingleStatement) - Extended Query Protocol (handleParse/handleDescribe/handleExecute) Supports both literal and parameterized ($1) cursor name variants, with optional pg_catalog schema prefix. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 0bec3ce commit e430e7e

File tree

1 file changed

+144
-4
lines changed

1 file changed

+144
-4
lines changed

server/conn.go

Lines changed: 144 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,11 @@ var errCancelHandled = errors.New("cancel request handled")
3535
type cursorOp int
3636

3737
const (
38-
cursorOpNone cursorOp = iota
39-
cursorOpDeclare // DECLARE cursor_name CURSOR FOR ...
40-
cursorOpFetch // FETCH [count] FROM cursor_name
41-
cursorOpClose // CLOSE cursor_name / CLOSE ALL
38+
cursorOpNone cursorOp = iota
39+
cursorOpDeclare // DECLARE cursor_name CURSOR FOR ...
40+
cursorOpFetch // FETCH [count] FROM cursor_name
41+
cursorOpClose // CLOSE cursor_name / CLOSE ALL
42+
cursorOpPgCursorsQuery // SELECT ... FROM pg_cursors WHERE name = ...
4243
)
4344

4445
// cursorState holds the state of an emulated server-side cursor.
@@ -722,6 +723,12 @@ func (c *clientConn) handleQuery(body []byte) error {
722723
}
723724
}
724725

726+
// Intercept pg_cursors queries (e.g. psycopg's "SELECT 1 FROM pg_cursors WHERE name = ...").
727+
// DuckDB doesn't have this system view; return synthetic results from cursor emulation state.
728+
if cursorName, _, ok := matchPgCursorsQuery(query); ok {
729+
return c.handlePgCursorsQuery(cursorName)
730+
}
731+
725732
// Passthrough mode: skip all transpilation, send query directly to DuckDB
726733
if c.passthrough {
727734
upperQuery := strings.ToUpper(query)
@@ -1164,6 +1171,19 @@ func (c *clientConn) executeSingleStatement(query string) (errSent bool, fatalEr
11641171
}
11651172
}
11661173

1174+
// Intercept pg_cursors queries
1175+
if cursorName, _, ok := matchPgCursorsQuery(query); ok {
1176+
_, exists := c.cursors[cursorName]
1177+
_ = c.sendPgCursorsRowDescription()
1178+
rowCount := 0
1179+
if exists {
1180+
_ = c.sendDataRowWithFormats([]interface{}{int64(1)}, nil, []int32{23})
1181+
rowCount = 1
1182+
}
1183+
_ = writeCommandComplete(c.writer, fmt.Sprintf("SELECT %d", rowCount))
1184+
return false, nil
1185+
}
1186+
11671187
// Transpile
11681188
tr := c.newTranspiler(false)
11691189
result, err := tr.Transpile(query)
@@ -3270,6 +3290,25 @@ func (c *clientConn) handleParse(body []byte) {
32703290
}
32713291
}
32723292

3293+
// Intercept pg_cursors queries (e.g. psycopg's "SELECT 1 FROM pg_cursors WHERE name = $1").
3294+
// DuckDB doesn't have this system view; return synthetic results from cursor emulation state.
3295+
if cursorName, parameterized, ok := matchPgCursorsQuery(query); ok {
3296+
delete(c.stmts, stmtName)
3297+
ps := &preparedStmt{
3298+
query: query,
3299+
convertedQuery: query,
3300+
cursorOp: cursorOpPgCursorsQuery,
3301+
cursorName: cursorName,
3302+
}
3303+
if parameterized {
3304+
ps.numParams = 1
3305+
ps.paramTypes = []int32{25} // text OID
3306+
}
3307+
c.stmts[stmtName] = ps
3308+
_ = writeParseComplete(c.writer)
3309+
return
3310+
}
3311+
32733312
// Passthrough mode: skip transpilation, store query directly
32743313
if c.passthrough {
32753314
// Count $N parameters with a simple regex (pg_query.Parse may fail on DuckDB-native SQL)
@@ -3484,6 +3523,10 @@ func (c *clientConn) handleDescribe(body []byte) {
34843523
_ = c.sendRowDescription(cols, colTypes)
34853524
ps.described = true
34863525
return
3526+
case cursorOpPgCursorsQuery:
3527+
_ = c.sendPgCursorsRowDescription()
3528+
ps.described = true
3529+
return
34873530
}
34883531

34893532
// For queries that return results, we need to send RowDescription
@@ -3555,6 +3598,10 @@ func (c *clientConn) handleDescribe(body []byte) {
35553598
p.described = true
35563599
_ = c.sendRowDescription(cols, colTypes)
35573600
return
3601+
case cursorOpPgCursorsQuery:
3602+
_ = c.sendPgCursorsRowDescription()
3603+
p.described = true
3604+
return
35583605
}
35593606

35603607
// For queries that don't return results, send NoData
@@ -3635,6 +3682,9 @@ func (c *clientConn) handleExecute(body []byte) {
36353682
case cursorOpClose:
36363683
c.handleCloseCursorExtended(p)
36373684
return
3685+
case cursorOpPgCursorsQuery:
3686+
c.handlePgCursorsQueryExtended(p)
3687+
return
36383688
}
36393689

36403690
// Handle empty queries - PostgreSQL returns EmptyQueryResponse for these
@@ -3937,6 +3987,96 @@ func isFetchForwardOnly(dir pg_query.FetchDirection) bool {
39373987
dir == pg_query.FetchDirection_FETCH_FORWARD
39383988
}
39393989

3990+
// pgCursorsLiteralRegex matches pg_cursors queries with a literal name value:
3991+
// SELECT 1 FROM pg_cursors WHERE name = 'cursor_name'
3992+
// SELECT 1 FROM pg_catalog.pg_cursors WHERE name = 'cursor_name'
3993+
var pgCursorsLiteralRegex = regexp.MustCompile(
3994+
`(?i)^\s*SELECT\s+.+\s+FROM\s+(?:pg_catalog\s*\.\s*)?pg_cursors\s+WHERE\s+name\s*=\s*'([^']*)'`,
3995+
)
3996+
3997+
// pgCursorsParamRegex matches pg_cursors queries with a parameterized name:
3998+
// SELECT 1 FROM pg_cursors WHERE name = $1
3999+
var pgCursorsParamRegex = regexp.MustCompile(
4000+
`(?i)^\s*SELECT\s+.+\s+FROM\s+(?:pg_catalog\s*\.\s*)?pg_cursors\s+WHERE\s+name\s*=\s*\$1\s*$`,
4001+
)
4002+
4003+
// matchPgCursorsQuery checks if a query is a pg_cursors lookup and extracts the cursor name.
4004+
// Returns the cursor name and true for literal queries, empty string and true for parameterized queries.
4005+
func matchPgCursorsQuery(query string) (cursorName string, parameterized bool, ok bool) {
4006+
if !strings.Contains(query, "pg_cursors") {
4007+
return "", false, false
4008+
}
4009+
if m := pgCursorsLiteralRegex.FindStringSubmatch(query); m != nil {
4010+
return m[1], false, true
4011+
}
4012+
if pgCursorsParamRegex.MatchString(query) {
4013+
return "", true, true
4014+
}
4015+
return "", false, false
4016+
}
4017+
4018+
// handlePgCursorsQuery handles SELECT FROM pg_cursors in the Simple Query protocol.
4019+
// Returns a single row with value "1" if the cursor exists, or zero rows if not.
4020+
func (c *clientConn) handlePgCursorsQuery(cursorName string) error {
4021+
_, exists := c.cursors[cursorName]
4022+
4023+
// Send RowDescription: single int4 column named "?column?"
4024+
if err := c.sendPgCursorsRowDescription(); err != nil {
4025+
return err
4026+
}
4027+
4028+
rowCount := 0
4029+
if exists {
4030+
if err := c.sendDataRowWithFormats([]interface{}{int64(1)}, nil, []int32{23}); err != nil {
4031+
return err
4032+
}
4033+
rowCount = 1
4034+
}
4035+
4036+
_ = writeCommandComplete(c.writer, fmt.Sprintf("SELECT %d", rowCount))
4037+
_ = writeReadyForQuery(c.writer, c.txStatus)
4038+
_ = c.writer.Flush()
4039+
return nil
4040+
}
4041+
4042+
// handlePgCursorsQueryExtended handles SELECT FROM pg_cursors in the Extended Query protocol.
4043+
func (c *clientConn) handlePgCursorsQueryExtended(p *portal) {
4044+
// Resolve cursor name: either from literal in query or from bind parameter
4045+
cursorName := p.stmt.cursorName
4046+
if cursorName == "" && len(p.paramValues) > 0 && p.paramValues[0] != nil {
4047+
cursorName = string(p.paramValues[0])
4048+
}
4049+
4050+
_, exists := c.cursors[cursorName]
4051+
4052+
if !p.stmt.described {
4053+
_ = c.sendPgCursorsRowDescription()
4054+
}
4055+
4056+
rowCount := 0
4057+
if exists {
4058+
_ = c.sendDataRowWithFormats([]interface{}{int64(1)}, p.resultFormats, []int32{23})
4059+
rowCount = 1
4060+
}
4061+
4062+
_ = writeCommandComplete(c.writer, fmt.Sprintf("SELECT %d", rowCount))
4063+
}
4064+
4065+
// sendPgCursorsRowDescription sends a RowDescription for a pg_cursors query result (single int4 column).
4066+
func (c *clientConn) sendPgCursorsRowDescription() error {
4067+
var buf bytes.Buffer
4068+
_ = binary.Write(&buf, binary.BigEndian, int16(1)) // 1 column
4069+
buf.WriteString("?column?")
4070+
buf.WriteByte(0)
4071+
_ = binary.Write(&buf, binary.BigEndian, int32(0)) // table OID
4072+
_ = binary.Write(&buf, binary.BigEndian, int16(0)) // column attr
4073+
_ = binary.Write(&buf, binary.BigEndian, int32(23)) // int4 OID
4074+
_ = binary.Write(&buf, binary.BigEndian, int16(4)) // type size
4075+
_ = binary.Write(&buf, binary.BigEndian, int32(-1)) // typmod
4076+
_ = binary.Write(&buf, binary.BigEndian, int16(0)) // text format
4077+
return writeMessage(c.writer, msgRowDescription, buf.Bytes())
4078+
}
4079+
39404080
// handleDeclareCursor handles DECLARE cursor in the Simple Query protocol.
39414081
func (c *clientConn) handleDeclareCursor(stmt *pg_query.DeclareCursorStmt) error {
39424082
innerSQL := deparseInnerQuery(stmt.Query)

0 commit comments

Comments
 (0)