-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstats.go
More file actions
198 lines (166 loc) · 5.16 KB
/
stats.go
File metadata and controls
198 lines (166 loc) · 5.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package main
import (
"fmt"
"log"
"time"
)
const activityRetentionMonths = 6
// initActivityDatabase creates tables for user activity tracking
func initActivityDatabase() error {
// User activity table - hourly buckets
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS user_activity (
group_id INTEGER NOT NULL,
user_name TEXT NOT NULL,
hour_bucket TIMESTAMP NOT NULL,
message_count INTEGER DEFAULT 1,
PRIMARY KEY (group_id, user_name, hour_bucket)
)`)
if err != nil {
return err
}
// Index for fast cleanup and queries
_, err = db.Exec(`CREATE INDEX IF NOT EXISTS idx_activity_hour_bucket
ON user_activity(hour_bucket)`)
if err != nil {
return err
}
// Rate limiting table for /stat command
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS stat_rate_limit (
group_id INTEGER PRIMARY KEY,
last_run_time TIMESTAMP NOT NULL
)`)
if err != nil {
return err
}
log.Println("[INFO] Activity tracking database initialized")
return nil
}
// trackUserActivity records a message in the current hour bucket
func trackUserActivity(groupID int64, userName string) error {
// Get current hour bucket (truncate to hour)
now := time.Now()
hourBucket := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
// Insert or increment message count
_, err := db.Exec(`INSERT INTO user_activity (group_id, user_name, hour_bucket, message_count)
VALUES (?, ?, ?, 1)
ON CONFLICT(group_id, user_name, hour_bucket)
DO UPDATE SET message_count = message_count + 1`,
groupID, userName, hourBucket)
if err != nil {
log.Printf("[ERROR] Failed to track user activity: %v", err)
return err
}
return nil
}
// cleanupOldActivity removes activity data older than retention period
func cleanupOldActivity() error {
cutoffTime := time.Now().AddDate(0, -activityRetentionMonths, 0)
result, err := db.Exec("DELETE FROM user_activity WHERE hour_bucket < ?", cutoffTime)
if err != nil {
return err
}
rowsAffected, _ := result.RowsAffected()
if rowsAffected > 0 {
log.Printf("[INFO] Cleaned up %d old activity records (older than %d months)",
rowsAffected, activityRetentionMonths)
}
return nil
}
// ActivityStats holds aggregated statistics for a group
type ActivityStats struct {
GroupID int64
StartTime time.Time
EndTime time.Time
UserStats []UserActivitySummary
}
// UserActivitySummary holds per-user statistics
type UserActivitySummary struct {
UserName string
TotalMessages int
HourlyData map[time.Time]int // hour bucket -> message count
}
// getGroupActivityStats retrieves activity stats for a group for the last N days
func getGroupActivityStats(groupID int64, days int) (*ActivityStats, error) {
endTime := time.Now()
startTime := endTime.AddDate(0, 0, -days)
rows, err := db.Query(`SELECT user_name, hour_bucket, message_count
FROM user_activity
WHERE group_id = ? AND hour_bucket >= ? AND hour_bucket <= ?
ORDER BY hour_bucket ASC`,
groupID, startTime, endTime)
if err != nil {
return nil, err
}
defer rows.Close()
// Aggregate by user
userDataMap := make(map[string]*UserActivitySummary)
for rows.Next() {
var userName string
var hourBucket time.Time
var messageCount int
err := rows.Scan(&userName, &hourBucket, &messageCount)
if err != nil {
log.Printf("[ERROR] Failed to scan activity row: %v", err)
continue
}
if _, exists := userDataMap[userName]; !exists {
userDataMap[userName] = &UserActivitySummary{
UserName: userName,
HourlyData: make(map[time.Time]int),
}
}
userDataMap[userName].HourlyData[hourBucket] = messageCount
userDataMap[userName].TotalMessages += messageCount
}
// Convert map to slice
var userStats []UserActivitySummary
for _, summary := range userDataMap {
userStats = append(userStats, *summary)
}
return &ActivityStats{
GroupID: groupID,
StartTime: startTime,
EndTime: endTime,
UserStats: userStats,
}, rows.Err()
}
// checkStatRateLimit checks if enough time has passed since last /stat run
func checkStatRateLimit(groupID int64) (bool, error) {
var lastRunTime time.Time
err := db.QueryRow("SELECT last_run_time FROM stat_rate_limit WHERE group_id = ?", groupID).Scan(&lastRunTime)
if err != nil {
// No record found - first time running
return true, nil
}
// Check if 1 hour has passed
if time.Since(lastRunTime) < time.Hour {
return false, fmt.Errorf("rate limited: last run was %v ago", time.Since(lastRunTime).Round(time.Minute))
}
return true, nil
}
// updateStatRateLimit updates the last run time for a group
func updateStatRateLimit(groupID int64) error {
_, err := db.Exec(`INSERT INTO stat_rate_limit (group_id, last_run_time)
VALUES (?, ?)
ON CONFLICT(group_id)
DO UPDATE SET last_run_time = ?`,
groupID, time.Now(), time.Now())
return err
}
// scheduleActivityCleanup runs periodic cleanup of old activity data
func scheduleActivityCleanup() {
// Run cleanup on startup
err := cleanupOldActivity()
if err != nil {
log.Printf("[ERROR] Failed initial activity cleanup: %v", err)
}
// Run cleanup daily
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
for range ticker.C {
err := cleanupOldActivity()
if err != nil {
log.Printf("[ERROR] Failed scheduled activity cleanup: %v", err)
}
}
}