Skip to content

Commit 9e1ba3c

Browse files
committed
triggers
1 parent 5a5d7a4 commit 9e1ba3c

File tree

21 files changed

+1123
-79
lines changed

21 files changed

+1123
-79
lines changed

database/cassandra/cassandra.go

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,14 @@ type Config struct {
4343
MultiStatementEnabled bool
4444
MultiStatementMaxSize int
4545

46-
Triggers map[string]func(d database.Driver, detail interface{}) error
46+
Triggers map[string]func(response interface{}) error
47+
}
48+
49+
type TriggerResponse struct {
50+
Driver *Cassandra
51+
Config *Config
52+
Trigger string
53+
Detail interface{}
4754
}
4855

4956
type Cassandra struct {
@@ -200,7 +207,7 @@ func (c *Cassandra) Close() error {
200207
return nil
201208
}
202209

203-
func (c *Cassandra) AddTriggers(t map[string]func(d database.Driver, detail interface{}) error) {
210+
func (c *Cassandra) AddTriggers(t map[string]func(response interface{}) error) {
204211
c.config.Triggers = t
205212
}
206213

@@ -210,7 +217,12 @@ func (c *Cassandra) Trigger(name string, detail interface{}) error {
210217
}
211218

212219
if trigger, ok := c.config.Triggers[name]; ok {
213-
return trigger(c, detail)
220+
return trigger(TriggerResponse{
221+
Driver: c,
222+
Config: c.config,
223+
Trigger: name,
224+
Detail: detail,
225+
})
214226
}
215227

216228
return nil
@@ -238,10 +250,22 @@ func (c *Cassandra) Run(migration io.Reader) error {
238250
if tq == "" {
239251
return true
240252
}
253+
if e := c.Trigger(database.TrigRunPre, struct {
254+
Query string
255+
}{Query: tq}); e != nil {
256+
err = database.Error{OrigErr: e, Err: "failed to trigger RunPre"}
257+
return false
258+
}
241259
if e := c.session.Query(tq).Exec(); e != nil {
242260
err = database.Error{OrigErr: e, Err: "migration failed", Query: m}
243261
return false
244262
}
263+
if e := c.Trigger(database.TrigRunPost, struct {
264+
Query string
265+
}{Query: tq}); e != nil {
266+
err = database.Error{OrigErr: e, Err: "failed to trigger RunPost"}
267+
return false
268+
}
245269
return true
246270
}); e != nil {
247271
return e
@@ -253,15 +277,32 @@ func (c *Cassandra) Run(migration io.Reader) error {
253277
if err != nil {
254278
return err
255279
}
280+
if err := c.Trigger(database.TrigRunPre, struct {
281+
Query string
282+
}{Query: string(migr)}); err != nil {
283+
return database.Error{OrigErr: err, Err: "failed to trigger RunPre"}
284+
}
256285
// run migration
257286
if err := c.session.Query(string(migr)).Exec(); err != nil {
258287
// TODO: cast to Cassandra error and get line number
259288
return database.Error{OrigErr: err, Err: "migration failed", Query: migr}
260289
}
290+
if err := c.Trigger(database.TrigRunPre, struct {
291+
Query string
292+
}{Query: string(migr)}); err != nil {
293+
return database.Error{OrigErr: err, Err: "failed to trigger RunPost"}
294+
}
261295
return nil
262296
}
263297

264298
func (c *Cassandra) SetVersion(version int, dirty bool) error {
299+
if err := c.Trigger(database.TrigSetVersionPre, struct {
300+
Version int
301+
Dirty bool
302+
}{Version: version, Dirty: dirty}); err != nil {
303+
return &database.Error{OrigErr: err, Err: "failed to trigger SetVersionPre"}
304+
}
305+
265306
// DELETE instead of TRUNCATE because AWS Keyspaces does not support it
266307
// see: https://docs.aws.amazon.com/keyspaces/latest/devguide/cassandra-apis.html
267308
squery := `SELECT version FROM "` + c.config.MigrationsTable + `"`
@@ -287,6 +328,13 @@ func (c *Cassandra) SetVersion(version int, dirty bool) error {
287328
}
288329
}
289330

331+
if err := c.Trigger(database.TrigSetVersionPost, struct {
332+
Version int
333+
Dirty bool
334+
}{Version: version, Dirty: dirty}); err != nil {
335+
return &database.Error{OrigErr: err, Err: "failed to trigger SetVersionPost"}
336+
}
337+
290338
return nil
291339
}
292340

@@ -342,13 +390,22 @@ func (c *Cassandra) ensureVersionTable() (err error) {
342390
}
343391
}()
344392

393+
if err := c.Trigger(database.TrigVersionTablePre, nil); err != nil {
394+
return &database.Error{OrigErr: err, Err: "failed to trigger VersionTablePre"}
395+
}
396+
345397
err = c.session.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (version bigint, dirty boolean, PRIMARY KEY(version))", c.config.MigrationsTable)).Exec()
346398
if err != nil {
347399
return err
348400
}
349401
if _, _, err = c.Version(); err != nil {
350402
return err
351403
}
404+
405+
if err := c.Trigger(database.TrigVersionTablePost, nil); err != nil {
406+
return &database.Error{OrigErr: err, Err: "failed to trigger VersionTablePost"}
407+
}
408+
352409
return nil
353410
}
354411

database/clickhouse/clickhouse.go

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type Config struct {
3535
MultiStatementEnabled bool
3636
MultiStatementMaxSize int
3737

38-
Triggers map[string]func(d database.Driver, detail interface{}) error
38+
Triggers map[string]func(response interface{}) error
3939
}
4040

4141
func init() {
@@ -69,6 +69,13 @@ type ClickHouse struct {
6969
isLocked atomic.Bool
7070
}
7171

72+
type TriggerResponse struct {
73+
Driver *ClickHouse
74+
Config *Config
75+
Trigger string
76+
Detail interface{}
77+
}
78+
7279
func (ch *ClickHouse) Open(dsn string) (database.Driver, error) {
7380
purl, err := url.Parse(dsn)
7481
if err != nil {
@@ -143,10 +150,22 @@ func (ch *ClickHouse) Run(r io.Reader) error {
143150
if tq == "" {
144151
return true
145152
}
153+
if e := ch.Trigger(database.TrigRunPre, struct {
154+
Query string
155+
}{Query: tq}); e != nil {
156+
err = database.Error{OrigErr: e, Err: "failed to trigger RunPre"}
157+
return false
158+
}
146159
if _, e := ch.conn.Exec(string(m)); e != nil {
147160
err = database.Error{OrigErr: e, Err: "migration failed", Query: m}
148161
return false
149162
}
163+
if e := ch.Trigger(database.TrigRunPost, struct {
164+
Query string
165+
}{Query: tq}); e != nil {
166+
err = database.Error{OrigErr: e, Err: "failed to trigger RunPost"}
167+
return false
168+
}
150169
return true
151170
}); e != nil {
152171
return e
@@ -159,10 +178,22 @@ func (ch *ClickHouse) Run(r io.Reader) error {
159178
return err
160179
}
161180

181+
if err := ch.Trigger(database.TrigRunPre, struct {
182+
Query string
183+
}{Query: string(migration)}); err != nil {
184+
return database.Error{OrigErr: err, Err: "failed to trigger RunPre"}
185+
}
186+
162187
if _, err := ch.conn.Exec(string(migration)); err != nil {
163188
return database.Error{OrigErr: err, Err: "migration failed", Query: migration}
164189
}
165190

191+
if err := ch.Trigger(database.TrigRunPost, struct {
192+
Query string
193+
}{Query: string(migration)}); err != nil {
194+
return database.Error{OrigErr: err, Err: "failed to trigger RunPost"}
195+
}
196+
166197
return nil
167198
}
168199
func (ch *ClickHouse) Version() (int, bool, error) {
@@ -182,7 +213,7 @@ func (ch *ClickHouse) Version() (int, bool, error) {
182213

183214
func (ch *ClickHouse) SetVersion(version int, dirty bool) error {
184215
var (
185-
bool = func(v bool) uint8 {
216+
booln = func(v bool) uint8 {
186217
if v {
187218
return 1
188219
}
@@ -194,11 +225,25 @@ func (ch *ClickHouse) SetVersion(version int, dirty bool) error {
194225
return err
195226
}
196227

228+
if err := ch.Trigger(database.TrigSetVersionPre, struct {
229+
Version int
230+
Dirty bool
231+
}{Version: version, Dirty: dirty}); err != nil {
232+
return &database.Error{OrigErr: err, Err: "failed to trigger SetVersionPre"}
233+
}
234+
197235
query := "INSERT INTO " + ch.config.MigrationsTable + " (version, dirty, sequence) VALUES (?, ?, ?)"
198-
if _, err := tx.Exec(query, version, bool(dirty), time.Now().UnixNano()); err != nil {
236+
if _, err := tx.Exec(query, version, booln(dirty), time.Now().UnixNano()); err != nil {
199237
return &database.Error{OrigErr: err, Query: []byte(query)}
200238
}
201239

240+
if err := ch.Trigger(database.TrigSetVersionPost, struct {
241+
Version int
242+
Dirty bool
243+
}{Version: version, Dirty: dirty}); err != nil {
244+
return &database.Error{OrigErr: err, Err: "failed to trigger SetVersionPost"}
245+
}
246+
202247
return tx.Commit()
203248
}
204249

@@ -230,9 +275,16 @@ func (ch *ClickHouse) ensureVersionTable() (err error) {
230275
return &database.Error{OrigErr: err, Query: []byte(query)}
231276
}
232277
} else {
278+
if err := ch.Trigger(database.TrigVersionTableExists, nil); err != nil {
279+
return &database.Error{OrigErr: err, Err: "failed to trigger VersionTableExists"}
280+
}
233281
return nil
234282
}
235283

284+
if err := ch.Trigger(database.TrigVersionTablePre, nil); err != nil {
285+
return &database.Error{OrigErr: err, Err: "failed to trigger VersionTablePre"}
286+
}
287+
236288
// if not, create the empty migration table
237289
if len(ch.config.ClusterName) > 0 {
238290
query = fmt.Sprintf(`
@@ -257,6 +309,11 @@ func (ch *ClickHouse) ensureVersionTable() (err error) {
257309
if _, err := ch.conn.Exec(query); err != nil {
258310
return &database.Error{OrigErr: err, Query: []byte(query)}
259311
}
312+
313+
if err := ch.Trigger(database.TrigVersionTablePost, nil); err != nil {
314+
return &database.Error{OrigErr: err, Err: "failed to trigger VersionTablePost"}
315+
}
316+
260317
return nil
261318
}
262319

@@ -308,7 +365,7 @@ func (ch *ClickHouse) Unlock() error {
308365
}
309366
func (ch *ClickHouse) Close() error { return ch.conn.Close() }
310367

311-
func (ch *ClickHouse) AddTriggers(t map[string]func(d database.Driver, detail interface{}) error) {
368+
func (ch *ClickHouse) AddTriggers(t map[string]func(response interface{}) error) {
312369
ch.config.Triggers = t
313370
}
314371

@@ -318,7 +375,12 @@ func (ch *ClickHouse) Trigger(name string, detail interface{}) error {
318375
}
319376

320377
if trigger, ok := ch.config.Triggers[name]; ok {
321-
return trigger(ch, detail)
378+
return trigger(TriggerResponse{
379+
Driver: ch,
380+
Config: ch.config,
381+
Trigger: name,
382+
Detail: detail,
383+
})
322384
}
323385

324386
return nil

0 commit comments

Comments
 (0)