Skip to content

Commit f106225

Browse files
authored
Merge pull request #15 from redhat-et/feature/a2a-agents
feat: A2A agent discovery and invocation
2 parents 5c63968 + 60680e3 commit f106225

30 files changed

+1865
-28
lines changed

agent-service/cmd/serve.go

Lines changed: 229 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/spf13/cobra"
1717

1818
"github.com/redhat-et/zero-trust-agent-demo/agent-service/internal/store"
19+
"github.com/redhat-et/zero-trust-agent-demo/pkg/a2abridge"
1920
"github.com/redhat-et/zero-trust-agent-demo/pkg/config"
2021
"github.com/redhat-et/zero-trust-agent-demo/pkg/logger"
2122
"github.com/redhat-et/zero-trust-agent-demo/pkg/metrics"
@@ -33,12 +34,24 @@ var serveCmd = &cobra.Command{
3334
func init() {
3435
rootCmd.AddCommand(serveCmd)
3536
serveCmd.Flags().String("document-service-url", "http://localhost:8084", "Document service URL")
37+
serveCmd.Flags().Bool("enable-discovery", false, "Enable Kubernetes-based A2A agent discovery")
38+
serveCmd.Flags().String("discovery-namespace", "spiffe-demo", "Namespace to discover A2A agents in")
39+
serveCmd.Flags().Duration("discovery-interval", 30*time.Second, "Interval between discovery scans")
40+
serveCmd.Flags().String("discovery-scheme", "https", "URL scheme for discovered agents (http or https)")
3641
v.BindPFlag("document_service_url", serveCmd.Flags().Lookup("document-service-url"))
42+
v.BindPFlag("enable_discovery", serveCmd.Flags().Lookup("enable-discovery"))
43+
v.BindPFlag("discovery_namespace", serveCmd.Flags().Lookup("discovery-namespace"))
44+
v.BindPFlag("discovery_interval", serveCmd.Flags().Lookup("discovery-interval"))
45+
v.BindPFlag("discovery_scheme", serveCmd.Flags().Lookup("discovery-scheme"))
3746
}
3847

3948
type Config struct {
4049
config.CommonConfig `mapstructure:",squash"`
41-
DocumentServiceURL string `mapstructure:"document_service_url"`
50+
DocumentServiceURL string `mapstructure:"document_service_url"`
51+
EnableDiscovery bool `mapstructure:"enable_discovery"`
52+
DiscoveryNamespace string `mapstructure:"discovery_namespace"`
53+
DiscoveryInterval time.Duration `mapstructure:"discovery_interval"`
54+
DiscoveryScheme string `mapstructure:"discovery_scheme"`
4255
}
4356

4457
// DelegatedAccessRequest represents a request from a user to access a document via agent
@@ -56,6 +69,7 @@ type AgentService struct {
5669
log *logger.Logger
5770
trustDomain string
5871
workloadClient *spiffe.WorkloadClient
72+
a2aClient *a2abridge.A2AClient
5973
}
6074

6175
func extractBearerToken(r *http.Request) string {
@@ -111,18 +125,39 @@ func runServe(cmd *cobra.Command, args []string) error {
111125
}
112126

113127
// Create mTLS HTTP client for outgoing requests
114-
httpClient := workloadClient.CreateMTLSClient(10 * time.Second)
128+
// Timeout must be long enough for A2A agent invocations that include LLM calls
129+
httpClient := workloadClient.CreateMTLSClient(120 * time.Second)
115130
if cfg.OTel.Enabled {
116131
httpClient.Transport = telemetry.WrapTransport(httpClient.Transport)
117132
}
118133

134+
a2aClient := a2abridge.NewA2AClient(httpClient, log.Logger)
135+
119136
svc := &AgentService{
120137
store: store.NewAgentStore(cfg.SPIFFE.TrustDomain),
121138
httpClient: httpClient,
122139
documentServiceURL: cfg.DocumentServiceURL,
123140
log: log,
124141
trustDomain: cfg.SPIFFE.TrustDomain,
125142
workloadClient: workloadClient,
143+
a2aClient: a2aClient,
144+
}
145+
146+
// Start A2A agent discovery loop if enabled
147+
if cfg.EnableDiscovery {
148+
discovery, err := a2abridge.NewAgentDiscovery(
149+
a2abridge.DiscoveryConfig{
150+
Namespace: cfg.DiscoveryNamespace,
151+
TrustDomain: cfg.SPIFFE.TrustDomain,
152+
Scheme: cfg.DiscoveryScheme,
153+
},
154+
httpClient,
155+
log.Logger,
156+
)
157+
if err != nil {
158+
return fmt.Errorf("failed to initialize agent discovery: %w", err)
159+
}
160+
go svc.runDiscoveryLoop(ctx, discovery, cfg.DiscoveryInterval)
126161
}
127162

128163
mux := http.NewServeMux()
@@ -144,7 +179,7 @@ func runServe(cmd *cobra.Command, args []string) error {
144179
server = workloadClient.CreateHTTPServer(cfg.Service.Addr(), handler)
145180
}
146181
server.ReadTimeout = 10 * time.Second
147-
server.WriteTimeout = 30 * time.Second
182+
server.WriteTimeout = 120 * time.Second
148183

149184
// Graceful shutdown
150185
done := make(chan bool)
@@ -174,6 +209,13 @@ func runServe(cmd *cobra.Command, args []string) error {
174209
log.Info("Loaded agents", "count", len(svc.store.List()))
175210
log.Info("mTLS mode", "enabled", !cfg.Service.MockSPIFFE && !cfg.Service.ListenPlainHTTP)
176211
log.Info("Plain HTTP mode", "enabled", cfg.Service.ListenPlainHTTP)
212+
log.Info("A2A agent discovery", "enabled", cfg.EnableDiscovery)
213+
if cfg.EnableDiscovery {
214+
log.Info("Discovery config",
215+
"namespace", cfg.DiscoveryNamespace,
216+
"interval", cfg.DiscoveryInterval,
217+
"scheme", cfg.DiscoveryScheme)
218+
}
177219

178220
for _, agent := range svc.store.List() {
179221
log.Info("Registered agent",
@@ -269,6 +311,16 @@ func (s *AgentService) handleAgentRoutes(w http.ResponseWriter, r *http.Request)
269311
return
270312
}
271313

314+
if parts[1] == "invoke" {
315+
// POST /agents/{id}/invoke
316+
if r.Method != http.MethodPost {
317+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
318+
return
319+
}
320+
s.handleInvoke(w, r, agent)
321+
return
322+
}
323+
272324
http.Error(w, "Not found", http.StatusNotFound)
273325
}
274326

@@ -441,3 +493,177 @@ func (s *AgentService) accessDocumentDelegated(ctx context.Context, agent *store
441493
User: userName,
442494
}, nil
443495
}
496+
497+
// InvokeRequest represents a request to invoke an A2A agent with delegation context.
498+
type InvokeRequest struct {
499+
UserSPIFFEID string `json:"user_spiffe_id"`
500+
DocumentID string `json:"document_id"`
501+
UserDepartments []string `json:"user_departments,omitempty"`
502+
ReviewType string `json:"review_type,omitempty"`
503+
}
504+
505+
// InvokeResponse represents the response from an A2A agent invocation.
506+
type InvokeResponse struct {
507+
Granted bool `json:"granted"`
508+
Reason string `json:"reason,omitempty"`
509+
Agent string `json:"agent"`
510+
User string `json:"user,omitempty"`
511+
Result string `json:"result,omitempty"`
512+
State string `json:"state,omitempty"`
513+
}
514+
515+
func (s *AgentService) handleInvoke(w http.ResponseWriter, r *http.Request, agent *store.Agent) {
516+
var req InvokeRequest
517+
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
518+
s.log.Error("Invalid request body", "error", err)
519+
http.Error(w, "Invalid request body", http.StatusBadRequest)
520+
return
521+
}
522+
523+
// Only A2A agents can be invoked
524+
if agent.A2AURL == "" {
525+
w.Header().Set("Content-Type", "application/json")
526+
w.WriteHeader(http.StatusBadRequest)
527+
json.NewEncoder(w).Encode(map[string]any{
528+
"granted": false,
529+
"reason": "Agent does not support A2A invocation",
530+
"agent": agent.ID,
531+
})
532+
return
533+
}
534+
535+
if req.UserSPIFFEID == "" {
536+
w.Header().Set("Content-Type", "application/json")
537+
w.WriteHeader(http.StatusForbidden)
538+
json.NewEncoder(w).Encode(map[string]any{
539+
"granted": false,
540+
"reason": "Agent requests require user delegation context",
541+
"agent": agent.ID,
542+
})
543+
return
544+
}
545+
546+
ctx := r.Context()
547+
548+
s.log.Section("A2A AGENT INVOCATION")
549+
s.log.Info("Invoking A2A agent",
550+
"agent", agent.Name,
551+
"a2a_url", agent.A2AURL,
552+
"document_id", req.DocumentID,
553+
"user_spiffe_id", req.UserSPIFFEID)
554+
555+
// First check OPA authorization via document-service
556+
bearerToken := extractBearerToken(r)
557+
accessResult, err := s.accessDocumentDelegated(ctx, agent, req.UserSPIFFEID, req.DocumentID, req.UserDepartments, bearerToken)
558+
if err != nil {
559+
s.log.Error("Authorization check failed", "error", err)
560+
http.Error(w, err.Error(), http.StatusInternalServerError)
561+
return
562+
}
563+
564+
if !accessResult.Granted {
565+
s.log.Deny("A2A invocation denied - authorization failed")
566+
w.Header().Set("Content-Type", "application/json")
567+
w.WriteHeader(http.StatusForbidden)
568+
json.NewEncoder(w).Encode(&InvokeResponse{
569+
Granted: false,
570+
Reason: accessResult.Reason,
571+
Agent: agent.ID,
572+
User: accessResult.User,
573+
})
574+
return
575+
}
576+
577+
// Authorization passed - invoke the A2A agent
578+
s.log.Success("Authorization granted, forwarding to A2A agent")
579+
580+
invokeResult, err := s.a2aClient.Invoke(ctx, &a2abridge.InvokeRequest{
581+
AgentURL: agent.A2AURL,
582+
Card: agent.AgentCard,
583+
DocumentID: req.DocumentID,
584+
UserSPIFFEID: req.UserSPIFFEID,
585+
UserDepartments: req.UserDepartments,
586+
ReviewType: req.ReviewType,
587+
})
588+
if err != nil {
589+
s.log.Error("A2A invocation failed", "error", err)
590+
http.Error(w, fmt.Sprintf("A2A invocation failed: %v", err), http.StatusBadGateway)
591+
return
592+
}
593+
594+
userParts := strings.Split(req.UserSPIFFEID, "/")
595+
userName := userParts[len(userParts)-1]
596+
597+
s.log.Success("A2A agent invocation completed",
598+
"agent", agent.Name,
599+
"state", invokeResult.State)
600+
601+
w.Header().Set("Content-Type", "application/json")
602+
json.NewEncoder(w).Encode(&InvokeResponse{
603+
Granted: true,
604+
Reason: "A2A invocation completed",
605+
Agent: agent.ID,
606+
User: userName,
607+
Result: invokeResult.Text,
608+
State: invokeResult.State,
609+
})
610+
}
611+
612+
// runDiscoveryLoop periodically discovers A2A agents from Kubernetes.
613+
func (s *AgentService) runDiscoveryLoop(ctx context.Context, discovery *a2abridge.AgentDiscovery, interval time.Duration) {
614+
s.log.Info("Starting A2A agent discovery loop", "interval", interval)
615+
616+
// Run immediately on startup
617+
s.discoverAgents(ctx, discovery)
618+
619+
ticker := time.NewTicker(interval)
620+
defer ticker.Stop()
621+
622+
for {
623+
select {
624+
case <-ctx.Done():
625+
s.log.Info("Stopping A2A agent discovery loop")
626+
return
627+
case <-ticker.C:
628+
s.discoverAgents(ctx, discovery)
629+
}
630+
}
631+
}
632+
633+
func (s *AgentService) discoverAgents(ctx context.Context, discovery *a2abridge.AgentDiscovery) {
634+
agents, err := discovery.Discover(ctx)
635+
if err != nil {
636+
s.log.Error("Agent discovery failed", "error", err)
637+
return
638+
}
639+
640+
// Track which discovered agents are still present
641+
foundIDs := make(map[string]bool)
642+
643+
for _, discovered := range agents {
644+
foundIDs[discovered.ID] = true
645+
s.store.Register(&store.Agent{
646+
ID: discovered.ID,
647+
Name: discovered.Name,
648+
Capabilities: discovered.Capabilities,
649+
SPIFFEID: discovered.SPIFFEID,
650+
Description: discovered.Description,
651+
Source: store.SourceDiscovered,
652+
A2AURL: discovered.A2AURL,
653+
AgentCard: discovered.Card,
654+
})
655+
s.log.Info("Registered discovered agent",
656+
"id", discovered.ID,
657+
"name", discovered.Name,
658+
"capabilities", discovered.Capabilities,
659+
"a2a_url", discovered.A2AURL)
660+
}
661+
662+
// Remove previously discovered agents that no longer exist
663+
for _, id := range s.store.DiscoveredIDs() {
664+
if !foundIDs[id] {
665+
s.store.Remove(id)
666+
s.log.Info("Removed stale discovered agent", "id", id)
667+
}
668+
}
669+
}

0 commit comments

Comments
 (0)