Skip to content

Commit bf4835d

Browse files
committed
expose GetFutureEvents in sqlite backend
1 parent cda71d8 commit bf4835d

File tree

3 files changed

+58
-112
lines changed

3 files changed

+58
-112
lines changed

backend/monoprocess/monoprocess_test.go

Lines changed: 4 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,9 @@ package monoprocess
22

33
import (
44
"context"
5-
"database/sql"
6-
"fmt"
7-
"reflect"
5+
"errors"
86
"testing"
97
"time"
10-
"unsafe"
118

129
"github.com/cschleiden/go-workflows/backend"
1310
"github.com/cschleiden/go-workflows/backend/sqlite"
@@ -44,57 +41,8 @@ func Test_EndToEndMonoprocessBackend(t *testing.T) {
4441
var _ test.TestBackend = (*monoprocessBackend)(nil)
4542

4643
func (b *monoprocessBackend) GetFutureEvents(ctx context.Context) ([]*history.Event, error) {
47-
// Reflection hack to access db in private field of sqlite.sqliteBackend
48-
privateDbField := reflect.ValueOf(b.Backend).Elem().FieldByName("db")
49-
hackedDbField := reflect.NewAt(privateDbField.Type(), unsafe.Pointer(privateDbField.UnsafeAddr())).Elem()
50-
db := hackedDbField.Interface().(*sql.DB)
51-
52-
tx, err := db.BeginTx(ctx, nil)
53-
if err != nil {
54-
return nil, err
55-
}
56-
defer tx.Rollback()
57-
58-
// There is no index on `visible_at`, but this is okay for test only usage.
59-
futureEvents, err := tx.QueryContext(
60-
ctx,
61-
"SELECT id, sequence_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `pending_events` WHERE visible_at IS NOT NULL",
62-
)
63-
if err != nil {
64-
return nil, fmt.Errorf("getting history: %w", err)
65-
}
66-
67-
f := make([]*history.Event, 0)
68-
69-
for futureEvents.Next() {
70-
var instanceID, executionID string
71-
var attributes []byte
72-
73-
fe := &history.Event{}
74-
75-
if err := futureEvents.Scan(
76-
&fe.ID,
77-
&fe.SequenceID,
78-
&instanceID,
79-
&executionID,
80-
&fe.Type,
81-
&fe.Timestamp,
82-
&fe.ScheduleEventID,
83-
&attributes,
84-
&fe.VisibleAt,
85-
); err != nil {
86-
return nil, fmt.Errorf("scanning event: %w", err)
87-
}
88-
89-
a, err := history.DeserializeAttributes(fe.Type, attributes)
90-
if err != nil {
91-
return nil, fmt.Errorf("deserializing attributes: %w", err)
92-
}
93-
94-
fe.Attributes = a
95-
96-
f = append(f, fe)
44+
if testBackend, ok := b.Backend.(test.TestBackend); ok {
45+
return testBackend.GetFutureEvents(ctx)
9746
}
98-
99-
return f, nil
47+
return nil, errors.New("not implemented")
10048
}

backend/sqlite/events.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,64 @@ import (
77
"strings"
88
"time"
99

10+
"github.com/cschleiden/go-workflows/backend/test"
1011
"github.com/cschleiden/go-workflows/internal/core"
1112
"github.com/cschleiden/go-workflows/internal/history"
1213
)
1314

15+
var _ test.TestBackend = (*sqliteBackend)(nil)
16+
17+
func (sb *sqliteBackend) GetFutureEvents(ctx context.Context) ([]*history.Event, error) {
18+
tx, err := sb.db.BeginTx(ctx, nil)
19+
if err != nil {
20+
return nil, err
21+
}
22+
defer tx.Rollback()
23+
24+
// There is no index on `visible_at`, but this is okay for test only usage.
25+
futureEvents, err := tx.QueryContext(
26+
ctx,
27+
"SELECT id, sequence_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `pending_events` WHERE visible_at IS NOT NULL",
28+
)
29+
if err != nil {
30+
return nil, fmt.Errorf("getting history: %w", err)
31+
}
32+
33+
f := make([]*history.Event, 0)
34+
35+
for futureEvents.Next() {
36+
var instanceID, executionID string
37+
var attributes []byte
38+
39+
fe := &history.Event{}
40+
41+
if err := futureEvents.Scan(
42+
&fe.ID,
43+
&fe.SequenceID,
44+
&instanceID,
45+
&executionID,
46+
&fe.Type,
47+
&fe.Timestamp,
48+
&fe.ScheduleEventID,
49+
&attributes,
50+
&fe.VisibleAt,
51+
); err != nil {
52+
return nil, fmt.Errorf("scanning event: %w", err)
53+
}
54+
55+
a, err := history.DeserializeAttributes(fe.Type, attributes)
56+
if err != nil {
57+
return nil, fmt.Errorf("deserializing attributes: %w", err)
58+
}
59+
60+
fe.Attributes = a
61+
62+
f = append(f, fe)
63+
}
64+
65+
return f, nil
66+
}
67+
1468
func getPendingEvents(ctx context.Context, tx *sql.Tx, instance *core.WorkflowInstance) ([]*history.Event, error) {
1569
now := time.Now()
1670
events, err := tx.QueryContext(

backend/sqlite/sqlite_test.go

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
package sqlite
22

33
import (
4-
"context"
5-
"fmt"
64
"testing"
75

86
"github.com/cschleiden/go-workflows/backend"
97
"github.com/cschleiden/go-workflows/backend/test"
10-
"github.com/cschleiden/go-workflows/internal/history"
118
)
129

1310
func Test_SqliteBackend(t *testing.T) {
@@ -35,56 +32,3 @@ func Test_EndToEndSqliteBackend(t *testing.T) {
3532
return NewInMemoryBackend(options...)
3633
}, nil)
3734
}
38-
39-
var _ test.TestBackend = (*sqliteBackend)(nil)
40-
41-
func (sb *sqliteBackend) GetFutureEvents(ctx context.Context) ([]*history.Event, error) {
42-
tx, err := sb.db.BeginTx(ctx, nil)
43-
if err != nil {
44-
return nil, err
45-
}
46-
defer tx.Rollback()
47-
48-
// There is no index on `visible_at`, but this is okay for test only usage.
49-
futureEvents, err := tx.QueryContext(
50-
ctx,
51-
"SELECT id, sequence_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at FROM `pending_events` WHERE visible_at IS NOT NULL",
52-
)
53-
if err != nil {
54-
return nil, fmt.Errorf("getting history: %w", err)
55-
}
56-
57-
f := make([]*history.Event, 0)
58-
59-
for futureEvents.Next() {
60-
var instanceID, executionID string
61-
var attributes []byte
62-
63-
fe := &history.Event{}
64-
65-
if err := futureEvents.Scan(
66-
&fe.ID,
67-
&fe.SequenceID,
68-
&instanceID,
69-
&executionID,
70-
&fe.Type,
71-
&fe.Timestamp,
72-
&fe.ScheduleEventID,
73-
&attributes,
74-
&fe.VisibleAt,
75-
); err != nil {
76-
return nil, fmt.Errorf("scanning event: %w", err)
77-
}
78-
79-
a, err := history.DeserializeAttributes(fe.Type, attributes)
80-
if err != nil {
81-
return nil, fmt.Errorf("deserializing attributes: %w", err)
82-
}
83-
84-
fe.Attributes = a
85-
86-
f = append(f, fe)
87-
}
88-
89-
return f, nil
90-
}

0 commit comments

Comments
 (0)