-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwritebatch_test.go
More file actions
276 lines (227 loc) · 7.15 KB
/
writebatch_test.go
File metadata and controls
276 lines (227 loc) · 7.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
package mariadb
import (
"database/sql"
"testing"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/mevdschee/tqdbproxy/cache"
"github.com/mevdschee/tqdbproxy/config"
"github.com/mevdschee/tqdbproxy/replica"
)
func TestWriteBatchIntegration(t *testing.T) {
// This test requires a running MariaDB backend
// Skip if not available
t.Skip("Integration test requires running MariaDB backend - run manually when testing")
// Create a proxy with write batching enabled
pcfg := config.ProxyConfig{
Listen: ":13307", // Use different port to avoid conflicts
Default: "main",
Backends: map[string]config.BackendConfig{
"main": {
Primary: "127.0.0.1:3306",
},
},
WriteBatch: config.WriteBatchConfig{
MaxBatchSize: 100,
},
}
pools := map[string]*replica.Pool{
"main": replica.NewPool(
"127.0.0.1:3306",
[]string{},
),
}
c, err := cache.New(cache.DefaultCacheConfig())
if err != nil {
t.Fatalf("Failed to create cache: %v", err)
}
proxy := New(pcfg, pools, c)
// Start the proxy
if err := proxy.Start(); err != nil {
t.Fatalf("Failed to start proxy: %v", err)
}
defer proxy.Stop()
// Give the proxy time to start
time.Sleep(100 * time.Millisecond)
// Connect to the proxy
db, err := sql.Open("mysql", "tqdbproxy:tqdbproxy@tcp(127.0.0.1:13307)/tqdbproxy")
if err != nil {
t.Fatalf("Failed to connect to proxy: %v", err)
}
defer db.Close()
// Create a test table
_, err = db.Exec("CREATE TABLE IF NOT EXISTS test_batch (id INT PRIMARY KEY AUTO_INCREMENT, value VARCHAR(255))")
if err != nil {
t.Fatalf("Failed to create table: %v", err)
}
defer db.Exec("DROP TABLE IF EXISTS test_batch")
// Test 1: Single write should be batched
t.Run("SingleWrite", func(t *testing.T) {
result, err := db.Exec("INSERT INTO test_batch (value) VALUES ('test1')")
if err != nil {
t.Fatalf("INSERT failed: %v", err)
}
rows, err := result.RowsAffected()
if err != nil {
t.Fatalf("RowsAffected failed: %v", err)
}
if rows != 1 {
t.Errorf("Expected 1 row affected, got %d", rows)
}
lastID, err := result.LastInsertId()
if err != nil {
t.Fatalf("LastInsertId failed: %v", err)
}
if lastID == 0 {
t.Error("Expected non-zero last insert ID")
}
})
// Test 2: Multiple concurrent writes should be batched together
t.Run("ConcurrentWrites", func(t *testing.T) {
const numWrites = 10
errChan := make(chan error, numWrites)
// Send concurrent writes
for i := 0; i < numWrites; i++ {
go func(idx int) {
_, err := db.Exec("INSERT INTO test_batch (value) VALUES (?)", idx)
errChan <- err
}(i)
}
// Wait for all writes to complete
for i := 0; i < numWrites; i++ {
if err := <-errChan; err != nil {
t.Errorf("Concurrent write %d failed: %v", i, err)
}
}
// Verify all rows were inserted
var count int
err := db.QueryRow("SELECT COUNT(*) FROM test_batch").Scan(&count)
if err != nil {
t.Fatalf("SELECT COUNT failed: %v", err)
}
expected := numWrites + 1 // +1 from SingleWrite test
if count < expected {
t.Errorf("Expected at least %d rows, got %d", expected, count)
}
})
// Test 3: Writes in transaction should NOT be batched
t.Run("TransactionWrites", func(t *testing.T) {
tx, err := db.Begin()
if err != nil {
t.Fatalf("BEGIN failed: %v", err)
}
defer tx.Rollback()
// This should execute immediately, not batched
result, err := tx.Exec("INSERT INTO test_batch (value) VALUES ('in-transaction')")
if err != nil {
t.Fatalf("INSERT in transaction failed: %v", err)
}
rows, err := result.RowsAffected()
if err != nil {
t.Fatalf("RowsAffected failed: %v", err)
}
if rows != 1 {
t.Errorf("Expected 1 row affected, got %d", rows)
}
// Rollback to clean up
tx.Rollback()
// Verify the row was NOT committed (should be rolled back)
var count int
err = db.QueryRow("SELECT COUNT(*) FROM test_batch WHERE value = 'in-transaction'").Scan(&count)
if err != nil {
t.Fatalf("SELECT COUNT failed: %v", err)
}
if count != 0 {
t.Errorf("Expected 0 rows (rolled back), got %d", count)
}
})
// Test 4: Non-batchable writes (UPDATE) should still work
t.Run("UpdateWrite", func(t *testing.T) {
// First insert a row to update
result, err := db.Exec("INSERT INTO test_batch (value) VALUES ('to-update')")
if err != nil {
t.Fatalf("INSERT failed: %v", err)
}
lastID, _ := result.LastInsertId()
// UPDATE queries are not batchable (different WHERE clauses)
result, err = db.Exec("UPDATE test_batch SET value = 'updated' WHERE id = ?", lastID)
if err != nil {
t.Fatalf("UPDATE failed: %v", err)
}
rows, err := result.RowsAffected()
if err != nil {
t.Fatalf("RowsAffected failed: %v", err)
}
if rows != 1 {
t.Errorf("Expected 1 row affected, got %d", rows)
}
})
}
func TestWriteBatchManagerLifecycle(t *testing.T) {
// Test that writebatch manager is properly initialized and cleaned up
pcfg := config.ProxyConfig{
Listen: ":13308",
Default: "main",
Backends: map[string]config.BackendConfig{
"main": {Primary: "127.0.0.1:3306"},
},
WriteBatch: config.WriteBatchConfig{
MaxBatchSize: 1000,
},
}
pools := map[string]*replica.Pool{
"main": replica.NewPool("127.0.0.1:3306", []string{}),
}
c, err := cache.New(cache.DefaultCacheConfig())
if err != nil {
t.Fatalf("Failed to create cache: %v", err)
}
proxy := New(pcfg, pools, c)
// Check that context and cancel func were created
if proxy.wbCtx == nil {
t.Error("wbCtx should be initialized")
}
if proxy.wbCancel == nil {
t.Error("wbCancel should be initialized")
}
// writeBatch manager should be nil until Start() is called
if proxy.writeBatch != nil {
t.Error("writeBatch should be nil before Start()")
}
// Start should fail if backend is not available (but that's ok for this test)
// We're just testing the lifecycle, not the actual connection
_ = proxy.Start()
// After Start, writeBatch should be initialized if backend connected
// (Will be nil if connection failed, but that's expected without a real backend)
// Stop should clean up without error
if err := proxy.Stop(); err != nil {
t.Errorf("Stop() failed: %v", err)
}
}
// Benchmark write batching vs immediate execution
func BenchmarkWriteBatchVsImmediate(b *testing.B) {
b.Skip("Benchmark requires running MariaDB backend - run manually when testing")
// This benchmark compares batched vs unbatched write performance
// Would need actual backend connection to run
}
// Test that batching respects the batch key
func TestBatchKeyGrouping(t *testing.T) {
// Unit test: verify that queries with the same batch key are grouped together
// This is tested in parser_test.go for GetBatchKey()
// And in writebatch/manager_test.go for actual batching behavior
// Here we would test the integration: multiple clients sending same INSERT
// should be grouped into a single batch
}
// Test batching configuration
func TestWriteBatchConfig(t *testing.T) {
// Test that the batching config is correctly set
pcfg := config.ProxyConfig{
WriteBatch: config.WriteBatchConfig{
MaxBatchSize: 1000,
},
}
// Verify config is correctly translated
if pcfg.WriteBatch.MaxBatchSize != 1000 {
t.Errorf("Expected MaxBatchSize 1000, got %d", pcfg.WriteBatch.MaxBatchSize)
}
}