Skip to content

Commit 0054184

Browse files
authored
Adding sensors and load balancing engine #393
1 parent a6d9605 commit 0054184

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+3122
-47
lines changed

cli/node/start.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
httpService "github.com/taubyte/tau/pkg/http"
1414
auto "github.com/taubyte/tau/pkg/http-auto"
1515
"github.com/taubyte/tau/pkg/kvdb"
16+
"github.com/taubyte/tau/pkg/sensors"
1617
commonSpecs "github.com/taubyte/tau/pkg/specs/common"
1718
slices "github.com/taubyte/tau/utils/slices/string"
1819
)
@@ -37,6 +38,11 @@ func Start(ctx context.Context, serviceConfig *config.Node) error {
3738
return err
3839
}
3940

41+
// start sensors service
42+
if serviceConfig.Sensors, err = sensors.New(serviceConfig.Node); err != nil {
43+
return fmt.Errorf("new sensors service failed with: %s", err)
44+
}
45+
4046
serviceConfig.Databases = kvdb.New(serviceConfig.Node)
4147

4248
// Create httpNode if needed

clients/p2p/seer/client.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,16 @@ import (
1212
streamClient "github.com/taubyte/tau/p2p/streams/client"
1313
"github.com/taubyte/tau/p2p/streams/command"
1414
"github.com/taubyte/tau/p2p/streams/command/response"
15+
"github.com/taubyte/tau/pkg/sensors"
1516
"github.com/taubyte/tau/utils/maps"
1617

1718
"github.com/taubyte/tau/services/common"
1819
)
1920

20-
func New(ctx context.Context, node peer.Node) (client iface.Client, err error) {
21-
c := &Client{}
21+
func New(ctx context.Context, node peer.Node, registry *sensors.Registry) (client iface.Client, err error) {
22+
c := &Client{
23+
sensors: registry,
24+
}
2225
c.client, err = streamClient.New(node, common.SeerProtocol)
2326
if err != nil {
2427
logger.Error("API client creation failed: %s", err)

clients/p2p/seer/dream/init.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@ func init() {
1515
}
1616

1717
func createSeerClient(node peer.Node, config *common.ClientConfig) (common.Client, error) {
18-
return seer.New(node.Context(), node)
18+
return seer.New(node.Context(), node, nil)
1919
}

clients/p2p/seer/heartbeat.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
)
1313

1414
func (u *Usage) Heartbeat(usage *iface.UsageData, hostname, nodeId, clientNodeId string, signature []byte) (response.Response, error) {
15+
logger.Debug("Heartbeat", "sending usage", usage)
16+
1517
usageData, err := cbor.Marshal(usage)
1618
if err != nil {
1719
return nil, err
@@ -21,6 +23,7 @@ func (u *Usage) Heartbeat(usage *iface.UsageData, hostname, nodeId, clientNodeId
2123
if err != nil {
2224
return nil, fmt.Errorf("calling heartbeat send failed with: %w", err)
2325
}
26+
2427
return resp, nil
2528
}
2629

clients/p2p/seer/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ import (
66
peerCore "github.com/libp2p/go-libp2p/core/peer"
77
iface "github.com/taubyte/tau/core/services/seer"
88
client "github.com/taubyte/tau/p2p/streams/client"
9+
"github.com/taubyte/tau/pkg/sensors"
910
)
1011

1112
var _ iface.Client = &Client{}
1213

1314
type Client struct {
1415
client *client.Client
1516
services iface.Services
17+
sensors *sensors.Registry
1618
peers []peerCore.ID
1719
}
1820

clients/p2p/seer/usage/all_usage.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ func GetUsage() (usage iface.UsageData, err error) {
2626
}
2727

2828
usage = iface.UsageData{
29-
Memory: memory,
30-
Cpu: cpu,
31-
Disk: disk,
29+
Memory: memory,
30+
Cpu: cpu,
31+
Disk: disk,
32+
CustomValues: make(map[string]float64),
3233
}
3334

3435
return

clients/p2p/seer/usage_client.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ func (u *Usage) updateUsage(hostname, nodeId, clientNodeId string, signature []b
4949
return nil, fmt.Errorf("getting usage of hostname `%s` failed with: %s", hostname, err)
5050
}
5151

52+
// get custom values
53+
if u.sensors != nil {
54+
for _, sensor := range u.sensors.List() {
55+
usageData.CustomValues[sensor.Name] = sensor.Value
56+
}
57+
}
58+
5259
resp, err := u.Heartbeat(&usageData, hostname, nodeId, clientNodeId, signature)
5360
if err != nil {
5461
return nil, err

config/services.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
seerIface "github.com/taubyte/tau/core/services/seer"
1212
"github.com/taubyte/tau/p2p/peer"
1313
http "github.com/taubyte/tau/pkg/http"
14+
"github.com/taubyte/tau/pkg/sensors"
1415
)
1516

1617
var (
@@ -56,6 +57,8 @@ type Node struct {
5657

5758
Http http.Service
5859

60+
Sensors *sensors.Service
61+
5962
EnableHTTPS bool
6063
Verbose bool
6164
DevMode bool
@@ -69,6 +72,13 @@ type DomainValidation struct {
6972
PublicKey []byte
7073
}
7174

75+
func (config *Node) SensorsRegistry() *sensors.Registry {
76+
if config.Sensors != nil {
77+
return config.Sensors.Registry()
78+
}
79+
return nil
80+
}
81+
7282
func (config *Node) Validate() error {
7383
if config == nil {
7484
config = &Node{}

core/services/seer/types.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type UsageReturn struct {
2929
FreeDisk int
3030
UsedDisk int
3131
AvailableDisk int
32+
CustomValues map[string]float64
3233
}
3334

3435
type Cpu struct {
@@ -48,9 +49,10 @@ type Cpu struct {
4849
}
4950

5051
type UsageData struct {
51-
Memory Memory `cbor:"3,keyasint"`
52-
Cpu Cpu `cbor:"5,keyasint"`
53-
Disk Disk `cbor:"7,keyasint"`
52+
Memory Memory `cbor:"3,keyasint"`
53+
Cpu Cpu `cbor:"5,keyasint"`
54+
Disk Disk `cbor:"7,keyasint"`
55+
CustomValues map[string]float64 `cbor:"9,keyasint,omitempty"`
5456
}
5557

5658
type Memory struct {

core/services/seer/usage_map.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package seer
2+
3+
import "strings"
4+
5+
func (u *UsageData) ToMap() map[string]any {
6+
result := map[string]any{
7+
"memory": map[string]any{
8+
"used": u.Memory.Used,
9+
"total": u.Memory.Total,
10+
"free": u.Memory.Free,
11+
},
12+
"cpu": map[string]any{
13+
"total": u.Cpu.Total,
14+
"count": u.Cpu.Count,
15+
"user": u.Cpu.User,
16+
"nice": u.Cpu.Nice,
17+
"system": u.Cpu.System,
18+
"idle": u.Cpu.Idle,
19+
"iowait": u.Cpu.Iowait,
20+
"irq": u.Cpu.Irq,
21+
"softirq": u.Cpu.Softirq,
22+
"steal": u.Cpu.Steal,
23+
"guest": u.Cpu.Guest,
24+
"guestNice": u.Cpu.GuestNice,
25+
"statCount": u.Cpu.StatCount,
26+
},
27+
"disk": map[string]any{
28+
"total": u.Disk.Total,
29+
"free": u.Disk.Free,
30+
"used": u.Disk.Used,
31+
"available": u.Disk.Available,
32+
},
33+
}
34+
35+
custom := convertCustomValuesToNestedMap(u.CustomValues)
36+
if custom != nil {
37+
result["custom"] = custom
38+
}
39+
40+
return result
41+
}
42+
43+
/*
44+
convertCustomValuesToNestedMap converts a flat map with dot-separated keys
45+
into a nested map structure where '.' is treated as a hierarchical separator.
46+
For example: {"sensor.network.bytes": 123.45} becomes
47+
{"sensor": {"network": {"bytes": 123.45}}}
48+
*/
49+
func convertCustomValuesToNestedMap(customValues map[string]float64) map[string]any {
50+
if len(customValues) == 0 {
51+
return nil
52+
}
53+
54+
result := make(map[string]any)
55+
for key, value := range customValues {
56+
parts := strings.Split(key, ".")
57+
cur := result
58+
for i, part := range parts {
59+
if i == len(parts)-1 {
60+
cur[part] = value
61+
} else {
62+
if next, ok := cur[part]; ok {
63+
if nextMap, ok := next.(map[string]any); ok {
64+
cur = nextMap
65+
} else {
66+
newMap := make(map[string]any)
67+
cur[part] = newMap
68+
cur = newMap
69+
}
70+
} else {
71+
newMap := make(map[string]any)
72+
cur[part] = newMap
73+
cur = newMap
74+
}
75+
}
76+
}
77+
}
78+
79+
return result
80+
}

0 commit comments

Comments
 (0)