Skip to content

Commit d07df62

Browse files
committed
procress log request through resource proxy
Signed-off-by: Mangaal <[email protected]>
1 parent 2a08301 commit d07df62

File tree

7 files changed

+339
-24
lines changed

7 files changed

+339
-24
lines changed

β€Žgo.modβ€Ž

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/alicebob/miniredis/v2 v2.35.0
1010
github.com/argoproj/argo-cd/v3 v3.1.5
1111
github.com/argoproj/gitops-engine v0.7.1-0.20250905160054-e48120133eec
12+
github.com/cenkalti/backoff/v4 v4.3.0
1213
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.16.1
1314
github.com/cloudevents/sdk-go/v2 v2.16.1
1415
github.com/go-redis/cache/v9 v9.0.0

β€Žgo.sumβ€Ž

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ github.com/casbin/casbin/v2 v2.107.0/go.mod h1:Ee33aqGrmES+GNL17L0h9X28wXuo829wn
6565
github.com/casbin/govaluate v1.3.0/go.mod h1:G/UnbIjZk/0uMNaLwZZmFQrR72tYRZWQkO70si/iR7A=
6666
github.com/casbin/govaluate v1.7.0 h1:Es2j2K2jv7br+QHJhxKcdoOa4vND0g0TqsO6rJeqJbA=
6767
github.com/casbin/govaluate v1.7.0/go.mod h1:G/UnbIjZk/0uMNaLwZZmFQrR72tYRZWQkO70si/iR7A=
68+
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
69+
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
6870
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
6971
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
7072
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=

β€Žinternal/event/event.goβ€Ž

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"errors"
2020
"fmt"
2121
"net/http"
22+
"strconv"
2223
"sync"
2324
"time"
2425

@@ -80,6 +81,7 @@ const (
8081
TargetResourceResync EventTarget = "resourceResync"
8182
TargetClusterCacheInfoUpdate EventTarget = "clusterCacheInfoUpdate"
8283
TargetRepository EventTarget = "repository"
84+
TargetContainerLog EventTarget = "containerlog"
8385
)
8486

8587
const (
@@ -406,9 +408,9 @@ func (evs EventSource) NewResourceRequestEvent(gvr v1.GroupVersionResource, name
406408
cev.SetSource(evs.source)
407409
cev.SetSpecVersion(cloudEventSpecVersion)
408410
cev.SetType(method)
409-
cev.SetDataSchema(TargetResource.String())
410411
cev.SetExtension(resourceID, reqUUID)
411412
cev.SetExtension(eventID, reqUUID)
413+
cev.SetDataSchema(TargetResource.String())
412414
err := cev.SetData(cloudevents.ApplicationJSON, rr)
413415
return &cev, err
414416
}
@@ -596,6 +598,8 @@ func Target(raw *cloudevents.Event) EventTarget {
596598
return TargetRedis
597599
case TargetClusterCacheInfoUpdate.String():
598600
return TargetClusterCacheInfoUpdate
601+
case TargetContainerLog.String():
602+
return TargetContainerLog
599603
}
600604
return ""
601605
}
@@ -950,3 +954,113 @@ func (ewm *EventWritersMap) Remove(agentName string) {
950954

951955
delete(ewm.eventWriters, agentName)
952956
}
957+
958+
type ContainerLogRequest struct {
959+
// UUID for request/response correlation
960+
UUID string `json:"uuid"`
961+
962+
// Pod identification
963+
Namespace string `json:"namespace"`
964+
PodName string `json:"podName"`
965+
966+
// Container specification
967+
Container string `json:"container,omitempty"` // Optional, defaults to first container
968+
969+
// Log streaming parameters
970+
Follow bool `json:"follow,omitempty"` // Stream continuously
971+
TailLines *int64 `json:"tailLines,omitempty"` // Number of lines from end
972+
SinceSeconds *int64 `json:"sinceSeconds,omitempty"` // Relative time
973+
SinceTime string `json:"sinceTime,omitempty"` // Absolute timestamp (RFC3339)
974+
Timestamps bool `json:"timestamps,omitempty"` // Include timestamps
975+
Previous bool `json:"previous,omitempty"` // Previous container logs
976+
977+
// Additional parameters from K8s logs API
978+
InsecureSkipTLSVerifyBackend bool `json:"insecureSkipTLSVerifyBackend,omitempty"` // Skip TLS verification
979+
LimitBytes *int64 `json:"limitBytes,omitempty"` // Limit output bytes
980+
Pretty bool `json:"pretty,omitempty"` // Pretty print output
981+
Stream string `json:"stream,omitempty"` // "All", "Stdout", or "Stderr"
982+
}
983+
984+
// NewLogRequestEvent creates a cloud event for requesting logs
985+
func (evs EventSource) NewLogRequestEvent(namespace, podName, method string, params map[string]string) (*cloudevents.Event, error) {
986+
reqUUID := uuid.NewString()
987+
988+
// Parse log-specific parameters
989+
logReq := &ContainerLogRequest{
990+
UUID: reqUUID,
991+
Namespace: namespace,
992+
PodName: podName,
993+
}
994+
995+
if container, ok := params["container"]; ok {
996+
logReq.Container = container
997+
}
998+
999+
// Parse query parameters
1000+
if follow := params["follow"]; follow == "true" {
1001+
logReq.Follow = true
1002+
}
1003+
1004+
if tailLines := params["tailLines"]; tailLines != "" {
1005+
if lines, err := strconv.ParseInt(tailLines, 10, 64); err == nil {
1006+
logReq.TailLines = &lines
1007+
}
1008+
}
1009+
1010+
if sinceSeconds := params["sinceSeconds"]; sinceSeconds != "" {
1011+
if seconds, err := strconv.ParseInt(sinceSeconds, 10, 64); err == nil {
1012+
logReq.SinceSeconds = &seconds
1013+
}
1014+
}
1015+
1016+
if sinceTime := params["sinceTime"]; sinceTime != "" {
1017+
logReq.SinceTime = sinceTime
1018+
}
1019+
1020+
if timestamps := params["timestamps"]; timestamps == "true" {
1021+
logReq.Timestamps = true
1022+
}
1023+
1024+
if previous := params["previous"]; previous == "true" {
1025+
logReq.Previous = true
1026+
}
1027+
1028+
// Parse additional K8s logs API parameters
1029+
if insecureSkipTLS := params["insecureSkipTLSVerifyBackend"]; insecureSkipTLS == "true" {
1030+
logReq.InsecureSkipTLSVerifyBackend = true
1031+
}
1032+
1033+
if limitBytes := params["limitBytes"]; limitBytes != "" {
1034+
if bytes, err := strconv.ParseInt(limitBytes, 10, 64); err == nil {
1035+
logReq.LimitBytes = &bytes
1036+
}
1037+
}
1038+
1039+
if pretty := params["pretty"]; pretty == "true" {
1040+
logReq.Pretty = true
1041+
}
1042+
1043+
if stream := params["stream"]; stream != "" {
1044+
// Validate stream parameter - must be "All", "Stdout", or "Stderr"
1045+
if stream == "All" || stream == "Stdout" || stream == "Stderr" {
1046+
logReq.Stream = stream
1047+
}
1048+
}
1049+
1050+
cev := cloudevents.NewEvent()
1051+
cev.SetSource(evs.source)
1052+
cev.SetSpecVersion(cloudEventSpecVersion)
1053+
cev.SetType(method) // HTTP method
1054+
cev.SetDataSchema(TargetContainerLog.String())
1055+
cev.SetExtension(resourceID, reqUUID)
1056+
cev.SetExtension(eventID, reqUUID)
1057+
err := cev.SetData(cloudevents.ApplicationJSON, logReq)
1058+
return &cev, err
1059+
}
1060+
1061+
// ContainerLogRequest extracts ContainerLogRequest data from event
1062+
func (ev *Event) ContainerLogRequest() (*ContainerLogRequest, error) {
1063+
logReq := &ContainerLogRequest{}
1064+
err := ev.event.DataAs(logReq)
1065+
return logReq, err
1066+
}

β€Žprincipal/listen.goβ€Ž

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/argoproj-labs/argocd-agent/internal/metrics"
3434
"github.com/argoproj-labs/argocd-agent/pkg/api/grpc/authapi"
3535
"github.com/argoproj-labs/argocd-agent/pkg/api/grpc/eventstreamapi"
36+
"github.com/argoproj-labs/argocd-agent/pkg/api/grpc/logstreamapi"
3637
"github.com/argoproj-labs/argocd-agent/pkg/api/grpc/versionapi"
3738
"github.com/argoproj-labs/argocd-agent/principal/apis/auth"
3839
"github.com/argoproj-labs/argocd-agent/principal/apis/eventstream"
@@ -218,5 +219,7 @@ func (s *Server) registerGrpcServices(metrics *metrics.PrincipalMetrics) error {
218219
authapi.RegisterAuthenticationServer(s.grpcServer, authSrv)
219220
versionapi.RegisterVersionServer(s.grpcServer, version.NewServer(s.authenticate))
220221
eventstreamapi.RegisterEventStreamServer(s.grpcServer, eventstream.NewServer(s.queues, s.eventWriters, metrics, s.clusterMgr, eventstream.WithNotifyOnConnect(s.notifyOnConnect)))
222+
// Proposal: register LogStream gRPC service for data-plane (use singleton instance)
223+
logstreamapi.RegisterLogStreamServiceServer(s.grpcServer, s.logStream)
221224
return nil
222225
}

β€Žprincipal/log.goβ€Ž

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package principal
2+
3+
import (
4+
"net/http"
5+
"strings"
6+
"time"
7+
8+
"github.com/argoproj-labs/argocd-agent/internal/event"
9+
"github.com/argoproj-labs/argocd-agent/principal/resourceproxy"
10+
"github.com/sirupsen/logrus"
11+
)
12+
13+
const containerLogRequestRegexp = `^/api/v1/namespaces/(?P<namespace>[^/]+)/pods/(?P<pod>[^/]+)/log(\?.*)?$`
14+
15+
// processContainerLogRequest handles container log requests from ArgoCD
16+
func (s *Server) processContainerLogRequest(w http.ResponseWriter, r *http.Request, params resourceproxy.Params) {
17+
logCtx := log().WithField("function", "processContainerLogRequest")
18+
19+
// πŸ” LOG: Container log request handler called
20+
logCtx.WithFields(logrus.Fields{
21+
"url": r.URL.String(),
22+
"method": r.Method,
23+
"remote_addr": r.RemoteAddr,
24+
"accept": r.Header.Get("Accept"),
25+
"user_agent": r.Header.Get("User-Agent"),
26+
}).Info("πŸ” CONTAINER_LOG_HANDLER: Called")
27+
28+
// 1. TLS auth
29+
if r.TLS == nil || len(r.TLS.PeerCertificates) < 1 {
30+
logCtx.Errorf("Unauthenticated request from client %s", r.RemoteAddr)
31+
http.Error(w, "Client certificate required", http.StatusUnauthorized)
32+
return
33+
}
34+
cert := r.TLS.PeerCertificates[0]
35+
agentName := cert.Subject.CommonName
36+
37+
// 2. Agent connection
38+
if !s.queues.HasQueuePair(agentName) {
39+
logCtx.Debugf("Agent %s is not connected", agentName)
40+
http.Error(w, "Agent not connected", http.StatusBadGateway)
41+
return
42+
}
43+
q := s.queues.SendQ(agentName)
44+
45+
// 3. Params
46+
namespace := params.Get("namespace")
47+
podName := params.Get("pod")
48+
queryParams := r.URL.Query()
49+
container := queryParams.Get("container")
50+
51+
// Validate required parameters
52+
if namespace == "" || podName == "" {
53+
logCtx.Error("Missing required parameters: namespace and pod are required")
54+
http.Error(w, "Missing required parameters: namespace and pod", http.StatusBadRequest)
55+
return
56+
}
57+
58+
// Extract log parameters more efficiently
59+
logParams := make(map[string]string)
60+
supportedParams := []string{
61+
"follow", "tailLines", "sinceSeconds", "sinceTime",
62+
"timestamps", "previous", "insecureSkipTLSVerifyBackend",
63+
"limitBytes", "pretty", "stream",
64+
}
65+
for _, param := range supportedParams {
66+
if v := queryParams.Get(param); v != "" {
67+
logParams[param] = v
68+
}
69+
}
70+
71+
logCtx = logCtx.WithFields(logrus.Fields{
72+
"namespace": namespace,
73+
"pod": podName,
74+
"container": container,
75+
"agent": agentName,
76+
"parameters": logParams,
77+
})
78+
logCtx.Infof("Processing container log request with %d parameters", len(logParams))
79+
80+
// 4. Create event
81+
sentEv, err := s.events.NewLogRequestEvent(namespace, podName, r.Method, logParams)
82+
if err != nil {
83+
logCtx.Errorf("Could not create container log event: %v", err)
84+
http.Error(w, "Internal server error", http.StatusInternalServerError)
85+
return
86+
}
87+
sentUUID := event.EventID(sentEv)
88+
89+
// 5. Check if this is an EventSource request
90+
acceptHeader := r.Header.Get("Accept")
91+
isEventSource := strings.Contains(acceptHeader, "text/event-stream")
92+
93+
logCtx.WithFields(logrus.Fields{
94+
"accept_header": acceptHeader,
95+
"reqyest_url": r.URL.String(),
96+
"is_eventsource": isEventSource,
97+
"user_agent": r.Header.Get("User-Agent"),
98+
}).Info("πŸ” DEBUG: Request type detection")
99+
100+
// 5. Register HTTP writer *before* submitting to agent
101+
if err := s.logStream.RegisterHTTP(sentUUID, w, r); err != nil {
102+
logCtx.Errorf("Could not register HTTP writer for log streaming: %v", err)
103+
http.Error(w, "Internal server error", http.StatusInternalServerError)
104+
return
105+
}
106+
107+
// 6. Submit to agent with cleanup on failure
108+
logCtx.Infof("Submitting container log request for pod %s/%s to agent %s", namespace, podName, agentName)
109+
q.Add(sentEv)
110+
111+
// 7. Wait: static completion or client disconnect
112+
logCtx.WithField("request_id", sentUUID).Info("πŸ” DEBUG: Waiting for LogStream to complete...")
113+
114+
// Check if this is a streaming request (follow=true)
115+
isStreaming := logParams["follow"] == "true"
116+
117+
if isStreaming {
118+
// For streaming logs: keep handler alive until client disconnects
119+
logCtx.WithField("request_id", sentUUID).Info("πŸ” DEBUG: Streaming logs - keeping HTTP handler alive until client disconnect")
120+
<-r.Context().Done()
121+
logCtx.WithField("request_id", sentUUID).Info("πŸ” DEBUG: HTTP client disconnected - streaming handler ends")
122+
} else {
123+
// For static logs: wait for completion with configurable timeout
124+
logCtx.WithField("request_id", sentUUID).Info("πŸ” DEBUG: Static logs - waiting for completion")
125+
completed := s.logStream.WaitForCompletion(sentUUID, 2*time.Minute)
126+
if completed {
127+
logCtx.WithField("request_id", sentUUID).Info("πŸ” DEBUG: Static logs completed via LogStream")
128+
} else {
129+
logCtx.WithField("request_id", sentUUID).Warn("πŸ” DEBUG: Static logs timeout - handler ends")
130+
}
131+
}
132+
}

0 commit comments

Comments
Β (0)