Skip to content

Commit 61743a9

Browse files
authored
feat: add jobs to heartbeat (#252)
1 parent 1294186 commit 61743a9

File tree

20 files changed

+1126
-37
lines changed

20 files changed

+1126
-37
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,6 @@ diode/
22
orb-configs/
33
build/
44
.coverage/
5-
.vscode/
5+
.vscode/
6+
7+
docs/.cursor

agent/agent.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func New(logger *slog.Logger, c config.Config) (Agent, error) {
6666

6767
restartBackendChan := make(chan string, restartBackendChanSize)
6868

69-
backendStateManager := backend.NewStateManager(c.OrbAgent.ConfigManager.Active, logger, restartBackendChan)
69+
backendStateManager := backend.NewStateManager(c.OrbAgent.ConfigManager.Active, logger, restartBackendChan, pm.GetRepo())
7070
// Pass a background context to the config manager at construction time. The
7171
// manager keeps its own copy and later derives child contexts from the
7272
// runtime context supplied in Agent.Start.

agent/backend/backend.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,36 @@ import (
1010
"github.com/netboxlabs/orb-agent/agent/policies"
1111
)
1212

13+
// PolicyStatusJob represents a job in the backend status response
14+
type PolicyStatusJob struct {
15+
ID string `json:"id"`
16+
Status string `json:"status"`
17+
Reason string `json:"reason"`
18+
EntityCount *int64 `json:"entity_count,omitempty"`
19+
CreatedAt time.Time `json:"created_at"`
20+
UpdatedAt time.Time `json:"updated_at"`
21+
}
22+
23+
// PolicyStatus represents policy status from backend status endpoint
24+
type PolicyStatus struct {
25+
Name string `json:"name"`
26+
Status string `json:"status"`
27+
Jobs []PolicyStatusJob `json:"jobs"`
28+
}
29+
30+
// StatusResponse represents the full status response from backend
31+
type StatusResponse struct {
32+
Version string `json:"version"`
33+
StartTime time.Time `json:"start_time"`
34+
UpTimeSeconds float64 `json:"up_time_seconds"`
35+
Policies []PolicyStatus `json:"policies"`
36+
}
37+
38+
// PolicyStatusProvider is an optional interface for backends that support policy status polling
39+
type PolicyStatusProvider interface {
40+
GetPolicyStatus() ([]PolicyStatus, error)
41+
}
42+
1343
// Running Status types
1444
const (
1545
Unknown RunningStatus = iota

agent/backend/backend_state.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"log/slog"
66
"sync"
77
"time"
8+
9+
"github.com/netboxlabs/orb-agent/agent/policies"
810
)
911

1012
// MinRestartTime is the minimum time to wait between restarts
@@ -33,16 +35,18 @@ type stateManager struct {
3335
ticker *time.Ticker
3436
logger *slog.Logger
3537
restartBackendChan chan string
38+
policyRepo policies.PolicyRepo
3639
}
3740

3841
// NewStateManager creates a new StateManager with the given logger and restart channel
39-
func NewStateManager(activeConfigMgr string, logger *slog.Logger, restartBackendChan chan string) StateManager {
42+
func NewStateManager(activeConfigMgr string, logger *slog.Logger, restartBackendChan chan string, policyRepo policies.PolicyRepo) StateManager {
4043
if configMgrSupportsStateMonitoring(activeConfigMgr) {
4144
return &stateManager{
4245
backendState: make(map[string]*State),
4346
ticker: time.NewTicker(BackendMonitorInterval),
4447
logger: logger,
4548
restartBackendChan: restartBackendChan,
49+
policyRepo: policyRepo,
4650
}
4751
}
4852
return nullStateManager{}
@@ -99,6 +103,21 @@ func (manager *stateManager) StartBackendMonitor(name string, be Backend) {
99103
}
100104
}
101105
manager.mu.Unlock()
106+
107+
// Poll policy status if backend supports it
108+
if provider, ok := be.(PolicyStatusProvider); ok && manager.policyRepo != nil {
109+
statuses, err := provider.GetPolicyStatus()
110+
if err != nil {
111+
manager.logger.Debug("failed to get policy status", "backend", name, "error", err)
112+
} else {
113+
for _, ps := range statuses {
114+
jobs := convertToJobData(ps.Jobs)
115+
if err := manager.policyRepo.UpdateJobs(ps.Name, jobs); err != nil {
116+
manager.logger.Debug("failed to update jobs for policy", "policy", ps.Name, "error", err)
117+
}
118+
}
119+
}
120+
}
102121
}
103122
}()
104123
}
@@ -138,3 +157,19 @@ func (manager *stateManager) Get() map[string]*State {
138157
}
139158
return result
140159
}
160+
161+
// convertToJobData converts backend PolicyStatusJob to policies.JobData
162+
func convertToJobData(statusJobs []PolicyStatusJob) []policies.JobData {
163+
jobs := make([]policies.JobData, len(statusJobs))
164+
for i, sj := range statusJobs {
165+
jobs[i] = policies.JobData{
166+
ID: sj.ID,
167+
Status: sj.Status,
168+
Reason: sj.Reason,
169+
EntityCount: sj.EntityCount,
170+
CreatedAt: sj.CreatedAt,
171+
UpdatedAt: sj.UpdatedAt,
172+
}
173+
}
174+
return jobs
175+
}

0 commit comments

Comments
 (0)