Skip to content

Commit a5144da

Browse files
authored
Merge pull request #34 from blinklabs-io/feat/filter
feat: basic filter support
2 parents 43cd1fb + 80d027c commit a5144da

File tree

9 files changed

+236
-10
lines changed

9 files changed

+236
-10
lines changed

cmd/snek/main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"net/http"
2020
"os"
2121

22+
_ "github.com/blinklabs-io/snek/filter"
2223
_ "github.com/blinklabs-io/snek/input"
2324
"github.com/blinklabs-io/snek/internal/config"
2425
"github.com/blinklabs-io/snek/internal/logging"
@@ -112,6 +113,12 @@ func main() {
112113
}
113114
pipe.AddInput(input)
114115

116+
// Configure filters
117+
for _, filterEntry := range plugin.GetPlugins(plugin.PluginTypeFilter) {
118+
filter := plugin.GetPlugin(plugin.PluginTypeFilter, filterEntry.Name)
119+
pipe.AddFilter(filter)
120+
}
121+
115122
// Configure output
116123
output := plugin.GetPlugin(plugin.PluginTypeOutput, cfg.Output)
117124
if output == nil {

filter/event/event.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright 2023 Blink Labs, LLC.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package event
16+
17+
import (
18+
"github.com/blinklabs-io/snek/event"
19+
)
20+
21+
type Event struct {
22+
errorChan chan error
23+
inputChan chan event.Event
24+
outputChan chan event.Event
25+
filterType string
26+
}
27+
28+
// New returns a new Event object with the specified options applied
29+
func New(options ...EventOptionFunc) *Event {
30+
e := &Event{
31+
errorChan: make(chan error),
32+
inputChan: make(chan event.Event, 10),
33+
outputChan: make(chan event.Event, 10),
34+
}
35+
for _, option := range options {
36+
option(e)
37+
}
38+
return e
39+
}
40+
41+
// Start the event filter
42+
func (e *Event) Start() error {
43+
go func() {
44+
// TODO: pre-process filter params to be more useful for direct comparison
45+
for {
46+
evt, ok := <-e.inputChan
47+
// Channel has been closed, which means we're shutting down
48+
if !ok {
49+
return
50+
}
51+
// Drop events if we have a type filter configured and the event doesn't match
52+
if e.filterType != "" {
53+
if evt.Type != e.filterType {
54+
continue
55+
}
56+
}
57+
// Send event along
58+
e.outputChan <- evt
59+
}
60+
}()
61+
return nil
62+
}
63+
64+
// Stop the event filter
65+
func (e *Event) Stop() error {
66+
close(e.inputChan)
67+
close(e.outputChan)
68+
close(e.errorChan)
69+
return nil
70+
}
71+
72+
// ErrorChan returns the filter error channel
73+
func (e *Event) ErrorChan() chan error {
74+
return e.errorChan
75+
}
76+
77+
// InputChan returns the input event channel
78+
func (e *Event) InputChan() chan<- event.Event {
79+
return e.inputChan
80+
}
81+
82+
// OutputChan returns the output event channel
83+
func (e *Event) OutputChan() <-chan event.Event {
84+
return e.outputChan
85+
}

filter/event/option.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright 2023 Blink Labs, LLC.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package event
16+
17+
type EventOptionFunc func(*Event)
18+
19+
// WithType specfies the event type to filter on
20+
func WithType(eventType string) EventOptionFunc {
21+
return func(e *Event) {
22+
e.filterType = eventType
23+
}
24+
}

filter/event/plugin.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright 2023 Blink Labs, LLC.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package event
16+
17+
import (
18+
"github.com/blinklabs-io/snek/plugin"
19+
)
20+
21+
var cmdlineOptions struct {
22+
eventType string
23+
}
24+
25+
func init() {
26+
plugin.Register(
27+
plugin.PluginEntry{
28+
Type: plugin.PluginTypeFilter,
29+
Name: "event",
30+
Description: "filters events based on top-level event attributes",
31+
NewFromOptionsFunc: NewFromCmdlineOptions,
32+
Options: []plugin.PluginOption{
33+
{
34+
Name: "type",
35+
Type: plugin.PluginOptionTypeString,
36+
Description: "specifies event type to filter on",
37+
DefaultValue: "",
38+
Dest: &(cmdlineOptions.eventType),
39+
CustomFlag: "type",
40+
},
41+
},
42+
},
43+
)
44+
}
45+
46+
func NewFromCmdlineOptions() plugin.Plugin {
47+
p := New(
48+
WithType(cmdlineOptions.eventType),
49+
)
50+
return p
51+
}

filter/filter.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright 2023 Blink Labs, LLC.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package filter
16+
17+
import (
18+
_ "github.com/blinklabs-io/snek/filter/event"
19+
)

input/chainsync/chainsync.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,15 +168,15 @@ func (c *ChainSync) setupConnection() error {
168168
}
169169

170170
func (c *ChainSync) handleRollBackward(point ocommon.Point, tip ochainsync.Tip) error {
171-
evt := event.New("rollback", time.Now(), NewRollbackEvent(point))
171+
evt := event.New("chainsync.rollback", time.Now(), NewRollbackEvent(point))
172172
c.eventChan <- evt
173173
return nil
174174
}
175175

176176
func (c *ChainSync) handleRollForward(blockType uint, blockData interface{}, tip ochainsync.Tip) error {
177177
switch v := blockData.(type) {
178178
case ledger.Block:
179-
evt := event.New("block", time.Now(), NewBlockEvent(v, c.includeCbor))
179+
evt := event.New("chainsync.block", time.Now(), NewBlockEvent(v, c.includeCbor))
180180
c.eventChan <- evt
181181
case ledger.BlockHeader:
182182
var blockSlot uint64
@@ -202,10 +202,10 @@ func (c *ChainSync) handleRollForward(blockType uint, blockData interface{}, tip
202202
if err != nil {
203203
return err
204204
}
205-
blockEvt := event.New("block", time.Now(), NewBlockEvent(block, c.includeCbor))
205+
blockEvt := event.New("chainsync.block", time.Now(), NewBlockEvent(block, c.includeCbor))
206206
c.eventChan <- blockEvt
207207
for _, transaction := range block.Transactions() {
208-
txEvt := event.New("transaction", time.Now(), NewTransactionEvent(block, transaction, c.includeCbor))
208+
txEvt := event.New("chainsync.transaction", time.Now(), NewTransactionEvent(block, transaction, c.includeCbor))
209209
c.eventChan <- txEvt
210210
}
211211
}

pipeline/pipeline.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,17 @@ import (
2323

2424
type Pipeline struct {
2525
inputs []plugin.Plugin
26+
filters []plugin.Plugin
2627
outputs []plugin.Plugin
28+
filterChan chan event.Event
2729
outputChan chan event.Event
2830
errorChan chan error
2931
doneChan chan bool
3032
}
3133

3234
func New() *Pipeline {
3335
p := &Pipeline{
36+
filterChan: make(chan event.Event),
3437
outputChan: make(chan event.Event),
3538
errorChan: make(chan error),
3639
doneChan: make(chan bool),
@@ -42,6 +45,10 @@ func (p *Pipeline) AddInput(input plugin.Plugin) {
4245
p.inputs = append(p.inputs, input)
4346
}
4447

48+
func (p *Pipeline) AddFilter(filter plugin.Plugin) {
49+
p.filters = append(p.filters, filter)
50+
}
51+
4552
func (p *Pipeline) AddOutput(output plugin.Plugin) {
4653
p.outputs = append(p.outputs, output)
4754
}
@@ -57,11 +64,35 @@ func (p *Pipeline) Start() error {
5764
if err := input.Start(); err != nil {
5865
return fmt.Errorf("failed to start input: %s", err)
5966
}
60-
// Start background process to send input events to combined output channel
61-
go p.chanCopyLoop(input.OutputChan(), p.outputChan)
67+
// Start background process to send input events to combined filter channel
68+
go p.chanCopyLoop(input.OutputChan(), p.filterChan)
6269
// Start background error listener
6370
go p.errorChanWait(input.ErrorChan())
6471
}
72+
// Start filters
73+
for idx, filter := range p.filters {
74+
if err := filter.Start(); err != nil {
75+
return fmt.Errorf("failed to start input: %s", err)
76+
}
77+
if idx == 0 {
78+
// Start background process to send events from combined filter channel to first filter plugin
79+
go p.chanCopyLoop(p.filterChan, filter.InputChan())
80+
} else {
81+
// Start background process to send events from previous filter plugin to current filter plugin
82+
go p.chanCopyLoop(p.filters[idx-1].OutputChan(), filter.InputChan())
83+
}
84+
if idx == len(p.filters)-1 {
85+
// Start background process to send events from last filter to combined output channel
86+
go p.chanCopyLoop(filter.OutputChan(), p.outputChan)
87+
}
88+
// Start background error listener
89+
go p.errorChanWait(filter.ErrorChan())
90+
}
91+
if len(p.filters) == 0 {
92+
// Start background process to send events from combined filter channel to combined output channel if
93+
// there are no filter plugins
94+
go p.chanCopyLoop(p.filterChan, p.outputChan)
95+
}
6596
// Start outputs
6697
for _, output := range p.outputs {
6798
if err := output.Start(); err != nil {
@@ -78,6 +109,7 @@ func (p *Pipeline) Start() error {
78109
func (p *Pipeline) Stop() error {
79110
close(p.doneChan)
80111
close(p.errorChan)
112+
close(p.filterChan)
81113
close(p.outputChan)
82114
// Stop inputs
83115
for _, input := range p.inputs {

plugin/option.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,19 @@ type PluginOption struct {
3636
Name string
3737
Type PluginOptionType
3838
CustomEnvVar string
39+
CustomFlag string
3940
Description string
4041
DefaultValue interface{}
4142
Dest interface{}
4243
}
4344

44-
func (p *PluginOption) AddToFlagSet(fs *flag.FlagSet, flagPrefix string) error {
45-
flagName := fmt.Sprintf("%s%s", flagPrefix, p.Name)
45+
func (p *PluginOption) AddToFlagSet(fs *flag.FlagSet, pluginType string, pluginName string) error {
46+
var flagName string
47+
if p.CustomFlag != "" {
48+
flagName = fmt.Sprintf("%s-%s", pluginType, p.CustomFlag)
49+
} else {
50+
flagName = fmt.Sprintf("%s-%s-%s", pluginType, pluginName, p.Name)
51+
}
4652
switch p.Type {
4753
case PluginOptionTypeString:
4854
fs.StringVar(p.Dest.(*string), flagName, p.DefaultValue.(string), p.Description)

plugin/register.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type PluginType int
2424
const (
2525
PluginTypeInput PluginType = 1
2626
PluginTypeOutput PluginType = 2
27+
PluginTypeFilter PluginType = 3
2728
)
2829

2930
func PluginTypeName(pluginType PluginType) string {
@@ -32,6 +33,8 @@ func PluginTypeName(pluginType PluginType) string {
3233
return "input"
3334
case PluginTypeOutput:
3435
return "output"
36+
case PluginTypeFilter:
37+
return "filter"
3538
default:
3639
return ""
3740
}
@@ -53,9 +56,8 @@ func Register(pluginEntry PluginEntry) {
5356

5457
func PopulateCmdlineOptions(fs *flag.FlagSet) error {
5558
for _, plugin := range pluginEntries {
56-
flagPrefix := fmt.Sprintf("%s-%s-", PluginTypeName(plugin.Type), plugin.Name)
5759
for _, option := range plugin.Options {
58-
if err := option.AddToFlagSet(fs, flagPrefix); err != nil {
60+
if err := option.AddToFlagSet(fs, PluginTypeName(plugin.Type), plugin.Name); err != nil {
5961
return err
6062
}
6163
}

0 commit comments

Comments
 (0)