Skip to content

Commit a44db36

Browse files
committed
BED-5717 chore: moved the bheClient variable to a structure of its own
1 parent fc3ed0f commit a44db36

File tree

7 files changed

+586
-512
lines changed

7 files changed

+586
-512
lines changed

client/bloodhound/client.go

Lines changed: 332 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
package bloodhound
2+
3+
import (
4+
"bytes"
5+
"compress/gzip"
6+
"context"
7+
"encoding/json"
8+
"errors"
9+
"fmt"
10+
"io"
11+
"net/http"
12+
"net/url"
13+
"os"
14+
15+
"github.com/bloodhoundad/azurehound/v2/client/rest"
16+
"github.com/bloodhoundad/azurehound/v2/config"
17+
"github.com/bloodhoundad/azurehound/v2/constants"
18+
"github.com/bloodhoundad/azurehound/v2/models"
19+
"github.com/bloodhoundad/azurehound/v2/pipeline"
20+
"github.com/go-logr/logr"
21+
)
22+
23+
const (
24+
BHEAuthSignature string = "bhesignature"
25+
)
26+
27+
var ErrExceededRetryLimit = errors.New("exceeded max retry limit for ingest batch, proceeding with next batch...")
28+
29+
// BloodHoundClient represents the methods for interacting with an instance of BloodHound
30+
type BloodHoundClient interface {
31+
SendRequest(req *http.Request) (*http.Response, error)
32+
CloseIdleConnections()
33+
Ingest(ctx context.Context, in <-chan []interface{}) bool
34+
GetAvailableJobs(ctx context.Context) ([]models.ClientJob, error)
35+
Checkin(ctx context.Context) error
36+
StartJob(ctx context.Context, jobId int) error
37+
EndJob(ctx context.Context, status models.JobStatus, message string) error
38+
UpdateClient(ctx context.Context) (*models.UpdateClientResponse, error)
39+
EndOrphanedJob(ctx context.Context, updatedClient *models.UpdateClientResponse) error
40+
}
41+
42+
// BHEClient implements the BloodHoundClient interface to communicate with a BloodHound Enterprise instance
43+
type BHEClient struct {
44+
httpClient *http.Client
45+
bheUrl url.URL
46+
log logr.Logger
47+
}
48+
49+
// NewBHEClient creates a new BloodHoundClient using the values from the application's config
50+
// TODO: the values from the global config should be provided via arguments in the constructor
51+
func NewBHEClient(bheUrl url.URL) (BloodHoundClient, error) {
52+
client, err := rest.NewHTTPClient(config.Proxy.Value().(string))
53+
if err != nil {
54+
return nil, err
55+
}
56+
57+
client.Transport = signingTransport{
58+
base: client.Transport,
59+
tokenId: config.BHETokenId.Value().(string),
60+
token: config.BHEToken.Value().(string),
61+
signature: BHEAuthSignature,
62+
}
63+
64+
return &BHEClient{
65+
httpClient: client,
66+
bheUrl: bheUrl,
67+
}, nil
68+
}
69+
70+
// SendRequest sends a given request to the BHE instance. In the event of an error, 3 retries will be attempted
71+
func (s *BHEClient) SendRequest(req *http.Request) (*http.Response, error) {
72+
var (
73+
res *http.Response
74+
maxRetries = 3
75+
)
76+
77+
// copy the bytes in case we need to retry the request
78+
if body, err := rest.CopyBody(req); err != nil {
79+
return nil, err
80+
} else {
81+
for retry := 0; retry < maxRetries; retry++ {
82+
// Reusing http.Request requires rewinding the request body
83+
// back to a working state
84+
if body != nil && retry > 0 {
85+
req.Body = io.NopCloser(bytes.NewBuffer(body))
86+
}
87+
88+
if res, err = s.httpClient.Do(req); err != nil {
89+
if rest.IsClosedConnectionErr(err) {
90+
// try again on force closed connections
91+
s.log.Error(err, fmt.Sprintf("remote host force closed connection while requesting %s; attempt %d/%d; trying again", req.URL, retry+1, maxRetries))
92+
rest.ExponentialBackoff(retry)
93+
continue
94+
}
95+
// normal client error, dont attempt again
96+
return nil, err
97+
} else if res.StatusCode < http.StatusOK || res.StatusCode >= http.StatusBadRequest {
98+
if res.StatusCode >= http.StatusInternalServerError {
99+
// Internal server error, backoff and try again.
100+
serverError := fmt.Errorf("received server error %d while requesting %v", res.StatusCode, req.URL)
101+
s.log.Error(serverError, fmt.Sprintf("attempt %d/%d; trying again", retry+1, maxRetries))
102+
103+
rest.ExponentialBackoff(retry)
104+
continue
105+
}
106+
// bad request we do not need to retry
107+
var body json.RawMessage
108+
defer res.Body.Close()
109+
if err := json.NewDecoder(res.Body).Decode(&body); err != nil {
110+
return nil, fmt.Errorf("received unexpected response code from %v: %s; failure reading response body", req.URL, res.Status)
111+
} else {
112+
return nil, fmt.Errorf("received unexpected response code from %v: %s %s", req.URL, res.Status, body)
113+
}
114+
} else {
115+
return res, nil
116+
}
117+
}
118+
}
119+
120+
return nil, fmt.Errorf("unable to complete request to url=%s; attempts=%d;", req.URL, maxRetries)
121+
}
122+
123+
func (s *BHEClient) Ingest(ctx context.Context, in <-chan []interface{}) bool {
124+
endpoint := s.bheUrl.ResolveReference(&url.URL{Path: "/api/v2/ingest"})
125+
126+
var (
127+
hasErrors = false
128+
maxRetries = 3
129+
unrecoverableErrMsg = fmt.Sprintf("ending current ingest job due to unrecoverable error while requesting %v", endpoint)
130+
)
131+
132+
for data := range pipeline.OrDone(ctx.Done(), in) {
133+
var (
134+
body bytes.Buffer
135+
gw = gzip.NewWriter(&body)
136+
)
137+
138+
ingestData := models.IngestRequest{
139+
Meta: models.Meta{
140+
Type: "azure",
141+
},
142+
Data: data,
143+
}
144+
145+
err := json.NewEncoder(gw).Encode(ingestData)
146+
if err != nil {
147+
s.log.Error(err, unrecoverableErrMsg)
148+
}
149+
gw.Close()
150+
151+
if req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint.String(), &body); err != nil {
152+
s.log.Error(err, unrecoverableErrMsg)
153+
return true
154+
} else {
155+
req.Header.Set("User-Agent", constants.UserAgent())
156+
req.Header.Set("Accept", "application/json")
157+
req.Header.Set("Content-Encoding", "gzip")
158+
for retry := 0; retry < maxRetries; retry++ {
159+
// No retries on regular err cases, only on HTTP 504 Gateway Timeout and HTTP 503 Service Unavailable
160+
if response, err := s.httpClient.Do(req); err != nil {
161+
if rest.IsClosedConnectionErr(err) {
162+
// try again on force closed connection
163+
s.log.Error(err, fmt.Sprintf("remote host force closed connection while requesting %s; attempt %d/%d; trying again", req.URL, retry+1, maxRetries))
164+
rest.ExponentialBackoff(retry)
165+
166+
if retry == maxRetries-1 {
167+
s.log.Error(ErrExceededRetryLimit, "")
168+
hasErrors = true
169+
}
170+
171+
continue
172+
}
173+
s.log.Error(err, unrecoverableErrMsg)
174+
return true
175+
} else if response.StatusCode == http.StatusGatewayTimeout || response.StatusCode == http.StatusServiceUnavailable || response.StatusCode == http.StatusBadGateway {
176+
serverError := fmt.Errorf("received server error %d while requesting %v; attempt %d/%d; trying again", response.StatusCode, endpoint, retry+1, maxRetries)
177+
s.log.Error(serverError, "")
178+
179+
rest.ExponentialBackoff(retry)
180+
181+
if retry == maxRetries-1 {
182+
s.log.Error(ErrExceededRetryLimit, "")
183+
hasErrors = true
184+
}
185+
if err := response.Body.Close(); err != nil {
186+
s.log.Error(fmt.Errorf("failed to close ingest body: %w", err), unrecoverableErrMsg)
187+
}
188+
continue
189+
} else if response.StatusCode != http.StatusAccepted {
190+
if bodyBytes, err := io.ReadAll(response.Body); err != nil {
191+
s.log.Error(fmt.Errorf("received unexpected response code from %v: %s; failure reading response body", endpoint, response.Status), unrecoverableErrMsg)
192+
} else {
193+
s.log.Error(fmt.Errorf("received unexpected response code from %v: %s %s", req.URL, response.Status, bodyBytes), unrecoverableErrMsg)
194+
}
195+
if err := response.Body.Close(); err != nil {
196+
s.log.Error(fmt.Errorf("failed to close ingest body: %w", err), unrecoverableErrMsg)
197+
}
198+
return true
199+
} else {
200+
if err := response.Body.Close(); err != nil {
201+
s.log.Error(fmt.Errorf("failed to close ingest body: %w", err), unrecoverableErrMsg)
202+
}
203+
}
204+
}
205+
}
206+
}
207+
return hasErrors
208+
}
209+
210+
// GetAvailableJobs sends a request to BHE to get the list of available jobs
211+
func (s *BHEClient) GetAvailableJobs(ctx context.Context) ([]models.ClientJob, error) {
212+
var (
213+
endpoint = s.bheUrl.ResolveReference(&url.URL{Path: "/api/v2/jobs/available"})
214+
response bloodhoundResponse[[]models.ClientJob]
215+
)
216+
217+
if req, err := rest.NewRequest(ctx, "GET", endpoint, nil, nil, nil); err != nil {
218+
return nil, err
219+
} else if res, err := s.SendRequest(req); err != nil {
220+
return nil, err
221+
} else {
222+
defer res.Body.Close()
223+
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
224+
return nil, err
225+
} else {
226+
return response.Data, nil
227+
}
228+
}
229+
}
230+
231+
// Checkin sends a request to BHE indicating that the client is running
232+
func (s *BHEClient) Checkin(ctx context.Context) error {
233+
endpoint := s.bheUrl.ResolveReference(&url.URL{Path: "/api/v2/jobs/current"})
234+
235+
if req, err := rest.NewRequest(ctx, "GET", endpoint, nil, nil, nil); err != nil {
236+
return err
237+
} else if res, err := s.SendRequest(req); err != nil {
238+
return err
239+
} else {
240+
res.Body.Close()
241+
return nil
242+
}
243+
}
244+
245+
// StartJob sends a request to BHE instructing it to start a job
246+
func (s *BHEClient) StartJob(ctx context.Context, jobId int) error {
247+
s.log.Info("beginning collection job", "id", jobId)
248+
var (
249+
endpoint = s.bheUrl.ResolveReference(&url.URL{Path: "/api/v2/jobs/start"})
250+
body = map[string]int{
251+
"id": jobId,
252+
}
253+
)
254+
255+
if req, err := rest.NewRequest(ctx, "POST", endpoint, body, nil, nil); err != nil {
256+
return err
257+
} else if res, err := s.SendRequest(req); err != nil {
258+
return err
259+
} else {
260+
res.Body.Close()
261+
return nil
262+
}
263+
}
264+
265+
// EndJob sends a request to BHE instructing it to end a job
266+
func (s *BHEClient) EndJob(ctx context.Context, status models.JobStatus, message string) error {
267+
endpoint := s.bheUrl.ResolveReference(&url.URL{Path: "/api/v2/jobs/end"})
268+
269+
body := models.CompleteJobRequest{
270+
Status: status.String(),
271+
Message: message,
272+
}
273+
274+
if req, err := rest.NewRequest(ctx, "POST", endpoint, body, nil, nil); err != nil {
275+
return err
276+
} else if res, err := s.SendRequest(req); err != nil {
277+
return err
278+
} else {
279+
res.Body.Close()
280+
return nil
281+
}
282+
}
283+
284+
// UpdateClient sends a request to BHE and updates the AzureHound client info
285+
func (s *BHEClient) UpdateClient(ctx context.Context) (*models.UpdateClientResponse, error) {
286+
var (
287+
endpoint = s.bheUrl.ResolveReference(&url.URL{Path: "/api/v2/clients/update"})
288+
response = bloodhoundResponse[models.UpdateClientResponse]{}
289+
)
290+
if addr, err := rest.Dial(s.log, s.bheUrl.String()); err != nil {
291+
return nil, err
292+
} else {
293+
// hostname is nice to have, but we don't really need it
294+
hostname, _ := os.Hostname()
295+
296+
body := models.UpdateClientRequest{
297+
Address: addr,
298+
Hostname: hostname,
299+
Version: constants.Version,
300+
}
301+
302+
s.log.V(2).Info("updating client info", "info", body)
303+
304+
if req, err := rest.NewRequest(ctx, "PUT", endpoint, body, nil, nil); err != nil {
305+
return nil, err
306+
} else if res, err := s.SendRequest(req); err != nil {
307+
return nil, err
308+
} else {
309+
defer res.Body.Close()
310+
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
311+
return nil, err
312+
} else {
313+
return &response.Data, nil
314+
}
315+
}
316+
}
317+
}
318+
319+
// EndOrphanedJob if a job is running, sends a request to BHE to end the current job with a failed status
320+
func (s *BHEClient) EndOrphanedJob(ctx context.Context, updatedClient *models.UpdateClientResponse) error {
321+
if updatedClient.CurrentJob.Status == models.JobStatusRunning {
322+
s.log.Info("the service started with an orphaned job in progress, sending job completion notice...", "jobId", updatedClient.CurrentJobID)
323+
return s.EndJob(ctx, models.JobStatusFailed, "This job has been orphaned. Re-run collection for complete data.")
324+
} else {
325+
return nil
326+
}
327+
}
328+
329+
// CloseIdleConnections closes all idle connections on the internal http.Client
330+
func (s *BHEClient) CloseIdleConnections() {
331+
s.httpClient.CloseIdleConnections()
332+
}

client/bloodhound/reader.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package bloodhound
2+
3+
import (
4+
"bytes"
5+
"io"
6+
)
7+
8+
type rewindableByteReader struct {
9+
data *bytes.Reader
10+
}
11+
12+
func (s *rewindableByteReader) Read(p []byte) (int, error) {
13+
return s.data.Read(p)
14+
}
15+
16+
func (s *rewindableByteReader) Close() error {
17+
return nil
18+
}
19+
20+
func (s *rewindableByteReader) Rewind() (int64, error) {
21+
return s.data.Seek(0, io.SeekStart)
22+
}

client/bloodhound/response.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package bloodhound
2+
3+
type bloodhoundResponse[T any] struct {
4+
Data T `json:"data"`
5+
}

0 commit comments

Comments
 (0)