Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions internal/analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,28 @@ func (p *ThanosResourceProvider) GetHostServiceCPU(ctx context.Context, start, e
return serviceCPU, nil
}

func (p *ThanosResourceProvider) GetValueUnits(ctx context.Context, start, end time.Time) (map[string]float64, error) {
// 1. Define queries for all business value counters
queries := map[string]string{
"ingestion": "sum(increase(second_brain_sync_processed_total[15m])) + sum(increase(reading_sync_processed_total[15m]))",
"proxy": "sum(increase(proxy_webhook_received_total[15m])) + sum(increase(proxy_synthetic_request_total[15m]))",
}

units := make(map[string]float64)
for feature, query := range queries {
samples, err := p.Client.QueryRange(ctx, query, start, end, "1m")
if err != nil {
continue
}
if len(samples) > 0 {
last := samples[len(samples)-1]
val, _ := strconv.ParseFloat(fmt.Sprintf("%v", last.Payload["value"]), 64)
units[feature] = val
}
}
return units, nil
}

func (p *ThanosResourceProvider) GetCarbonIntensity(ctx context.Context) (float64, error) {

// Default: ~150g CO2 per kWh (Sample value for a "greenish" grid)
Expand Down
29 changes: 28 additions & 1 deletion internal/analytics/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type ResourceProvider interface {
GetEnergyJoules(ctx context.Context, start, end time.Time) (float64, error)
GetContainerEnergy(ctx context.Context, start, end time.Time) (map[string]float64, error)
GetHostServiceCPU(ctx context.Context, start, end time.Time) (map[string]float64, error)
GetValueUnits(ctx context.Context, start, end time.Time) (map[string]float64, error)
GetCarbonIntensity(ctx context.Context) (float64, error) // gCO2 per kWh
GetCostFactor(ctx context.Context) (float64, error) // CAD per Joule
}
Expand Down Expand Up @@ -147,7 +148,10 @@ func (s *Service) RunBatch(ctx context.Context) {
// 3. Resource Integration (Phase 3)
s.processResources(ctx, start, end, hostName, osName)

// 4. Fetch Tailscale State (Logs/Metrics only, no DB)
// 4. Value Integration (Phase 4: Business Value Ingestion)
s.processValueUnits(ctx, start, end, hostName, osName)

// 5. Fetch Tailscale State (Logs/Metrics only, no DB)
s.collectTailscale(ctx)

telemetry.Info("batch_complete")
Expand Down Expand Up @@ -203,6 +207,29 @@ func (s *Service) processResources(ctx context.Context, start, end time.Time, ho
}
}

func (s *Service) processValueUnits(ctx context.Context, start, end time.Time, hostName, osName string) {
if s.Resources == nil {
return
}

valueUnits, err := s.Resources.GetValueUnits(ctx, start, end)
if err != nil {
telemetry.Error("value_units_fetch_failed", "error", err)
return
}

for feature, count := range valueUnits {
if count > 0 {
metadata := map[string]interface{}{
"host": hostName,
"os": osName,
}
_ = s.Store.RecordAnalyticsMetric(ctx, end, feature, KindValueUnit, count, "count", metadata)
telemetry.Info("value_unit_recorded", "feature_id", feature, "count", count)
}
}
}

func (s *Service) recordMetricsForFeature(ctx context.Context, t time.Time, featureID string, joules, costFactor, carbonIntensity float64, hostName, osName string) {
metadata := map[string]interface{}{
"host": hostName,
Expand Down
38 changes: 38 additions & 0 deletions internal/analytics/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type MockResources struct {
GetEnergyJoulesFn func(ctx context.Context, start, end time.Time) (float64, error)
GetContainerEnergyFn func(ctx context.Context, start, end time.Time) (map[string]float64, error)
GetHostServiceCPUFn func(ctx context.Context, start, end time.Time) (map[string]float64, error)
GetValueUnitsFn func(ctx context.Context, start, end time.Time) (map[string]float64, error)
}

func (m *MockResources) GetEnergyJoules(ctx context.Context, start, end time.Time) (float64, error) {
Expand Down Expand Up @@ -81,6 +82,16 @@ func (m *MockResources) GetHostServiceCPU(ctx context.Context, start, end time.T
}, nil
}

func (m *MockResources) GetValueUnits(ctx context.Context, start, end time.Time) (map[string]float64, error) {
if m.GetValueUnitsFn != nil {
return m.GetValueUnitsFn(ctx, start, end)
}
return map[string]float64{
"ingestion": 10.0,
"proxy": 5.0,
}, nil
}

func (m *MockResources) GetCarbonIntensity(ctx context.Context) (float64, error) {
return 150.0, nil
}
Expand Down Expand Up @@ -286,6 +297,33 @@ func TestService_ProcessResources(t *testing.T) {
}
}

func TestService_ProcessValueUnits(t *testing.T) {
mockStore := &MockStore{}
mockResources := &MockResources{}

s := &Service{
Store: mockStore,
Resources: mockResources,
}

s.processValueUnits(context.Background(), time.Now().Add(-15*time.Minute), time.Now(), "test-host", "linux")

// 2 features (ingestion, proxy) = 2 recordings
expectedCount := 2
if len(mockStore.AnalyticsRecorded) != expectedCount {
t.Errorf("expected %d analytics recordings, got %d", expectedCount, len(mockStore.AnalyticsRecorded))
}

recordedKinds := make(map[string]bool)
for _, k := range mockStore.AnalyticsRecorded {
recordedKinds[k] = true
}

if !recordedKinds[string(KindValueUnit)] {
t.Errorf("expected KindValueUnit to be recorded")
}
}

func TestService_CollectTailscale(t *testing.T) {
// Just call it to see if it covers the lines.
// Real logic will fail but it handles errors gracefully.
Expand Down