Skip to content

Commit 3417085

Browse files
authored
feat(gossip, login): separate gossip pkg, implement new login/gossip operations (#641)
@eahydra @cardyok Now implementing very basic login + gossip. Needs a lot of cleanups later. Signed-off-by: Gyuho Lee <[email protected]>
1 parent 0f83fcb commit 3417085

File tree

19 files changed

+1238
-562
lines changed

19 files changed

+1238
-562
lines changed

api/v1/gossip.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package v1
2+
3+
// GossipRequest is the request for the gossip request.
4+
type GossipRequest struct {
5+
MachineID string `json:"machineID"`
6+
DaemonVersion string `json:"daemonVersion"`
7+
Components []string `json:"components"`
8+
}
9+
10+
// GossipResponse is the response for the gossip request.
11+
type GossipResponse struct {
12+
Error string `json:"error"`
13+
Status string `json:"status"`
14+
}

cmd/gpud/command/command.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package command
22

33
import (
4+
"errors"
45
"fmt"
56
"time"
67

@@ -23,7 +24,6 @@ var (
2324
logFile string
2425

2526
statusWatch bool
26-
uid string
2727

2828
annotations string
2929
listenAddress string
@@ -56,6 +56,10 @@ const (
5656
warningSign = "\033[31m✘\033[0m"
5757
)
5858

59+
var (
60+
ErrEmptyToken = errors.New("token is empty")
61+
)
62+
5963
func App() *cli.App {
6064
app := cli.NewApp()
6165

@@ -183,11 +187,6 @@ sudo rm /etc/systemd/system/gpud.service
183187
Usage: "set the annotations",
184188
Destination: &annotations,
185189
},
186-
cli.StringFlag{
187-
Name: "uid",
188-
Usage: "uid for this machine",
189-
Destination: &uid,
190-
},
191190
&cli.BoolFlag{
192191
Name: "pprof",
193192
Usage: "enable pprof (default: false)",
@@ -388,11 +387,6 @@ sudo rm /etc/systemd/system/gpud.service
388387
Usage: "endpoint for control plane",
389388
Value: "mothership-machine.app.lepton.ai",
390389
},
391-
cli.StringFlag{
392-
Name: "uid",
393-
Usage: "uid for this machine",
394-
Destination: &uid,
395-
},
396390
},
397391
Subcommands: []cli.Command{
398392
{

cmd/gpud/command/login.go

Lines changed: 37 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,31 @@ package command
33
import (
44
"context"
55
"fmt"
6-
"os"
76
"time"
87

8+
"github.com/urfave/cli"
9+
10+
apiv1 "github.com/leptonai/gpud/api/v1"
911
client "github.com/leptonai/gpud/client/v1"
1012
"github.com/leptonai/gpud/pkg/config"
11-
gpud_state "github.com/leptonai/gpud/pkg/gpud-state"
13+
gpudstate "github.com/leptonai/gpud/pkg/gpud-state"
1214
"github.com/leptonai/gpud/pkg/login"
1315
"github.com/leptonai/gpud/pkg/server"
1416
"github.com/leptonai/gpud/pkg/sqlite"
15-
16-
"github.com/urfave/cli"
1717
)
1818

1919
func cmdLogin(cliContext *cli.Context) error {
20+
token := cliContext.String("token")
21+
if token == "" {
22+
fmt.Print("Please visit https://dashboard.lepton.ai/ under Settings/Tokens to fetch your token\nPlease enter your token:")
23+
if _, err := fmt.Scanln(&token); err != nil && err.Error() != "unexpected newline" {
24+
return fmt.Errorf("failed reading input: %w", err)
25+
}
26+
}
27+
if token == "" {
28+
return ErrEmptyToken
29+
}
30+
2031
rootCtx, rootCancel := context.WithTimeout(context.Background(), 2*time.Minute)
2132
defer rootCancel()
2233

@@ -31,75 +42,54 @@ func cmdLogin(cliContext *cli.Context) error {
3142
if err != nil {
3243
return fmt.Errorf("failed to get state file: %w", err)
3344
}
34-
35-
dbRW, err := sqlite.Open(stateFile)
36-
if err != nil {
37-
return fmt.Errorf("failed to open state file: %w", err)
38-
}
39-
defer dbRW.Close()
40-
4145
dbRO, err := sqlite.Open(stateFile, sqlite.WithReadOnly(true))
4246
if err != nil {
4347
return fmt.Errorf("failed to open state file: %w", err)
4448
}
4549
defer dbRO.Close()
4650

47-
uid, err := gpud_state.CreateMachineIDIfNotExist(rootCtx, dbRW, dbRO, "")
51+
machineID, err := gpudstate.ReadMachineID(rootCtx, dbRO)
4852
if err != nil {
49-
return fmt.Errorf("failed to get machine uid: %w", err)
53+
return err
5054
}
51-
52-
components, err := gpud_state.GetComponents(rootCtx, dbRO, uid)
53-
if err != nil {
54-
return fmt.Errorf("failed to get components: %w", err)
55+
if machineID != "" {
56+
fmt.Printf("machine ID %s already assigned (skipping login)\n", machineID)
57+
return nil
5558
}
5659

57-
cliToken := cliContext.String("token")
5860
endpoint := cliContext.String("endpoint")
5961

60-
dbToken, _ := gpud_state.GetLoginInfo(rootCtx, dbRO, uid)
61-
token := dbToken
62-
if cliToken != "" {
63-
token = cliToken
64-
} else {
65-
fmt.Println("trying token from local store, if you want to override, use --token flag")
62+
// machine ID has not been assigned yet
63+
// thus request one and blocks until the login request is processed
64+
loginResp, err := login.SendRequest(rootCtx, endpoint, apiv1.LoginRequest{Token: token})
65+
if err != nil {
66+
return err
6667
}
68+
machineID = loginResp.MachineID
6769

68-
if token == "" {
69-
fmt.Print("Please visit https://dashboard.lepton.ai/ under Settings/Tokens to fetch your token\nPlease enter your token:")
70-
if _, err := fmt.Scanln(&token); err != nil && err.Error() != "unexpected newline" {
71-
return fmt.Errorf("failed reading input: %w", err)
72-
}
70+
// consume the login response to persist the machine ID
71+
dbRW, err := sqlite.Open(stateFile)
72+
if err != nil {
73+
return fmt.Errorf("failed to open state file: %w", err)
74+
}
75+
defer dbRW.Close()
76+
if err := gpudstate.RecordMachineID(rootCtx, dbRW, dbRO, machineID); err != nil {
77+
return fmt.Errorf("failed to record machine ID: %w", err)
7378
}
7479

7580
fifoFile, err := config.DefaultFifoFile()
7681
if err != nil {
7782
return fmt.Errorf("failed to get fifo file: %w", err)
7883
}
7984

80-
if token != "" && endpoint != "" {
81-
hostname, err := os.Hostname()
82-
if err != nil {
83-
hostname = "UnknownName"
84-
}
85-
if err := login.Login(hostname, token, endpoint, components, uid); err != nil {
86-
return err
87-
}
88-
} else {
89-
fmt.Println("login skipped since token or endpoint not provided...")
90-
return nil
91-
}
92-
9385
if err := server.WriteToken(token, fifoFile); err != nil {
9486
return fmt.Errorf("failed to write token: %v", err)
9587
}
9688

97-
if token != dbToken {
98-
if err = gpud_state.UpdateLoginInfo(rootCtx, dbRW, uid, token); err != nil {
99-
fmt.Println("machine logged in but failed to update token:", err)
100-
}
89+
if err = gpudstate.UpdateLoginInfo(rootCtx, dbRW, machineID, token); err != nil {
90+
fmt.Println("machine logged in but failed to update token:", err)
10191
}
10292

103-
fmt.Printf("%s successfully logged into lepton.ai\n", checkMark)
93+
fmt.Printf("%s successfully logged in with machine id %s\n", checkMark, loginResp.MachineID)
10494
return nil
10595
}

cmd/gpud/command/run.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ import (
1616

1717
"github.com/leptonai/gpud/pkg/config"
1818
gpud_manager "github.com/leptonai/gpud/pkg/gpud-manager"
19+
gpudstate "github.com/leptonai/gpud/pkg/gpud-state"
1920
"github.com/leptonai/gpud/pkg/log"
2021
lepServer "github.com/leptonai/gpud/pkg/server"
22+
"github.com/leptonai/gpud/pkg/sqlite"
2123
pkd_systemd "github.com/leptonai/gpud/pkg/systemd"
2224
"github.com/leptonai/gpud/version"
2325
)
@@ -97,6 +99,23 @@ func cmdRun(cliContext *cli.Context) error {
9799
}
98100
m.Start(rootCtx)
99101

102+
stateFile, err := config.DefaultStateFile()
103+
if err != nil {
104+
return fmt.Errorf("failed to get state file: %w", err)
105+
}
106+
dbRO, err := sqlite.Open(stateFile, sqlite.WithReadOnly(true))
107+
if err != nil {
108+
return fmt.Errorf("failed to open state file: %w", err)
109+
}
110+
defer dbRO.Close()
111+
uid, err := gpudstate.ReadMachineID(rootCtx, dbRO)
112+
if err != nil {
113+
return fmt.Errorf("failed to read machine ID: %w", err)
114+
}
115+
if uid == "" {
116+
log.Logger.Warnw("machine ID not found, running in local mode not connected to any control plane")
117+
}
118+
100119
server, err := lepServer.New(rootCtx, cfg, cliContext.String("endpoint"), uid, m)
101120
if err != nil {
102121
return err

cmd/gpud/command/utils.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@ func GetUID(ctx context.Context) (string, error) {
2424
}
2525
defer dbRO.Close()
2626

27-
return gpud_state.GetMachineID(ctx, dbRO)
27+
return gpud_state.ReadMachineID(ctx, dbRO)
2828
}

pkg/config/default.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -244,14 +244,6 @@ func setupDefaultDir() (string, error) {
244244
return d, nil
245245
}
246246

247-
func DefaultConfigFile() (string, error) {
248-
dir, err := setupDefaultDir()
249-
if err != nil {
250-
return "", err
251-
}
252-
return filepath.Join(dir, "gpud.yaml"), nil
253-
}
254-
255247
func DefaultStateFile() (string, error) {
256248
dir, err := setupDefaultDir()
257249
if err != nil {

pkg/gossip/client.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package gossip
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
"io"
9+
"net/http"
10+
"os"
11+
12+
apiv1 "github.com/leptonai/gpud/api/v1"
13+
"github.com/leptonai/gpud/pkg/log"
14+
)
15+
16+
// SendRequest sends a gossip request.
17+
func SendRequest(ctx context.Context, endpoint string, req apiv1.GossipRequest) (*apiv1.GossipResponse, error) {
18+
url := createURL(endpoint)
19+
return sendRequest(ctx, url, req)
20+
}
21+
22+
func sendRequest(ctx context.Context, url string, req apiv1.GossipRequest) (*apiv1.GossipResponse, error) {
23+
if os.Getenv("GPUD_NO_USAGE_STATS") == "true" {
24+
log.Logger.Debug("gossip skipped since GPUD_NO_USAGE_STATS=true specified")
25+
return nil, nil
26+
}
27+
28+
log.Logger.Infow("sending gossip request", "url", url)
29+
30+
b, err := json.Marshal(req)
31+
if err != nil {
32+
return nil, fmt.Errorf("error marshaling gossip request: %w", err)
33+
}
34+
35+
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(b))
36+
if err != nil {
37+
return nil, fmt.Errorf("error creating request: %w", err)
38+
}
39+
httpReq.Header.Set("Content-Type", "application/json")
40+
41+
client := &http.Client{}
42+
httpResp, err := client.Do(httpReq)
43+
if err != nil {
44+
return nil, err
45+
}
46+
defer httpResp.Body.Close()
47+
48+
body, err := io.ReadAll(httpResp.Body)
49+
if err != nil {
50+
return nil, fmt.Errorf("error reading response body: %w", err)
51+
}
52+
53+
var resp apiv1.GossipResponse
54+
if err := json.Unmarshal(body, &resp); err != nil {
55+
return nil, fmt.Errorf("error unmarshalling gossip response: %w", err)
56+
}
57+
58+
if httpResp.StatusCode != http.StatusOK {
59+
return &resp, fmt.Errorf("login request failed with status code %d (%+v)", httpResp.StatusCode, resp)
60+
}
61+
62+
log.Logger.Infow("gossip request processed", "url", url)
63+
return &resp, nil
64+
}
65+
66+
// createURL creates a URL for the gossip endpoint
67+
func createURL(endpoint string) string {
68+
return fmt.Sprintf("https://%s/api/v1/gossip", endpoint)
69+
}

0 commit comments

Comments
 (0)