Skip to content

Commit 958b00f

Browse files
authored
feat: add Replay (#737)
1 parent 8195d8d commit 958b00f

File tree

7 files changed

+253
-18
lines changed

7 files changed

+253
-18
lines changed

libs/hwdb/helper.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package hwdb
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"github.com/jackc/pgx/v5"
78
"github.com/jackc/pgx/v5/pgtype"
89
"google.golang.org/protobuf/types/known/timestamppb"
10+
"telemetry"
911
"time"
1012
)
1113

@@ -44,3 +46,44 @@ func PbToTimestamp(src *timestamppb.Timestamp) pgtype.Timestamp {
4446
}
4547
return pgtype.Timestamp{Time: (*src).AsTime().UTC(), Valid: true}
4648
}
49+
50+
// DANGERTruncateAllTables truncates all tables of db
51+
func DANGERTruncateAllTables(ctx context.Context, db DBTX) error {
52+
ctx, span, log := telemetry.StartSpan(ctx, "hwdb.DANGERTruncateAllTables")
53+
defer span.End()
54+
55+
rows, err := db.Query(ctx, `
56+
SELECT table_name
57+
FROM information_schema.tables
58+
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
59+
`)
60+
61+
if err != nil {
62+
return fmt.Errorf("DANGERTruncateAllTables: failed to query table names: %w", err)
63+
}
64+
65+
defer rows.Close()
66+
67+
tableNames := make([]string, 0)
68+
69+
for rows.Next() {
70+
var tableName string
71+
if err := rows.Scan(&tableName); err != nil {
72+
return fmt.Errorf("DANGERTruncateAllTables: could not Scan row: %w", err)
73+
}
74+
tableNames = append(tableNames, tableName)
75+
}
76+
77+
log.Info().Strs("tableNames", tableNames).Msg("Start truncating all tables")
78+
79+
for _, tableName := range tableNames {
80+
log.Trace().Str("table_name", tableName).Msg("truncating")
81+
if _, err := db.Exec(ctx, fmt.Sprintf("TRUNCATE TABLE %s CASCADE", tableName)); err != nil {
82+
return err
83+
}
84+
log.Debug().Str("table_name", tableName).Msg("table truncated")
85+
}
86+
log.Debug().Msg("truncate finished")
87+
88+
return rows.Err()
89+
}

libs/hwes/eventstoredb/projections/custom/custom.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package custom
33
import (
44
"context"
55
"errors"
6-
"fmt"
76
"github.com/EventStore/EventStore-Client-Go/v4/esdb"
87
zlog "github.com/rs/zerolog/log"
98
"hwes"
@@ -88,13 +87,14 @@ func (p *CustomProjection) RegisterEventListener(eventType string, eventHandler
8887
return p
8988
}
9089

91-
func (p *CustomProjection) handleEvent(ctx context.Context, event hwes.Event) (error, *esdb.NackAction) {
92-
ctx, span, _ := telemetry.StartSpan(ctx, "custom_projection.handleEvent")
90+
func (p *CustomProjection) HandleEvent(ctx context.Context, event hwes.Event) (error, *esdb.NackAction) {
91+
ctx, span, log := telemetry.StartSpan(ctx, "custom_projection.handleEvent")
9392
defer span.End()
9493

9594
eventHandler, found := p.eventHandlers[event.EventType]
9695
if !found {
97-
return fmt.Errorf("event type '%s' is invalid", event.EventType), hwutil.PtrTo(esdb.NackActionUnknown)
96+
log.Debug().Dict("event", event.GetZerologDict()).Msg("event handler for event type not found, skip")
97+
return nil, hwutil.PtrTo(esdb.NackActionUnknown)
9898
}
9999
return eventHandler(ctx, event)
100100
}
@@ -195,7 +195,7 @@ func (p *CustomProjection) processReceivedEventFromStream(ctx context.Context, s
195195

196196
log.Debug().Dict("event", event.GetZerologDict()).Msg("process event")
197197

198-
err, nackAction := p.handleEvent(ctx, event)
198+
err, nackAction := p.HandleEvent(ctx, event)
199199
if err == nil && nackAction == nil {
200200
// ack event
201201
log.Debug().Dict("event", event.GetZerologDict()).Msg("ack event")

libs/hwes/eventstoredb/replay.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package eventstoredb
2+
3+
import (
4+
"context"
5+
"github.com/EventStore/EventStore-Client-Go/v4/esdb"
6+
zlog "github.com/rs/zerolog"
7+
"hwes"
8+
"telemetry"
9+
)
10+
11+
func hwesEventFromReceivedEventFromStream(ctx context.Context, esdbEvent *esdb.SubscriptionEvent) (hwes.Event, error) {
12+
log := zlog.Ctx(ctx)
13+
14+
if esdbEvent.SubscriptionDropped != nil {
15+
log.Error().Err(esdbEvent.SubscriptionDropped.Error).Msg("Subscription dropped")
16+
return hwes.Event{}, esdbEvent.SubscriptionDropped.Error
17+
}
18+
19+
if esdbEvent.EventAppeared == nil || esdbEvent.EventAppeared.Event == nil {
20+
log.Debug().Msg("Received empty event, skip")
21+
return hwes.Event{}, nil
22+
}
23+
24+
event, err := hwes.NewEventFromRecordedEvent(esdbEvent.EventAppeared.Event)
25+
26+
if err != nil {
27+
log.Error().
28+
Err(err).
29+
Str("EventType", esdbEvent.EventAppeared.Event.EventType).
30+
Msg("could not create new event from recorded event")
31+
return hwes.Event{}, nil
32+
}
33+
34+
return event, nil
35+
}
36+
37+
// Replay provides the ability to re-run existing events from an event stream
38+
// from the first to the latest event. Run will call the passed .OnEvent() handler for every event
39+
// in a filtered $all stream up to the event which was the latest when Replay() was called.
40+
func Replay(ctx context.Context, es *esdb.Client, onEvent func(ctx context.Context, event hwes.Event) error, streamPrefixFilters *[]string) error {
41+
ctx, span, log := telemetry.StartSpan(ctx, "hwes.Replay.Run")
42+
defer span.End()
43+
44+
subscribeToAllOptions := esdb.SubscribeToAllOptions{
45+
From: esdb.Start{},
46+
Filter: esdb.ExcludeSystemEventsFilter(),
47+
}
48+
49+
if streamPrefixFilters != nil {
50+
subscribeToAllOptions.Filter = &esdb.SubscriptionFilter{
51+
Type: esdb.StreamFilterType,
52+
Prefixes: *streamPrefixFilters,
53+
}
54+
}
55+
56+
stream, err := es.SubscribeToAll(ctx, subscribeToAllOptions)
57+
if err != nil {
58+
return err
59+
}
60+
defer stream.Close()
61+
62+
for {
63+
esdbEvent := stream.Recv()
64+
65+
select {
66+
case <-ctx.Done():
67+
return nil
68+
default:
69+
}
70+
71+
if esdbEvent.SubscriptionDropped != nil {
72+
stream.Close()
73+
break
74+
}
75+
76+
if esdbEvent.CaughtUp != nil {
77+
stream.Close()
78+
log.Info().Msg("caught up subscribed event stream")
79+
break
80+
}
81+
82+
event, err := hwesEventFromReceivedEventFromStream(ctx, esdbEvent)
83+
if err != nil {
84+
return err
85+
}
86+
87+
if err := onEvent(ctx, event); err != nil {
88+
return err
89+
}
90+
log.Info().Dict("event", event.GetZerologDict()).Msg("handled event")
91+
}
92+
93+
return nil
94+
}

services/property-svc/internal/property-value/projections/property_value_postgres_projection/property_value_postgres_projection.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@ type Projection struct {
2525
propertyValueRepo *property_value_repo.Queries
2626
}
2727

28-
func NewProjection(es *esdb.Client, serviceName string) *Projection {
28+
func NewProjection(es *esdb.Client, serviceName string, db hwdb.DBTX) *Projection {
2929
subscriptionGroupName := fmt.Sprintf("%s-value-postgres-projection", serviceName)
3030
p := &Projection{
3131
CustomProjection: custom.NewCustomProjection(es, subscriptionGroupName, &[]string{fmt.Sprintf("%s-", aggregate.PropertyValueAggregateType)}),
32-
db: hwdb.GetDB(),
33-
propertyRepo: property_repo.New(hwdb.GetDB()),
34-
propertyValueRepo: property_value_repo.New(hwdb.GetDB())}
32+
db: db,
33+
propertyRepo: property_repo.New(db),
34+
propertyValueRepo: property_value_repo.New(db)}
3535
p.initEventListeners()
3636
return p
3737
}

services/property-svc/internal/property/projections/property_postgres_projection/property_postgres_projection.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ type Projection struct {
2525
propertyRepo *property_repo.Queries
2626
}
2727

28-
func NewProjection(es *esdb.Client, serviceName string) *Projection {
28+
func NewProjection(es *esdb.Client, serviceName string, db hwdb.DBTX) *Projection {
2929
subscriptionGroupName := fmt.Sprintf("%s-postgres-projection", serviceName)
3030
p := &Projection{
3131
CustomProjection: custom.NewCustomProjection(es, subscriptionGroupName, &[]string{fmt.Sprintf("%s-", aggregate.PropertyAggregateType)}),
32-
db: hwdb.GetDB(),
33-
propertyRepo: property_repo.New(hwdb.GetDB())}
32+
db: db,
33+
propertyRepo: property_repo.New(db)}
3434
p.initEventListeners()
3535
return p
3636
}

services/property-svc/main.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"common"
55
"context"
6+
"flag"
67
pb "gen/proto/services/property_svc/v1"
78
"hwdb"
89
"hwes/eventstoredb"
@@ -31,27 +32,40 @@ func main() {
3132
ctx, cancel := context.WithCancel(context.Background())
3233
common.Setup(ServiceName, Version, common.WithAuth())
3334

35+
replayMode := flag.Bool("replay", false, "")
36+
flag.Parse()
37+
log.Debug().Bool("replayMode", *replayMode).Msg("flags")
38+
3439
closeDBPool := hwdb.SetupDatabaseFromEnv(ctx)
3540
defer closeDBPool()
3641

3742
eventStore := eventstoredb.SetupEventStoreByEnv()
3843
aggregateStore := eventstoredb.NewAggregateStore(eventStore)
3944

40-
propertyHandlers := ph.NewPropertyHandlers(aggregateStore)
41-
propertySetHandlers := psh.NewPropertySetHandlers(aggregateStore)
42-
propertyViewHandlers := pvih.NewPropertyViewHandlers(aggregateStore)
43-
propertyValueHandlers := pvh.NewPropertyValueHandlers(aggregateStore)
45+
propertyPostgresProjection := property_postgres_projection.
46+
NewProjection(eventStore, ServiceName, hwdb.GetDB())
47+
48+
propertyValuePostgresProjection := property_value_postgres_projection.
49+
NewProjection(eventStore, ServiceName, hwdb.GetDB())
50+
51+
if *replayMode {
52+
if err := replay(ctx, eventStore); err != nil {
53+
log.Err(err).Msg("error during replay")
54+
cancel()
55+
}
56+
// TODO: Find a more generic approach to run common.Shutdown()
57+
common.Shutdown()
58+
return
59+
}
4460

4561
go func() {
46-
propertyPostgresProjection := property_postgres_projection.NewProjection(eventStore, ServiceName)
4762
if err := propertyPostgresProjection.Subscribe(ctx); err != nil {
4863
log.Err(err).Msg("error during property-postgres projection subscription")
4964
cancel()
5065
}
5166
}()
5267

5368
go func() {
54-
propertyValuePostgresProjection := property_value_postgres_projection.NewProjection(eventStore, ServiceName)
5569
if err := propertyValuePostgresProjection.Subscribe(ctx); err != nil {
5670
log.Err(err).Msg("error during propertyValue-postgres projection subscription")
5771
cancel()
@@ -66,6 +80,11 @@ func main() {
6680
}
6781
}()
6882

83+
propertyHandlers := ph.NewPropertyHandlers(aggregateStore)
84+
propertySetHandlers := psh.NewPropertySetHandlers(aggregateStore)
85+
propertyViewHandlers := pvih.NewPropertyViewHandlers(aggregateStore)
86+
propertyValueHandlers := pvh.NewPropertyValueHandlers(aggregateStore)
87+
6988
common.StartNewGRPCServer(context.Background(), common.ResolveAddrFromEnv(), func(server *daprd.Server) {
7089
grpcServer := server.GrpcServer()
7190

services/property-svc/replay.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"github.com/EventStore/EventStore-Client-Go/v4/esdb"
8+
"github.com/jackc/pgx/v5"
9+
"hwdb"
10+
"hwes"
11+
"hwes/eventstoredb"
12+
propertyValueAggregate "property-svc/internal/property-value/aggregate"
13+
"property-svc/internal/property-value/projections/property_value_postgres_projection"
14+
propertyAggregate "property-svc/internal/property/aggregate"
15+
"property-svc/internal/property/projections/property_postgres_projection"
16+
"telemetry"
17+
)
18+
19+
// replay mechanism for projections of the property-svc
20+
// replay truncates the whole database and replays all events
21+
func replay(ctx context.Context, eventStore *esdb.Client) error {
22+
ctx, span, log := telemetry.StartSpan(ctx, "property-svc.replay")
23+
defer span.End()
24+
25+
log.Info().Msg("starting in replay mode")
26+
27+
db := hwdb.GetDB()
28+
tx, err := db.Begin(ctx)
29+
if err != nil {
30+
return fmt.Errorf("cannot begin transaction: %w", err)
31+
}
32+
33+
var errToRollback error
34+
defer func() {
35+
if errToRollback != nil {
36+
err = tx.Rollback(ctx)
37+
if err != nil && !errors.Is(err, pgx.ErrTxClosed) {
38+
log.Err(err).Msg("failed to rollback transaction")
39+
}
40+
}
41+
}()
42+
43+
log.Info().Msg("truncating all tables...")
44+
45+
if errToRollback = hwdb.DANGERTruncateAllTables(ctx, tx); errToRollback != nil {
46+
return fmt.Errorf("cannot truncate all tables: %w", err)
47+
}
48+
49+
log.Info().Msg("starting event replay")
50+
51+
propertyPostgresProjection := property_postgres_projection.NewProjection(eventStore, ServiceName, tx)
52+
propertyValuePostgresProjection := property_value_postgres_projection.NewProjection(eventStore, ServiceName, tx)
53+
54+
if errToRollback = eventstoredb.Replay(
55+
ctx,
56+
eventStore,
57+
func(ctx context.Context, event hwes.Event) (err error) {
58+
if err, _ = propertyPostgresProjection.HandleEvent(ctx, event); err != nil {
59+
return
60+
}
61+
if err, _ = propertyValuePostgresProjection.HandleEvent(ctx, event); err != nil {
62+
return
63+
}
64+
return
65+
},
66+
&[]string{
67+
fmt.Sprintf("%s-", propertyAggregate.PropertyAggregateType),
68+
fmt.Sprintf("%s-", propertyValueAggregate.PropertyValueAggregateType),
69+
},
70+
); errToRollback != nil {
71+
return errToRollback
72+
}
73+
74+
if err := tx.Commit(ctx); err != nil {
75+
return fmt.Errorf("cannot commit transaction: %w", err)
76+
}
77+
78+
return nil
79+
}

0 commit comments

Comments
 (0)