Skip to content

Commit 3848c82

Browse files
committed
Update how events are retrieved using the Sophos-Central API.
1 parent 32aae65 commit 3848c82

File tree

3 files changed

+153
-32
lines changed

3 files changed

+153
-32
lines changed

sophos/configuration/const.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import "github.com/utmstack/UTMStack/sophos/utils"
44

55
const (
66
CORRELATIONURL = "http://correlation:8080/v1/newlog"
7+
AUTHURL = "https://id.sophos.com/api/v2/oauth2/token"
8+
WHOAMIURL = "https://api.central.sophos.com/whoami/v1"
79
)
810

911
func GetInternalKey() string {

sophos/processor/processor.go

Lines changed: 146 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,65 +4,181 @@ import (
44
"fmt"
55
"net/http"
66
"net/url"
7+
"time"
78

89
"github.com/threatwinds/logger"
10+
"github.com/utmstack/UTMStack/sophos/configuration"
911
"github.com/utmstack/UTMStack/sophos/utils"
1012
"github.com/utmstack/config-client-go/types"
1113
)
1214

1315
type SophosCentralProcessor struct {
14-
XApiKey string
15-
Authorization string
16-
ApiUrl string
16+
ClientID string
17+
ClientSecret string
18+
TenantID string
19+
DataRegion string
20+
AccessToken string
21+
ExpiresAt time.Time
1722
}
1823

19-
func GetSophosCentralProcessor(group types.ModuleGroup) SophosCentralProcessor {
24+
func getSophosCentralProcessor(group types.ModuleGroup) SophosCentralProcessor {
2025
sophosProcessor := SophosCentralProcessor{}
2126

2227
for _, cnf := range group.Configurations {
2328
switch cnf.ConfName {
24-
case "X-API-KEY":
25-
sophosProcessor.XApiKey = cnf.ConfValue
26-
case "Authorization":
27-
sophosProcessor.Authorization = cnf.ConfValue
28-
case "API Url":
29-
sophosProcessor.ApiUrl = cnf.ConfValue
29+
case "ClientID":
30+
sophosProcessor.ClientID = cnf.ConfValue
31+
case "ClientSecret":
32+
sophosProcessor.ClientSecret = cnf.ConfValue
3033
}
3134
}
3235
return sophosProcessor
3336
}
3437

35-
type EventAggregate struct {
36-
HasMore bool `json:"has_more"`
37-
Items []map[string]interface{} `json:"items"`
38-
NextCursor string `json:"next_cursor"`
39-
}
38+
func (p *SophosCentralProcessor) getAccessToken() (string, *logger.Error) {
39+
data := url.Values{}
40+
data.Set("grant_type", "client_credentials")
41+
data.Set("client_id", p.ClientID)
42+
data.Set("client_secret", p.ClientSecret)
43+
data.Set("scope", "token")
4044

41-
func (p *SophosCentralProcessor) GetLogs(group types.ModuleGroup, fromTime int) ([]TransformedLog, *logger.Error) {
42-
baseURL := p.ApiUrl + "/siem/v1/events"
45+
headers := map[string]string{
46+
"Content-Type": "application/x-www-form-urlencoded",
47+
}
4348

44-
u, parseerr := url.Parse(baseURL)
45-
if parseerr != nil {
46-
return nil, utils.Logger.ErrorF("error parsing URL params: %v", parseerr)
49+
response, _, err := utils.DoReq[map[string]any](configuration.AUTHURL, []byte(data.Encode()), http.MethodPost, headers)
50+
if err != nil {
51+
return "", utils.Logger.ErrorF("error making auth request: %v", err)
4752
}
4853

49-
params := url.Values{}
50-
params.Add("limit", "1000")
51-
params.Add("from_date", fmt.Sprintf("%d", fromTime))
54+
accessToken, ok := response["access_token"].(string)
55+
if !ok || accessToken == "" {
56+
return "", utils.Logger.ErrorF("access_token not found in response")
57+
}
5258

53-
u.RawQuery = params.Encode()
59+
expiresIn, ok := response["expires_in"].(float64)
60+
if !ok {
61+
return "", utils.Logger.ErrorF("expires_in not found in response")
62+
}
63+
64+
p.AccessToken = accessToken
65+
p.ExpiresAt = time.Now().Add(time.Duration(expiresIn) * time.Second)
66+
67+
return accessToken, nil
68+
}
69+
70+
type WhoamiResponse struct {
71+
ID string `json:"id"`
72+
ApiHosts ApiHosts `json:"apiHosts"`
73+
}
74+
type ApiHosts struct {
75+
Global string `json:"global"`
76+
DataRegion string `json:"dataRegion"`
77+
}
5478

79+
func (p *SophosCentralProcessor) getTenantInfo(accessToken string) *logger.Error {
5580
headers := map[string]string{
5681
"accept": "application/json",
57-
"Authorization": p.Authorization,
58-
"x-api-key": p.XApiKey,
82+
"Authorization": "Bearer " + accessToken,
5983
}
6084

61-
response, _, err := utils.DoReq[EventAggregate](u.String(), nil, http.MethodGet, headers)
85+
response, _, err := utils.DoReq[WhoamiResponse](configuration.WHOAMIURL, nil, http.MethodGet, headers)
6286
if err != nil {
63-
return nil, err
87+
return utils.Logger.ErrorF("error making whoami request: %v", err)
88+
}
89+
90+
if response.ID == "" {
91+
return utils.Logger.ErrorF("tenant ID not found in whoami response")
92+
}
93+
p.TenantID = response.ID
94+
95+
if response.ApiHosts.DataRegion == "" {
96+
return utils.Logger.ErrorF("dataRegion not found in whoami response")
97+
}
98+
p.DataRegion = response.ApiHosts.DataRegion
99+
100+
return nil
101+
}
102+
103+
func (p *SophosCentralProcessor) getValidAccessToken() (string, *logger.Error) {
104+
if p.AccessToken != "" && time.Now().Before(p.ExpiresAt) {
105+
return p.AccessToken, nil
64106
}
107+
return p.getAccessToken()
108+
}
65109

66-
logs := ETLProcess(response, group)
67-
return logs, nil
110+
type EventAggregate struct {
111+
Pages Pages `json:"pages"`
112+
Items []map[string]any `json:"items"`
113+
}
114+
115+
type Pages struct {
116+
FromKey string `json:"fromKey"`
117+
NextKey string `json:"nextKey"`
118+
Size int64 `json:"size"`
119+
MaxSize int64 `json:"maxSize"`
120+
}
121+
122+
func (p *SophosCentralProcessor) getLogs(fromTime int, nextKey string, group types.ModuleGroup) ([]TransformedLog, string, *logger.Error) {
123+
accessToken, err := p.getValidAccessToken()
124+
if err != nil {
125+
return nil, "", utils.Logger.ErrorF("error getting access token: %v", err)
126+
}
127+
128+
if p.TenantID == "" || p.DataRegion == "" {
129+
if err := p.getTenantInfo(accessToken); err != nil {
130+
return nil, "", utils.Logger.ErrorF("error getting tenant information: %v", err)
131+
}
132+
}
133+
134+
var aggregatedEvents EventAggregate
135+
aggregatedEvents.Items = make([]map[string]any, 0)
136+
currentNextKey := nextKey
137+
138+
for {
139+
u, err := p.buildURL(fromTime, currentNextKey)
140+
if err != nil {
141+
return nil, "", utils.Logger.ErrorF("error building URL: %v", err)
142+
}
143+
144+
headers := map[string]string{
145+
"Content-Type": "application/json",
146+
"Authorization": "Bearer " + accessToken,
147+
"X-Tenant-ID": p.TenantID,
148+
}
149+
150+
response, _, err := utils.DoReq[EventAggregate](u.String(), nil, http.MethodGet, headers)
151+
if err != nil {
152+
return nil, "", err
153+
}
154+
155+
aggregatedEvents.Items = append(aggregatedEvents.Items, response.Items...)
156+
157+
if response.Pages.NextKey == "" {
158+
break
159+
}
160+
currentNextKey = response.Pages.NextKey
161+
}
162+
163+
transformedLogs := ETLProcess(aggregatedEvents, group)
164+
165+
return transformedLogs, currentNextKey, nil
166+
}
167+
168+
func (p *SophosCentralProcessor) buildURL(fromTime int, nextKey string) (*url.URL, *logger.Error) {
169+
baseURL := p.DataRegion + "/siem/v1/events"
170+
u, parseErr := url.Parse(baseURL)
171+
if parseErr != nil {
172+
return nil, utils.Logger.ErrorF("error parsing url: %v", parseErr)
173+
}
174+
175+
params := url.Values{}
176+
if nextKey != "" {
177+
params.Set("pageFromKey", nextKey)
178+
} else {
179+
params.Set("from_date", fmt.Sprintf("%d", fromTime))
180+
}
181+
182+
u.RawQuery = params.Encode()
183+
return u, nil
68184
}

sophos/processor/pull.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
const delayCheck = 300
1212

1313
var timeGroups = make(map[int]int)
14+
var nextKeys = make(map[int]string)
1415

1516
func PullLogs(group types.ModuleGroup) *logger.Error {
1617
utils.Logger.Info("starting log sync for : %s", group.GroupName)
@@ -26,13 +27,15 @@ func PullLogs(group types.ModuleGroup) *logger.Error {
2627
timeGroups[group.ModuleID] = epoch + 1
2728
}()
2829

29-
agent := GetSophosCentralProcessor(group)
30+
agent := getSophosCentralProcessor(group)
3031

31-
logs, err := agent.GetLogs(group, timeGroups[group.ModuleID])
32+
logs, newNextKey, err := agent.getLogs(timeGroups[group.ModuleID], nextKeys[group.ModuleID], group)
3233
if err != nil {
3334
return err
3435
}
3536

37+
nextKeys[group.ModuleID] = newNextKey
38+
3639
err = SendToCorrelation(logs)
3740
if err != nil {
3841
return err

0 commit comments

Comments
 (0)