Skip to content

Commit ae38b7a

Browse files
authored
Merge pull request #1035 from utmstack/bugfix/10.5.20/update-agent-hostname
Reverting correlation changes
2 parents 5595ed0 + d310c9a commit ae38b7a

File tree

27 files changed

+459
-520
lines changed

27 files changed

+459
-520
lines changed

correlation/Dockerfile

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
FROM ubuntu:24.04
2-
RUN apt-get update
3-
RUN apt-get install -y ca-certificates git wget
1+
FROM ubuntu:22.04
2+
RUN apt update
3+
RUN apt install -y ca-certificates git
44
COPY correlation /app/
55
COPY docs/swagger.json /app/docs/
66
COPY docs/swagger.yaml /app/docs/
@@ -9,13 +9,4 @@ COPY run.sh /
99
RUN chmod +x /app/correlation
1010
RUN chmod +x /run.sh
1111
RUN update-ca-certificates
12-
RUN wget -O /app/asn-blocks-v4.csv https://cdn.utmstack.com/geoip/asn-blocks-v4.csv
13-
RUN wget -O /app/asn-blocks-v6.csv https://cdn.utmstack.com/geoip/asn-blocks-v6.csv
14-
RUN wget -O /app/blocks-v4.csv https://cdn.utmstack.com/geoip/blocks-v4.csv
15-
RUN wget -O /app/blocks-v6.csv https://cdn.utmstack.com/geoip/blocks-v6.csv
16-
RUN wget -O /app/locations-en.csv https://cdn.utmstack.com/geoip/locations-en.csv
17-
RUN wget -O /app/ip_blocklist.list https://intelligence.threatwinds.com/feeds/public/ip/cumulative.list
18-
RUN wget -O /app/domain_blocklist.list https://intelligence.threatwinds.com/feeds/public/domain/cumulative.list
19-
RUN wget -O /app/hostname_blocklist.list https://intelligence.threatwinds.com/feeds/public/hostname/cumulative.list
20-
2112
ENTRYPOINT [ "/run.sh" ]

correlation/api/newLogHandler.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package api
33
import (
44
"encoding/json"
55
"fmt"
6-
"github.com/utmstack/UTMStack/correlation/ti"
76
"io"
87
"log"
98
"net/http"
@@ -75,7 +74,6 @@ func NewLog(c *gin.Context) {
7574
}
7675

7776
cache.AddToCache(l)
78-
ti.Enqueue(l)
7977
search.AddToQueue(l)
8078
response["status"] = "queued"
8179
c.JSON(http.StatusOK, response)

correlation/cache/cache.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ import (
1313

1414
const bufferSize int = 1000000
1515

16-
var storageMutex = &sync.RWMutex{}
16+
var cacheStorageMutex = &sync.RWMutex{}
1717

18-
var storage []string
18+
var CacheStorage []string
1919

2020
func Status() {
2121
for {
22-
log.Printf("Logs in cache: %v", len(storage))
23-
if len(storage) != 0 {
24-
est := gjson.Get(storage[0], "@timestamp").String()
22+
log.Printf("Logs in cache: %v", len(CacheStorage))
23+
if len(CacheStorage) != 0 {
24+
est := gjson.Get(CacheStorage[0], "@timestamp").String()
2525
log.Printf("Old document in cache: %s", est)
2626
}
2727
time.Sleep(60 * time.Second)
@@ -31,8 +31,8 @@ func Status() {
3131

3232
func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
3333
var elements []string
34-
storageMutex.RLock()
35-
defer storageMutex.RUnlock()
34+
cacheStorageMutex.RLock()
35+
defer cacheStorageMutex.RUnlock()
3636

3737
cToBreak := 0
3838
ait := time.Now().UTC().Unix() - func() int64 {
@@ -43,8 +43,8 @@ func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
4343
return seconds
4444
}
4545
}()
46-
for i := len(storage) - 1; i >= 0; i-- {
47-
est := gjson.Get(storage[i], "@timestamp").String()
46+
for i := len(CacheStorage) - 1; i >= 0; i-- {
47+
est := gjson.Get(CacheStorage[i], "@timestamp").String()
4848
eit, err := time.Parse(time.RFC3339Nano, est)
4949
if err != nil {
5050
log.Printf("Could not parse @timestamp: %v", err)
@@ -61,23 +61,23 @@ func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
6161
var allCatch bool
6262
var oneCatch bool
6363
for _, of := range oneOf {
64-
oneCatch = evalElement(storage[i], of.Field, of.Operator, of.Value)
64+
oneCatch = evalElement(CacheStorage[i], of.Field, of.Operator, of.Value)
6565
if oneCatch {
6666
break
6767
}
6868
}
6969
for _, af := range allOf {
70-
allCatch = evalElement(storage[i], af.Field, af.Operator, af.Value)
70+
allCatch = evalElement(CacheStorage[i], af.Field, af.Operator, af.Value)
7171
if !allCatch {
7272
break
7373
}
7474
}
7575
if (len(allOf) == 0 || allCatch) && (len(oneOf) == 0 || oneCatch) {
76-
elements = append(elements, storage[i])
76+
elements = append(elements, CacheStorage[i])
7777
}
7878
}
7979
}
80-
80+
8181
return elements
8282
}
8383

@@ -97,9 +97,9 @@ func ProcessQueue() {
9797
go func() {
9898
for {
9999
l := <-logs
100-
storageMutex.Lock()
101-
storage = append(storage, l)
102-
storageMutex.Unlock()
100+
cacheStorageMutex.Lock()
101+
CacheStorage = append(CacheStorage, l)
102+
cacheStorageMutex.Unlock()
103103
}
104104
}()
105105
}
@@ -109,11 +109,11 @@ func Clean() {
109109
for {
110110
var clean bool
111111

112-
if len(storage) > 1 {
112+
if len(CacheStorage) > 1 {
113113
if utils.AssignedMemory >= 80 {
114114
clean = true
115115
} else {
116-
old := gjson.Get(storage[0], "@timestamp").String()
116+
old := gjson.Get(CacheStorage[0], "@timestamp").String()
117117
oldTime, err := time.Parse(time.RFC3339Nano, old)
118118
if err != nil {
119119
log.Printf("Could not parse old log timestamp. Cleaning up")
@@ -129,9 +129,9 @@ func Clean() {
129129
}
130130

131131
if clean {
132-
storageMutex.Lock()
133-
storage = storage[1:]
134-
storageMutex.Unlock()
132+
cacheStorageMutex.Lock()
133+
CacheStorage = CacheStorage[1:]
134+
cacheStorageMutex.Unlock()
135135
} else {
136136
time.Sleep(5 * time.Second)
137137
}

correlation/cache/cache_test.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1-
package cache
1+
package cache_test
22

33
import (
4-
"github.com/utmstack/UTMStack/correlation/rules"
54
"testing"
5+
6+
7+
"github.com/utmstack/UTMStack/correlation/cache"
8+
"github.com/utmstack/UTMStack/correlation/rules"
69
)
710

811
func TestSearch(t *testing.T) {
@@ -13,7 +16,7 @@ func TestSearch(t *testing.T) {
1316
`{"@timestamp":"2022-01-01T00:00:03.000Z","field1":"value1","field2":"value2"}`,
1417
`{"@timestamp":"2022-01-01T00:00:04.000Z","field1":"value1","field2":"value2"}`,
1518
}
16-
storage = cacheStorage
19+
cache.CacheStorage = cacheStorage
1720
allOf := []rules.AllOf{
1821
{Field: "field1", Operator: "==", Value: "value1"},
1922
}
@@ -28,7 +31,7 @@ func TestSearch(t *testing.T) {
2831
`{"@timestamp":"2022-01-01T00:00:01.000Z","field1":"value1","field2":"value2"}`,
2932
`{"@timestamp":"2022-01-01T00:00:00.000Z","field1":"value1","field2":"value2"}`,
3033
}
31-
result := Search(allOf, oneOf, int64(seconds))
34+
result := cache.Search(allOf, oneOf, int64(seconds))
3235
if len(result) != len(expected) {
3336
t.Errorf("Expected %d elements, but got %d", len(expected), len(result))
3437
}
@@ -37,4 +40,4 @@ func TestSearch(t *testing.T) {
3740
t.Errorf("Expected %s, but got %s", expected[i], r)
3841
}
3942
}
40-
}
43+
}

correlation/correlation/analyzer.go

Lines changed: 67 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,85 @@
11
package correlation
22

33
import (
4-
"github.com/tidwall/gjson"
4+
"net"
5+
6+
"github.com/utmstack/UTMStack/correlation/geo"
57
"github.com/utmstack/UTMStack/correlation/rules"
6-
"github.com/utmstack/UTMStack/correlation/utils"
8+
"github.com/tidwall/gjson"
79
)
810

9-
func processResponse(logs []string, rule rules.Rule, save []utils.SavedField, tmpLogs *[20][]map[string]string,
11+
func processResponse(logs []string, rule rules.Rule, save []rules.SavedField, tmpLogs *[20][]map[string]string,
1012
steps, step, minCount int) {
11-
if len(logs) >= func() int {
12-
switch minCount {
13+
if len(logs) >= func()int{
14+
switch minCount{
1315
case 0:
1416
return 1
1517
default:
1618
return minCount
1719
}
1820
}() {
1921
for _, l := range logs {
20-
fields := utils.ExtractDetails(save, l)
22+
var fields = map[string]string{
23+
"id": gjson.Get(l, "id").String(),
24+
}
25+
//User saved fields
26+
for _, save := range save {
27+
fields[save.Alias] = gjson.Get(l, save.Field).String()
28+
}
29+
// Try to resolve SourceHost if SourceIP exists but not SourceHost
30+
if fields["SourceHost"] == "" && fields["SourceIP"] != "" {
31+
host, _ := net.LookupHost(fields["SourceIP"])
32+
fields["SourceHost"] = host[0]
33+
}
34+
// Try to resolve DestinationHost if DestinationIP exists but not DestinationHost
35+
if fields["DestinationHost"] == "" && fields["DestinationIP"] != "" {
36+
host, _ := net.LookupHost(fields["DestinationIP"])
37+
fields["DestinationHost"] = host[0]
38+
}
39+
// Try to resolve SourceIP if SourceHost exists but not SourceIP
40+
if fields["SourceHost"] != "" && fields["SourceIP"] == "" {
41+
ip, _ := net.LookupIP(fields["SourceHost"])
42+
if len(ip) != 0 && ip[0].String() != "<nil>" {
43+
fields["SourceIP"] = ip[0].String()
44+
}
45+
}
46+
// Try to resolve DestinationIP if DestinationHost exists but not DestinationIP
47+
if fields["DestinationHost"] != "" && fields["DestinationIP"] == "" {
48+
ip, _ := net.LookupIP(fields["DestinationHost"])
49+
if len(ip) != 0 && ip[0].String() != "<nil>" {
50+
fields["DestinationIP"] = ip[0].String()
51+
}
52+
}
53+
// Try to geolocate SourceIP if exists
54+
if fields["SourceIP"] != "" {
55+
location := geo.Geolocate(fields["SourceIP"])
56+
fields["SourceCountry"] = location["country"]
57+
fields["SourceCountryCode"] = location["countryCode"]
58+
fields["SourceCity"] = location["city"]
59+
fields["SourceLat"] = location["latitude"]
60+
fields["SourceLon"] = location["longitude"]
61+
fields["SourceAccuracyRadius"] = location["accuracyRadius"]
62+
fields["SourceASN"] = location["asn"]
63+
fields["SourceASO"] = location["aso"]
64+
fields["SourceIsSatelliteProvider"] = location["isSatelliteProvider"]
65+
fields["SourceIsAnonymousProxy"] = location["isAnonymousProxy"]
66+
}
67+
// Try to geolocate DetinationIP if exists
68+
if fields["DestinationIP"] != "" {
69+
location := geo.Geolocate(fields["DestinationIP"])
70+
fields["DestinationCountry"] = location["country"]
71+
fields["DestinationCountryCode"] = location["countryCode"]
72+
fields["DestinationCity"] = location["city"]
73+
fields["DestinationLat"] = location["latitude"]
74+
fields["DestinationLon"] = location["longitude"]
75+
fields["DestinationAccuracyRadius"] = location["accuracyRadius"]
76+
fields["DestinationASN"] = location["asn"]
77+
fields["DestinationASO"] = location["aso"]
78+
fields["DestinationIsSatelliteProvider"] = location["isSatelliteProvider"]
79+
fields["DestinationIsAnonymousProxy"] = location["isAnonymousProxy"]
80+
}
2181

22-
// Alert in the last step or save data to the next iteration
82+
// Alert in the last step or save data to next cicle
2383
if steps-1 == step {
2484
// Use content of AlertName as Name if exists
2585
var alertName string

correlation/correlation/reporter.go

Lines changed: 10 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@ package correlation
22

33
import (
44
"encoding/json"
5+
"strconv"
6+
"strings"
7+
"time"
8+
9+
"log"
10+
511
"github.com/google/uuid"
612
"github.com/levigross/grequests"
7-
"github.com/utmstack/UTMStack/correlation/geo"
813
"github.com/utmstack/UTMStack/correlation/search"
914
"github.com/utmstack/UTMStack/correlation/utils"
10-
"log"
11-
"strconv"
12-
"strings"
13-
"time"
1415
)
1516

1617
type Host struct {
@@ -64,47 +65,13 @@ type AlertFields struct {
6465
}
6566

6667
func Alert(name, severity, description, solution, category, tactic string, reference []string, dataType, dataSource string,
67-
fields map[string]string) {
68-
69-
// Try to geolocate SourceIP if exists
70-
if fields["SourceIP"] != "" {
71-
location := geo.Geolocate(fields["SourceIP"])
72-
if len(location) != 0 {
73-
fields["SourceCountry"] = location["country"]
74-
fields["SourceCountryCode"] = location["countryCode"]
75-
fields["SourceCity"] = location["city"]
76-
fields["SourceLat"] = location["latitude"]
77-
fields["SourceLon"] = location["longitude"]
78-
fields["SourceAccuracyRadius"] = location["accuracyRadius"]
79-
fields["SourceASN"] = location["asn"]
80-
fields["SourceASO"] = location["aso"]
81-
fields["SourceIsSatelliteProvider"] = location["isSatelliteProvider"]
82-
fields["SourceIsAnonymousProxy"] = location["isAnonymousProxy"]
83-
}
84-
}
85-
86-
// Try to geolocate DestinationIP if exists
87-
if fields["DestinationIP"] != "" {
88-
location := geo.Geolocate(fields["DestinationIP"])
89-
if len(location) != 0 {
90-
fields["DestinationCountry"] = location["country"]
91-
fields["DestinationCountryCode"] = location["countryCode"]
92-
fields["DestinationCity"] = location["city"]
93-
fields["DestinationLat"] = location["latitude"]
94-
fields["DestinationLon"] = location["longitude"]
95-
fields["DestinationAccuracyRadius"] = location["accuracyRadius"]
96-
fields["DestinationASN"] = location["asn"]
97-
fields["DestinationASO"] = location["aso"]
98-
fields["DestinationIsSatelliteProvider"] = location["isSatelliteProvider"]
99-
fields["DestinationIsAnonymousProxy"] = location["isAnonymousProxy"]
100-
}
101-
}
68+
details map[string]string) {
10269

10370
log.Printf("Reporting alert: %s", name)
10471

105-
if !UpdateAlert(name, severity, fields) {
72+
if !UpdateAlert(name, severity, details) {
10673
NewAlert(name, severity, description, solution, category, tactic, reference, dataType, dataSource,
107-
fields)
74+
details)
10875
}
10976
}
11077

@@ -241,12 +208,11 @@ func UpdateAlert(name, severity string, details map[string]string) bool {
241208
},
242209
},
243210
})
211+
_ = r.Close()
244212
if err != nil {
245213
log.Printf("Could not update existent alert: %v", err)
246214
return false
247215
}
248-
249-
_ = r.Close()
250216
}
251217
}
252218
}

correlation/docs/docs.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ const docTemplate = `{
1010
"description": "{{escape .Description}}",
1111
"title": "{{.Title}}",
1212
"contact": {
13-
"name": "UTMStack LLC",
14-
"email": "contact@utmstack.com"
13+
"name": "Osmany Montero",
14+
"email": "osmany@quantfall.com"
1515
},
1616
"license": {
17-
"name": "AGPLv3"
17+
"name": "Private"
1818
},
1919
"version": "{{.Version}}"
2020
},
@@ -47,7 +47,7 @@ var SwaggerInfo = &swag.Spec{
4747
BasePath: "/v1",
4848
Schemes: []string{},
4949
Title: "UTMStack's Correlation Engine",
50-
Description: "Rules-based correlation engine for UTMStack.",
50+
Description: "Rules based correlation engine for UTMStack.",
5151
InfoInstanceName: "swagger",
5252
SwaggerTemplate: docTemplate,
5353
LeftDelim: "{{",

0 commit comments

Comments
 (0)