Skip to content

Commit 80eedc7

Browse files
authored
Merge pull request #589 from sahandilshan/analytics
Add initial observability component
2 parents f31c7ae + acaa5d4 commit 80eedc7

35 files changed

+7668
-6
lines changed

backend/cmd/server/main.go

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@ import (
3131
"syscall"
3232
"time"
3333

34+
"github.com/asgardeo/thunder/internal/observability"
3435
"github.com/asgardeo/thunder/internal/system/cache"
3536
"github.com/asgardeo/thunder/internal/system/cert"
3637
"github.com/asgardeo/thunder/internal/system/config"
3738
"github.com/asgardeo/thunder/internal/system/database/provider"
3839
"github.com/asgardeo/thunder/internal/system/log"
40+
"github.com/asgardeo/thunder/internal/system/middleware"
3941
)
4042

4143
// shutdownTimeout defines the timeout duration for graceful shutdown.
@@ -54,6 +56,9 @@ func main() {
5456
// Initialize the cache manager.
5557
initCacheManager(logger)
5658

59+
// Initialize observability with console adapter and JSON format
60+
initObservability(logger)
61+
5762
// Create a new HTTP multiplexer.
5863
mux := http.NewServeMux()
5964
if mux == nil {
@@ -136,6 +141,34 @@ func initCacheManager(logger *log.Logger) {
136141
cm.Init()
137142
}
138143

144+
// initObservability initializes the observability service with console adapter and JSON format.
145+
func initObservability(logger *log.Logger) {
146+
// Configure observability to use console adapter with JSON format
147+
observabilityCfg := &observability.Config{
148+
Enabled: true,
149+
Output: observability.OutputConfig{
150+
Type: "console", // Output to stdout
151+
Format: "json",
152+
},
153+
Metrics: observability.MetricsConfig{
154+
Enabled: true,
155+
},
156+
FailureMode: "graceful", // Don't fail if observability has issues
157+
}
158+
159+
svc, err := observability.InitializeWithConfig(observabilityCfg)
160+
if err != nil {
161+
logger.Error("Failed to initialize observability service", log.Error(err))
162+
return
163+
}
164+
165+
if svc.IsEnabled() {
166+
logger.Debug("Observability service initialized successfully with console adapter and JSON format")
167+
} else {
168+
logger.Warn("Observability service is disabled")
169+
}
170+
}
171+
139172
// loadCertConfig loads the certificate configuration and extracts the Key ID (kid).
140173
func loadCertConfig(logger *log.Logger, cfg *config.Config, thunderHome string) *tls.Config {
141174
sysCertSvc := cert.NewSystemCertificateService()
@@ -198,15 +231,15 @@ func startHTTPServer(logger *log.Logger, cfg *config.Config, mux *http.ServeMux)
198231

199232
// createHTTPServer creates and configures an HTTP server with common settings.
200233
func createHTTPServer(logger *log.Logger, cfg *config.Config, mux *http.ServeMux) (*http.Server, string) {
201-
// Wrap the multiplexer with AccessLogHandler.
202-
wrappedMux := log.AccessLogHandler(logger, mux)
234+
handler := middleware.CorrelationIDMiddleware(mux)
235+
handler = log.AccessLogHandler(logger, handler)
203236

204237
// Build the server address using hostname and port from the configurations.
205238
serverAddr := fmt.Sprintf("%s:%d", cfg.Server.Hostname, cfg.Server.Port)
206239

207240
server := &http.Server{
208241
Addr: serverAddr,
209-
Handler: wrappedMux,
242+
Handler: handler,
210243
ReadHeaderTimeout: 10 * time.Second, // Mitigate Slowloris attacks
211244
WriteTimeout: 10 * time.Second,
212245
IdleTimeout: 120 * time.Second,
@@ -227,6 +260,13 @@ func gracefulShutdown(logger *log.Logger, server *http.Server) {
227260
logger.Debug("HTTP server shutdown completed")
228261
}
229262

263+
// Shutdown observability service
264+
observabilitySvc := observability.GetService()
265+
if observabilitySvc != nil {
266+
observabilitySvc.Shutdown()
267+
logger.Debug("Observability service shutdown completed")
268+
}
269+
230270
// Close database connections
231271
dbCloser := provider.GetDBProviderCloser()
232272
if err := dbCloser.Close(); err != nil {

backend/cmd/server/repository/conf/deployment.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,12 @@ cors:
2929

3030
immutable_resources:
3131
enabled: false
32+
33+
observability:
34+
enabled: false
35+
output:
36+
type: "console"
37+
format: "json"
38+
metrics:
39+
enabled: true
40+
failure_mode: "graceful"
Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
# Observability System
2+
3+
A comprehensive observability platform for Thunder providing event logging, metrics collection, distributed tracing, and health monitoring for authentication and authorization flows.
4+
5+
## Architecture
6+
7+
The observability system uses an **Event Bus** pattern to capture and process events throughout the auth/authz flows with minimal performance impact.
8+
9+
**Key Features:**
10+
-**Async & Non-blocking** - Events published in goroutines, main thread never blocks
11+
-**No Queue** - Direct event delivery to subscribers
12+
-**Category-based routing** - Subscribers filter by event categories
13+
-**Isolated failures** - Subscriber panics don't affect others
14+
15+
```
16+
Auth/AuthZ Components → Event Bus (Publisher) → Subscribers (async) → Formatters → Output Adapters
17+
18+
Each subscriber in own goroutine
19+
```
20+
21+
## Quick Start
22+
23+
### 1. Publishing Events
24+
25+
```go
26+
import (
27+
"github.com/asgardeo/thunder/internal/observability"
28+
"github.com/asgardeo/thunder/internal/observability/event"
29+
)
30+
31+
func handleRequest(ctx context.Context) {
32+
// Ensure trace ID exists
33+
ctx = observability.EnsureTraceID(ctx)
34+
traceID := observability.GetTraceID(ctx)
35+
36+
// Create and publish event
37+
evt := event.NewEvent(traceID, string(event.EventTypeAuthenticationStarted), "MyComponent")
38+
evt.WithData("user_id", "user123")
39+
.WithStatus(event.StatusInProgress)
40+
.WithData("message", "Authentication started")
41+
42+
observability.GetService().PublishEvent(evt)
43+
}
44+
```
45+
46+
### 2. Custom Subscribers
47+
48+
```go
49+
import (
50+
"github.com/asgardeo/thunder/internal/observability/publisher"
51+
"github.com/asgardeo/thunder/internal/observability/subscriber/defaultsubscriber"
52+
"github.com/asgardeo/thunder/internal/observability/formatter/json"
53+
"github.com/asgardeo/thunder/internal/observability/adapter/file"
54+
"github.com/asgardeo/thunder/internal/observability/event"
55+
)
56+
57+
// Subscribe to specific event types only
58+
jsonFormatter := jsonformatter.NewJSONFormatter()
59+
fileAdapter, _ := file.NewFileAdapter("/path/to/custom-events.log")
60+
subscriber := defaultsubscriber.NewDefaultSubscriber(jsonFormatter, fileAdapter)
61+
62+
// Option 1: Subscribe to specific event types
63+
publisher.GetPublisher().Subscribe(subscriber,
64+
event.EventTypeAuthenticationStarted,
65+
event.EventTypeAuthenticationCompleted,
66+
event.EventTypeAuthenticationFailed,
67+
)
68+
69+
// Option 2: Subscribe to all events
70+
publisher.GetPublisher().SubscribeAll(subscriber)
71+
```
72+
73+
## Package Structure
74+
75+
```
76+
observability/
77+
├── event/ # Event model and types
78+
│ ├── event.go # Core event structure with fluent API
79+
│ └── constants.go # Event types and component names
80+
├── publisher/ # Event publisher
81+
│ └── publisher.go # Singleton publisher with in-memory queue
82+
├── subscriber/ # Subscriber interfaces and implementations
83+
│ ├── subscriber.go # Subscriber interface
84+
│ └── defaultsubscriber/ # Default subscriber implementation
85+
│ └── default_subscriber.go
86+
├── formatter/ # Event formatters
87+
│ ├── formatter.go # Formatter interface
88+
│ ├── json/ # JSON formatter
89+
│ │ └── json_formatter.go
90+
│ └── csv/ # CSV formatter
91+
│ └── csv_formatter.go
92+
├── adapter/ # Output adapters
93+
│ ├── adapter.go # OutputAdapter interface
94+
│ ├── file/ # File output
95+
│ │ └── file_adapter.go
96+
│ └── console/ # Console output
97+
│ └── console_adapter.go
98+
├── examples/ # Integration examples
99+
│ └── integration_examples.go
100+
├── context.go # Context utilities for trace ID
101+
├── service.go # Main analytics service
102+
└── README.md # This file
103+
```
104+
105+
## Event Types
106+
107+
### Authorization Events
108+
- `AUTHORIZATION_STARTED` - Authorization request received
109+
- `AUTHORIZATION_VALIDATED` - Request validated
110+
- `AUTHORIZATION_CODE_GENERATED` - Auth code created
111+
- `AUTHORIZATION_COMPLETED` - Flow complete
112+
- `AUTHORIZATION_FAILED` - Flow failed
113+
114+
### Authentication Events
115+
- `AUTHENTICATION_STARTED` - Auth flow begins
116+
- `CREDENTIALS_AUTH_STARTED` - Username/password auth
117+
- `CREDENTIALS_AUTH_COMPLETED` - Credentials verified
118+
- `CREDENTIALS_AUTH_FAILED` - Credentials invalid
119+
- `OTP_SENT` - OTP sent to user
120+
- `OTP_VERIFIED` - OTP validated
121+
- `SOCIAL_AUTH_STARTED` - Social login begins
122+
- `SOCIAL_AUTH_COMPLETED` - Social login succeeds
123+
- `AUTHENTICATION_COMPLETED` - Auth complete
124+
- `AUTHENTICATION_FAILED` - Auth failed
125+
126+
### Token Events
127+
- `TOKEN_REQUEST_RECEIVED` - Token request received
128+
- `TOKEN_REQUEST_VALIDATED` - Request validated
129+
- `PKCE_VALIDATED` - PKCE validation successful
130+
- `ACCESS_TOKEN_GENERATED` - Access token created
131+
- `ID_TOKEN_GENERATED` - ID token created
132+
- `REFRESH_TOKEN_GENERATED` - Refresh token created
133+
- `TOKEN_ISSUED` - Tokens issued
134+
- `TOKEN_REQUEST_FAILED` - Request failed
135+
136+
### Flow Events
137+
- `FLOW_STARTED` - Flow execution begins
138+
- `FLOW_NODE_EXECUTION_STARTED` - Node execution starts
139+
- `FLOW_NODE_EXECUTION_COMPLETED` - Node completes
140+
- `FLOW_USER_INPUT_REQUIRED` - User input needed
141+
- `FLOW_COMPLETED` - Flow succeeds
142+
- `FLOW_FAILED` - Flow fails
143+
144+
## Integration Examples
145+
146+
See `examples/integration_examples.go` for complete examples of:
147+
- Authorization handler integration
148+
- Token handler integration
149+
- Flow execution integration
150+
- Authentication service integration
151+
152+
## Output Format
153+
154+
### JSON (Default)
155+
```json
156+
{
157+
"trace_id": "550e8400-e29b-41d4-a716-446655440000",
158+
"event_id": "123e4567-e89b-12d3-a456-426614174000",
159+
"event_type": "AUTHENTICATION_COMPLETED",
160+
"timestamp": "2025-10-21T10:15:30Z",
161+
"component": "AuthenticationService",
162+
"user_id": "user_123",
163+
"client_id": "client_456",
164+
"status": "SUCCESS",
165+
"duration_ms": 245
166+
}
167+
```
168+
169+
### CSV
170+
```csv
171+
TraceID,EventID,EventType,Timestamp,Component,UserID,ClientID,Status,DurationMS
172+
550e8400...,123e4567...,AUTHENTICATION_COMPLETED,2025-10-21T10:15:30Z,AuthenticationService,user_123,client_456,SUCCESS,245
173+
```
174+
175+
## Querying Events
176+
177+
### Filter by Trace ID
178+
```bash
179+
# Using jq
180+
cat analytics.log | jq 'select(.trace_id == "550e8400-e29b-41d4-a716-446655440000")'
181+
182+
# Get event timeline
183+
cat analytics.log | jq 'select(.trace_id == "550e8400...") | {timestamp, event_type, status}'
184+
```
185+
186+
### Get Failed Authentications
187+
```bash
188+
cat analytics.log | jq 'select(.event_type | contains("FAILED"))'
189+
```
190+
191+
### Calculate Average Duration
192+
```bash
193+
cat analytics.log | jq -s '[.[] | select(.event_type == "TOKEN_ISSUED") | .duration_ms] | add / length'
194+
```
195+
196+
## Performance
197+
198+
- **Non-blocking**: Event publishing returns immediately, never blocks the main thread
199+
- **Async processing**: Each subscriber runs in its own goroutine
200+
- **No queue overhead**: Direct event delivery without buffering
201+
- **Graceful degradation**: Events skipped if no subscribers are interested
202+
- **Parallel processing**: All subscribers process events simultaneously
203+
- **Isolated failures**: One subscriber's failure doesn't affect others
204+
205+
## Extending
206+
207+
### Custom Formatter
208+
209+
```go
210+
type MyFormatter struct{}
211+
212+
func (mf *MyFormatter) Format(evt *event.Event) ([]byte, error) {
213+
// Custom formatting logic
214+
return []byte(fmt.Sprintf("%s: %s", evt.EventType, evt.Message)), nil
215+
}
216+
217+
func (mf *MyFormatter) GetName() string {
218+
return "MyFormatter"
219+
}
220+
```
221+
222+
### Custom Output Adapter
223+
224+
```go
225+
type MyAdapter struct{}
226+
227+
func (ma *MyAdapter) Write(data []byte) error {
228+
// Custom output logic (send to external service, database, etc.)
229+
return nil
230+
}
231+
232+
func (ma *MyAdapter) Flush() error {
233+
return nil
234+
}
235+
236+
func (ma *MyAdapter) Close() error {
237+
return nil
238+
}
239+
240+
func (ma *MyAdapter) GetName() string {
241+
return "MyAdapter"
242+
}
243+
```
244+
245+
## Shutdown
246+
247+
Always shut down the analytics service gracefully:
248+
249+
```go
250+
// In main.go or shutdown handler
251+
defer observability.GetService().Shutdown()
252+
```
253+
254+
This ensures:
255+
- Pending events are processed
256+
- Buffers are flushed
257+
- Files are closed properly
258+
- Resources are released
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright (c) 2025, WSO2 LLC. (https://www.wso2.com).
3+
*
4+
* WSO2 LLC. licenses this file to you under the Apache License,
5+
* Version 2.0 (the "License"); you may not use this file except
6+
* in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing,
12+
* software distributed under the License is distributed on an
13+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
* KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations
16+
* under the License.
17+
*/
18+
19+
// Package adapter provides output adapter interfaces and implementations.
20+
package adapter
21+
22+
// OutputAdapter is the interface for writing formatted events to various destinations.
23+
type OutputAdapter interface {
24+
// Write writes formatted event data to the output destination.
25+
Write(data []byte) error
26+
27+
// Flush ensures all buffered data is written.
28+
Flush() error
29+
30+
// Close closes the adapter and releases resources.
31+
Close() error
32+
33+
// GetName returns the name of this adapter.
34+
GetName() string
35+
}

0 commit comments

Comments
 (0)