Skip to content

Commit f998073

Browse files
committed
Refactor and optimize geo and TI processing.
Reorganized GeoIP database and threat intelligence loading into more modular functions for improved maintainability and code readability. Simplified caching, removed unused database function, and restructured rule-handling logic. Addressed minor variable renames and logging adjustments for consistency.
1 parent 5bbbf0d commit f998073

File tree

24 files changed

+518
-452
lines changed

24 files changed

+518
-452
lines changed

correlation/Dockerfile

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
FROM ubuntu:22.04
2-
RUN apt update
3-
RUN apt install -y ca-certificates git
1+
FROM ubuntu:24.04
2+
RUN apt-get update
3+
RUN apt-get install -y ca-certificates git wget
44
COPY correlation /app/
55
COPY docs/swagger.json /app/docs/
66
COPY docs/swagger.yaml /app/docs/
@@ -9,4 +9,13 @@ COPY run.sh /
99
RUN chmod +x /app/correlation
1010
RUN chmod +x /run.sh
1111
RUN update-ca-certificates
12+
RUN wget -O /app/asn-blocks-v4.csv https://cdn.utmstack.com/geoip/asn-blocks-v4.csv
13+
RUN wget -O /app/asn-blocks-v6.csv https://cdn.utmstack.com/geoip/asn-blocks-v6.csv
14+
RUN wget -O /app/blocks-v4.csv https://cdn.utmstack.com/geoip/blocks-v4.csv
15+
RUN wget -O /app/blocks-v6.csv https://cdn.utmstack.com/geoip/blocks-v6.csv
16+
RUN wget -O /app/locations-en.csv https://cdn.utmstack.com/geoip/locations-en.csv
17+
RUN wget -O /app/ip_blocklist.list https://intelligence.threatwinds.com/feeds/public/ip/cumulative.list
18+
RUN wget -O /app/domain_blocklist.list https://intelligence.threatwinds.com/feeds/public/domain/cumulative.list
19+
RUN wget -O /app/hostname_blocklist.list https://intelligence.threatwinds.com/feeds/public/hostname/cumulative.list
20+
1221
ENTRYPOINT [ "/run.sh" ]

correlation/api/newLogHandler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package api
33
import (
44
"encoding/json"
55
"fmt"
6+
"github.com/utmstack/UTMStack/correlation/ti"
67
"io"
78
"log"
89
"net/http"
@@ -74,6 +75,7 @@ func NewLog(c *gin.Context) {
7475
}
7576

7677
cache.AddToCache(l)
78+
ti.Enqueue(l)
7779
search.AddToQueue(l)
7880
response["status"] = "queued"
7981
c.JSON(http.StatusOK, response)

correlation/cache/cache.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ import (
1313

1414
const bufferSize int = 1000000
1515

16-
var cacheStorageMutex = &sync.RWMutex{}
16+
var storageMutex = &sync.RWMutex{}
1717

18-
var CacheStorage []string
18+
var storage []string
1919

2020
func Status() {
2121
for {
22-
log.Printf("Logs in cache: %v", len(CacheStorage))
23-
if len(CacheStorage) != 0 {
24-
est := gjson.Get(CacheStorage[0], "@timestamp").String()
22+
log.Printf("Logs in cache: %v", len(storage))
23+
if len(storage) != 0 {
24+
est := gjson.Get(storage[0], "@timestamp").String()
2525
log.Printf("Old document in cache: %s", est)
2626
}
2727
time.Sleep(60 * time.Second)
@@ -31,8 +31,8 @@ func Status() {
3131

3232
func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
3333
var elements []string
34-
cacheStorageMutex.RLock()
35-
defer cacheStorageMutex.RUnlock()
34+
storageMutex.RLock()
35+
defer storageMutex.RUnlock()
3636

3737
cToBreak := 0
3838
ait := time.Now().UTC().Unix() - func() int64 {
@@ -43,8 +43,8 @@ func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
4343
return seconds
4444
}
4545
}()
46-
for i := len(CacheStorage) - 1; i >= 0; i-- {
47-
est := gjson.Get(CacheStorage[i], "@timestamp").String()
46+
for i := len(storage) - 1; i >= 0; i-- {
47+
est := gjson.Get(storage[i], "@timestamp").String()
4848
eit, err := time.Parse(time.RFC3339Nano, est)
4949
if err != nil {
5050
log.Printf("Could not parse @timestamp: %v", err)
@@ -61,23 +61,23 @@ func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
6161
var allCatch bool
6262
var oneCatch bool
6363
for _, of := range oneOf {
64-
oneCatch = evalElement(CacheStorage[i], of.Field, of.Operator, of.Value)
64+
oneCatch = evalElement(storage[i], of.Field, of.Operator, of.Value)
6565
if oneCatch {
6666
break
6767
}
6868
}
6969
for _, af := range allOf {
70-
allCatch = evalElement(CacheStorage[i], af.Field, af.Operator, af.Value)
70+
allCatch = evalElement(storage[i], af.Field, af.Operator, af.Value)
7171
if !allCatch {
7272
break
7373
}
7474
}
7575
if (len(allOf) == 0 || allCatch) && (len(oneOf) == 0 || oneCatch) {
76-
elements = append(elements, CacheStorage[i])
76+
elements = append(elements, storage[i])
7777
}
7878
}
7979
}
80-
80+
8181
return elements
8282
}
8383

@@ -97,9 +97,9 @@ func ProcessQueue() {
9797
go func() {
9898
for {
9999
l := <-logs
100-
cacheStorageMutex.Lock()
101-
CacheStorage = append(CacheStorage, l)
102-
cacheStorageMutex.Unlock()
100+
storageMutex.Lock()
101+
storage = append(storage, l)
102+
storageMutex.Unlock()
103103
}
104104
}()
105105
}
@@ -109,11 +109,11 @@ func Clean() {
109109
for {
110110
var clean bool
111111

112-
if len(CacheStorage) > 1 {
112+
if len(storage) > 1 {
113113
if utils.AssignedMemory >= 80 {
114114
clean = true
115115
} else {
116-
old := gjson.Get(CacheStorage[0], "@timestamp").String()
116+
old := gjson.Get(storage[0], "@timestamp").String()
117117
oldTime, err := time.Parse(time.RFC3339Nano, old)
118118
if err != nil {
119119
log.Printf("Could not parse old log timestamp. Cleaning up")
@@ -129,9 +129,9 @@ func Clean() {
129129
}
130130

131131
if clean {
132-
cacheStorageMutex.Lock()
133-
CacheStorage = CacheStorage[1:]
134-
cacheStorageMutex.Unlock()
132+
storageMutex.Lock()
133+
storage = storage[1:]
134+
storageMutex.Unlock()
135135
} else {
136136
time.Sleep(5 * time.Second)
137137
}

correlation/cache/cache_test.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
1-
package cache_test
1+
package cache
22

33
import (
4-
"testing"
5-
6-
7-
"github.com/utmstack/UTMStack/correlation/cache"
84
"github.com/utmstack/UTMStack/correlation/rules"
5+
"testing"
96
)
107

118
func TestSearch(t *testing.T) {
@@ -16,7 +13,7 @@ func TestSearch(t *testing.T) {
1613
`{"@timestamp":"2022-01-01T00:00:03.000Z","field1":"value1","field2":"value2"}`,
1714
`{"@timestamp":"2022-01-01T00:00:04.000Z","field1":"value1","field2":"value2"}`,
1815
}
19-
cache.CacheStorage = cacheStorage
16+
storage = cacheStorage
2017
allOf := []rules.AllOf{
2118
{Field: "field1", Operator: "==", Value: "value1"},
2219
}
@@ -31,7 +28,7 @@ func TestSearch(t *testing.T) {
3128
`{"@timestamp":"2022-01-01T00:00:01.000Z","field1":"value1","field2":"value2"}`,
3229
`{"@timestamp":"2022-01-01T00:00:00.000Z","field1":"value1","field2":"value2"}`,
3330
}
34-
result := cache.Search(allOf, oneOf, int64(seconds))
31+
result := Search(allOf, oneOf, int64(seconds))
3532
if len(result) != len(expected) {
3633
t.Errorf("Expected %d elements, but got %d", len(expected), len(result))
3734
}
@@ -40,4 +37,4 @@ func TestSearch(t *testing.T) {
4037
t.Errorf("Expected %s, but got %s", expected[i], r)
4138
}
4239
}
43-
}
40+
}

correlation/correlation/analyzer.go

Lines changed: 7 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,85 +1,25 @@
11
package correlation
22

33
import (
4-
"net"
5-
6-
"github.com/utmstack/UTMStack/correlation/geo"
7-
"github.com/utmstack/UTMStack/correlation/rules"
84
"github.com/tidwall/gjson"
5+
"github.com/utmstack/UTMStack/correlation/rules"
6+
"github.com/utmstack/UTMStack/correlation/utils"
97
)
108

11-
func processResponse(logs []string, rule rules.Rule, save []rules.SavedField, tmpLogs *[20][]map[string]string,
9+
func processResponse(logs []string, rule rules.Rule, save []utils.SavedField, tmpLogs *[20][]map[string]string,
1210
steps, step, minCount int) {
13-
if len(logs) >= func()int{
14-
switch minCount{
11+
if len(logs) >= func() int {
12+
switch minCount {
1513
case 0:
1614
return 1
1715
default:
1816
return minCount
1917
}
2018
}() {
2119
for _, l := range logs {
22-
var fields = map[string]string{
23-
"id": gjson.Get(l, "id").String(),
24-
}
25-
//User saved fields
26-
for _, save := range save {
27-
fields[save.Alias] = gjson.Get(l, save.Field).String()
28-
}
29-
// Try to resolve SourceHost if SourceIP exists but not SourceHost
30-
if fields["SourceHost"] == "" && fields["SourceIP"] != "" {
31-
host, _ := net.LookupHost(fields["SourceIP"])
32-
fields["SourceHost"] = host[0]
33-
}
34-
// Try to resolve DestinationHost if DestinationIP exists but not DestinationHost
35-
if fields["DestinationHost"] == "" && fields["DestinationIP"] != "" {
36-
host, _ := net.LookupHost(fields["DestinationIP"])
37-
fields["DestinationHost"] = host[0]
38-
}
39-
// Try to resolve SourceIP if SourceHost exists but not SourceIP
40-
if fields["SourceHost"] != "" && fields["SourceIP"] == "" {
41-
ip, _ := net.LookupIP(fields["SourceHost"])
42-
if len(ip) != 0 && ip[0].String() != "<nil>" {
43-
fields["SourceIP"] = ip[0].String()
44-
}
45-
}
46-
// Try to resolve DestinationIP if DestinationHost exists but not DestinationIP
47-
if fields["DestinationHost"] != "" && fields["DestinationIP"] == "" {
48-
ip, _ := net.LookupIP(fields["DestinationHost"])
49-
if len(ip) != 0 && ip[0].String() != "<nil>" {
50-
fields["DestinationIP"] = ip[0].String()
51-
}
52-
}
53-
// Try to geolocate SourceIP if exists
54-
if fields["SourceIP"] != "" {
55-
location := geo.Geolocate(fields["SourceIP"])
56-
fields["SourceCountry"] = location["country"]
57-
fields["SourceCountryCode"] = location["countryCode"]
58-
fields["SourceCity"] = location["city"]
59-
fields["SourceLat"] = location["latitude"]
60-
fields["SourceLon"] = location["longitude"]
61-
fields["SourceAccuracyRadius"] = location["accuracyRadius"]
62-
fields["SourceASN"] = location["asn"]
63-
fields["SourceASO"] = location["aso"]
64-
fields["SourceIsSatelliteProvider"] = location["isSatelliteProvider"]
65-
fields["SourceIsAnonymousProxy"] = location["isAnonymousProxy"]
66-
}
67-
// Try to geolocate DetinationIP if exists
68-
if fields["DestinationIP"] != "" {
69-
location := geo.Geolocate(fields["DestinationIP"])
70-
fields["DestinationCountry"] = location["country"]
71-
fields["DestinationCountryCode"] = location["countryCode"]
72-
fields["DestinationCity"] = location["city"]
73-
fields["DestinationLat"] = location["latitude"]
74-
fields["DestinationLon"] = location["longitude"]
75-
fields["DestinationAccuracyRadius"] = location["accuracyRadius"]
76-
fields["DestinationASN"] = location["asn"]
77-
fields["DestinationASO"] = location["aso"]
78-
fields["DestinationIsSatelliteProvider"] = location["isSatelliteProvider"]
79-
fields["DestinationIsAnonymousProxy"] = location["isAnonymousProxy"]
80-
}
20+
fields := utils.ExtractDetails(save, l)
8121

82-
// Alert in the last step or save data to next cicle
22+
// Alert in the last step or save data to the next iteration
8323
if steps-1 == step {
8424
// Use content of AlertName as Name if exists
8525
var alertName string

correlation/correlation/reporter.go

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,15 @@ package correlation
22

33
import (
44
"encoding/json"
5-
"strconv"
6-
"strings"
7-
"time"
8-
9-
"log"
10-
115
"github.com/google/uuid"
126
"github.com/levigross/grequests"
7+
"github.com/utmstack/UTMStack/correlation/geo"
138
"github.com/utmstack/UTMStack/correlation/search"
149
"github.com/utmstack/UTMStack/correlation/utils"
10+
"log"
11+
"strconv"
12+
"strings"
13+
"time"
1514
)
1615

1716
type Host struct {
@@ -65,13 +64,47 @@ type AlertFields struct {
6564
}
6665

6766
func Alert(name, severity, description, solution, category, tactic string, reference []string, dataType, dataSource string,
68-
details map[string]string) {
67+
fields map[string]string) {
68+
69+
// Try to geolocate SourceIP if exists
70+
if fields["SourceIP"] != "" {
71+
location := geo.Geolocate(fields["SourceIP"])
72+
if len(location) != 0 {
73+
fields["SourceCountry"] = location["country"]
74+
fields["SourceCountryCode"] = location["countryCode"]
75+
fields["SourceCity"] = location["city"]
76+
fields["SourceLat"] = location["latitude"]
77+
fields["SourceLon"] = location["longitude"]
78+
fields["SourceAccuracyRadius"] = location["accuracyRadius"]
79+
fields["SourceASN"] = location["asn"]
80+
fields["SourceASO"] = location["aso"]
81+
fields["SourceIsSatelliteProvider"] = location["isSatelliteProvider"]
82+
fields["SourceIsAnonymousProxy"] = location["isAnonymousProxy"]
83+
}
84+
}
85+
86+
// Try to geolocate DestinationIP if exists
87+
if fields["DestinationIP"] != "" {
88+
location := geo.Geolocate(fields["DestinationIP"])
89+
if len(location) != 0 {
90+
fields["DestinationCountry"] = location["country"]
91+
fields["DestinationCountryCode"] = location["countryCode"]
92+
fields["DestinationCity"] = location["city"]
93+
fields["DestinationLat"] = location["latitude"]
94+
fields["DestinationLon"] = location["longitude"]
95+
fields["DestinationAccuracyRadius"] = location["accuracyRadius"]
96+
fields["DestinationASN"] = location["asn"]
97+
fields["DestinationASO"] = location["aso"]
98+
fields["DestinationIsSatelliteProvider"] = location["isSatelliteProvider"]
99+
fields["DestinationIsAnonymousProxy"] = location["isAnonymousProxy"]
100+
}
101+
}
69102

70103
log.Printf("Reporting alert: %s", name)
71104

72-
if !UpdateAlert(name, severity, details) {
105+
if !UpdateAlert(name, severity, fields) {
73106
NewAlert(name, severity, description, solution, category, tactic, reference, dataType, dataSource,
74-
details)
107+
fields)
75108
}
76109
}
77110

@@ -208,11 +241,12 @@ func UpdateAlert(name, severity string, details map[string]string) bool {
208241
},
209242
},
210243
})
211-
_ = r.Close()
212244
if err != nil {
213245
log.Printf("Could not update existent alert: %v", err)
214246
return false
215247
}
248+
249+
_ = r.Close()
216250
}
217251
}
218252
}

correlation/docs/docs.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ const docTemplate = `{
1010
"description": "{{escape .Description}}",
1111
"title": "{{.Title}}",
1212
"contact": {
13-
"name": "Osmany Montero",
14-
"email": "osmany@quantfall.com"
13+
"name": "UTMStack LLC",
14+
"email": "contact@utmstack.com"
1515
},
1616
"license": {
17-
"name": "Private"
17+
"name": "AGPLv3"
1818
},
1919
"version": "{{.Version}}"
2020
},
@@ -47,7 +47,7 @@ var SwaggerInfo = &swag.Spec{
4747
BasePath: "/v1",
4848
Schemes: []string{},
4949
Title: "UTMStack's Correlation Engine",
50-
Description: "Rules based correlation engine for UTMStack.",
50+
Description: "Rules-based correlation engine for UTMStack.",
5151
InfoInstanceName: "swagger",
5252
SwaggerTemplate: docTemplate,
5353
LeftDelim: "{{",

0 commit comments

Comments
 (0)