Skip to content

Commit 656cf7b

Browse files
authored
refactoring TxnEvent to support callback data and return error with callback function (ISCP integration Step 1) (#22596)
### **User description** ## What type of PR is this? - [ ] API-change - [ ] BUG - [ ] Improvement - [ ] Documentation - [ ] Feature - [ ] Test and CI - [x] Code Refactoring ## Which issue(s) this PR fixes: issue #22595 ## What this PR does / why we need it: refactoring txnevent ___ ### **PR Type** Enhancement ___ ### **Description** - Refactor TxnEvent callback system to support error handling - Add context parameter to all callback functions - Replace simple function callbacks with TxnEventCallback struct - Update all callback registrations across services ___ ### Diagram Walkthrough ```mermaid flowchart LR A["Old Callback System"] --> B["TxnEventCallback Struct"] B --> C["Error Handling Support"] B --> D["Context Parameter"] C --> E["Service Updates"] D --> E ``` <details> <summary><h3> File Walkthrough</h3></summary> <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Tests</strong></td><td><details><summary>9 files</summary><table> <tr> <td><strong>service_test.go</strong><dd><code>Update test mock callback signature</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-1545c670565109e9085a67dbb7f5a2282ecc827539c27f35c7e873f086400b12">+1/-1</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>engine_mock.go</strong><dd><code>Update mock method signatures and parameters</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-7f25d270cc6d85f3c1655194a9dd2493991b59c88f63be8bf658e28360dd398a">+22/-20</a>&nbsp; </td> </tr> <tr> <td><strong>txn_mock.go</strong><dd><code>Update mock TxnOperator callback signature</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-68e826bd15c3f0b74a031fad521dfb4ed97d3d9a22a29d40d0e1372c814dc646">+1/-1</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>txn_test.go</strong><dd><code>Update test TxnOperator callback signature</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-a62be3f347c14388335d2c9f5215f4cc4047061478716e92a7ff2008746667af">+1/-1</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>store_sql_test.go</strong><dd><code>Update test callback signature</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-7be9152998e4522d6fdd16ff7a5f3eefee803a694058c564957fd1ddc935017f">+1/-1</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>storage_test.go</strong><dd><code>Refactor test callbacks with error handling</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-63b189c4aababc87abb2af810b34dc7a30186c06fa339dc1b0c89c1ba540baf3">+24/-20</a>&nbsp; </td> </tr> <tr> <td><strong>storage_test.go</strong><dd><code>Update test storage callbacks</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-da396b17f40cc6b15087240192c7f3f90448da0512eab47ff78b9cba0372d4ca">+20/-16</a>&nbsp; </td> </tr> <tr> <td><strong>operator_events_test.go</strong><dd><code>Update test with new callback structure</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-de84152fc282697406e741e8fb983e52f1d11ac349071a4a6efe03eddfed3624">+6/-3</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>entire_engine_test.go</strong><dd><code>Update test operator callback signature</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-1e6781313efa2d3f2c37c97bcc6e8f3cf63d5af77cf95536e506e548d126be55">+1/-1</a>&nbsp; &nbsp; &nbsp; </td> </tr> </table></details></td></tr><tr><td><strong>Enhancement</strong></td><td><details><summary>10 files</summary><table> <tr> <td><strong>service.go</strong><dd><code>Update callback registration with new wrapper</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-720e966f9f67df5a9aa7a53b758545f61570d432058f4cebe0421da898eac954">+4/-3</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>store_mem.go</strong><dd><code>Refactor callback to return error and accept context</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-d45949e111d7a1330d3efe078395b2e8fb90a9607b05fd2a5d75c4dcb6656471">+12/-10</a>&nbsp; </td> </tr> <tr> <td><strong>service.go</strong><dd><code>Update callback with error handling support</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-88d584535426969228aa47bd0449a801cc45faadb922595fa8493f7f13d7b495">+7/-5</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>service.go</strong><dd><code>Update service callbacks with new structure</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-013c985deca4c0e8533c2d68bc956730ca890c3181430ee095454481f55dce32">+26/-22</a>&nbsp; </td> </tr> <tr> <td><strong>client.go</strong><dd><code>Update client callback registration and handlers</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-f4f177db5c0c25f4d24f1aced66ebcc38f7b0589ea867e55022a096211ff1565">+11/-5</a>&nbsp; &nbsp; </td> </tr> <tr> <td><strong>operator.go</strong><dd><code>Core refactoring of callback system implementation</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-43f44107ace02cc2170c3138f30bd1e0ba258be040543c7e43e3a49b89d67c99">+37/-17</a>&nbsp; </td> </tr> <tr> <td><strong>operator_events.go</strong><dd><code>Implement new callback structure and error handling</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-d466c303cce94e5a172125a9733d6e3075a6f8ec03fe6e8dbfd242878c486fb5">+11/-6</a>&nbsp; &nbsp; </td> </tr> <tr> <td><strong>types.go</strong><dd><code>Define TxnEventCallback struct and helper functions</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-04dd5377bb096d114087521ac2b2f926bbe80f0459b98e1bb1d5a2774282b08a">+18/-1</a>&nbsp; &nbsp; </td> </tr> <tr> <td><strong>storage_txn_client.go</strong><dd><code>Update storage callback signature</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-f3de47afbf03f0313e955d1981e7739483e2583ab25715940cbd860078c45d6a">+1/-1</a>&nbsp; &nbsp; &nbsp; </td> </tr> <tr> <td><strong>service_txn_event.go</strong><dd><code>Update trace service callbacks with error handling</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; </dd></td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22596/files#diff-ccd9582830fcde4b37f16f52bfc51a3b6e5c88a5f9c3afdb63c45ba57ab873a3">+26/-20</a>&nbsp; </td> </tr> </table></details></td></tr></tr></tbody></table> </details> ___
1 parent ffe9351 commit 656cf7b

File tree

18 files changed

+225
-148
lines changed

18 files changed

+225
-148
lines changed

pkg/bootstrap/service_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (tTxnOp *testTxnOperator) GetWorkspace() client.Workspace {
177177
panic("implement me")
178178
}
179179

180-
func (tTxnOp *testTxnOperator) AppendEventCallback(event client.EventType, callbacks ...func(client.TxnEvent)) {
180+
func (tTxnOp *testTxnOperator) AppendEventCallback(event client.EventType, callbacks ...client.TxnEventCallback) {
181181
//TODO implement me
182182
panic("implement me")
183183
}

pkg/frontend/test/engine_mock.go

Lines changed: 22 additions & 20 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/frontend/test/txn_mock.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/frontend/txn_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -931,7 +931,7 @@ func (txnop *testTxnOp) GetWorkspace() client.Workspace {
931931
return txnop.wp
932932
}
933933

934-
func (txnop *testTxnOp) AppendEventCallback(event client.EventType, callbacks ...func(client.TxnEvent)) {
934+
func (txnop *testTxnOp) AppendEventCallback(event client.EventType, callbacks ...client.TxnEventCallback) {
935935
//TODO implement me
936936
panic("implement me")
937937
}

pkg/incrservice/service.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func (s *service) Create(
9696

9797
txnOp.AppendEventCallback(
9898
client.ClosedEvent,
99-
s.txnClosed)
99+
client.NewTxnEventCallback(s.txnClosed))
100100
if err := s.store.Create(ctx, tableID, cols, txnOp); err != nil {
101101
s.logger.Error("create auto increment cache failed",
102102
zap.Uint64("table-id", tableID),
@@ -180,7 +180,7 @@ func (s *service) Delete(
180180

181181
txnOp.AppendEventCallback(
182182
client.ClosedEvent,
183-
s.txnClosed)
183+
client.NewTxnEventCallback(s.txnClosed))
184184

185185
s.mu.Lock()
186186
defer s.mu.Unlock()
@@ -345,12 +345,13 @@ func (s *service) getCommittedTableCache(
345345
return c, nil
346346
}
347347

348-
func (s *service) txnClosed(event client.TxnEvent) {
348+
func (s *service) txnClosed(ctx context.Context, txnOp client.TxnOperator, event client.TxnEvent, v any) error {
349349
s.mu.Lock()
350350
defer s.mu.Unlock()
351351

352352
s.handleCreatesLocked(event.Txn)
353353
s.handleDeletesLocked(event.Txn)
354+
return nil
354355
}
355356

356357
func (s *service) handleCreatesLocked(txnMeta txn.TxnMeta) {

pkg/incrservice/store_mem.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,19 @@ func (s *memStore) Create(
6060
s.uncommitted[string(txnOp.Txn().ID)] = m
6161
txnOp.AppendEventCallback(
6262
client.ClosedEvent,
63-
func(event client.TxnEvent) {
64-
txnMeta := event.Txn
65-
s.Lock()
66-
defer s.Unlock()
67-
delete(s.uncommitted, string(txnMeta.ID))
68-
if txnMeta.Status == txn.TxnStatus_Committed {
69-
for k, v := range m {
70-
s.caches[k] = v
63+
client.NewTxnEventCallback(
64+
func(ctx context.Context, txnOp client.TxnOperator, event client.TxnEvent, v any) error {
65+
txnMeta := event.Txn
66+
s.Lock()
67+
defer s.Unlock()
68+
delete(s.uncommitted, string(txnMeta.ID))
69+
if txnMeta.Status == txn.TxnStatus_Committed {
70+
for k, v := range m {
71+
s.caches[k] = v
72+
}
7173
}
72-
}
73-
})
74+
return nil
75+
}))
7476
}
7577

7678
caches := m[tableID]

pkg/incrservice/store_sql_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func (tTxnOp *testTxnOperator) GetWorkspace() client.Workspace {
285285
panic("implement me")
286286
}
287287

288-
func (tTxnOp *testTxnOperator) AppendEventCallback(event client.EventType, callbacks ...func(client.TxnEvent)) {
288+
func (tTxnOp *testTxnOperator) AppendEventCallback(event client.EventType, callbacks ...client.TxnEventCallback) {
289289
//TODO implement me
290290
panic("implement me")
291291
}

pkg/partitionservice/storage_test.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -116,18 +116,20 @@ func (s *memStorage) Create(
116116

117117
txnOp.AppendEventCallback(
118118
client.ClosedEvent,
119-
func(txn client.TxnEvent) {
120-
s.Lock()
121-
defer s.Unlock()
122-
123-
v, ok := s.uncommitted[def.TblId]
124-
if txn.Committed() {
125-
if ok {
126-
s.committed[def.TblId] = v
119+
client.NewTxnEventCallback(
120+
func(ctx context.Context, txnOp client.TxnOperator, txn client.TxnEvent, cbdata any) error {
121+
s.Lock()
122+
defer s.Unlock()
123+
124+
v, ok := s.uncommitted[def.TblId]
125+
if txn.Committed() {
126+
if ok {
127+
s.committed[def.TblId] = v
128+
}
127129
}
128-
}
129-
delete(s.uncommitted, def.TblId)
130-
},
130+
delete(s.uncommitted, def.TblId)
131+
return nil
132+
}),
131133
)
132134
return nil
133135
}
@@ -154,15 +156,17 @@ func (s *memStorage) Delete(
154156

155157
txnOp.AppendEventCallback(
156158
client.ClosedEvent,
157-
func(txn client.TxnEvent) {
158-
s.Lock()
159-
defer s.Unlock()
159+
client.NewTxnEventCallback(
160+
func(ctx context.Context, txnOp client.TxnOperator, txn client.TxnEvent, v any) error {
161+
s.Lock()
162+
defer s.Unlock()
160163

161-
delete(s.uncommitted, table)
162-
if txn.Committed() {
163-
delete(s.committed, table)
164-
}
165-
},
164+
delete(s.uncommitted, table)
165+
if txn.Committed() {
166+
delete(s.committed, table)
167+
}
168+
return nil
169+
}),
166170
)
167171
return nil
168172
}

pkg/shardservice/service.go

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -181,21 +181,23 @@ func (s *service) Create(
181181
if !s.options.disableAppendCreateCallback {
182182
txnOp.AppendEventCallback(
183183
client.ClosedEvent,
184-
func(txn client.TxnEvent) {
185-
if txn.Committed() {
186-
s.logger.Info("sharding table created",
187-
zap.Uint64("table", table),
188-
zap.String("committed", txn.Txn.CommitTS.DebugString()),
189-
)
190-
191-
// The callback here is not guaranteed to execute after the transaction has
192-
// already committed.
193-
// The creation will lazy execute in Read.
194-
s.createC <- table
195-
} else {
196-
s.atomic.abort.Add(1)
197-
}
198-
},
184+
client.NewTxnEventCallback(
185+
func(ctx context.Context, _txnOp client.TxnOperator, txn client.TxnEvent, v any) error {
186+
if txn.Committed() {
187+
s.logger.Info("sharding table created",
188+
zap.Uint64("table", table),
189+
zap.String("committed", txn.Txn.CommitTS.DebugString()),
190+
)
191+
192+
// The callback here is not guaranteed to execute after the transaction has
193+
// already committed.
194+
// The creation will lazy execute in Read.
195+
s.createC <- table
196+
} else {
197+
s.atomic.abort.Add(1)
198+
}
199+
return nil
200+
}),
199201
)
200202
}
201203

@@ -220,13 +222,15 @@ func (s *service) Delete(
220222
if !s.options.disableAppendDeleteCallback {
221223
txnOp.AppendEventCallback(
222224
client.ClosedEvent,
223-
func(txn client.TxnEvent) {
224-
if txn.Committed() {
225-
s.deleteC <- table
226-
} else {
227-
s.atomic.abort.Add(1)
228-
}
229-
},
225+
client.NewTxnEventCallback(
226+
func(ctx context.Context, _txnOp client.TxnOperator, txn client.TxnEvent, v any) error {
227+
if txn.Committed() {
228+
s.deleteC <- table
229+
} else {
230+
s.atomic.abort.Add(1)
231+
}
232+
return nil
233+
}),
230234
)
231235
}
232236

pkg/shardservice/storage_test.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -125,15 +125,17 @@ func (s *MemShardStorage) Create(
125125
}
126126
txnOp.AppendEventCallback(
127127
client.ClosedEvent,
128-
func(txn client.TxnEvent) {
129-
s.Lock()
130-
defer s.Unlock()
128+
client.NewTxnEventCallback(
129+
func(ctx context.Context, txnOp client.TxnOperator, txn client.TxnEvent, cbdata any) error {
130+
s.Lock()
131+
defer s.Unlock()
131132

132-
if txn.Committed() {
133-
s.committed[table] = v
134-
}
135-
delete(s.uncommittedAdd, table)
136-
},
133+
if txn.Committed() {
134+
s.committed[table] = v
135+
}
136+
delete(s.uncommittedAdd, table)
137+
return nil
138+
}),
137139
)
138140
return true, nil
139141
}
@@ -158,15 +160,17 @@ func (s *MemShardStorage) Delete(
158160
s.uncommittedDelete[table] = struct{}{}
159161
txnOp.AppendEventCallback(
160162
client.ClosedEvent,
161-
func(txn client.TxnEvent) {
162-
s.Lock()
163-
defer s.Unlock()
163+
client.NewTxnEventCallback(
164+
func(ctx context.Context, txnOp client.TxnOperator, txn client.TxnEvent, v any) error {
165+
s.Lock()
166+
defer s.Unlock()
164167

165-
if txn.Committed() {
166-
delete(s.committed, table)
167-
}
168-
delete(s.uncommittedDelete, table)
169-
},
168+
if txn.Committed() {
169+
delete(s.committed, table)
170+
}
171+
delete(s.uncommittedDelete, table)
172+
return nil
173+
}),
170174
)
171175
return true, nil
172176
}

0 commit comments

Comments
 (0)