Skip to content

Commit f628b12

Browse files
authored
Merge pull request #1036 from utmstack/bugfix/10.5.20/update-agent-hostname
2 parents ae38b7a + 4f4db77 commit f628b12

32 files changed

+581
-528
lines changed

correlation/Dockerfile

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
FROM ubuntu:22.04
2-
RUN apt update
3-
RUN apt install -y ca-certificates git
1+
FROM ubuntu:24.04
2+
RUN apt-get update
3+
RUN apt-get install -y ca-certificates git wget
44
COPY correlation /app/
55
COPY docs/swagger.json /app/docs/
66
COPY docs/swagger.yaml /app/docs/
@@ -9,4 +9,13 @@ COPY run.sh /
99
RUN chmod +x /app/correlation
1010
RUN chmod +x /run.sh
1111
RUN update-ca-certificates
12+
RUN wget -O /app/asn-blocks-v4.csv https://cdn.utmstack.com/geoip/asn-blocks-v4.csv
13+
RUN wget -O /app/asn-blocks-v6.csv https://cdn.utmstack.com/geoip/asn-blocks-v6.csv
14+
RUN wget -O /app/blocks-v4.csv https://cdn.utmstack.com/geoip/blocks-v4.csv
15+
RUN wget -O /app/blocks-v6.csv https://cdn.utmstack.com/geoip/blocks-v6.csv
16+
RUN wget -O /app/locations-en.csv https://cdn.utmstack.com/geoip/locations-en.csv
17+
RUN wget -O /app/ip_blocklist.list https://intelligence.threatwinds.com/feeds/public/ip/cumulative.list
18+
RUN wget -O /app/domain_blocklist.list https://intelligence.threatwinds.com/feeds/public/domain/cumulative.list
19+
RUN wget -O /app/hostname_blocklist.list https://intelligence.threatwinds.com/feeds/public/hostname/cumulative.list
20+
1221
ENTRYPOINT [ "/run.sh" ]

correlation/api/newLogHandler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package api
33
import (
44
"encoding/json"
55
"fmt"
6+
"github.com/utmstack/UTMStack/correlation/ti"
67
"io"
78
"log"
89
"net/http"
@@ -74,6 +75,7 @@ func NewLog(c *gin.Context) {
7475
}
7576

7677
cache.AddToCache(l)
78+
ti.Enqueue(l)
7779
search.AddToQueue(l)
7880
response["status"] = "queued"
7981
c.JSON(http.StatusOK, response)

correlation/cache/cache.go

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,15 @@ import (
1313

1414
const bufferSize int = 1000000
1515

16-
var cacheStorageMutex = &sync.RWMutex{}
16+
var storageMutex = &sync.RWMutex{}
1717

18-
var CacheStorage []string
18+
var storage []string
1919

2020
func Status() {
2121
for {
22-
log.Printf("Logs in cache: %v", len(CacheStorage))
23-
if len(CacheStorage) != 0 {
24-
est := gjson.Get(CacheStorage[0], "@timestamp").String()
22+
log.Printf("Logs in cache: %v", len(storage))
23+
if len(storage) != 0 {
24+
est := gjson.Get(storage[0], "@timestamp").String()
2525
log.Printf("Old document in cache: %s", est)
2626
}
2727
time.Sleep(60 * time.Second)
@@ -31,8 +31,8 @@ func Status() {
3131

3232
func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
3333
var elements []string
34-
cacheStorageMutex.RLock()
35-
defer cacheStorageMutex.RUnlock()
34+
storageMutex.RLock()
35+
defer storageMutex.RUnlock()
3636

3737
cToBreak := 0
3838
ait := time.Now().UTC().Unix() - func() int64 {
@@ -43,8 +43,8 @@ func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
4343
return seconds
4444
}
4545
}()
46-
for i := len(CacheStorage) - 1; i >= 0; i-- {
47-
est := gjson.Get(CacheStorage[i], "@timestamp").String()
46+
for i := len(storage) - 1; i >= 0; i-- {
47+
est := gjson.Get(storage[i], "@timestamp").String()
4848
eit, err := time.Parse(time.RFC3339Nano, est)
4949
if err != nil {
5050
log.Printf("Could not parse @timestamp: %v", err)
@@ -61,23 +61,23 @@ func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
6161
var allCatch bool
6262
var oneCatch bool
6363
for _, of := range oneOf {
64-
oneCatch = evalElement(CacheStorage[i], of.Field, of.Operator, of.Value)
64+
oneCatch = evalElement(storage[i], of.Field, of.Operator, of.Value)
6565
if oneCatch {
6666
break
6767
}
6868
}
6969
for _, af := range allOf {
70-
allCatch = evalElement(CacheStorage[i], af.Field, af.Operator, af.Value)
70+
allCatch = evalElement(storage[i], af.Field, af.Operator, af.Value)
7171
if !allCatch {
7272
break
7373
}
7474
}
7575
if (len(allOf) == 0 || allCatch) && (len(oneOf) == 0 || oneCatch) {
76-
elements = append(elements, CacheStorage[i])
76+
elements = append(elements, storage[i])
7777
}
7878
}
7979
}
80-
80+
8181
return elements
8282
}
8383

@@ -97,9 +97,9 @@ func ProcessQueue() {
9797
go func() {
9898
for {
9999
l := <-logs
100-
cacheStorageMutex.Lock()
101-
CacheStorage = append(CacheStorage, l)
102-
cacheStorageMutex.Unlock()
100+
storageMutex.Lock()
101+
storage = append(storage, l)
102+
storageMutex.Unlock()
103103
}
104104
}()
105105
}
@@ -109,11 +109,11 @@ func Clean() {
109109
for {
110110
var clean bool
111111

112-
if len(CacheStorage) > 1 {
112+
if len(storage) > 1 {
113113
if utils.AssignedMemory >= 80 {
114114
clean = true
115115
} else {
116-
old := gjson.Get(CacheStorage[0], "@timestamp").String()
116+
old := gjson.Get(storage[0], "@timestamp").String()
117117
oldTime, err := time.Parse(time.RFC3339Nano, old)
118118
if err != nil {
119119
log.Printf("Could not parse old log timestamp. Cleaning up")
@@ -129,9 +129,9 @@ func Clean() {
129129
}
130130

131131
if clean {
132-
cacheStorageMutex.Lock()
133-
CacheStorage = CacheStorage[1:]
134-
cacheStorageMutex.Unlock()
132+
storageMutex.Lock()
133+
storage = storage[1:]
134+
storageMutex.Unlock()
135135
} else {
136136
time.Sleep(5 * time.Second)
137137
}

correlation/cache/cache_test.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
1-
package cache_test
1+
package cache
22

33
import (
4-
"testing"
5-
6-
7-
"github.com/utmstack/UTMStack/correlation/cache"
84
"github.com/utmstack/UTMStack/correlation/rules"
5+
"testing"
96
)
107

118
func TestSearch(t *testing.T) {
@@ -16,7 +13,7 @@ func TestSearch(t *testing.T) {
1613
`{"@timestamp":"2022-01-01T00:00:03.000Z","field1":"value1","field2":"value2"}`,
1714
`{"@timestamp":"2022-01-01T00:00:04.000Z","field1":"value1","field2":"value2"}`,
1815
}
19-
cache.CacheStorage = cacheStorage
16+
storage = cacheStorage
2017
allOf := []rules.AllOf{
2118
{Field: "field1", Operator: "==", Value: "value1"},
2219
}
@@ -31,7 +28,7 @@ func TestSearch(t *testing.T) {
3128
`{"@timestamp":"2022-01-01T00:00:01.000Z","field1":"value1","field2":"value2"}`,
3229
`{"@timestamp":"2022-01-01T00:00:00.000Z","field1":"value1","field2":"value2"}`,
3330
}
34-
result := cache.Search(allOf, oneOf, int64(seconds))
31+
result := Search(allOf, oneOf, int64(seconds))
3532
if len(result) != len(expected) {
3633
t.Errorf("Expected %d elements, but got %d", len(expected), len(result))
3734
}
@@ -40,4 +37,4 @@ func TestSearch(t *testing.T) {
4037
t.Errorf("Expected %s, but got %s", expected[i], r)
4138
}
4239
}
43-
}
40+
}

correlation/cache/operators.go

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,14 @@ import (
1212

1313
func inCIDR(addr, network string) (bool, error) {
1414
_, subnet, err := net.ParseCIDR(network)
15-
if err == nil {
16-
ip := net.ParseIP(addr)
17-
if ip != nil {
18-
if subnet.Contains(ip) {
19-
return true, nil
20-
}
21-
}
15+
if err != nil {
16+
return false, fmt.Errorf("invalid CIDR")
17+
}
18+
ip := net.ParseIP(addr)
19+
if ip == nil {
2220
return false, fmt.Errorf("invalid IP address")
2321
}
24-
return false, err
22+
return subnet.Contains(ip), nil
2523
}
2624

2725
func equal(val1, val2 string) bool {
@@ -54,25 +52,25 @@ func endWith(str, suff string) bool {
5452
return strings.HasSuffix(str, suff)
5553
}
5654

57-
func expresion(exp, str string) (bool, error) {
55+
func expression(exp, str string) (bool, error) {
5856
re, err := regexp.Compile(exp)
59-
if err == nil {
60-
if re.MatchString(str) {
61-
return true, nil
62-
}
57+
if err != nil {
58+
return false, err
6359
}
64-
return false, err
60+
return re.MatchString(str), nil
6561
}
6662

6763
func parseFloats(val1, val2 string) (float64, float64, error) {
68-
f1, err1 := strconv.ParseFloat(val1, 64)
69-
if err1 != nil {
70-
return 0, 0, err1
64+
f1, err := strconv.ParseFloat(val1, 64)
65+
if err != nil {
66+
return 0, 0, err
7167
}
72-
f2, err2 := strconv.ParseFloat(val2, 64)
73-
if err2 != nil {
74-
return 0, 0, err2
68+
69+
f2, err := strconv.ParseFloat(val2, 64)
70+
if err != nil {
71+
return 0, 0, err
7572
}
73+
7674
return f1, f2, nil
7775
}
7876

@@ -105,17 +103,17 @@ func compare(operator, val1, val2 string) bool {
105103
case "not end with":
106104
return !endWith(val1, val2)
107105
case "regexp":
108-
matched, err := expresion(val2, val1)
106+
matched, err := expression(val2, val1)
109107
if err != nil {
110108
return false
111109
}
112110
return matched
113111
case "not regexp":
114-
matched, err := expresion(val2, val1)
112+
matched, err := expression(val2, val1)
115113
if err != nil {
116114
return false
117115
}
118-
return matched
116+
return !matched
119117
case "<":
120118
f1, f2, err := parseFloats(val1, val2)
121119
if err != nil {
@@ -144,24 +142,24 @@ func compare(operator, val1, val2 string) bool {
144142
return true
145143
case "in cidr":
146144
matched, err := inCIDR(val1, val2)
147-
if err == nil {
148-
return matched
145+
if err != nil {
146+
return false
149147
}
150-
return false
148+
return matched
151149
case "not in cidr":
152150
matched, err := inCIDR(val1, val2)
153-
if err == nil {
154-
return !matched
151+
if err != nil {
152+
return false
155153
}
156-
return false
154+
return !matched
157155
default:
158156
return false
159157
}
160158
}
161159

162160
func evalElement(elem, field, operator, value string) bool {
163-
if gjson.Get(elem, field).Exists() {
164-
return compare(operator, gjson.Get(elem, field).String(), value)
161+
if elem := gjson.Get(elem, field); elem.Exists() {
162+
return compare(operator, elem.String(), value)
165163
} else if operator == "not exist" {
166164
return true
167165
}

correlation/config.yml.prod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
rulesFolder: /app/rulesets/
2-
geoipFolder: /app/geosets/
32
elasticsearch: "http://ELASTICSEARCH_HOST:ELASTICSEARCH_PORT"
43
postgresql:
54
server: POSTGRESQL_HOST

0 commit comments

Comments
 (0)