Skip to content

Commit 839df3d

Browse files
committed
report-listener: add CleanAI intelligence endpoint with server-side limits
1 parent ccdfba0 commit 839df3d

File tree

7 files changed

+648
-2
lines changed

7 files changed

+648
-2
lines changed

report-listener/config/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ type Config struct {
3232
RabbitExchange string
3333
RabbitAnalysedReportRoutingKey string
3434
RabbitTwitterReplyRoutingKey string
35+
36+
// Intelligence chat configuration
37+
GeminiAPIKey string
38+
GeminiModel string
39+
IntelligenceFreeTierMaxTurn int
3540
}
3641

3742
// Load loads configuration from environment variables
@@ -61,6 +66,11 @@ func Load() *Config {
6166
RabbitExchange: getEnv("RABBITMQ_EXCHANGE", "cleanapp"),
6267
RabbitAnalysedReportRoutingKey: getEnv("RABBITMQ_ANALYSED_REPORT_ROUTING_KEY", "report.analysed"),
6368
RabbitTwitterReplyRoutingKey: getEnv("RABBITMQ_TWITTER_REPLY_ROUTING_KEY", "twitter.reply"),
69+
70+
// Intelligence defaults
71+
GeminiAPIKey: getEnv("GEMINI_API_KEY", ""),
72+
GeminiModel: getEnv("GEMINI_MODEL", "gemini-2.5-flash"),
73+
IntelligenceFreeTierMaxTurn: getIntEnv("INTELLIGENCE_FREE_MAX_TURNS", 5),
6474
}
6575

6676
return config
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
package database
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
"strings"
8+
"time"
9+
)
10+
11+
type NamedCount struct {
12+
Name string
13+
Count int
14+
}
15+
16+
type IntelligenceContext struct {
17+
OrgID string
18+
ReportsAnalyzed int
19+
ReportsThisMonth int
20+
HighPriorityCount int
21+
MediumPriorityCount int
22+
TopClassifications []NamedCount
23+
TopIssues []NamedCount
24+
RecentSummaries []string
25+
}
26+
27+
func (d *Database) EnsureIntelligenceTables(ctx context.Context) error {
28+
_, err := d.db.ExecContext(ctx, `
29+
CREATE TABLE IF NOT EXISTS intelligence_usage (
30+
session_id VARCHAR(128) PRIMARY KEY,
31+
turns_used INT NOT NULL DEFAULT 0,
32+
expires_at TIMESTAMP NOT NULL,
33+
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
34+
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
35+
)
36+
`)
37+
if err != nil {
38+
return fmt.Errorf("failed to ensure intelligence_usage table: %w", err)
39+
}
40+
return nil
41+
}
42+
43+
func (d *Database) GetAndIncrementIntelligenceUsage(ctx context.Context, sessionID string, maxTurns int, ttl time.Duration) (bool, int, error) {
44+
if strings.TrimSpace(sessionID) == "" {
45+
return false, 0, fmt.Errorf("session_id is required")
46+
}
47+
if maxTurns <= 0 {
48+
return true, 0, nil
49+
}
50+
51+
tx, err := d.db.BeginTx(ctx, nil)
52+
if err != nil {
53+
return false, 0, fmt.Errorf("failed to start tx: %w", err)
54+
}
55+
defer tx.Rollback()
56+
57+
now := time.Now().UTC()
58+
var turnsUsed int
59+
var expiresAt time.Time
60+
hasRow := true
61+
62+
rowErr := tx.QueryRowContext(ctx, `
63+
SELECT turns_used, expires_at
64+
FROM intelligence_usage
65+
WHERE session_id = ?
66+
FOR UPDATE
67+
`, sessionID).Scan(&turnsUsed, &expiresAt)
68+
if rowErr == sql.ErrNoRows {
69+
hasRow = false
70+
turnsUsed = 0
71+
expiresAt = now.Add(ttl)
72+
} else if rowErr != nil {
73+
return false, 0, fmt.Errorf("failed to read intelligence usage: %w", rowErr)
74+
}
75+
76+
expired := !hasRow || !expiresAt.After(now)
77+
if expired {
78+
turnsUsed = 0
79+
expiresAt = now.Add(ttl)
80+
}
81+
82+
if turnsUsed >= maxTurns {
83+
if err := tx.Commit(); err != nil {
84+
return false, turnsUsed, fmt.Errorf("failed to commit usage tx: %w", err)
85+
}
86+
return false, turnsUsed, nil
87+
}
88+
89+
turnsUsed++
90+
if _, err := tx.ExecContext(ctx, `
91+
INSERT INTO intelligence_usage (session_id, turns_used, expires_at)
92+
VALUES (?, ?, ?)
93+
ON DUPLICATE KEY UPDATE
94+
turns_used = VALUES(turns_used),
95+
expires_at = VALUES(expires_at)
96+
`, sessionID, turnsUsed, expiresAt); err != nil {
97+
return false, 0, fmt.Errorf("failed to upsert intelligence usage: %w", err)
98+
}
99+
100+
if err := tx.Commit(); err != nil {
101+
return false, 0, fmt.Errorf("failed to commit usage tx: %w", err)
102+
}
103+
return true, turnsUsed, nil
104+
}
105+
106+
func (d *Database) GetIntelligenceContext(ctx context.Context, orgID string) (*IntelligenceContext, error) {
107+
org := strings.ToLower(strings.TrimSpace(orgID))
108+
if org == "" {
109+
return nil, fmt.Errorf("org_id is required")
110+
}
111+
112+
total, high, medium, err := d.GetBrandPriorityCountsByBrandName(ctx, org)
113+
if err != nil {
114+
return nil, err
115+
}
116+
117+
result := &IntelligenceContext{
118+
OrgID: org,
119+
ReportsAnalyzed: total,
120+
HighPriorityCount: high,
121+
MediumPriorityCount: medium,
122+
}
123+
124+
_ = d.db.QueryRowContext(ctx, `
125+
SELECT COUNT(*)
126+
FROM reports r
127+
INNER JOIN report_analysis ra ON ra.seq = r.seq
128+
WHERE ra.brand_name = ?
129+
AND ra.is_valid = TRUE
130+
AND r.ts >= DATE_FORMAT(UTC_TIMESTAMP(), '%Y-%m-01 00:00:00')
131+
`, org).Scan(&result.ReportsThisMonth)
132+
133+
classRows, err := d.db.QueryContext(ctx, `
134+
SELECT COALESCE(classification, 'unknown') AS classification, COUNT(*) AS cnt
135+
FROM report_analysis
136+
WHERE brand_name = ?
137+
AND is_valid = TRUE
138+
GROUP BY classification
139+
ORDER BY cnt DESC
140+
LIMIT 5
141+
`, org)
142+
if err == nil {
143+
defer classRows.Close()
144+
for classRows.Next() {
145+
var item NamedCount
146+
if scanErr := classRows.Scan(&item.Name, &item.Count); scanErr == nil {
147+
result.TopClassifications = append(result.TopClassifications, item)
148+
}
149+
}
150+
}
151+
152+
issueRows, err := d.db.QueryContext(ctx, `
153+
SELECT title, COUNT(*) AS cnt
154+
FROM report_analysis
155+
WHERE brand_name = ?
156+
AND is_valid = TRUE
157+
AND title IS NOT NULL
158+
AND title != ''
159+
GROUP BY title
160+
ORDER BY cnt DESC
161+
LIMIT 5
162+
`, org)
163+
if err == nil {
164+
defer issueRows.Close()
165+
for issueRows.Next() {
166+
var item NamedCount
167+
if scanErr := issueRows.Scan(&item.Name, &item.Count); scanErr == nil {
168+
result.TopIssues = append(result.TopIssues, item)
169+
}
170+
}
171+
}
172+
173+
summaryRows, err := d.db.QueryContext(ctx, `
174+
SELECT summary
175+
FROM report_analysis
176+
WHERE brand_name = ?
177+
AND is_valid = TRUE
178+
AND summary IS NOT NULL
179+
AND summary != ''
180+
ORDER BY created_at DESC
181+
LIMIT 5
182+
`, org)
183+
if err == nil {
184+
defer summaryRows.Close()
185+
for summaryRows.Next() {
186+
var summary string
187+
if scanErr := summaryRows.Scan(&summary); scanErr == nil {
188+
result.RecentSummaries = append(result.RecentSummaries, summary)
189+
}
190+
}
191+
}
192+
193+
return result, nil
194+
}

report-listener/handlers/handlers.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ import (
1313
"sync"
1414
"time"
1515

16+
"report-listener/config"
1617
"report-listener/database"
18+
"report-listener/intelligence"
1719
"report-listener/models"
1820
"report-listener/rabbitmq"
1921
brandutil "report-listener/utils"
@@ -50,18 +52,30 @@ type Handlers struct {
5052
db *database.Database
5153
rabbitmqPublisher *rabbitmq.Publisher
5254
rabbitmqReplier *rabbitmq.Publisher
55+
cfg *config.Config
56+
geminiClient *intelligence.Client
5357

5458
brandCountsMu sync.RWMutex
5559
brandCountsCache map[string]brandCountsCacheEntry
5660
}
5761

5862
// NewHandlers creates a new handlers instance
59-
func NewHandlers(hub *ws.Hub, db *database.Database, pub *rabbitmq.Publisher, replyPub *rabbitmq.Publisher) *Handlers {
63+
func NewHandlers(
64+
cfg *config.Config,
65+
hub *ws.Hub,
66+
db *database.Database,
67+
pub *rabbitmq.Publisher,
68+
replyPub *rabbitmq.Publisher,
69+
) *Handlers {
70+
geminiClient := intelligence.NewClient(cfg.GeminiAPIKey, cfg.GeminiModel)
71+
6072
return &Handlers{
6173
hub: hub,
6274
db: db,
6375
rabbitmqPublisher: pub,
6476
rabbitmqReplier: replyPub,
77+
cfg: cfg,
78+
geminiClient: geminiClient,
6579
brandCountsCache: make(map[string]brandCountsCacheEntry),
6680
}
6781
}

0 commit comments

Comments
 (0)