Skip to content

Commit 3b8bd11

Browse files
authored
feat(indexer-gateway): add new rest api (#72)
- add new rest api to get file info and node status - improve storage nodes selection for file upload and download
1 parent e8e028d commit 3b8bd11

File tree

8 files changed

+304
-140
lines changed

8 files changed

+304
-140
lines changed

cmd/indexer.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ var (
1717
locations indexer.IPLocationConfig
1818
locationCache indexer.FileLocationCacheConfig
1919
maxDownloadFileSize uint64
20-
ExpectedReplica uint
2120
}
2221

2322
indexerCmd = &cobra.Command{
@@ -45,7 +44,6 @@ func init() {
4544
indexerCmd.Flags().IntVar(&indexerArgs.locationCache.CacheSize, "file-location-cache-size", 100000, "size of file location cache")
4645

4746
indexerCmd.Flags().Uint64Var(&indexerArgs.maxDownloadFileSize, "max-download-file-size", 100*1024*1024, "Maximum file size in bytes to download")
48-
indexerCmd.Flags().UintVar(&indexerArgs.ExpectedReplica, "expected-replica", 1, "Expected number of replications to upload")
4947

5048
indexerCmd.MarkFlagsOneRequired("trusted", "node")
5149

@@ -58,17 +56,17 @@ func startIndexer(*cobra.Command, []string) {
5856

5957
indexer.InitDefaultIPLocationManager(indexerArgs.locations)
6058

61-
nodeManagerClosable, err := indexer.InitDefaultNodeManager(indexerArgs.nodes)
59+
nodeManager, err := indexer.InitDefaultNodeManager(indexerArgs.nodes)
6260
if err != nil {
6361
logrus.WithError(err).Fatal("Failed to initialize the default node manager")
6462
}
65-
defer nodeManagerClosable()
63+
defer nodeManager.Close()
6664

67-
fileLocationCacheClosable, err := indexer.InitFileLocationCache(indexerArgs.locationCache)
65+
fileLocationCache, err := indexer.InitFileLocationCache(indexerArgs.locationCache)
6866
if err != nil {
6967
logrus.WithError(err).Fatal("Failed to initialize the default file location cache")
7068
}
71-
defer fileLocationCacheClosable()
69+
defer fileLocationCache.Close()
7270

7371
api := indexer.NewIndexerApi()
7472

@@ -77,11 +75,9 @@ func startIndexer(*cobra.Command, []string) {
7775
"discover": len(indexerArgs.nodes.DiscoveryNode) > 0,
7876
}).Info("Starting indexer service ...")
7977

80-
gateway.MustServeWithRPC(gateway.Config{
78+
gateway.MustServeWithRPC(nodeManager, fileLocationCache, gateway.Config{
8179
Endpoint: indexerArgs.endpoint,
82-
Nodes: indexerArgs.nodes.TrustedNodes,
8380
MaxDownloadFileSize: indexerArgs.maxDownloadFileSize,
84-
ExpectedReplica: indexerArgs.ExpectedReplica,
8581
RPCHandler: rpc.MustNewHandler(map[string]interface{}{
8682
api.Namespace: api,
8783
}),

indexer/file_location_cache.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,18 @@ type FileLocationCache struct {
4141

4242
var defaultFileLocationCache FileLocationCache
4343

44-
func InitFileLocationCache(config FileLocationCacheConfig) (closable func(), err error) {
44+
func InitFileLocationCache(config FileLocationCacheConfig) (cache *FileLocationCache, err error) {
4545
if len(config.DiscoveryNode) > 0 {
4646
if defaultFileLocationCache.discoverNode, err = node.NewAdminClient(config.DiscoveryNode, defaultZgsClientOpt); err != nil {
4747
return nil, errors.WithMessage(err, "Failed to create admin client to discover peers")
4848
}
4949
}
5050
defaultFileLocationCache.cache = expirable.NewLRU[uint64, []*shard.ShardedNode](config.CacheSize, nil, config.Expiry)
5151
defaultFileLocationCache.discoveryPorts = config.DiscoveryPorts
52-
return defaultFileLocationCache.close, nil
52+
return &defaultFileLocationCache, nil
5353
}
5454

55-
func (c *FileLocationCache) close() {
55+
func (c *FileLocationCache) Close() {
5656
if c.discoverNode != nil {
5757
c.discoverNode.Close()
5858
}

indexer/gateway/controller.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package gateway
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
8+
"github.com/0glabs/0g-storage-client/common/shard"
9+
"github.com/0glabs/0g-storage-client/indexer"
10+
"github.com/0glabs/0g-storage-client/node"
11+
eth_common "github.com/ethereum/go-ethereum/common"
12+
"github.com/pkg/errors"
13+
)
14+
15+
type Cid struct {
16+
Root string `form:"root" json:"root"`
17+
TxSeq *uint64 `form:"txSeq" json:"txSeq"`
18+
}
19+
20+
// NewCid parsing the CID from the input string.
21+
func NewCid(cidStr string) Cid {
22+
var cid Cid
23+
if v, err := strconv.ParseUint(cidStr, 10, 64); err == nil { // TxnSeq is used as CID
24+
cid.TxSeq = &v
25+
} else {
26+
cid.Root = cidStr
27+
}
28+
return cid
29+
}
30+
31+
type RestController struct {
32+
nodeManager *indexer.NodeManager
33+
fileLocationCache *indexer.FileLocationCache
34+
35+
maxDownloadFileSize uint64 // max download file size
36+
}
37+
38+
func NewRestController(nodeManager *indexer.NodeManager, locationCache *indexer.FileLocationCache, maxDownloadFileSize uint64) *RestController {
39+
return &RestController{
40+
nodeManager: nodeManager,
41+
fileLocationCache: locationCache,
42+
maxDownloadFileSize: maxDownloadFileSize,
43+
}
44+
}
45+
46+
// getAvailableFileLocations returns a list of available file locations for a file with the given CID.
47+
func (ctrl *RestController) getAvailableFileLocations(ctx context.Context, cid Cid) ([]*shard.ShardedNode, error) {
48+
if cid.TxSeq != nil {
49+
return ctrl.fileLocationCache.GetFileLocations(ctx, *cid.TxSeq)
50+
}
51+
52+
// find corresponding tx sequence
53+
hash := eth_common.HexToHash(cid.Root)
54+
for _, client := range ctrl.nodeManager.TrustedClients() {
55+
info, err := client.GetFileInfo(ctx, hash)
56+
if err == nil && info != nil {
57+
return ctrl.fileLocationCache.GetFileLocations(ctx, info.Tx.Seq)
58+
}
59+
}
60+
61+
return nil, nil
62+
}
63+
64+
// getAvailableStorageNodes returns a list of available storage nodes for a file with the given CID.
65+
func (ctrl *RestController) getAvailableStorageNodes(ctx context.Context, cid Cid) ([]*node.ZgsClient, error) {
66+
nodes, err := ctrl.getAvailableFileLocations(ctx, cid)
67+
if err != nil {
68+
return nil, errors.WithMessage(err, "failed to get file locations")
69+
}
70+
71+
var clients []*node.ZgsClient
72+
for i := range nodes {
73+
client, err := node.NewZgsClient(nodes[i].URL)
74+
if err != nil {
75+
return nil, errors.WithMessage(err, "failed to create zgs client")
76+
}
77+
78+
clients = append(clients, client)
79+
}
80+
81+
return clients, nil
82+
}
83+
84+
// fetchFileInfo encapsulates the logic for attempting to retrieve file info from storage nodes.
85+
func (ctrl *RestController) fetchFileInfo(ctx context.Context, cid Cid) (*node.FileInfo, error) {
86+
clients, err := ctrl.getAvailableStorageNodes(ctx, cid)
87+
if err != nil {
88+
return nil, fmt.Errorf("failed to get available storage nodes: %v", err)
89+
}
90+
91+
fileInfo, err := getOverallFileInfo(ctx, clients, cid)
92+
if err != nil {
93+
return nil, fmt.Errorf("failed to retrieve file info from storage nodes: %v", err)
94+
}
95+
96+
if fileInfo != nil {
97+
return fileInfo, nil
98+
}
99+
100+
// Attempt retrieval from trusted clients as a fallback
101+
fileInfo, err = getOverallFileInfo(ctx, ctrl.nodeManager.TrustedClients(), cid)
102+
if err != nil {
103+
return nil, fmt.Errorf("failed to retrieve file info from trusted clients: %v", err)
104+
}
105+
106+
return fileInfo, nil
107+
}
108+
109+
func getOverallFileInfo(ctx context.Context, clients []*node.ZgsClient, cid Cid) (info *node.FileInfo, err error) {
110+
var rootHash eth_common.Hash
111+
if cid.TxSeq == nil {
112+
rootHash = eth_common.HexToHash(cid.Root)
113+
}
114+
115+
var finalInfo *node.FileInfo
116+
for _, client := range clients {
117+
if cid.TxSeq != nil {
118+
info, err = client.GetFileInfoByTxSeq(ctx, *cid.TxSeq)
119+
} else {
120+
info, err = client.GetFileInfo(ctx, rootHash)
121+
}
122+
123+
if err != nil {
124+
return nil, err
125+
}
126+
127+
if info == nil {
128+
return nil, nil
129+
}
130+
131+
if finalInfo == nil {
132+
finalInfo = info
133+
continue
134+
}
135+
finalInfo.Finalized = finalInfo.Finalized && info.Finalized
136+
finalInfo.IsCached = finalInfo.IsCached && info.IsCached
137+
finalInfo.Pruned = finalInfo.Pruned || info.Pruned
138+
finalInfo.UploadedSegNum = min(finalInfo.UploadedSegNum, info.UploadedSegNum)
139+
}
140+
141+
return finalInfo, nil
142+
}

0 commit comments

Comments
 (0)