Skip to content

Commit 4493043

Browse files
Denis TuDenis Tu
authored andcommitted
Feat: Embedded HTTP API Server #7
1 parent c4680b8 commit 4493043

File tree

5 files changed

+804
-6
lines changed

5 files changed

+804
-6
lines changed

cmd/main.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
kagentv1alpha2 "github.com/kagent-dev/khook/api/v1alpha2"
1818
kclient "github.com/kagent-dev/khook/internal/client"
1919
"github.com/kagent-dev/khook/internal/config"
20+
"github.com/kagent-dev/khook/internal/sre"
2021
"github.com/kagent-dev/khook/internal/workflow"
2122
)
2223

@@ -76,8 +77,17 @@ func main() {
7677
os.Exit(1)
7778
}
7879

80+
// Start SRE-IDE server
81+
sreServer := sre.NewServer(8082, mgr.GetClient())
82+
go func() {
83+
ctx := context.Background()
84+
if err := sreServer.Start(ctx); err != nil {
85+
setupLog.Error(err, "problem running SRE-IDE server")
86+
}
87+
}()
88+
7989
// Add workflow coordinator to manage hooks and event processing
80-
if err := mgr.Add(newWorkflowCoordinator(mgr)); err != nil {
90+
if err := mgr.Add(newWorkflowCoordinator(mgr, sreServer)); err != nil {
8191
setupLog.Error(err, "unable to add workflow coordinator")
8292
os.Exit(1)
8393
}
@@ -91,11 +101,15 @@ func main() {
91101

92102
// workflowCoordinator manages the complete workflow lifecycle using proper services
93103
type workflowCoordinator struct {
94-
mgr ctrl.Manager
104+
mgr ctrl.Manager
105+
sreServer *sre.Server
95106
}
96107

97-
func newWorkflowCoordinator(mgr ctrl.Manager) *workflowCoordinator {
98-
return &workflowCoordinator{mgr: mgr}
108+
func newWorkflowCoordinator(mgr ctrl.Manager, sreServer *sre.Server) *workflowCoordinator {
109+
return &workflowCoordinator{
110+
mgr: mgr,
111+
sreServer: sreServer,
112+
}
99113
}
100114

101115
func (w *workflowCoordinator) NeedLeaderElection() bool { return true }
@@ -121,7 +135,7 @@ func (w *workflowCoordinator) Start(ctx context.Context) error {
121135

122136
// Create workflow coordinator
123137
eventRecorder := w.mgr.GetEventRecorderFor("khook")
124-
coordinator := workflow.NewCoordinator(k8s, w.mgr.GetClient(), kagentCli, eventRecorder)
138+
coordinator := workflow.NewCoordinator(k8s, w.mgr.GetClient(), kagentCli, eventRecorder, w.sreServer)
125139

126140
// Start the coordinator
127141
return coordinator.Start(ctx)

internal/pipeline/processor.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/kagent-dev/khook/api/v1alpha2"
1616
"github.com/kagent-dev/khook/internal/interfaces"
17+
"github.com/kagent-dev/khook/internal/sre"
1718
)
1819

1920
// Processor handles the complete event processing pipeline
@@ -22,6 +23,7 @@ type Processor struct {
2223
deduplicationManager interfaces.DeduplicationManager
2324
kagentClient interfaces.KagentClient
2425
statusManager interfaces.StatusManager
26+
sreServer interface{}
2527
logger logr.Logger
2628
}
2729

@@ -31,12 +33,14 @@ func NewProcessor(
3133
deduplicationManager interfaces.DeduplicationManager,
3234
kagentClient interfaces.KagentClient,
3335
statusManager interfaces.StatusManager,
36+
sreServer interface{},
3437
) *Processor {
3538
return &Processor{
3639
eventWatcher: eventWatcher,
3740
deduplicationManager: deduplicationManager,
3841
kagentClient: kagentClient,
3942
statusManager: statusManager,
43+
sreServer: sreServer,
4044
logger: log.Log.WithName("event-processor"),
4145
}
4246
}
@@ -167,6 +171,19 @@ func (p *Processor) processEventMatch(ctx context.Context, match EventMatch) err
167171
// Continue even if status recording fails
168172
}
169173

174+
// Add alert to SRE server if available
175+
p.logger.Info("Checking SRE server integration", "sreServer", p.sreServer != nil)
176+
if p.sreServer != nil {
177+
if sreServer, ok := p.sreServer.(*sre.Server); ok {
178+
// Convert event to alert and add to SRE server
179+
alert := sre.ConvertEventToAlert(match.Event, match.Hook, agentRef, response)
180+
sreServer.AddAlert(alert)
181+
p.logger.Info("Added alert to SRE server", "alertId", alert.ID)
182+
} else {
183+
p.logger.Error(nil, "Type assertion failed for SRE server", "sreServerType", fmt.Sprintf("%T", p.sreServer))
184+
}
185+
}
186+
170187
// Mark event as notified to suppress re-sending within suppression window
171188
p.deduplicationManager.MarkNotified(hookRef, match.Event)
172189

0 commit comments

Comments
 (0)