-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmanager.go
More file actions
174 lines (157 loc) · 5.25 KB
/
manager.go
File metadata and controls
174 lines (157 loc) · 5.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
// Package writebatch implements automatic batching of write operations.
// See types.go for package documentation.
package writebatch
import (
"context"
"database/sql"
"fmt"
"log"
"strings"
"sync"
"sync/atomic"
"time"
)
// Manager handles batching of write operations
type Manager struct {
groups sync.Map // map[string]*BatchGroup
config Config
db *sql.DB
closed atomic.Bool
batchCount atomic.Int64
firstInsertIDIsFirst bool // true for MySQL/MariaDB (last_insert_id = first row), false for SQLite (last_insert_rowid = last row)
}
// BatchCount returns the total number of batches executed since the manager was created.
func (m *Manager) BatchCount() int64 {
return m.batchCount.Load()
}
// New creates a new write batch manager
func New(db *sql.DB, config Config) *Manager {
// MySQL/MariaDB return the first auto-generated ID for multi-row INSERTs.
// SQLite (and some other drivers) return the last inserted row's ID instead.
driverType := fmt.Sprintf("%T", db.Driver())
firstIDIsFirst := strings.Contains(strings.ToLower(driverType), "mysql")
return &Manager{
db: db,
config: config,
firstInsertIDIsFirst: firstIDIsFirst,
}
}
// Enqueue adds a write operation to the batch queue and waits for its result.
//
// Parameters:
// - ctx: Context for cancellation
// - batchKey: Key for grouping operations (typically the normalized query)
// - query: The SQL query to execute
// - params: Query parameters
// - batchMs: Maximum wait time in milliseconds (0 = execute immediately)
// - onBatchComplete: Callback invoked when batch executes (receives batch size)
//
// Batching behavior:
// - batchMs == 0: Execute immediately without batching
// - batchMs > 0: Add to batch group, wait up to batchMs for more operations
// - Batch executes when: timer expires OR max_batch_size reached
//
// The function blocks until the operation completes or context is cancelled.
// Each operation receives its individual WriteResult, including:
// - AffectedRows, LastInsertID (for INSERT)
// - BatchSize (number of operations in the batch)
// - Error (if any)
func (m *Manager) Enqueue(ctx context.Context, batchKey, query string, params []interface{}, batchMs int, onBatchComplete func(int)) WriteResult {
hasReturning := hasReturningClause(query)
if m.closed.Load() {
return WriteResult{Error: ErrManagerClosed}
}
// If no wait time specified, execute immediately (no batching)
if batchMs == 0 {
result := m.executeImmediate(ctx, query, params)
// Call callback even for immediate execution
if onBatchComplete != nil {
onBatchComplete(result.BatchSize)
}
return result
}
req := &WriteRequest{
Query: query,
Params: params,
ResultChan: make(chan WriteResult, 1),
EnqueuedAt: time.Now(),
OnBatchComplete: onBatchComplete,
HasReturning: hasReturning,
}
// Get or create batch group
groupInterface, loaded := m.groups.Load(batchKey)
if !loaded {
// Group doesn't exist, create it
newGroup := &BatchGroup{
BatchKey: batchKey,
Requests: make([]*WriteRequest, 0, m.config.MaxBatchSize),
FirstSeen: time.Now(),
}
groupInterface, loaded = m.groups.LoadOrStore(batchKey, newGroup)
}
group := groupInterface.(*BatchGroup)
group.mu.Lock()
isFirst := len(group.Requests) == 0
if group.Requests == nil {
// Group has been processed, this shouldn't happen but handle it
group.mu.Unlock()
// Retry with a fresh lookup
return m.Enqueue(ctx, batchKey, query, params, batchMs, onBatchComplete)
}
group.Requests = append(group.Requests, req)
currentSize := len(group.Requests)
if isFirst {
// First request - start timer with specified delay
delay := time.Duration(batchMs) * time.Millisecond
group.timer = time.AfterFunc(delay, func() {
m.executeBatch(batchKey, group)
})
group.mu.Unlock()
} else if currentSize >= m.config.MaxBatchSize {
// Batch full - execute immediately
timer := group.timer
// Delete group from map so new requests create a fresh batch
m.groups.Delete(batchKey)
group.mu.Unlock()
if timer != nil {
timer.Stop()
}
go m.executeBatch(batchKey, group)
} else {
group.mu.Unlock()
}
// Wait for result
select {
case result := <-req.ResultChan:
return result
case <-ctx.Done():
return WriteResult{Error: ctx.Err()}
}
}
// executeImmediate executes a query immediately without batching
func (m *Manager) executeImmediate(ctx context.Context, query string, params []interface{}) WriteResult {
result, err := m.db.ExecContext(ctx, query, params...)
if err != nil {
log.Printf("[WriteBatch] executeImmediate ERROR: %v", err)
return WriteResult{Error: err}
}
affected, _ := result.RowsAffected()
lastID, _ := result.LastInsertId()
return WriteResult{
AffectedRows: affected,
LastInsertID: lastID,
}
}
// Close shuts down the manager and waits for in-flight batches
func (m *Manager) Close() error {
m.closed.Store(true)
// Wait a moment for in-flight batches to complete
time.Sleep(200 * time.Millisecond)
return nil
}
// hasReturningClause checks if a query contains a RETURNING clause
func hasReturningClause(query string) bool {
// Simple case-insensitive check for RETURNING keyword
q := strings.ToUpper(query)
return strings.Contains(q, " RETURNING ")
}