Skip to content

Commit 9f269cb

Browse files
authored
Merge pull request #30 from filecoin-project/feat/index-service
feat: add index service to resolve latest snapshot to static resource
2 parents 43b733e + 60ab50f commit 9f269cb

File tree

8 files changed

+519
-10
lines changed

8 files changed

+519
-10
lines changed

cmd/filecoin-chain-archiver/cmds/cmds.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99

1010
var logger = log.Logger("filecoin-chain-archiver/cmds")
1111

12-
var Commands = []*cli.Command{cmdCreate, cmdDefaultConfig, cmdService}
12+
var Commands = []*cli.Command{cmdCreate, cmdDefaultConfig, cmdService, cmdIndexService}
1313

1414
func TrimDescription(desc string) string {
1515
lines := strings.Split(desc, "\n")

cmd/filecoin-chain-archiver/cmds/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ var cmdDefaultConfig = &cli.Command{
1515
Action: func(cctx *cli.Context) error {
1616
var icfg interface{}
1717

18-
cfg := config.DefaultConfig()
18+
cfg := config.DefaultExportWorkerConfig()
1919
cfg.Nodes = append(cfg.Nodes, config.Node{
2020
Address: "/ip4/127.0.0.1/1234",
2121
TokenPath: "/path/to/token",

cmd/filecoin-chain-archiver/cmds/create.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,12 +153,12 @@ var cmdCreate = &cli.Command{
153153
return err
154154
}
155155

156-
icfg, err := config.FromFile(flagConfigPath, &config.Config{})
156+
icfg, err := config.FromFile(flagConfigPath, &config.ExportWorkerConfig{})
157157
if err != nil {
158158
return err
159159
}
160160

161-
cfg := icfg.(*config.Config)
161+
cfg := icfg.(*config.ExportWorkerConfig)
162162

163163
addrs, err := NodeMultiaddrs(cfg)
164164
if err != nil {
@@ -338,7 +338,7 @@ var cmdCreate = &cli.Command{
338338
ContentType: "application/octet-stream",
339339
})
340340
if err != nil {
341-
return fmt.Errorf("failed to upload", "object", fmt.Sprintf("%s%s.car", flagNamePrefix, name), "err", err)
341+
return fmt.Errorf("failed to upload object (%s): %w", fmt.Sprintf("%s%s.car", flagNamePrefix, name), err)
342342
}
343343

344344
logger.Infow("upload",
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package cmds
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"net/http"
8+
"os"
9+
"os/signal"
10+
"syscall"
11+
"time"
12+
13+
"github.com/urfave/cli/v2"
14+
15+
"github.com/filecoin-project/filecoin-chain-archiver/build"
16+
"github.com/filecoin-project/filecoin-chain-archiver/pkg/config"
17+
"github.com/filecoin-project/filecoin-chain-archiver/pkg/index/service"
18+
)
19+
20+
var cmdIndexService = &cli.Command{
21+
Name: "index-resolver-service",
22+
Usage: "Commands for the index resolver service",
23+
Description: "The index resolver service provides a way to resolve snapshots",
24+
Flags: []cli.Flag{},
25+
Subcommands: []*cli.Command{
26+
{
27+
Name: "default-config",
28+
Usage: "prints the default configuration",
29+
Description: TrimDescription(``),
30+
Flags: []cli.Flag{},
31+
Action: func(cctx *cli.Context) error {
32+
var icfg interface{}
33+
34+
cfg := config.DefaultIndexServiceConfig()
35+
icfg = cfg
36+
37+
bs, err := config.ConfigComment(icfg)
38+
if err != nil {
39+
return err
40+
}
41+
42+
fmt.Printf("%s", string(bs))
43+
44+
return nil
45+
},
46+
},
47+
{
48+
Name: "run",
49+
Usage: "start the service",
50+
Flags: []cli.Flag{
51+
&cli.StringFlag{
52+
Name: "service-listen",
53+
Usage: "host and port to listen on",
54+
EnvVars: []string{"FCA_INDEX_RESOLVER_SERVICE_LISTEN"},
55+
Value: "localhost:5200",
56+
},
57+
&cli.StringFlag{
58+
Name: "operator-listen",
59+
Usage: "host and port to listen on",
60+
EnvVars: []string{"FCA_INDEX_RESOLVER_OPERATOR_LISTEN"},
61+
Value: "localhost:5201",
62+
},
63+
&cli.StringFlag{
64+
Name: "config-path",
65+
Usage: "path to configuration file",
66+
EnvVars: []string{"FCA_INDEX_RESOLVER_CONFIG_PATH"},
67+
Value: "./config.toml",
68+
},
69+
},
70+
Action: func(cctx *cli.Context) error {
71+
ctx, cancelFunc := context.WithCancel(context.Background())
72+
ctx = context.WithValue(ctx, versionKey{}, build.Version())
73+
74+
signalChan := make(chan os.Signal, 1)
75+
signal.Notify(signalChan, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM)
76+
77+
s := service.NewIndexService(ctx)
78+
79+
if err := s.SetupService(cctx.String("config-path")); err != nil {
80+
return err
81+
}
82+
83+
svr := &http.Server{
84+
Addr: cctx.String("service-listen"),
85+
Handler: s.ServiceRouter,
86+
BaseContext: func(listener net.Listener) context.Context {
87+
return context.Background()
88+
},
89+
}
90+
91+
go func() {
92+
logger.Debugw("service running")
93+
err := svr.ListenAndServe()
94+
switch err {
95+
case nil:
96+
case http.ErrServerClosed:
97+
logger.Infow("server closed")
98+
case context.Canceled:
99+
logger.Infow("context cancled")
100+
default:
101+
logger.Errorw("error shutting down service server", "err", err)
102+
}
103+
}()
104+
105+
if err := s.SetupOperator(); err != nil {
106+
return err
107+
}
108+
109+
osvr := http.Server{
110+
Addr: cctx.String("operator-listen"),
111+
Handler: s.OperatorRouter,
112+
BaseContext: func(listener net.Listener) context.Context {
113+
return context.Background()
114+
},
115+
}
116+
117+
go func() {
118+
logger.Debugw("operator running")
119+
err := osvr.ListenAndServe()
120+
switch err {
121+
case nil:
122+
case http.ErrServerClosed:
123+
logger.Infow("server closed")
124+
case context.Canceled:
125+
logger.Infow("context cancled")
126+
default:
127+
logger.Errorw("error shutting down internal server", "err", err)
128+
}
129+
}()
130+
131+
logger.Infow("waiting for signal")
132+
<-signalChan
133+
s.Shutdown()
134+
135+
t := time.NewTimer(svrShutdownTimeout)
136+
137+
shutdownChan := make(chan error)
138+
go func() {
139+
shutdownChan <- svr.Shutdown(ctx)
140+
}()
141+
142+
select {
143+
case err := <-shutdownChan:
144+
if err != nil {
145+
logger.Errorw("shutdown finished with an error", "err", err)
146+
} else {
147+
logger.Infow("shutdown finished successfully")
148+
}
149+
case <-t.C:
150+
logger.Warnw("shutdown timed out")
151+
}
152+
153+
cancelFunc()
154+
time.Sleep(ctxCancelWait)
155+
156+
logger.Infow("closing down database connections")
157+
s.Close()
158+
159+
if err := osvr.Shutdown(ctx); err != nil {
160+
switch err {
161+
case nil:
162+
case http.ErrServerClosed:
163+
logger.Infow("server closed")
164+
case context.Canceled:
165+
logger.Infow("context cancled")
166+
default:
167+
logger.Errorw("error shutting down operator server", "err", err)
168+
}
169+
}
170+
171+
logger.Infow("existing")
172+
173+
return nil
174+
},
175+
},
176+
},
177+
}

cmd/filecoin-chain-archiver/cmds/utils.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ import (
1010

1111
"github.com/filecoin-project/go-jsonrpc"
1212

13-
cliutil "github.com/filecoin-project/lotus/cli/util"
1413
"github.com/filecoin-project/filecoin-chain-archiver/pkg/config"
14+
cliutil "github.com/filecoin-project/lotus/cli/util"
1515
)
1616

17-
func NodeMultiaddrs(cfg *config.Config) ([]string, error) {
17+
func NodeMultiaddrs(cfg *config.ExportWorkerConfig) ([]string, error) {
1818
var multiaddrs []string
1919

2020
for _, node := range cfg.Nodes {

pkg/config/config.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@ import (
1010
"golang.org/x/xerrors"
1111
)
1212

13-
func DefaultConfig() *Config {
14-
return &Config{}
13+
func DefaultIndexServiceConfig() *IndexServiceConfig {
14+
return &IndexServiceConfig{}
15+
}
16+
17+
func DefaultExportWorkerConfig() *ExportWorkerConfig {
18+
return &ExportWorkerConfig{}
1519
}
1620

1721
type URL url.URL
@@ -36,10 +40,21 @@ type Node struct {
3640
TokenPath string
3741
}
3842

39-
type Config struct {
43+
type ExportWorkerConfig struct {
4044
Nodes []Node
4145
}
4246

47+
type S3ResolverConfig struct {
48+
Endpoint string
49+
Bucket string
50+
AccessKeyPath string
51+
SecretKeyPath string
52+
}
53+
54+
type IndexServiceConfig struct {
55+
S3Resolver S3ResolverConfig
56+
}
57+
4358
func FromFile(path string, def interface{}) (interface{}, error) {
4459
file, err := os.Open(path)
4560
switch {

pkg/index/index.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package index
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"time"
8+
9+
"github.com/minio/minio-go/v7"
10+
11+
"github.com/ipfs/go-log/v2"
12+
)
13+
14+
var (
15+
expiryLength = 5 * time.Minute
16+
)
17+
18+
var logger = log.Logger("filecoin-chain-archiver/pkg/index-resolver")
19+
20+
type IndexS3Resolver struct {
21+
client s3ClientInterface
22+
bucket string
23+
}
24+
25+
type s3ClientInterface interface {
26+
StatObject(ctx context.Context, bucketName, objectName string, opts minio.StatObjectOptions) (minio.ObjectInfo, error)
27+
}
28+
29+
type Resolver interface {
30+
Resolve(context.Context, string) (string, error)
31+
}
32+
33+
func NewIndexS3Resolver(client s3ClientInterface, bucket string) *IndexS3Resolver {
34+
return &IndexS3Resolver{
35+
client: client,
36+
bucket: bucket,
37+
}
38+
}
39+
40+
func (i *IndexS3Resolver) Resolve(ctx context.Context, obj string) (string, error) {
41+
objInfo, err := i.client.StatObject(ctx, i.bucket, obj, minio.StatObjectOptions{})
42+
if err != nil {
43+
return "", err
44+
}
45+
46+
if v, ok := objInfo.Metadata["X-Amz-Website-Redirect-Location"]; ok {
47+
return v[0], nil
48+
}
49+
50+
return "", fmt.Errorf("failed to resolve link")
51+
}
52+
53+
type cacheMetadata struct {
54+
value string
55+
expiry time.Time
56+
}
57+
58+
type CachedResolver struct {
59+
resolver Resolver
60+
61+
cacheMu sync.Mutex
62+
cache map[string]cacheMetadata
63+
}
64+
65+
func NewCachedResolver(resolver Resolver) *CachedResolver {
66+
return &CachedResolver{
67+
resolver: resolver,
68+
cache: make(map[string]cacheMetadata),
69+
}
70+
}
71+
72+
func (i *CachedResolver) read(obj string) (string, bool) {
73+
i.cacheMu.Lock()
74+
defer i.cacheMu.Unlock()
75+
if v, ok := i.cache[obj]; ok {
76+
if time.Now().Before(v.expiry) {
77+
return v.value, true
78+
}
79+
}
80+
81+
return "", false
82+
}
83+
84+
func (i *CachedResolver) set(obj, value string, expiry time.Time) {
85+
i.cacheMu.Lock()
86+
defer i.cacheMu.Unlock()
87+
i.cache[obj] = cacheMetadata{
88+
expiry: expiry,
89+
value: value,
90+
}
91+
}
92+
93+
func (i *CachedResolver) Resolve(ctx context.Context, obj string) (string, error) {
94+
if v, ok := i.read(obj); ok {
95+
logger.Debugw("cache hit")
96+
return v, nil
97+
}
98+
99+
value, err := i.resolver.Resolve(ctx, obj)
100+
if err != nil {
101+
return "", err
102+
}
103+
104+
logger.Debugw("cache miss")
105+
i.set(obj, value, time.Now().Add(expiryLength))
106+
107+
return value, nil
108+
}

0 commit comments

Comments
 (0)