Skip to content

Commit c1c2fb5

Browse files
authored
Merge pull request #141 from thin-edge/refactor-entity-store-api
refactor: use tedge entity store api for registration
2 parents 5f2f4bc + ab06921 commit c1c2fb5

File tree

7 files changed

+276
-71
lines changed

7 files changed

+276
-71
lines changed

.goreleaser.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,17 @@ nfpms:
8585
- rpm
8686
- apk
8787

88+
overrides:
89+
# Dependency on the Entity HTTP API which was only added in tedge >= 1.5.0
90+
# Note: Don't add explicit dependency for apk (Alpine Linux)
91+
# as generally only the tedge is just installed via a binary and not the apk package
92+
deb:
93+
dependencies:
94+
- tedge (>= 1.5.0)
95+
rpm:
96+
dependencies:
97+
- tedge >= 1.5.0-1
98+
8899
# FIXME: Remove for official release, as the package can be called "tedge-container-plugin" instead of "tedge-container-plugin-ng"
89100
replaces:
90101
- tedge-container-plugin

cli/run/cmd.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ func NewRunCommand(cliContext cli.Cli) *cobra.Command {
5050
DeleteFromCloud: cliContext.DeleteFromCloud(),
5151
EnableEngineEvents: cliContext.EngineEventsEnabled(),
5252

53+
HTTPHost: cliContext.GetHTTPHost(),
54+
HTTPPort: cliContext.GetHTTPPort(),
5355
MQTTHost: cliContext.GetMQTTHost(),
5456
MQTTPort: cliContext.GetMQTTPort(),
5557
CumulocityHost: cliContext.GetCumulocityHost(),
@@ -158,6 +160,8 @@ func NewRunCommand(cliContext cli.Cli) *cobra.Command {
158160
viper.SetDefault("delete_from_cloud.enabled", true)
159161

160162
// thin-edge.io services
163+
viper.SetDefault("client.http.host", "127.0.0.1")
164+
viper.SetDefault("client.http.port", 8000)
161165
viper.SetDefault("client.mqtt.host", "127.0.0.1")
162166
// client.mqtt.port: 0 = auto-detection, where 8883 is used when the cert files exist, or 1883 otherwise
163167
viper.SetDefault("client.mqtt.port", 0)

packaging/config.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ labels = [ "tedge.ignore" ]
3030
key = "/etc/tedge/device-certs/local-tedge.key"
3131
# Certificate file used to connect to local thin-edge.io services (only used if the file exists)
3232
cert_file = "/etc/tedge/device-certs/local-tedge.crt"
33+
[client.http]
34+
# thin-edge.io HTTP host operated by the tedge-agent on the main device
35+
host = "127.0.0.1"
36+
# thin-edge.io HTTP port operated by the tedge-agent on the main device
37+
port = 8000
3338

3439
[client.mqtt]
3540
# thin-edge.io MQTT host

pkg/app/app.go

Lines changed: 55 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ type Config struct {
6969
EnableEngineEvents bool
7070
DeleteFromCloud bool
7171

72+
HTTPHost string
73+
HTTPPort uint16
74+
7275
MQTTHost string
7376
MQTTPort uint16
7477

@@ -79,6 +82,8 @@ type Config struct {
7982
func NewApp(device tedge.Target, config Config) (*App, error) {
8083
serviceTarget := device.Service(config.ServiceName)
8184
tedgeOpts := &tedge.ClientConfig{
85+
HTTPHost: config.HTTPHost,
86+
HTTPPort: config.HTTPPort,
8287
MqttHost: config.MQTTHost,
8388
MqttPort: config.MQTTPort,
8489
C8yHost: config.CumulocityHost,
@@ -94,6 +99,18 @@ func NewApp(device tedge.Target, config Config) (*App, error) {
9499
return nil, err
95100
}
96101

102+
// Register via http interface
103+
_, registrationErr := tedgeClient.TedgeAPI.CreateEntity(context.Background(), tedge.Entity{
104+
TedgeType: tedge.EntityTypeService,
105+
Name: serviceTarget.Name,
106+
TedgeTopicID: serviceTarget.TopicID,
107+
})
108+
if registrationErr == nil {
109+
slog.Info("Registered service", "topic", serviceTarget.Topic())
110+
} else {
111+
slog.Error("Could not register tedge entity.", "err", registrationErr)
112+
}
113+
97114
if err := tedgeClient.Connect(); err != nil {
98115
return nil, err
99116
}
@@ -137,13 +154,10 @@ func (a *App) DeleteLegacyService(deleteFromCloud bool) {
137154
target := a.client.Target.Service("tedge-container-monitor")
138155
slog.Info("Removing legacy service from the cloud", "topic", target.Topic())
139156

140-
if err := a.client.Publish(tedge.GetHealthTopic(*target), 1, true, ""); err != nil {
141-
slog.Warn("Failed to clear health status.", "topic", tedge.GetHealthTopic(*target))
142-
}
143-
time.Sleep(500 * time.Millisecond)
144-
if err := a.client.Publish(tedge.GetTopic(*target), 1, true, ""); err != nil {
145-
slog.Warn("Failed to clear registration message.", "topic", tedge.GetHealthTopic(*target))
157+
if _, err := a.client.TedgeAPI.DeleteEntity(context.Background(), *target); err != nil {
158+
slog.Warn("Failed to clear registration message.", "topic-id", target.TopicID)
146159
}
160+
147161
time.Sleep(500 * time.Millisecond)
148162

149163
if target.CloudIdentity != "" && deleteFromCloud {
@@ -373,10 +387,16 @@ func (a *App) updateMetrics(items []container.TedgeContainer) error {
373387
doWork := func(jobs <-chan container.TedgeContainer, results chan<- error) {
374388
for j := range jobs {
375389
var jobErr error
376-
stats, jobErr := a.ContainerClient.GetStats(context.Background(), j.Container.Id)
390+
// TODO: Check if container exists, if not then do nothing
391+
target := a.Device.Service(j.Name)
392+
if _, entityErr := a.client.TedgeAPI.GetEntity(context.Background(), *target); entityErr != nil {
393+
slog.Info("Entity has not be registered yet, skipping metric for it.", "topic-id", target.TopicID)
394+
results <- jobErr
395+
continue
396+
}
377397

398+
stats, jobErr := a.ContainerClient.GetStats(context.Background(), j.Container.Id)
378399
if jobErr == nil {
379-
target := a.Device.Service(j.Name)
380400
topic := tedge.GetTopic(*target, "m", "resource_usage")
381401
payload, err := json.Marshal(stats)
382402
if err == nil {
@@ -443,25 +463,26 @@ func (a *App) doUpdate(filterOptions container.FilterOptions) error {
443463
target := a.Device.Service(item.Name)
444464

445465
// Skip registration message if it already exists
446-
if _, ok := existingServices[target.Topic()]; ok {
447-
slog.Debug("Container is already registered", "topic", target.Topic())
448-
delete(existingServices, target.Topic())
449-
continue
450-
}
466+
// if _, ok := existingServices[target.Topic()]; ok {
467+
// slog.Info("Container is already registered", "topic", target.Topic())
468+
// delete(existingServices, target.Topic())
469+
// continue
470+
// }
451471
delete(existingServices, target.Topic())
452472

453-
payload := map[string]any{
454-
"@type": "service",
455-
"name": item.Name,
456-
"type": item.ServiceType,
457-
}
458-
b, err := json.Marshal(payload)
459-
if err != nil {
460-
slog.Warn("Could not marshal registration message", "err", err)
461-
continue
462-
}
463-
if err := tedgeClient.Publish(target.Topic(), 1, true, b); err != nil {
464-
slog.Error("Failed to register container", "target", target.Topic(), "err", err)
473+
// Register using HTTP API
474+
resp, err := tedgeClient.TedgeAPI.CreateEntity(context.Background(), tedge.Entity{
475+
TedgeType: tedge.EntityTypeService,
476+
TedgeTopicID: target.TopicID,
477+
Name: item.Name,
478+
Type: item.ServiceType,
479+
TedgeParentID: tedgeClient.Parent.TopicID,
480+
})
481+
482+
if err == nil {
483+
slog.Info("Registered container.", "topic", target.Topic(), "url", resp.RawResponse.Request.URL.String(), "status_code", resp.RawResponse.Status)
484+
} else {
485+
slog.Error("Failed to register container.", "topic", target.Topic(), "err", err)
465486
}
466487
}
467488

@@ -490,18 +511,16 @@ func (a *App) doUpdate(filterOptions container.FilterOptions) error {
490511
for _, item := range items {
491512
target := a.Device.Service(item.Name)
492513

493-
topic := tedge.GetTopic(*target, "twin", "container")
494-
495514
// Create status
496-
payload, err := json.Marshal(item.Container)
497-
515+
_, err := tedgeClient.TedgeAPI.UpdateTwin(
516+
context.Background(),
517+
tedge.Entity{
518+
TedgeTopicID: target.TopicID,
519+
},
520+
"container",
521+
item.Container,
522+
)
498523
if err != nil {
499-
slog.Error("Failed to convert payload to json", "err", err)
500-
continue
501-
}
502-
503-
slog.Info("Publishing container status", "topic", topic, "payload", payload)
504-
if err := tedgeClient.Publish(topic, 1, true, payload); err != nil {
505524
slog.Error("Could not publish container status", "err", err)
506525
}
507526
}
@@ -518,7 +537,7 @@ func (a *App) doUpdate(filterOptions container.FilterOptions) error {
518537
continue
519538
}
520539

521-
if err := tedgeClient.DeregisterEntity(*target, "twin/container"); err != nil {
540+
if err := tedgeClient.DeregisterEntity(*target); err != nil {
522541
slog.Warn("Failed to deregister entity.", "err", err)
523542
}
524543

pkg/cli/cli.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,18 @@ func (c *Cli) DeleteFromCloud() bool {
155155
return viper.GetBool("delete_from_cloud.enabled")
156156
}
157157

158+
func (c *Cli) GetHTTPHost() string {
159+
return viper.GetString("client.http.host")
160+
}
161+
162+
func (c *Cli) GetHTTPPort() uint16 {
163+
v := viper.GetUint16("client.http.port")
164+
if v == 0 {
165+
return 8000
166+
}
167+
return v
168+
}
169+
158170
func (c *Cli) GetMQTTHost() string {
159171
return viper.GetString("client.mqtt.host")
160172
}

pkg/tedge/target.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,14 @@ import (
55
"strings"
66
)
77

8+
var EntityTypeService = "service"
9+
var EntityTypeChildDevice = "child-device"
10+
811
type Target struct {
912
RootPrefix string
1013
TopicID string
1114
CloudIdentity string
15+
Name string
1216
}
1317

1418
func (t *Target) ExternalID() string {
@@ -25,6 +29,7 @@ func (t *Target) Topic() string {
2529
func (t *Target) Service(name string) *Target {
2630
target := NewTarget(t.RootPrefix, strings.Join(strings.Split(t.TopicID, "/")[0:2], "/")+"/service/"+name)
2731
target.CloudIdentity = t.CloudIdentity
32+
target.Name = name
2833
return target
2934
}
3035

0 commit comments

Comments
 (0)