Skip to content

Commit 018ac74

Browse files
committed
akash provider: multipl fixes
* fix required flags * show error if api host is not expected provider * actually show all providers
1 parent 46679c2 commit 018ac74

File tree

8 files changed

+448
-374
lines changed

8 files changed

+448
-374
lines changed

cmd/akash/provider.go

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ func providerCommand() *cobra.Command {
3333
}
3434

3535
session.AddFlagNode(cmd, cmd.PersistentFlags())
36-
session.AddFlagKey(cmd, cmd.PersistentFlags())
37-
session.AddFlagNonce(cmd, cmd.PersistentFlags())
3836

3937
cmd.AddCommand(createProviderCommand())
4038
cmd.AddCommand(runCommand())
@@ -54,6 +52,8 @@ func createProviderCommand() *cobra.Command {
5452
RunE: session.WithSession(session.RequireNode(doCreateProviderCommand)),
5553
}
5654

55+
session.AddFlagKey(cmd, cmd.Flags())
56+
session.AddFlagNonce(cmd, cmd.Flags())
5757
session.AddFlagKeyType(cmd, cmd.Flags())
5858

5959
return cmd
@@ -132,6 +132,8 @@ func runCommand() *cobra.Command {
132132
RunE: session.WithSession(session.RequireNode(session.RequireHost(doProviderRunCommand))),
133133
}
134134

135+
session.AddFlagKey(cmd, cmd.Flags())
136+
session.AddFlagNonce(cmd, cmd.Flags())
135137
session.AddFlagHost(cmd, cmd.PersistentFlags())
136138

137139
cmd.Flags().Bool("kube", false, "use kubernetes cluster")
@@ -209,7 +211,7 @@ func doProviderRunCommand(session session.Session, cmd *cobra.Command, args []st
209211

210212
go func() {
211213
defer cancel()
212-
errch <- grpc.RunServer(ctx, session.Log(), "tcp", "9090", service.ManifestHandler(), cclient, service)
214+
errch <- grpc.Run(ctx, ":9090", psession, cclient, service, service.ManifestHandler())
213215
}()
214216

215217
go func() {
@@ -245,7 +247,7 @@ func doProviderStatusCommand(session session.Session, cmd *cobra.Command, args [
245247
return err
246248
}
247249

248-
var providers []types.Provider
250+
var providers []*types.Provider
249251

250252
if len(args) == 0 {
251253
providers = plist.Providers
@@ -265,20 +267,34 @@ func doProviderStatusCommand(session session.Session, cmd *cobra.Command, args [
265267
}
266268

267269
type outputItem struct {
268-
Provider *types.Provider
269-
Status *types.ServerStatusParseable
270-
Error string `json:",omitempty"`
270+
Provider *types.Provider `json:"provider,omitempty"`
271+
Status *types.ServerStatusParseable `json:"status,omitempty"`
272+
Error string `json:"error,omitempty"`
271273
}
272274

273275
output := []outputItem{}
274276

275277
for _, provider := range providers {
276-
status, err := http.Status(session.Ctx(), &provider)
278+
279+
status, err := http.Status(session.Ctx(), provider)
277280
if err != nil {
278-
output = append(output, outputItem{Provider: &provider, Error: err.Error()})
281+
output = append(output, outputItem{Provider: provider, Error: err.Error()})
282+
continue
283+
}
284+
285+
if !bytes.Equal(status.Provider, provider.Address) {
286+
output = append(output, outputItem{
287+
Provider: provider,
288+
Status: status,
289+
Error: "Status received from incorrect provider",
290+
})
279291
continue
280292
}
281-
output = append(output, outputItem{Provider: &provider, Status: status})
293+
294+
output = append(output, outputItem{
295+
Provider: provider,
296+
Status: status,
297+
})
282298
}
283299

284300
buf, err := json.MarshalIndent(output, "", " ")
@@ -298,7 +314,8 @@ func closeFulfillmentCommand() *cobra.Command {
298314
RunE: session.WithSession(session.RequireNode(doCloseFulfillmentCommand)),
299315
}
300316

301-
session.AddFlagKeyType(cmd, cmd.Flags())
317+
session.AddFlagKey(cmd, cmd.Flags())
318+
session.AddFlagNonce(cmd, cmd.Flags())
302319

303320
return cmd
304321
}
@@ -329,8 +346,6 @@ func closeLeaseCommand() *cobra.Command {
329346
RunE: session.WithSession(session.RequireNode(doCloseLeaseCommand)),
330347
}
331348

332-
session.AddFlagKeyType(cmd, cmd.Flags())
333-
334349
return cmd
335350
}
336351

provider/grpc/client_test.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,17 @@ package grpc
22

33
import (
44
"context"
5-
"os"
65
"testing"
7-
"time"
86

97
"github.com/ovrclk/akash/manifest"
10-
kmocks "github.com/ovrclk/akash/provider/cluster/kube/mocks"
8+
cmocks "github.com/ovrclk/akash/provider/cluster/mocks"
119
"github.com/ovrclk/akash/provider/manifest/mocks"
12-
pmocks "github.com/ovrclk/akash/provider/mocks"
10+
"github.com/ovrclk/akash/provider/session"
1311
"github.com/ovrclk/akash/sdl"
1412
"github.com/ovrclk/akash/testutil"
1513
"github.com/stretchr/testify/assert"
1614
"github.com/stretchr/testify/mock"
1715
"github.com/stretchr/testify/require"
18-
"github.com/tendermint/tmlibs/log"
1916
)
2017

2118
func TestSendManifest(t *testing.T) {
@@ -28,29 +25,37 @@ func TestSendManifest(t *testing.T) {
2825
mani, err := sdl.Manifest()
2926
require.NoError(t, err)
3027

31-
_, kmgr := testutil.NewNamedKey(t)
28+
key, kmgr := testutil.NewNamedKey(t)
3229
signer := testutil.Signer(t, kmgr)
3330

31+
provider := testutil.Provider(key.Address(), 1)
32+
session := session.New(testutil.Logger(), provider, nil, nil)
33+
3434
deployment := testutil.DeploymentAddress(t)
3535

3636
req, _, err := manifest.SignManifest(mani, signer, deployment)
3737
assert.NoError(t, err)
3838

39-
sclient := &pmocks.StatusClient{}
40-
4139
handler := &mocks.Handler{}
4240
handler.On("HandleManifest", mock.Anything, mock.Anything).Return(nil)
4341

44-
client := &kmocks.Client{}
42+
client := &cmocks.Client{}
43+
44+
donech := make(chan struct{})
45+
ctx, cancel := context.WithCancel(context.Background())
4546

46-
server := newServer(log.NewTMLogger(os.Stdout), "tcp", ":3001", handler, client, sclient)
4747
go func() {
48-
err := server.listenAndServe()
49-
require.NoError(t, err)
48+
defer close(donech)
49+
assert.NoError(t, Run(ctx, ":3001", session, client, nil, handler))
5050
}()
5151

52-
time.Sleep(1 * time.Second)
52+
testutil.SleepForThreadStart(t)
5353

5454
_, err = c.Deploy(context.TODO(), req)
5555
assert.NoError(t, err)
56+
57+
testutil.SleepForThreadStart(t)
58+
59+
cancel()
60+
<-donech
5661
}

provider/grpc/server.go

Lines changed: 64 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package grpc
22

33
import (
4-
"fmt"
54
"net"
65
"net/http"
76
"strings"
@@ -10,67 +9,46 @@ import (
109
"github.com/ovrclk/akash/keys"
1110
"github.com/ovrclk/akash/provider"
1211
"github.com/ovrclk/akash/provider/cluster"
13-
"github.com/ovrclk/akash/provider/cluster/kube"
1412
"github.com/ovrclk/akash/provider/manifest"
13+
"github.com/ovrclk/akash/provider/session"
1514
"github.com/ovrclk/akash/types"
1615
"github.com/ovrclk/akash/version"
1716
"github.com/tendermint/tmlibs/log"
1817
"golang.org/x/net/context"
19-
"golang.org/x/net/netutil"
2018
"google.golang.org/grpc"
2119
)
2220

2321
type server struct {
24-
cluster.Client
25-
*grpc.Server
22+
session session.Session
23+
client cluster.Client
2624
status provider.StatusClient
2725
handler manifest.Handler
28-
network string
29-
port string
3026
log log.Logger
3127
}
3228

33-
func RunServer(ctx context.Context, log log.Logger, network,
34-
port string, handler manifest.Handler, client kube.Client,
35-
status provider.StatusClient) error {
36-
37-
address := fmt.Sprintf(":%v", port)
38-
39-
server := newServer(log, network, address, handler, client, status)
40-
41-
ctx, cancel := context.WithCancel(ctx)
42-
43-
donech := make(chan struct{})
44-
45-
go func() {
46-
defer close(donech)
47-
<-ctx.Done()
48-
log.Info("Shutting down server")
49-
server.GracefulStop()
50-
}()
51-
52-
log.Info("Starting GRPC server", "address", address)
53-
err := server.listenAndServe()
54-
cancel()
55-
56-
<-donech
57-
58-
log.Info("GRPC server shutdown")
59-
60-
return err
29+
func Run(
30+
ctx context.Context,
31+
address string,
32+
session session.Session,
33+
client cluster.Client,
34+
status provider.StatusClient,
35+
handler manifest.Handler) error {
36+
server := create(session, client, status, handler)
37+
return run(ctx, server, address)
6138
}
6239

63-
func (s server) Status(ctx context.Context, req *types.Empty) (*types.ServerStatus, error) {
40+
func (s *server) Status(ctx context.Context, req *types.Empty) (*types.ServerStatus, error) {
6441
status, err := s.status.Status(ctx)
6542
if err != nil {
6643
return nil, err
6744
}
6845

6946
vsn := version.Get()
7047
return &types.ServerStatus{
71-
Code: http.StatusOK,
48+
Provider: s.session.Provider().Address,
7249
Version: &vsn,
73-
Provider: status,
50+
Status: status,
51+
Code: http.StatusOK,
7452
Message: "OK",
7553
}, nil
7654
}
@@ -85,7 +63,7 @@ func (s server) Deploy(ctx context.Context, req *types.ManifestRequest) (*types.
8563
}
8664

8765
// Lease status will retry for one minute
88-
func (s server) LeaseStatus(ctx context.Context, req *types.LeaseStatusRequest) (*types.LeaseStatusResponse, error) {
66+
func (s *server) LeaseStatus(ctx context.Context, req *types.LeaseStatusRequest) (*types.LeaseStatusResponse, error) {
8967
attempts := 12
9068

9169
lease, err := keys.ParseLeasePath(strings.Join([]string{req.Deployment, req.Group, req.Order, req.Provider}, "/"))
@@ -94,14 +72,14 @@ func (s server) LeaseStatus(ctx context.Context, req *types.LeaseStatusRequest)
9472
return nil, types.ErrInternalError{Message: "internal error"}
9573
}
9674

97-
response, err := s.Client.LeaseStatus(lease.LeaseID)
75+
response, err := s.client.LeaseStatus(lease.LeaseID)
9876
if err == nil {
9977
return response, err
10078
}
10179

10280
for i := 0; i < attempts; i++ {
10381
time.Sleep(time.Second * 5)
104-
response, err = s.Client.LeaseStatus(lease.LeaseID)
82+
response, err = s.client.LeaseStatus(lease.LeaseID)
10583
if err != cluster.ErrNoDeployments {
10684
break
10785
}
@@ -110,23 +88,23 @@ func (s server) LeaseStatus(ctx context.Context, req *types.LeaseStatusRequest)
11088
return response, err
11189
}
11290

113-
func (s server) ServiceStatus(ctx context.Context,
91+
func (s *server) ServiceStatus(ctx context.Context,
11492
req *types.ServiceStatusRequest) (*types.ServiceStatusResponse, error) {
11593
lease, err := keys.ParseLeasePath(strings.Join([]string{req.Deployment, req.Group, req.Order, req.Provider}, "/"))
11694
if err != nil {
11795
s.log.Error(err.Error())
11896
return nil, types.ErrInternalError{Message: "internal error"}
11997
}
120-
return s.Client.ServiceStatus(lease.LeaseID, req.Name)
98+
return s.client.ServiceStatus(lease.LeaseID, req.Name)
12199
}
122100

123-
func (s server) ServiceLogs(req *types.LogRequest, server types.Cluster_ServiceLogsServer) error {
101+
func (s *server) ServiceLogs(req *types.LogRequest, server types.Cluster_ServiceLogsServer) error {
124102
lease, err := keys.ParseLeasePath(strings.Join([]string{req.Deployment, req.Group, req.Order, req.Provider}, "/"))
125103
if err != nil {
126104
s.log.Error(err.Error())
127105
return types.ErrInternalError{Message: "internal error"}
128106
}
129-
logs, err := s.Client.ServiceLogs(server.Context(), lease.LeaseID, req.Options.TailLines, req.Options.Follow)
107+
logs, err := s.client.ServiceLogs(server.Context(), lease.LeaseID, req.Options.TailLines, req.Options.Follow)
130108
if err != nil {
131109
s.log.Error(err.Error())
132110
return types.ErrInternalError{Message: "internal error"}
@@ -165,28 +143,54 @@ func (s server) ServiceLogs(req *types.LogRequest, server types.Cluster_ServiceL
165143
return nil
166144
}
167145

168-
// NewServer network can be "tcp", "tcp4", "tcp6", "unix" or "unixpacket". phandler is the provider cluster handler
169-
func newServer(log log.Logger, network, port string, handler manifest.Handler,
170-
client kube.Client, status provider.StatusClient) *server {
171-
s := &server{
146+
func create(
147+
session session.Session,
148+
client cluster.Client,
149+
status provider.StatusClient,
150+
handler manifest.Handler) *server {
151+
152+
log := session.Log().With("cmp", "grpc-server")
153+
154+
return &server{
155+
session: session,
156+
client: client,
172157
status: status,
173158
handler: handler,
174-
network: network,
175-
port: port,
176-
Server: grpc.NewServer(grpc.MaxConcurrentStreams(2), grpc.MaxRecvMsgSize(500000)),
177159
log: log,
178-
Client: client,
179160
}
180-
types.RegisterClusterServer(s.Server, s)
181-
return s
182161
}
183162

184-
func (s *server) listenAndServe() error {
185-
l, err := net.Listen(s.network, s.port)
163+
func run(ctx context.Context, server *server, address string) error {
164+
165+
fd, err := net.Listen("tcp4", address)
186166
if err != nil {
187167
return err
188168
}
189-
l = netutil.LimitListener(l, 10)
190-
s.log.Info("Running manifest server", "port", s.port, "network", s.network)
191-
return s.Server.Serve(l)
169+
170+
gserver := grpc.NewServer(grpc.MaxConcurrentStreams(2), grpc.MaxRecvMsgSize(500000))
171+
types.RegisterClusterServer(gserver, server)
172+
173+
ctx, cancel := context.WithCancel(ctx)
174+
donech := make(chan struct{})
175+
176+
go func() {
177+
defer close(donech)
178+
<-ctx.Done()
179+
server.log.Info("Shutting down server")
180+
gserver.GracefulStop()
181+
}()
182+
183+
server.log.Info("Starting GRPC server", "address", address)
184+
err = gserver.Serve(fd)
185+
server.log.Info("GRPC server shutdown.")
186+
if ctx.Err() == context.Canceled {
187+
err = nil
188+
}
189+
cancel()
190+
191+
<-donech
192+
193+
server.log.Info("GRPC done.")
194+
195+
return err
192196
}

0 commit comments

Comments
 (0)