-
Notifications
You must be signed in to change notification settings - Fork 301
Expand file tree
/
Copy pathingest.go
More file actions
397 lines (342 loc) · 14.1 KB
/
ingest.go
File metadata and controls
397 lines (342 loc) · 14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
// Copyright 2025 Specter Ops, Inc.
//
// Licensed under the Apache License, Version 2.0
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
//
//go:generate go run go.uber.org/mock/mockgen -copyright_file=../../../../../LICENSE.header -destination=./mocks/ingest.go -package=mocks -source=ingest.go
package graphify
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"time"
"github.com/specterops/bloodhound/cmd/api/src/daemons/changelog"
"github.com/specterops/bloodhound/cmd/api/src/model"
"github.com/specterops/bloodhound/cmd/api/src/model/ingest"
"github.com/specterops/bloodhound/cmd/api/src/services/graphify/endpoint"
"github.com/specterops/bloodhound/cmd/api/src/services/upload"
"github.com/specterops/bloodhound/packages/go/bhlog/measure"
"github.com/specterops/bloodhound/packages/go/ein"
"github.com/specterops/bloodhound/packages/go/errorlist"
"github.com/specterops/bloodhound/packages/go/graphschema/ad"
"github.com/specterops/bloodhound/packages/go/graphschema/azure"
"github.com/specterops/dawgs/graph"
)
const (
IngestCountThreshold = 500
ReconcileProperty = "reconcile"
)
// registrationFn persists a kind encountered in the ingest payload, if it hasn't already been registered in the source_kinds table.
// (e.g., "Base", "AZBase", "GithubBase")
type registrationFn func(kind graph.Kind) error
type ReadOptions struct {
FileType model.FileType // JSON or ZIP
IngestSchema upload.IngestSchema
RegisterSourceKind registrationFn
}
// IngestContext is a container for dependencies needed by ingest
type IngestContext struct {
Ctx context.Context
// Batch is the buffering/flushing mechanism that writes entities to the graph database
Batch BatchUpdater
// IngestTime is a single timestamp assigned to the lastseen property of every entity ignested per ingest run
IngestTime time.Time
// Manager is the caching layer that deduplicates ingest payloads across ingest runs
Manager ChangeManager
// Stats tracks the number of nodes and relationships processed during ingestion
Stats *IngestStats
// EndpointResolver is the endpoint matching strategy to be used when looking up
// entities for relationship creation
EndpointResolver *endpoint.Resolver
// RetainIngestedFiles determines if the service should clean up working files after ingest
RetainIngestedFiles bool
}
func NewIngestContext(ctx context.Context, opts ...IngestOption) *IngestContext {
ic := &IngestContext{
Ctx: ctx,
Stats: &IngestStats{}, // Always initialize stats
}
for _, opt := range opts {
opt(ic)
}
// avoid a zero IngestTime as it breaks lastseen semantics.
if ic.IngestTime.IsZero() {
ic.IngestTime = time.Now()
}
return ic
}
// option helpers
type IngestOption func(*IngestContext)
func WithIngestTime(ingestTime time.Time) IngestOption {
return func(s *IngestContext) {
s.IngestTime = ingestTime
}
}
func WithIngestRetentionConfig(shouldRetainIngestedFiles bool) IngestOption {
return func(s *IngestContext) {
s.RetainIngestedFiles = shouldRetainIngestedFiles
}
}
func WithChangeManager(manager ChangeManager) IngestOption {
return func(s *IngestContext) {
s.Manager = manager
}
}
func WithEndpointResolver(resolver *endpoint.Resolver) IngestOption {
return func(s *IngestContext) {
s.EndpointResolver = resolver
}
}
func WithBatchUpdater(batchUpdater BatchUpdater) IngestOption {
return func(s *IngestContext) {
s.Batch = batchUpdater
}
}
func (s *IngestContext) BindBatchUpdater(batch BatchUpdater) {
// Always wrap the batch with counting to track stats
s.Batch = NewCountingBatchUpdater(batch, s.Stats)
}
func (s *IngestContext) HasChangelog() bool {
return s.Manager != nil
}
// ChangeManager represents the ingestion-facing API for the changelog daemon.
//
// It provides three responsibilities:
// - Deduplication: ResolveChange determines whether a proposed change is new or modified
// and therefore requires persistence, or whether it has already been seen.
// - Submission: Submit enqueues a change for asynchronous processing by the changelog loop.
// - Metrics: FlushStats logs and resets internal cache hit/miss statistics,
// allowing callers to observe deduplication efficiency over time.
//
// To generate mocks for this interface for unit testing seams in the application
// please use:
//
// mockgen -source=ingest.go -destination=mocks/ingest.go -package=mocks
type ChangeManager interface {
ResolveChange(change changelog.Change) (bool, error)
Submit(ctx context.Context, change changelog.Change) bool
FlushStats()
ClearCache(ctx context.Context)
}
// BatchUpdater represents the ingestion-facing API for a dawgs BatchOperation
type BatchUpdater interface {
UpdateNodeBy(update graph.NodeUpdate) error
UpdateRelationshipBy(update graph.RelationshipUpdate) error
Nodes() graph.NodeQuery
Relationships() graph.RelationshipQuery
}
// ReadFileForIngest orchestrates the ingestion of a file into the graph database,
// performing any necessary metadata validation and schema enforcement before
// delegating to the core ingest logic.
//
// If the file type is ZIP, additional validation is performed using JSON Schema,
// and the full stream is consumed to enable downstream readers to function correctly.
// Zip files are validated here and not at file upload time because it would be expensive to
// decompress the entire zip into memory.
// Files that fail this validation step will not be processed further.
//
// Returns an error if metadata validation or ingestion fails.
func ReadFileForIngest(batch *IngestContext, reader io.ReadSeeker, options ReadOptions) error {
var (
shouldValidateGraph = false
)
// TODO: Should this be moved into the upload service. The comment here is helpful, but more
// discovery required.
// if filetype == ZIP, we need to validate against jsonschema because
// the archive bypassed validation controls at file upload time, as opposed to JSON files,
// which were validated at file upload time
if options.FileType == model.FileTypeZip {
shouldValidateGraph = true
}
if meta, err := upload.ParseAndValidatePayload(reader, options.IngestSchema, shouldValidateGraph, shouldValidateGraph); err != nil {
return err
} else {
// Because we gave the reader to ParseAndValidatePayload above, if they read the whole
// thing, we need to make sure we're starting at the front. Be kind, Rewind.
if _, err := reader.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("rewind failed: %w", err)
}
return IngestWrapper(batch, reader, meta, options)
}
}
// IngestWrapper dispatches the ingest process based on the metadata's type.
func IngestWrapper(batch *IngestContext, reader io.ReadSeeker, meta ingest.OriginalMetadata, readOpts ReadOptions) error {
// Source-kind-aware handler
if handler, ok := sourceKindHandlers[meta.Type]; ok {
if readOpts.RegisterSourceKind == nil {
return fmt.Errorf("missing source kind registration function for data type: %v", meta.Type)
}
return handler(batch, reader, meta, readOpts.RegisterSourceKind)
}
// Basic handler
if handler, ok := basicHandlers[meta.Type]; ok {
return handler(batch, reader, meta)
}
return fmt.Errorf("no handler for ingest data type: %v", meta.Type)
}
func IngestBasicData(batch *IngestContext, converted ConvertedData) error {
errs := errorlist.NewBuilder()
if err := IngestNodes(batch, ad.Entity, converted.NodeProps); err != nil {
errs.Add(err)
}
if err := IngestRelationships(batch, ad.Entity, converted.RelProps); err != nil {
errs.Add(err)
}
return errs.Build()
}
// IngestGenericData writes generic graph data into the database using the provided batch.
// It attempts to ingest all nodes and relationships from the ConvertedData object.
//
// Because generic entities do not have a predefined base kind (unlike AZ or AD), this function passes
// graph.EmptyKind to the node and relationship ingestion functions. This indicates that no
// base kind should be applied uniformly to all ingested entities, and instead the kind(s)
// defined directly on each node or edge (if any) are used as-is.
func IngestGenericData(batch *IngestContext, sourceKind graph.Kind, converted ConvertedData) error {
errs := errorlist.NewBuilder()
if err := IngestNodes(batch, sourceKind, converted.NodeProps); err != nil {
errs.Add(err)
}
if err := IngestRelationships(batch, sourceKind, converted.RelProps); err != nil {
errs.Add(err)
}
return errs.Build()
}
func IngestGroupData(batch *IngestContext, converted ConvertedGroupData) error {
errs := errorlist.NewBuilder()
if err := IngestNodes(batch, ad.Entity, converted.NodeProps); err != nil {
errs.Add(err)
}
if err := IngestRelationships(batch, ad.Entity, converted.RelProps); err != nil {
errs.Add(err)
}
if err := IngestDNRelationships(batch, converted.DistinguishedNameProps); err != nil {
errs.Add(err)
}
return errs.Build()
}
func IngestAzureData(batch *IngestContext, converted ConvertedAzureData) error {
defer measure.ContextLogAndMeasureWithThreshold(context.TODO(), slog.LevelDebug, "ingest azure data")()
errs := errorlist.NewBuilder()
if err := IngestNodes(batch, azure.Entity, converted.NodeProps); err != nil {
errs.Add(err)
}
if err := IngestRelationships(batch, azure.Entity, converted.RelProps); err != nil {
errs.Add(err)
}
return errs.Build()
}
// basicIngestHandler defines the function signature for all ingest paths except for the OpenGraph
type basicIngestHandler func(batch *IngestContext, reader io.ReadSeeker, meta ingest.OriginalMetadata) error
// sourceKindIngestHandler defines the function signature for ingest handlers that require
// additional logic — specifically, registration of a sourceKind before decoding data.
// This is only used for ingest payloads within OpenGraph, which may specify new source kinds that we want to track (e.g. Base, AZBase, GithubBase).
type sourceKindIngestHandler func(batch *IngestContext, reader io.ReadSeeker, meta ingest.OriginalMetadata, register registrationFn) error
func defaultBasicHandler[T any](conversionFunc ConversionFuncWithTime[T]) basicIngestHandler {
return func(batch *IngestContext, reader io.ReadSeeker, meta ingest.OriginalMetadata) error {
decoder, err := getDefaultDecoder(reader)
if err != nil {
return err
}
return decodeBasicData(batch, decoder, conversionFunc)
}
}
var basicHandlers = map[ingest.DataType]basicIngestHandler{
ingest.DataTypeComputer: func(batch *IngestContext, reader io.ReadSeeker, meta ingest.OriginalMetadata) error {
if decoder, err := getDefaultDecoder(reader); err != nil {
return err
} else if meta.Version >= 5 {
return decodeBasicData(batch, decoder, convertComputerData)
} else {
return nil
}
},
ingest.DataTypeGroup: func(batch *IngestContext, reader io.ReadSeeker, meta ingest.OriginalMetadata) error {
if decoder, err := getDefaultDecoder(reader); err != nil {
return err
} else {
return decodeGroupData(batch, decoder)
}
},
ingest.DataTypeSession: func(batch *IngestContext, reader io.ReadSeeker, meta ingest.OriginalMetadata) error {
if decoder, err := getDefaultDecoder(reader); err != nil {
return err
} else {
return decodeSessionData(batch, decoder)
}
},
ingest.DataTypeAzure: func(batch *IngestContext, reader io.ReadSeeker, meta ingest.OriginalMetadata) error {
if decoder, err := getDefaultDecoder(reader); err != nil {
return err
} else {
return decodeAzureData(batch, decoder)
}
},
ingest.DataTypeUser: defaultBasicHandler(convertUserData),
ingest.DataTypeDomain: defaultBasicHandler(convertDomainData),
ingest.DataTypeGPO: defaultBasicHandler(convertGPOData),
ingest.DataTypeOU: defaultBasicHandler(convertOUData),
ingest.DataTypeContainer: defaultBasicHandler(convertContainerData),
ingest.DataTypeAIACA: defaultBasicHandler(convertAIACAData),
ingest.DataTypeRootCA: defaultBasicHandler(convertRootCAData),
ingest.DataTypeEnterpriseCA: defaultBasicHandler(convertEnterpriseCAData),
ingest.DataTypeNTAuthStore: defaultBasicHandler(convertNTAuthStoreData),
ingest.DataTypeCertTemplate: defaultBasicHandler(convertCertTemplateData),
ingest.DataTypeIssuancePolicy: defaultBasicHandler(convertIssuancePolicy),
}
var sourceKindHandlers = map[ingest.DataType]sourceKindIngestHandler{
ingest.DataTypeOpenGraph: func(batch *IngestContext, reader io.ReadSeeker, meta ingest.OriginalMetadata, registerSourceKind registrationFn) error {
sourceKind := graph.EmptyKind
// decode metadata, if present
if decoder, err := CreateIngestDecoder(reader, "metadata", 1); err != nil {
if !errors.Is(err, ingest.ErrDataTagNotFound) {
return err
}
slog.Debug("No metadata found in opengraph payload; continuing to nodes")
} else {
var meta ein.GenericMetadata
if err := decoder.Decode(&meta); err != nil {
return fmt.Errorf("failed to parse opengraph metadata tag: %w", err)
}
sourceKind = graph.StringKind(meta.SourceKind)
if err := registerSourceKind(sourceKind); err != nil {
return fmt.Errorf("failed to register sourceKind: %w", err)
}
}
// decode nodes, if present
if decoder, err := CreateIngestDecoder(reader, "nodes", 2); err != nil {
if !errors.Is(err, ingest.ErrDataTagNotFound) {
return err
}
slog.Debug("No nodes found in opengraph payload; continuing to edges")
} else if err := DecodeGenericData(batch, decoder, sourceKind, ConvertGenericNode); err != nil {
return err
}
// decode edges, if present
if decoder, err := CreateIngestDecoder(reader, "edges", 2); err != nil {
if !errors.Is(err, ingest.ErrDataTagNotFound) {
return err
}
slog.Debug("No edges found in opengraph payload")
} else {
return DecodeGenericData(batch, decoder, sourceKind, ConvertGenericEdge)
}
return nil
},
}
func getDefaultDecoder(reader io.ReadSeeker) (*json.Decoder, error) {
return CreateIngestDecoder(reader, "data", 1)
}