Skip to content

Commit a06f346

Browse files
committed
Added implementation of stopper module.
1 parent 1a74970 commit a06f346

File tree

3 files changed

+97
-1
lines changed

3 files changed

+97
-1
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ module github.com/dipdup-net/indexer-sdk
22

33
go 1.20
44

5-
65
require (
6+
github.com/dipdup-io/workerpool v0.0.4
77
github.com/dipdup-net/go-lib v0.3.0
88
github.com/ethereum/go-ethereum v1.12.0
99
github.com/go-testfixtures/testfixtures/v3 v3.9.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5il
4646
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs=
4747
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
4848
github.com/denisenkom/go-mssqldb v0.12.3 h1:pBSGx9Tq67pBOTLmxNuirNTeB8Vjmf886Kx+8Y+8shw=
49+
github.com/dipdup-io/workerpool v0.0.4 h1:m58fuFY3VIPRc+trWpjw2Lsm4FvIgtjP/4VRe79r+/s=
50+
github.com/dipdup-io/workerpool v0.0.4/go.mod h1:m6YMqx7M+fORTyabHD/auKymBRpbDax0t1aPZ1i7GZA=
4951
github.com/dipdup-net/go-lib v0.3.0 h1:56OImCLIHyLL4Da7UI5H2xyTP65i2FUXSO6Gyxll460=
5052
github.com/dipdup-net/go-lib v0.3.0/go.mod h1:oBDOSsM/F8fEnmuDnaJ6QA/cHH4lne49ASbsh8WXDe4=
5153
github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8=

pkg/modules/stopper/stopper.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package stopper
2+
3+
import (
4+
"context"
5+
"github.com/dipdup-io/workerpool"
6+
"github.com/dipdup-net/indexer-sdk/pkg/modules"
7+
"github.com/pkg/errors"
8+
"github.com/rs/zerolog"
9+
"github.com/rs/zerolog/log"
10+
)
11+
12+
const (
13+
InputName = "signal"
14+
)
15+
16+
// Module - cancels context of all application if get signal.
17+
//
18+
// |----------------|
19+
// | |
20+
// -- struct{} -> | MODULE |
21+
// | |
22+
// |----------------|
23+
type Module struct {
24+
input *modules.Input
25+
stop context.CancelFunc
26+
log zerolog.Logger
27+
g workerpool.Group
28+
}
29+
30+
func NewModule(cancelFunc context.CancelFunc) Module {
31+
m := Module{
32+
input: modules.NewInput(InputName),
33+
stop: cancelFunc,
34+
g: workerpool.NewGroup(),
35+
}
36+
m.log = log.With().Str("module", m.Name()).Logger()
37+
38+
return m
39+
}
40+
41+
func (*Module) Name() string {
42+
return "stopper"
43+
}
44+
45+
// Start -
46+
func (s *Module) Start(ctx context.Context) {
47+
s.g.GoCtx(ctx, s.listen)
48+
}
49+
50+
func (s *Module) listen(ctx context.Context) {
51+
for {
52+
select {
53+
case <-ctx.Done():
54+
return
55+
case <-s.input.Listen():
56+
log.Info().Msg("stop signal received")
57+
if s.stop != nil {
58+
log.Info().Msg("cancelling context...")
59+
s.stop()
60+
return
61+
}
62+
}
63+
}
64+
}
65+
66+
// Close -
67+
func (s *Module) Close() error {
68+
s.g.Wait()
69+
return s.input.Close()
70+
}
71+
72+
// Output -
73+
func (*Module) Output(name string) (*modules.Output, error) {
74+
return nil, errors.Wrap(modules.ErrUnknownOutput, name)
75+
}
76+
77+
// Input -
78+
func (s *Module) Input(name string) (*modules.Input, error) {
79+
if name != InputName {
80+
return nil, errors.Wrap(modules.ErrUnknownInput, name)
81+
}
82+
return s.input, nil
83+
}
84+
85+
// AttachTo -
86+
func (s *Module) AttachTo(name string, input *modules.Input) error {
87+
output, err := s.Output(name)
88+
if err != nil {
89+
return err
90+
}
91+
92+
output.Attach(input)
93+
return nil
94+
}

0 commit comments

Comments
 (0)