Skip to content

Commit a9df5cf

Browse files
committed
Refactor geo updates and add new Threat Intelligence logic
Reorganized the GeoIP update logic into a new `Load` function, improving code maintainability and clarity. Introduced Threat Intelligence feeds loading and blocklist handling via new methods in the `ti` package. Updated storage handling and various utility methods to streamline operations and improve consistency across modules.
1 parent 01901db commit a9df5cf

File tree

24 files changed

+518
-452
lines changed

24 files changed

+518
-452
lines changed

correlation/Dockerfile

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
FROM ubuntu:22.04
2-
RUN apt update
3-
RUN apt install -y ca-certificates git
1+
FROM ubuntu:24.04
2+
RUN apt-get update
3+
RUN apt-get install -y ca-certificates git wget
44
COPY correlation /app/
55
COPY docs/swagger.json /app/docs/
66
COPY docs/swagger.yaml /app/docs/
@@ -9,4 +9,13 @@ COPY run.sh /
99
RUN chmod +x /app/correlation
1010
RUN chmod +x /run.sh
1111
RUN update-ca-certificates
12+
RUN wget -O /app/asn-blocks-v4.csv https://cdn.utmstack.com/geoip/asn-blocks-v4.csv
13+
RUN wget -O /app/asn-blocks-v6.csv https://cdn.utmstack.com/geoip/asn-blocks-v6.csv
14+
RUN wget -O /app/blocks-v4.csv https://cdn.utmstack.com/geoip/blocks-v4.csv
15+
RUN wget -O /app/blocks-v6.csv https://cdn.utmstack.com/geoip/blocks-v6.csv
16+
RUN wget -O /app/locations-en.csv https://cdn.utmstack.com/geoip/locations-en.csv
17+
RUN wget -O /app/ip_blocklist.list https://intelligence.threatwinds.com/feeds/public/ip/cumulative.list
18+
RUN wget -O /app/domain_blocklist.list https://intelligence.threatwinds.com/feeds/public/domain/cumulative.list
19+
RUN wget -O /app/hostname_blocklist.list https://intelligence.threatwinds.com/feeds/public/hostname/cumulative.list
20+
1221
ENTRYPOINT [ "/run.sh" ]

correlation/api/newLogHandler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package api
33
import (
44
"encoding/json"
55
"fmt"
6+
"github.com/utmstack/UTMStack/correlation/ti"
67
"io"
78
"log"
89
"net/http"
@@ -74,6 +75,7 @@ func NewLog(c *gin.Context) {
7475
}
7576

7677
cache.AddToCache(l)
78+
ti.Enqueue(l)
7779
search.AddToQueue(l)
7880
response["status"] = "queued"
7981
c.JSON(http.StatusOK, response)

correlation/cache/cache.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ import (
1313

1414
const bufferSize int = 1000000
1515

16-
var cacheStorageMutex = &sync.RWMutex{}
16+
var storageMutex = &sync.RWMutex{}
1717

18-
var CacheStorage []string
18+
var storage []string
1919

2020
func Status() {
2121
for {
22-
log.Printf("Logs in cache: %v", len(CacheStorage))
23-
if len(CacheStorage) != 0 {
24-
est := gjson.Get(CacheStorage[0], "@timestamp").String()
22+
log.Printf("Logs in cache: %v", len(storage))
23+
if len(storage) != 0 {
24+
est := gjson.Get(storage[0], "@timestamp").String()
2525
log.Printf("Old document in cache: %s", est)
2626
}
2727
time.Sleep(60 * time.Second)
@@ -31,8 +31,8 @@ func Status() {
3131

3232
func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
3333
var elements []string
34-
cacheStorageMutex.RLock()
35-
defer cacheStorageMutex.RUnlock()
34+
storageMutex.RLock()
35+
defer storageMutex.RUnlock()
3636

3737
cToBreak := 0
3838
ait := time.Now().UTC().Unix() - func() int64 {
@@ -43,8 +43,8 @@ func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
4343
return seconds
4444
}
4545
}()
46-
for i := len(CacheStorage) - 1; i >= 0; i-- {
47-
est := gjson.Get(CacheStorage[i], "@timestamp").String()
46+
for i := len(storage) - 1; i >= 0; i-- {
47+
est := gjson.Get(storage[i], "@timestamp").String()
4848
eit, err := time.Parse(time.RFC3339Nano, est)
4949
if err != nil {
5050
log.Printf("Could not parse @timestamp: %v", err)
@@ -61,23 +61,23 @@ func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
6161
var allCatch bool
6262
var oneCatch bool
6363
for _, of := range oneOf {
64-
oneCatch = evalElement(CacheStorage[i], of.Field, of.Operator, of.Value)
64+
oneCatch = evalElement(storage[i], of.Field, of.Operator, of.Value)
6565
if oneCatch {
6666
break
6767
}
6868
}
6969
for _, af := range allOf {
70-
allCatch = evalElement(CacheStorage[i], af.Field, af.Operator, af.Value)
70+
allCatch = evalElement(storage[i], af.Field, af.Operator, af.Value)
7171
if !allCatch {
7272
break
7373
}
7474
}
7575
if (len(allOf) == 0 || allCatch) && (len(oneOf) == 0 || oneCatch) {
76-
elements = append(elements, CacheStorage[i])
76+
elements = append(elements, storage[i])
7777
}
7878
}
7979
}
80-
80+
8181
return elements
8282
}
8383

@@ -97,9 +97,9 @@ func ProcessQueue() {
9797
go func() {
9898
for {
9999
l := <-logs
100-
cacheStorageMutex.Lock()
101-
CacheStorage = append(CacheStorage, l)
102-
cacheStorageMutex.Unlock()
100+
storageMutex.Lock()
101+
storage = append(storage, l)
102+
storageMutex.Unlock()
103103
}
104104
}()
105105
}
@@ -109,11 +109,11 @@ func Clean() {
109109
for {
110110
var clean bool
111111

112-
if len(CacheStorage) > 1 {
112+
if len(storage) > 1 {
113113
if utils.AssignedMemory >= 80 {
114114
clean = true
115115
} else {
116-
old := gjson.Get(CacheStorage[0], "@timestamp").String()
116+
old := gjson.Get(storage[0], "@timestamp").String()
117117
oldTime, err := time.Parse(time.RFC3339Nano, old)
118118
if err != nil {
119119
log.Printf("Could not parse old log timestamp. Cleaning up")
@@ -129,9 +129,9 @@ func Clean() {
129129
}
130130

131131
if clean {
132-
cacheStorageMutex.Lock()
133-
CacheStorage = CacheStorage[1:]
134-
cacheStorageMutex.Unlock()
132+
storageMutex.Lock()
133+
storage = storage[1:]
134+
storageMutex.Unlock()
135135
} else {
136136
time.Sleep(5 * time.Second)
137137
}

correlation/cache/cache_test.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
1-
package cache_test
1+
package cache
22

33
import (
4-
"testing"
5-
6-
7-
"github.com/utmstack/UTMStack/correlation/cache"
84
"github.com/utmstack/UTMStack/correlation/rules"
5+
"testing"
96
)
107

118
func TestSearch(t *testing.T) {
@@ -16,7 +13,7 @@ func TestSearch(t *testing.T) {
1613
`{"@timestamp":"2022-01-01T00:00:03.000Z","field1":"value1","field2":"value2"}`,
1714
`{"@timestamp":"2022-01-01T00:00:04.000Z","field1":"value1","field2":"value2"}`,
1815
}
19-
cache.CacheStorage = cacheStorage
16+
storage = cacheStorage
2017
allOf := []rules.AllOf{
2118
{Field: "field1", Operator: "==", Value: "value1"},
2219
}
@@ -31,7 +28,7 @@ func TestSearch(t *testing.T) {
3128
`{"@timestamp":"2022-01-01T00:00:01.000Z","field1":"value1","field2":"value2"}`,
3229
`{"@timestamp":"2022-01-01T00:00:00.000Z","field1":"value1","field2":"value2"}`,
3330
}
34-
result := cache.Search(allOf, oneOf, int64(seconds))
31+
result := Search(allOf, oneOf, int64(seconds))
3532
if len(result) != len(expected) {
3633
t.Errorf("Expected %d elements, but got %d", len(expected), len(result))
3734
}
@@ -40,4 +37,4 @@ func TestSearch(t *testing.T) {
4037
t.Errorf("Expected %s, but got %s", expected[i], r)
4138
}
4239
}
43-
}
40+
}

correlation/correlation/analyzer.go

Lines changed: 7 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,85 +1,25 @@
11
package correlation
22

33
import (
4-
"net"
5-
6-
"github.com/utmstack/UTMStack/correlation/geo"
7-
"github.com/utmstack/UTMStack/correlation/rules"
84
"github.com/tidwall/gjson"
5+
"github.com/utmstack/UTMStack/correlation/rules"
6+
"github.com/utmstack/UTMStack/correlation/utils"
97
)
108

11-
func processResponse(logs []string, rule rules.Rule, save []rules.SavedField, tmpLogs *[20][]map[string]string,
9+
func processResponse(logs []string, rule rules.Rule, save []utils.SavedField, tmpLogs *[20][]map[string]string,
1210
steps, step, minCount int) {
13-
if len(logs) >= func()int{
14-
switch minCount{
11+
if len(logs) >= func() int {
12+
switch minCount {
1513
case 0:
1614
return 1
1715
default:
1816
return minCount
1917
}
2018
}() {
2119
for _, l := range logs {
22-
var fields = map[string]string{
23-
"id": gjson.Get(l, "id").String(),
24-
}
25-
//User saved fields
26-
for _, save := range save {
27-
fields[save.Alias] = gjson.Get(l, save.Field).String()
28-
}
29-
// Try to resolve SourceHost if SourceIP exists but not SourceHost
30-
if fields["SourceHost"] == "" && fields["SourceIP"] != "" {
31-
host, _ := net.LookupHost(fields["SourceIP"])
32-
fields["SourceHost"] = host[0]
33-
}
34-
// Try to resolve DestinationHost if DestinationIP exists but not DestinationHost
35-
if fields["DestinationHost"] == "" && fields["DestinationIP"] != "" {
36-
host, _ := net.LookupHost(fields["DestinationIP"])
37-
fields["DestinationHost"] = host[0]
38-
}
39-
// Try to resolve SourceIP if SourceHost exists but not SourceIP
40-
if fields["SourceHost"] != "" && fields["SourceIP"] == "" {
41-
ip, _ := net.LookupIP(fields["SourceHost"])
42-
if len(ip) != 0 && ip[0].String() != "<nil>" {
43-
fields["SourceIP"] = ip[0].String()
44-
}
45-
}
46-
// Try to resolve DestinationIP if DestinationHost exists but not DestinationIP
47-
if fields["DestinationHost"] != "" && fields["DestinationIP"] == "" {
48-
ip, _ := net.LookupIP(fields["DestinationHost"])
49-
if len(ip) != 0 && ip[0].String() != "<nil>" {
50-
fields["DestinationIP"] = ip[0].String()
51-
}
52-
}
53-
// Try to geolocate SourceIP if exists
54-
if fields["SourceIP"] != "" {
55-
location := geo.Geolocate(fields["SourceIP"])
56-
fields["SourceCountry"] = location["country"]
57-
fields["SourceCountryCode"] = location["countryCode"]
58-
fields["SourceCity"] = location["city"]
59-
fields["SourceLat"] = location["latitude"]
60-
fields["SourceLon"] = location["longitude"]
61-
fields["SourceAccuracyRadius"] = location["accuracyRadius"]
62-
fields["SourceASN"] = location["asn"]
63-
fields["SourceASO"] = location["aso"]
64-
fields["SourceIsSatelliteProvider"] = location["isSatelliteProvider"]
65-
fields["SourceIsAnonymousProxy"] = location["isAnonymousProxy"]
66-
}
67-
// Try to geolocate DetinationIP if exists
68-
if fields["DestinationIP"] != "" {
69-
location := geo.Geolocate(fields["DestinationIP"])
70-
fields["DestinationCountry"] = location["country"]
71-
fields["DestinationCountryCode"] = location["countryCode"]
72-
fields["DestinationCity"] = location["city"]
73-
fields["DestinationLat"] = location["latitude"]
74-
fields["DestinationLon"] = location["longitude"]
75-
fields["DestinationAccuracyRadius"] = location["accuracyRadius"]
76-
fields["DestinationASN"] = location["asn"]
77-
fields["DestinationASO"] = location["aso"]
78-
fields["DestinationIsSatelliteProvider"] = location["isSatelliteProvider"]
79-
fields["DestinationIsAnonymousProxy"] = location["isAnonymousProxy"]
80-
}
20+
fields := utils.ExtractDetails(save, l)
8121

82-
// Alert in the last step or save data to next cicle
22+
// Alert in the last step or save data to the next iteration
8323
if steps-1 == step {
8424
// Use content of AlertName as Name if exists
8525
var alertName string

correlation/correlation/reporter.go

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,15 @@ package correlation
22

33
import (
44
"encoding/json"
5-
"strconv"
6-
"strings"
7-
"time"
8-
9-
"log"
10-
115
"github.com/google/uuid"
126
"github.com/levigross/grequests"
7+
"github.com/utmstack/UTMStack/correlation/geo"
138
"github.com/utmstack/UTMStack/correlation/search"
149
"github.com/utmstack/UTMStack/correlation/utils"
10+
"log"
11+
"strconv"
12+
"strings"
13+
"time"
1514
)
1615

1716
type Host struct {
@@ -65,13 +64,47 @@ type AlertFields struct {
6564
}
6665

6766
func Alert(name, severity, description, solution, category, tactic string, reference []string, dataType, dataSource string,
68-
details map[string]string) {
67+
fields map[string]string) {
68+
69+
// Try to geolocate SourceIP if exists
70+
if fields["SourceIP"] != "" {
71+
location := geo.Geolocate(fields["SourceIP"])
72+
if len(location) != 0 {
73+
fields["SourceCountry"] = location["country"]
74+
fields["SourceCountryCode"] = location["countryCode"]
75+
fields["SourceCity"] = location["city"]
76+
fields["SourceLat"] = location["latitude"]
77+
fields["SourceLon"] = location["longitude"]
78+
fields["SourceAccuracyRadius"] = location["accuracyRadius"]
79+
fields["SourceASN"] = location["asn"]
80+
fields["SourceASO"] = location["aso"]
81+
fields["SourceIsSatelliteProvider"] = location["isSatelliteProvider"]
82+
fields["SourceIsAnonymousProxy"] = location["isAnonymousProxy"]
83+
}
84+
}
85+
86+
// Try to geolocate DestinationIP if exists
87+
if fields["DestinationIP"] != "" {
88+
location := geo.Geolocate(fields["DestinationIP"])
89+
if len(location) != 0 {
90+
fields["DestinationCountry"] = location["country"]
91+
fields["DestinationCountryCode"] = location["countryCode"]
92+
fields["DestinationCity"] = location["city"]
93+
fields["DestinationLat"] = location["latitude"]
94+
fields["DestinationLon"] = location["longitude"]
95+
fields["DestinationAccuracyRadius"] = location["accuracyRadius"]
96+
fields["DestinationASN"] = location["asn"]
97+
fields["DestinationASO"] = location["aso"]
98+
fields["DestinationIsSatelliteProvider"] = location["isSatelliteProvider"]
99+
fields["DestinationIsAnonymousProxy"] = location["isAnonymousProxy"]
100+
}
101+
}
69102

70103
log.Printf("Reporting alert: %s", name)
71104

72-
if !UpdateAlert(name, severity, details) {
105+
if !UpdateAlert(name, severity, fields) {
73106
NewAlert(name, severity, description, solution, category, tactic, reference, dataType, dataSource,
74-
details)
107+
fields)
75108
}
76109
}
77110

@@ -208,11 +241,12 @@ func UpdateAlert(name, severity string, details map[string]string) bool {
208241
},
209242
},
210243
})
211-
_ = r.Close()
212244
if err != nil {
213245
log.Printf("Could not update existent alert: %v", err)
214246
return false
215247
}
248+
249+
_ = r.Close()
216250
}
217251
}
218252
}

correlation/docs/docs.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ const docTemplate = `{
1010
"description": "{{escape .Description}}",
1111
"title": "{{.Title}}",
1212
"contact": {
13-
"name": "Osmany Montero",
14-
"email": "osmany@quantfall.com"
13+
"name": "UTMStack LLC",
14+
"email": "contact@utmstack.com"
1515
},
1616
"license": {
17-
"name": "Private"
17+
"name": "AGPLv3"
1818
},
1919
"version": "{{.Version}}"
2020
},
@@ -47,7 +47,7 @@ var SwaggerInfo = &swag.Spec{
4747
BasePath: "/v1",
4848
Schemes: []string{},
4949
Title: "UTMStack's Correlation Engine",
50-
Description: "Rules based correlation engine for UTMStack.",
50+
Description: "Rules-based correlation engine for UTMStack.",
5151
InfoInstanceName: "swagger",
5252
SwaggerTemplate: docTemplate,
5353
LeftDelim: "{{",

0 commit comments

Comments
 (0)