-
Notifications
You must be signed in to change notification settings - Fork 39
Expand file tree
/
Copy pathcommand_describe_test.go
More file actions
150 lines (116 loc) · 4.65 KB
/
command_describe_test.go
File metadata and controls
150 lines (116 loc) · 4.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package wire
import (
"bytes"
"context"
"testing"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jeroenrinzema/psql-wire/pkg/buffer"
"github.com/jeroenrinzema/psql-wire/pkg/mock"
"github.com/jeroenrinzema/psql-wire/pkg/types"
"github.com/neilotoole/slogt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestHandleDescribe_ParallelPipeline_StatementSuccess verifies that successful describe statement enqueues the right event
func TestHandleDescribe_ParallelPipeline_StatementSuccess(t *testing.T) {
t.Parallel()
ctx := context.Background()
logger := slogt.New(t)
statements := &DefaultStatementCache{}
stmt := NewStatement(
func(ctx context.Context, writer DataWriter, parameters []Parameter) error { return nil },
WithParameters([]uint32{pgtype.Int4OID}),
WithColumns(Columns{{Name: "col1", Oid: pgtype.Int4OID}}),
)
require.NoError(t, statements.Set(ctx, "test_stmt", stmt))
session := &Session{
Server: &Server{logger: logger},
Statements: statements,
ParallelPipeline: ParallelPipelineConfig{Enabled: true},
ResponseQueue: NewResponseQueue(),
}
reader := mock.NewDescribeReader(t, logger, types.DescribeStatement, "test_stmt")
outBuf := &bytes.Buffer{}
writer := buffer.NewWriter(logger, outBuf)
err := session.handleDescribe(ctx, reader, writer)
require.NoError(t, err)
// In parallel pipeline mode, nothing should be written to the wire immediately
assert.Equal(t, 0, outBuf.Len(), "parallel pipeline should not write to wire on success")
assert.Equal(t, 1, session.ResponseQueue.Len())
events := session.ResponseQueue.DrainAll()
require.Len(t, events, 1)
event := events[0]
assert.Equal(t, ResponseStmtDescribe, event.Kind)
assert.Equal(t, []uint32{pgtype.Int4OID}, event.Parameters)
assert.Len(t, event.Columns, 1)
assert.Equal(t, "col1", event.Columns[0].Name)
}
// TestHandleDescribe_ParallelPipeline_PortalSuccess verifies that successful describe portal enqueues the right event
func TestHandleDescribe_ParallelPipeline_PortalSuccess(t *testing.T) {
t.Parallel()
ctx := context.Background()
logger := slogt.New(t)
portals := &DefaultPortalCache{}
stmt := NewStatement(
func(ctx context.Context, writer DataWriter, parameters []Parameter) error { return nil },
WithParameters([]uint32{pgtype.Int4OID}),
WithColumns(Columns{{Name: "col1", Oid: pgtype.Int4OID}}),
)
formats := []FormatCode{BinaryFormat}
err := portals.Bind(ctx, "test_portal", &Statement{
parameters: []uint32{},
columns: stmt.columns,
}, []Parameter{}, formats)
require.NoError(t, err)
session := &Session{
Server: &Server{logger: logger},
Portals: portals,
ParallelPipeline: ParallelPipelineConfig{Enabled: true},
ResponseQueue: NewResponseQueue(),
}
reader := mock.NewDescribeReader(t, logger, types.DescribePortal, "test_portal")
outBuf := &bytes.Buffer{}
writer := buffer.NewWriter(logger, outBuf)
err = session.handleDescribe(ctx, reader, writer)
require.NoError(t, err)
// In parallel pipeline mode, nothing should be written to the wire immediately
assert.Equal(t, 0, outBuf.Len(), "parallel pipeline should not write to wire on success")
assert.Equal(t, 1, session.ResponseQueue.Len())
events := session.ResponseQueue.DrainAll()
require.Len(t, events, 1)
event := events[0]
assert.Equal(t, ResponsePortalDescribe, event.Kind)
assert.Len(t, event.Columns, 1)
assert.Equal(t, "col1", event.Columns[0].Name)
assert.Equal(t, formats, event.Formats)
}
// TestHandleDescribe_ParallelPipeline_Error verifies error handling drains the queue
func TestHandleDescribe_ParallelPipeline_Error(t *testing.T) {
t.Parallel()
ctx := context.Background()
logger := slogt.New(t)
session := &Session{
Server: &Server{logger: logger},
Statements: &DefaultStatementCache{statements: map[string]*Statement{"unknown_stmt": nil}},
ParallelPipeline: ParallelPipelineConfig{Enabled: true},
ResponseQueue: NewResponseQueue(),
inExtendedQuery: true,
}
// Enqueue a previous event
session.ResponseQueue.Enqueue(NewParseCompleteEvent())
reader := mock.NewDescribeReader(t, logger, types.DescribeStatement, "unknown_stmt")
outBuf := &bytes.Buffer{}
writer := buffer.NewWriter(logger, outBuf)
err := session.handleDescribe(ctx, reader, writer)
require.NoError(t, err)
assert.Equal(t, 0, session.ResponseQueue.Len())
responseReader := mock.NewReader(t, outBuf)
// 1. ParseComplete
msgType, _, err := responseReader.ReadTypedMsg()
require.NoError(t, err)
assert.Equal(t, types.ServerParseComplete, msgType)
// 2. ErrorResponse
msgType, _, err = responseReader.ReadTypedMsg()
require.NoError(t, err)
assert.Equal(t, types.ServerErrorResponse, msgType)
}