Skip to content

Commit 943316b

Browse files
Feature: add duplicator module
1 parent 11cee5b commit 943316b

File tree

5 files changed

+271
-1
lines changed

5 files changed

+271
-1
lines changed

examples/duplicator/main.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
"os"
7+
"os/signal"
8+
"syscall"
9+
10+
"github.com/dipdup-net/indexer-sdk/pkg/modules"
11+
"github.com/dipdup-net/indexer-sdk/pkg/modules/cron"
12+
"github.com/dipdup-net/indexer-sdk/pkg/modules/duplicator"
13+
"github.com/dipdup-net/indexer-sdk/pkg/modules/printer"
14+
)
15+
16+
func main() {
17+
cronModule1, err := cron.NewModule(&cron.Config{
18+
Jobs: map[string]string{
19+
"ticker": "@every 5s",
20+
},
21+
})
22+
if err != nil {
23+
log.Panic(err)
24+
}
25+
cronModule2, err := cron.NewModule(&cron.Config{
26+
Jobs: map[string]string{
27+
"ticker": "* * * * * *",
28+
},
29+
})
30+
if err != nil {
31+
log.Panic(err)
32+
}
33+
34+
dup := duplicator.NewDuplicator(2, 1)
35+
36+
print := printer.NewPrinter()
37+
38+
if err := modules.Connect(cronModule1, dup, "ticker", duplicator.GetInputName(0)); err != nil {
39+
log.Panic(err)
40+
}
41+
if err := modules.Connect(cronModule2, dup, "ticker", duplicator.GetInputName(1)); err != nil {
42+
log.Panic(err)
43+
}
44+
if err := modules.Connect(dup, print, duplicator.GetOutputName(0), printer.InputName); err != nil {
45+
log.Panic(err)
46+
}
47+
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
48+
print.Start(ctx)
49+
dup.Start(ctx)
50+
cronModule2.Start(ctx)
51+
cronModule1.Start(ctx)
52+
53+
<-ctx.Done()
54+
cancel()
55+
56+
if err := cronModule1.Close(); err != nil {
57+
log.Print(err)
58+
}
59+
if err := cronModule2.Close(); err != nil {
60+
log.Print(err)
61+
}
62+
if err := dup.Close(); err != nil {
63+
log.Print(err)
64+
}
65+
if err := print.Close(); err != nil {
66+
log.Print(err)
67+
}
68+
}

pkg/modules/README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,8 @@ gRPC module where realized default client and server. Detailed docs can be found
9696

9797
### Cron
9898

99-
Cron module implements cron scheduler. Detailed docs can be found [here](/pkg/modules/cron/).
99+
Cron module implements cron scheduler. Detailed docs can be found [here](/pkg/modules/cron/).
100+
101+
### Duplicator
102+
103+
Duplicator module repeats signal from one of inputs to all outputs. Detailed docs can be found [here](/pkg/modules/duplicator/).

pkg/modules/duplicator/README.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Duplicator
2+
3+
Duplicator module repeats signal from one of inputs to all outputs
4+
5+
## Usage
6+
7+
Usage of duplicator module is described by the [example](/examples/duplicator/).
8+
9+
To import module in your code write following line:
10+
11+
```go
12+
import "github.com/dipdup-net/indexer-sdk/pkg/modules/duplicator"
13+
```
14+
15+
Duplicator module implements interface `Module`. So you can use it like any other module. For example:
16+
17+
```go
18+
// create duplicator module with 2 inputs and 2 outputs. Any signal from one of 2 output will be passed to 2 outputs.
19+
duplicatorModule, err := duplicator.NewModule(2, 2)
20+
if err != nil {
21+
log.Panic(err)
22+
}
23+
// start duplicator module
24+
duplicatorModule.Start(ctx)
25+
26+
// your code is here
27+
28+
// close duplicator module
29+
if err := duplicatorModule.Close(); err != nil {
30+
log.Panic(err)
31+
}
32+
```
33+
34+
## Input
35+
36+
Module reply to its outputs all signals from inputs. You should connect it with next code:
37+
38+
```go
39+
// with helper function. You should pass a input number to function GetInputName for receiving input name
40+
41+
if err := modules.Connect(customModule, dup, "your_output_of_module", duplicator.GetInputName(0)); err != nil {
42+
log.Panic(err)
43+
}
44+
45+
```
46+
47+
## Output
48+
49+
You should connect it with next code:
50+
51+
```go
52+
// with helper function. You should pass a output number to function GetOutputName for receiving output name
53+
54+
if err := modules.Connect(dup, customModule, duplicator.GetOutputName(0), "your_output_of_module"); err != nil {
55+
log.Panic(err)
56+
}
57+
58+
```
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package duplicator
2+
3+
import "fmt"
4+
5+
// GetInputName -
6+
func GetInputName(idx int) string {
7+
return fmt.Sprintf("%s_%d", InputName, idx)
8+
}
9+
10+
// GetOutputName -
11+
func GetOutputName(idx int) string {
12+
return fmt.Sprintf("%s_%d", OutputName, idx)
13+
}

pkg/modules/duplicator/module.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package duplicator
2+
3+
import (
4+
"context"
5+
"reflect"
6+
"sync"
7+
8+
"github.com/dipdup-net/indexer-sdk/pkg/modules"
9+
"github.com/pkg/errors"
10+
)
11+
12+
// names
13+
const (
14+
InputName = "input"
15+
OutputName = "output"
16+
)
17+
18+
// Duplicator - the structure which is responsible for duplicate signal from one of inputs to all outputs
19+
type Duplicator struct {
20+
inputs map[string]*modules.Input
21+
outputs map[string]*modules.Output
22+
23+
wg *sync.WaitGroup
24+
}
25+
26+
// NewDuplicator - constructor of Duplicator structure
27+
func NewDuplicator(inputsCount, outputsCount int) *Duplicator {
28+
d := &Duplicator{
29+
inputs: make(map[string]*modules.Input),
30+
outputs: make(map[string]*modules.Output),
31+
32+
wg: new(sync.WaitGroup),
33+
}
34+
35+
for i := 0; i < inputsCount; i++ {
36+
name := GetInputName(i)
37+
d.inputs[name] = modules.NewInput(name)
38+
}
39+
40+
for i := 0; i < outputsCount; i++ {
41+
name := GetOutputName(i)
42+
d.outputs[name] = modules.NewOutput(name)
43+
}
44+
45+
return d
46+
}
47+
48+
// Name -
49+
func (duplicator *Duplicator) Name() string {
50+
return "duplicator"
51+
}
52+
53+
// Input - returns input by name
54+
func (duplicator *Duplicator) Input(name string) (*modules.Input, error) {
55+
input, ok := duplicator.inputs[name]
56+
if !ok {
57+
return nil, errors.Wrap(modules.ErrUnknownInput, name)
58+
}
59+
return input, nil
60+
}
61+
62+
// Output - returns output by name
63+
func (duplicator *Duplicator) Output(name string) (*modules.Output, error) {
64+
output, ok := duplicator.outputs[name]
65+
if !ok {
66+
return nil, errors.Wrap(modules.ErrUnknownOutput, name)
67+
}
68+
return output, nil
69+
}
70+
71+
// AttachTo - attach input to output with name
72+
func (duplicator *Duplicator) AttachTo(name string, input *modules.Input) error {
73+
output, err := duplicator.Output(name)
74+
if err != nil {
75+
return err
76+
}
77+
output.Attach(input)
78+
return nil
79+
}
80+
81+
// Close - gracefully stops module
82+
func (duplicator *Duplicator) Close() error {
83+
duplicator.wg.Wait()
84+
85+
for _, i := range duplicator.inputs {
86+
if err := i.Close(); err != nil {
87+
return err
88+
}
89+
}
90+
91+
return nil
92+
}
93+
94+
// Start - starts module
95+
func (duplicator *Duplicator) Start(ctx context.Context) {
96+
duplicator.wg.Add(1)
97+
go duplicator.listen(ctx)
98+
}
99+
100+
func (duplicator *Duplicator) listen(ctx context.Context) {
101+
defer duplicator.wg.Done()
102+
103+
cases := []reflect.SelectCase{
104+
{
105+
Dir: reflect.SelectRecv,
106+
Chan: reflect.ValueOf(ctx.Done()),
107+
},
108+
}
109+
for _, input := range duplicator.inputs {
110+
cases = append(cases,
111+
reflect.SelectCase{
112+
Dir: reflect.SelectRecv,
113+
Chan: reflect.ValueOf(input.Listen()),
114+
},
115+
)
116+
}
117+
118+
for {
119+
_, value, ok := reflect.Select(cases)
120+
if !ok {
121+
return
122+
}
123+
for _, output := range duplicator.outputs {
124+
output.Push(value.Interface())
125+
}
126+
}
127+
}

0 commit comments

Comments
 (0)