Skip to content

Commit 886c845

Browse files
authored
Merge pull request #36 from blinklabs-io/feat/filter-chainsync
feat: chainsync filter support
2 parents a5144da + e04f65c commit 886c845

File tree

8 files changed

+332
-8
lines changed

8 files changed

+332
-8
lines changed

README.md

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,29 @@ plugins:
111111
level: info
112112
```
113113
114+
## Filtering
115+
116+
snek supports filtering events before they are output.
117+
118+
You can get a list of all available filter options by using the `-h`/`-help` flag.
119+
120+
```bash
121+
$ ./snek -h
122+
Usage of snek:
123+
...
124+
-filter-address string
125+
specifies address to filter on
126+
-filter-asset string
127+
specifies the asset fingerprint (asset1xxx) to filter on
128+
-filter-policy string
129+
specifies asset policy ID to filter on
130+
-filter-type string
131+
specifies event type to filter on
132+
...
133+
```
134+
135+
Multiple filter options can be used together, and only events matching all filters will be output.
136+
114137
## Example usage
115138

116139
### Native using remote node
@@ -138,3 +161,45 @@ docker run --rm -ti \
138161
-v node-ipc:/node-ipc \
139162
ghcr.io/blinklabs-io/snek:main
140163
```
164+
165+
### Filtering
166+
167+
#### Filtering on event type
168+
169+
Only output `chainsync.transaction` event types
170+
171+
```bash
172+
$ snek -filter-type chainsync.transaction
173+
```
174+
175+
#### Filtering on asset policy
176+
177+
Only output transactions involving an asset with a particular policy ID
178+
179+
```bash
180+
$ snek -filter-type chainsync.transaction -filter-policy 13aa2accf2e1561723aa26871e071fdf32c867cff7e7d50ad470d62f
181+
```
182+
183+
#### Filtering on asset fingerprint
184+
185+
Only output transactions involving a particular asset
186+
187+
```bash
188+
$ snek -filter-type chainsync.transaction -filter-asset asset108xu02ckwrfc8qs9d97mgyh4kn8gdu9w8f5sxk
189+
```
190+
191+
#### Filtering on a policy ID and asset fingerprint
192+
193+
Only output transactions involving both a particular policy ID and a particular asset (which do not need to be related)
194+
195+
```bash
196+
$ snek -filter-type chainsync.transaction -filter-asset asset108xu02ckwrfc8qs9d97mgyh4kn8gdu9w8f5sxk -filter-policy 13aa2accf2e1561723aa26871e071fdf32c867cff7e7d50ad470d62f
197+
```
198+
199+
#### Filtering on an address
200+
201+
Only output transactions with outputs matching a particular address
202+
203+
```bash
204+
$ snek -filter-type chainsync.transaction -filter-address addr1qyht4ja0zcn45qvyx477qlyp6j5ftu5ng0prt9608dxp6l2j2c79gy9l76sdg0xwhd7r0c0kna0tycz4y5s6mlenh8pq4jxtdy
205+
```

filter/chainsync/chainsync.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Copyright 2023 Blink Labs, LLC.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package chainsync
16+
17+
import (
18+
"github.com/blinklabs-io/gouroboros/ledger"
19+
"github.com/blinklabs-io/snek/event"
20+
"github.com/blinklabs-io/snek/input/chainsync"
21+
)
22+
23+
type ChainSync struct {
24+
errorChan chan error
25+
inputChan chan event.Event
26+
outputChan chan event.Event
27+
filterAddress string
28+
filterPolicyId string
29+
filterAssetFingerprint string
30+
}
31+
32+
// New returns a new ChainSync object with the specified options applied
33+
func New(options ...ChainSyncOptionFunc) *ChainSync {
34+
c := &ChainSync{
35+
errorChan: make(chan error),
36+
inputChan: make(chan event.Event, 10),
37+
outputChan: make(chan event.Event, 10),
38+
}
39+
for _, option := range options {
40+
option(c)
41+
}
42+
return c
43+
}
44+
45+
// Start the chain sync filter
46+
func (c *ChainSync) Start() error {
47+
go func() {
48+
// TODO: pre-process filter params to be more useful for direct comparison
49+
for {
50+
evt, ok := <-c.inputChan
51+
// Channel has been closed, which means we're shutting down
52+
if !ok {
53+
return
54+
}
55+
switch v := evt.Payload.(type) {
56+
case chainsync.TransactionEvent:
57+
// Check address filter
58+
if c.filterAddress != "" {
59+
foundMatch := false
60+
// TODO: extract and compare stake addresses when this is done
61+
// https://github.com/blinklabs-io/gouroboros/issues/302
62+
for _, output := range v.Outputs {
63+
if output.Address().String() == c.filterAddress {
64+
foundMatch = true
65+
break
66+
}
67+
}
68+
if !foundMatch {
69+
continue
70+
}
71+
}
72+
// Check policy ID filter
73+
if c.filterPolicyId != "" {
74+
foundMatch := false
75+
for _, output := range v.Outputs {
76+
if output.Assets() != nil {
77+
for _, policyId := range output.Assets().Policies() {
78+
if policyId.String() == c.filterPolicyId {
79+
foundMatch = true
80+
break
81+
}
82+
}
83+
}
84+
if foundMatch {
85+
break
86+
}
87+
}
88+
if !foundMatch {
89+
continue
90+
}
91+
}
92+
// Check asset fingerprint filter
93+
if c.filterAssetFingerprint != "" {
94+
foundMatch := false
95+
for _, output := range v.Outputs {
96+
if output.Assets() != nil {
97+
for _, policyId := range output.Assets().Policies() {
98+
for _, assetName := range output.Assets().Assets(policyId) {
99+
assetFp := ledger.NewAssetFingerprint(policyId.Bytes(), assetName)
100+
if assetFp.String() == c.filterAssetFingerprint {
101+
foundMatch = true
102+
}
103+
}
104+
if foundMatch {
105+
break
106+
}
107+
}
108+
if foundMatch {
109+
break
110+
}
111+
}
112+
}
113+
if !foundMatch {
114+
continue
115+
}
116+
}
117+
}
118+
c.outputChan <- evt
119+
}
120+
}()
121+
return nil
122+
}
123+
124+
// Stop the chain sync filter
125+
func (c *ChainSync) Stop() error {
126+
close(c.inputChan)
127+
close(c.outputChan)
128+
close(c.errorChan)
129+
return nil
130+
}
131+
132+
// ErrorChan returns the filter error channel
133+
func (c *ChainSync) ErrorChan() chan error {
134+
return c.errorChan
135+
}
136+
137+
// InputChan returns the input event channel
138+
func (c *ChainSync) InputChan() chan<- event.Event {
139+
return c.inputChan
140+
}
141+
142+
// OutputChan returns the output event channel
143+
func (c *ChainSync) OutputChan() <-chan event.Event {
144+
return c.outputChan
145+
}

filter/chainsync/option.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Copyright 2023 Blink Labs, LLC.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package chainsync
16+
17+
type ChainSyncOptionFunc func(*ChainSync)
18+
19+
// WithAddress specfies the address to filter on
20+
func WithAddress(address string) ChainSyncOptionFunc {
21+
return func(c *ChainSync) {
22+
c.filterAddress = address
23+
}
24+
}
25+
26+
// WithPolicy specfies the address to filter on
27+
func WithPolicy(policyId string) ChainSyncOptionFunc {
28+
return func(c *ChainSync) {
29+
c.filterPolicyId = policyId
30+
}
31+
}
32+
33+
//WithAssetFingerprint specifies the asset fingerprint (asset1xxx) to filter on
34+
func WithAssetFingerprint(assetFingerprint string) ChainSyncOptionFunc {
35+
return func(c *ChainSync) {
36+
c.filterAssetFingerprint = assetFingerprint
37+
}
38+
}

filter/chainsync/plugin.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// Copyright 2023 Blink Labs, LLC.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package chainsync
16+
17+
import (
18+
"github.com/blinklabs-io/snek/plugin"
19+
)
20+
21+
var cmdlineOptions struct {
22+
address string
23+
policyId string
24+
asset string
25+
}
26+
27+
func init() {
28+
plugin.Register(
29+
plugin.PluginEntry{
30+
Type: plugin.PluginTypeFilter,
31+
Name: "chainsync",
32+
Description: "filters chainsync events",
33+
NewFromOptionsFunc: NewFromCmdlineOptions,
34+
Options: []plugin.PluginOption{
35+
{
36+
Name: "address",
37+
Type: plugin.PluginOptionTypeString,
38+
Description: "specifies address to filter on",
39+
DefaultValue: "",
40+
Dest: &(cmdlineOptions.address),
41+
CustomFlag: "address",
42+
},
43+
{
44+
Name: "policy",
45+
Type: plugin.PluginOptionTypeString,
46+
Description: "specifies asset policy ID to filter on",
47+
DefaultValue: "",
48+
Dest: &(cmdlineOptions.policyId),
49+
CustomFlag: "policy",
50+
},
51+
{
52+
Name: "asset",
53+
Type: plugin.PluginOptionTypeString,
54+
Description: "specifies the asset fingerprint (asset1xxx) to filter on",
55+
DefaultValue: "",
56+
Dest: &(cmdlineOptions.asset),
57+
CustomFlag: "asset",
58+
},
59+
},
60+
},
61+
)
62+
}
63+
64+
func NewFromCmdlineOptions() plugin.Plugin {
65+
p := New(
66+
WithAddress(cmdlineOptions.address),
67+
WithPolicy(cmdlineOptions.policyId),
68+
WithAssetFingerprint(cmdlineOptions.asset),
69+
)
70+
return p
71+
}

filter/filter.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@
1515
package filter
1616

1717
import (
18+
_ "github.com/blinklabs-io/snek/filter/chainsync"
1819
_ "github.com/blinklabs-io/snek/filter/event"
1920
)

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/blinklabs-io/snek
33
go 1.18
44

55
require (
6-
github.com/blinklabs-io/gouroboros v0.43.0
6+
github.com/blinklabs-io/gouroboros v0.44.0
77
github.com/kelseyhightower/envconfig v1.4.0
88
go.uber.org/zap v1.24.0
99
gopkg.in/yaml.v2 v2.4.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
2-
github.com/blinklabs-io/gouroboros v0.43.0 h1:XwZzyAEyhl3z2v3zZSQI41OFyqCU9u0JQp8hlRnAXK4=
3-
github.com/blinklabs-io/gouroboros v0.43.0/go.mod h1:YNHQqwU1yv620T5C+umkDmYf8BgBHlm6QpI9LUQ3Jhw=
2+
github.com/blinklabs-io/gouroboros v0.44.0 h1:bvhMAqU9ZUh1WYrx8lMUj87h4QeFCXLZJaZ4bLlfxlE=
3+
github.com/blinklabs-io/gouroboros v0.44.0/go.mod h1:YNHQqwU1yv620T5C+umkDmYf8BgBHlm6QpI9LUQ3Jhw=
44
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
55
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
66
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

input/chainsync/tx.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ import (
1919
)
2020

2121
type TransactionEvent struct {
22-
BlockNumber uint64 `json:"blockNumber"`
23-
BlockHash string `json:"blockHash"`
24-
SlotNumber uint64 `json:"slotNumber"`
25-
TransactionHash string `json:"transactionHash"`
26-
TransactionCbor byteSliceJsonHex `json:"transactionCbor,omitempty"`
22+
BlockNumber uint64 `json:"blockNumber"`
23+
BlockHash string `json:"blockHash"`
24+
SlotNumber uint64 `json:"slotNumber"`
25+
TransactionHash string `json:"transactionHash"`
26+
TransactionCbor byteSliceJsonHex `json:"transactionCbor,omitempty"`
27+
Inputs []ledger.TransactionInput `json:"inputs"`
28+
Outputs []ledger.TransactionOutput `json:"outputs"`
2729
}
2830

2931
func NewTransactionEvent(block ledger.Block, txBody ledger.TransactionBody, includeCbor bool) TransactionEvent {
@@ -32,6 +34,8 @@ func NewTransactionEvent(block ledger.Block, txBody ledger.TransactionBody, incl
3234
BlockHash: block.Hash(),
3335
SlotNumber: block.SlotNumber(),
3436
TransactionHash: txBody.Hash(),
37+
Inputs: txBody.Inputs(),
38+
Outputs: txBody.Outputs(),
3539
}
3640
if includeCbor {
3741
evt.TransactionCbor = txBody.Cbor()

0 commit comments

Comments
 (0)