Skip to content

Commit 5c8ad24

Browse files
authored
Bugfix: In flight flow calculation was stuck sometimes (#4668)
If flows crashed before sending any status then the server would not update them. This resulted in repeatedly sending flow status request without making progress.
1 parent cdc4edb commit 5c8ad24

File tree

10 files changed

+229
-33
lines changed

10 files changed

+229
-33
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,4 @@ config/ab0x.go
2626

2727
__debug*
2828
debug.test*
29-
enriched.json*
29+
artifacts/testdata/server/hunts/H.*

actions/proto/vql.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ message ClientInfo {
228228
map<string, int64> in_flight_flows = 28;
229229

230230

231-
// A list of indexed metadata fields. There are not all metadata
231+
// A list of indexed metadata fields. These are not all metadata
232232
// fields, only the ones that are important enough to be indexed.
233233
map<string, string> metadata = 29;
234234
}

flows/client_flow_runner.go

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -513,31 +513,38 @@ func (self *ClientFlowRunner) handleUnknwonFlow(
513513
ctx context.Context, client_id, flow_id string,
514514
msg *crypto_proto.FlowStats) error {
515515

516-
flow_path_manager := paths.NewFlowPathManager(client_id, flow_id)
517-
db, err := datastore.GetDB(self.config_obj)
516+
if len(msg.QueryStatus) == 0 {
517+
return nil
518+
}
519+
520+
launcher_service, err := services.GetLauncher(self.config_obj)
518521
if err != nil {
519522
return err
520523
}
521524

522-
// Just a blind write will eventually hit the disk.
523-
stats := &flows_proto.ArtifactCollectorContext{}
524-
err = db.GetSubject(self.config_obj, flow_path_manager.Stats(), stats)
525+
// If we dont know anything about the flow, ignore it.
526+
collection_context, err := launcher_service.Storage().LoadCollectionContext(
527+
ctx, self.config_obj, client_id, flow_id)
525528
if err != nil {
526529
return nil
527530
}
528531

529532
// Mark all the stats as terminated if they are still running.
530-
for _, s := range stats.QueryStats {
531-
if s.Status == crypto_proto.VeloStatus_PROGRESS {
532-
s.Status = crypto_proto.VeloStatus_GENERIC_ERROR
533-
s.ErrorMessage = msg.QueryStatus[0].ErrorMessage
533+
if len(collection_context.QueryStats) == 0 {
534+
collection_context.QueryStats = append(
535+
collection_context.QueryStats, msg.QueryStatus...)
536+
} else {
537+
for _, s := range collection_context.QueryStats {
538+
if s.Status == crypto_proto.VeloStatus_PROGRESS {
539+
s.Status = msg.QueryStatus[0].Status
540+
s.ErrorMessage = msg.QueryStatus[0].ErrorMessage
541+
}
534542
}
535543
}
536544

537-
launcher.UpdateFlowStats(stats)
538-
539-
return db.SetSubjectWithCompletion(self.config_obj,
540-
flow_path_manager.Stats(), stats, nil)
545+
// Update the flow.
546+
return launcher_service.Storage().WriteFlow(ctx, self.config_obj,
547+
collection_context, utils.BackgroundWriter)
541548
}
542549

543550
func (self *ClientFlowRunner) FlowStats(

flows/client_flow_runner_test.go

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package flows_test
33
import (
44
"context"
55
"fmt"
6-
"os"
76
"regexp"
87
"strings"
98
"sync"
@@ -1025,42 +1024,62 @@ func (self *ServerTestSuite) TestCancellation() {
10251024

10261025
// Test an unknown flow. What happens when the server receives a
10271026
// message to an unknown flow.
1027+
1028+
// This message is received in response to the in flight keep
1029+
// alive. If the client crashes, the server will send a status request
1030+
// for the in flight flow. But the client does not know about this
1031+
// flow id. The server will terminate the flow.
10281032
func (self *ServerTestSuite) TestUnknownFlow() {
10291033
t := self.T()
10301034

1031-
db, err := datastore.GetDB(self.ConfigObj)
1032-
require.NoError(t, err)
1035+
closer := utils.SetFlowIdForTests("F.SomeFlow")
1036+
defer closer()
1037+
1038+
// Create a new collection flow
1039+
flow_id, err := self.createArtifactCollection()
1040+
require.NoError(self.T(), err)
1041+
assert.Equal(self.T(), "F.SomeFlow", flow_id)
1042+
1043+
launcher, err := services.GetLauncher(self.ConfigObj)
1044+
assert.NoError(t, err)
1045+
1046+
collection_context, err := launcher.GetFlowDetails(self.Ctx, self.ConfigObj,
1047+
services.GetFlowOptions{}, self.client_id, flow_id)
1048+
assert.NoError(t, err)
1049+
1050+
assert.Equal(self.T(), collection_context.Context.State,
1051+
flows_proto.ArtifactCollectorContext_RUNNING)
10331052

10341053
runner := flows.NewFlowRunner(self.Ctx, self.ConfigObj)
10351054
defer runner.Close(self.Ctx)
10361055

10371056
// Send a message to a random non-existant flow from client.
1038-
flow_id := "F.NONEXISTENT"
10391057
runner.ProcessSingleMessage(
10401058
self.Ctx,
10411059
&crypto_proto.VeloMessage{
10421060
Source: self.client_id,
10431061
SessionId: flow_id,
10441062
FlowStats: &crypto_proto.FlowStats{
10451063
QueryStatus: []*crypto_proto.VeloStatus{
1046-
{Status: crypto_proto.VeloStatus_OK, QueryId: 1},
1064+
{
1065+
Status: crypto_proto.VeloStatus_UNKNOWN_FLOW,
1066+
ErrorMessage: "Unknown flow",
1067+
},
10471068
},
10481069
},
10491070
})
10501071

1051-
// We used to send cancellation message to the client, but this
1052-
// too expensive for the server to keep track of. Now we just
1053-
// write data in the flow as if it exists anyway.
1072+
collection_context, err = launcher.GetFlowDetails(self.Ctx, self.ConfigObj,
1073+
services.GetFlowOptions{}, self.client_id, flow_id)
1074+
assert.NoError(t, err)
10541075

1055-
// The flow does not exist - make sure it still does not.
1056-
collection_context := &flows_proto.ArtifactCollectorContext{}
1057-
path_manager := paths.NewFlowPathManager(self.client_id, flow_id)
1058-
err = db.GetSubject(self.ConfigObj, path_manager.Path(), collection_context)
1059-
require.Error(t, err, os.ErrNotExist)
1076+
assert.Equal(self.T(), collection_context.Context.State,
1077+
flows_proto.ArtifactCollectorContext_ERROR)
10601078

1061-
// The flow stats are written as normal.
1062-
err = db.GetSubject(self.ConfigObj, path_manager.Stats(), collection_context)
1063-
assert.NoError(t, err)
1079+
assert.Equal(self.T(), collection_context.Context.Status,
1080+
"Unknown flow")
1081+
1082+
assert.Equal(self.T(), len(collection_context.Context.QueryStats), 1)
10641083
}
10651084

10661085
// Test an unknown flow. What happens when the server receives a

services/client_info/client_info_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ type: INTERNAL
4848
`, `
4949
name: Server.Audit.Logs
5050
type: INTERNAL
51+
`, `
52+
name: Client.Test
53+
type: CLIENT
54+
sources:
55+
- query: SELECT * FROM info()
5156
`})
5257

5358
// Create a client in the datastore so we can test initializing
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
{
2+
"InFlightFlows": {
3+
"client_id": "C.1234",
4+
"hostname": "Hostname",
5+
"has_tasks": true,
6+
"in_flight_flows": {
7+
"F.0": 10,
8+
"F.1": 10,
9+
"F.2": 10,
10+
"F.3": 10
11+
}
12+
},
13+
"StatusChecks": [
14+
{
15+
"session_id": "F.Status",
16+
"flow_stats_request": {
17+
"flow_id": [
18+
"F.0",
19+
"F.1",
20+
"F.2",
21+
"F.3"
22+
]
23+
}
24+
}
25+
],
26+
"AfterCompletion": {
27+
"client_id": "C.1234",
28+
"hostname": "Hostname",
29+
"has_tasks": true
30+
},
31+
"SecondSetOfTasks": {
32+
"client_id": "C.1234",
33+
"hostname": "Hostname",
34+
"has_tasks": true,
35+
"in_flight_flows": {
36+
"F.4": 100,
37+
"F.5": 100,
38+
"F.6": 100,
39+
"F.7": 100
40+
}
41+
}
42+
}

services/client_info/tasks_test.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,19 @@ import (
66
"sort"
77
"time"
88

9+
"github.com/Velocidex/ordereddict"
910
"google.golang.org/protobuf/proto"
1011
crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto"
12+
"www.velocidex.com/golang/velociraptor/flows"
13+
flows_proto "www.velocidex.com/golang/velociraptor/flows/proto"
14+
"www.velocidex.com/golang/velociraptor/json"
1115
"www.velocidex.com/golang/velociraptor/services"
1216
"www.velocidex.com/golang/velociraptor/services/client_info"
1317
"www.velocidex.com/golang/velociraptor/utils"
18+
"www.velocidex.com/golang/velociraptor/vql/acl_managers"
1419
"www.velocidex.com/golang/velociraptor/vtesting"
1520
"www.velocidex.com/golang/velociraptor/vtesting/assert"
21+
"www.velocidex.com/golang/velociraptor/vtesting/goldie"
1622
)
1723

1824
func (self *ClientInfoTestSuite) TestQueueMessages() {
@@ -85,3 +91,111 @@ func (self *ClientInfoTestSuite) TestFastQueueMessages() {
8591
assert.True(self.T(), proto.Equal(tasks[i], written[i]))
8692
}
8793
}
94+
95+
func (self *ClientInfoTestSuite) TestInFlightMessages() {
96+
closer := utils.MockTime(utils.NewMockClock(time.Unix(10, 0)))
97+
defer closer()
98+
99+
launcher, err := services.GetLauncher(self.ConfigObj)
100+
assert.NoError(self.T(), err)
101+
102+
var flow_ids []string
103+
acl_manager := acl_managers.NullACLManager{}
104+
manager, _ := services.GetRepositoryManager(self.ConfigObj)
105+
repository, _ := manager.GetGlobalRepository(self.ConfigObj)
106+
107+
for i := 0; i < 10; i++ {
108+
closer := utils.SetFlowIdForTests(fmt.Sprintf("F.%d", i))
109+
110+
flow_id, err := launcher.ScheduleArtifactCollection(self.Ctx,
111+
self.ConfigObj, acl_manager,
112+
repository, &flows_proto.ArtifactCollectorArgs{
113+
Creator: "admin",
114+
ClientId: self.client_id,
115+
Artifacts: []string{"Client.Test"},
116+
}, utils.SyncCompleter)
117+
assert.NoError(self.T(), err)
118+
119+
flow_ids = append(flow_ids, flow_id)
120+
121+
closer()
122+
}
123+
124+
client_info_manager, err := services.GetClientInfoManager(self.ConfigObj)
125+
assert.NoError(self.T(), err)
126+
127+
tasks, err := client_info_manager.GetClientTasks(self.Ctx, self.client_id)
128+
assert.NoError(self.T(), err)
129+
130+
// 4 tasks are queued
131+
assert.Equal(self.T(), len(tasks), 4)
132+
133+
tasks, err = client_info_manager.GetClientTasks(self.Ctx, self.client_id)
134+
assert.NoError(self.T(), err)
135+
136+
// Tasks are still in flight, so we can not get any new tasks yet.
137+
assert.Equal(self.T(), len(tasks), 0)
138+
139+
client_info, err := client_info_manager.Get(self.Ctx, self.client_id)
140+
assert.NoError(self.T(), err)
141+
142+
// Should contain only 4 flow ids in the in_flight_flows set.
143+
golden := ordereddict.NewDict().
144+
Set("InFlightFlows", client_info)
145+
146+
// Pass some time
147+
closer = utils.MockTime(utils.NewMockClock(time.Unix(100, 0)))
148+
defer closer()
149+
150+
// Tasks are still in flight, so we do not send any flows, instead
151+
// we send a task status request to see how those other tasks are
152+
// going.
153+
tasks, err = client_info_manager.GetClientTasks(self.Ctx, self.client_id)
154+
assert.NoError(self.T(), err)
155+
156+
sort.Strings(tasks[0].FlowStatsRequest.FlowId)
157+
158+
assert.Equal(self.T(), len(tasks), 1)
159+
160+
// Should contains a status check request for all inflight flows.
161+
golden.Set("StatusChecks", tasks)
162+
163+
// Now complete the flows
164+
runner := flows.NewFlowRunner(self.Ctx, self.ConfigObj)
165+
for flow_id := range client_info.InFlightFlows {
166+
runner.ProcessSingleMessage(self.Ctx,
167+
&crypto_proto.VeloMessage{
168+
Source: self.client_id,
169+
SessionId: flow_id,
170+
FlowStats: &crypto_proto.FlowStats{
171+
FlowComplete: true,
172+
QueryStatus: []*crypto_proto.VeloStatus{{
173+
Status: crypto_proto.VeloStatus_OK,
174+
}},
175+
}})
176+
}
177+
runner.Close(self.Ctx)
178+
179+
client_info, err = client_info_manager.Get(self.Ctx, self.client_id)
180+
assert.NoError(self.T(), err)
181+
182+
// Completing the flows removes the flows from the in flight set.
183+
assert.Equal(self.T(), len(client_info.InFlightFlows), 0)
184+
185+
// Should contain no in flight flows but still contain the
186+
// has_tasks flag.
187+
golden.Set("AfterCompletion", client_info)
188+
189+
// Now read some more tasks
190+
tasks, err = client_info_manager.GetClientTasks(self.Ctx, self.client_id)
191+
assert.NoError(self.T(), err)
192+
193+
// Should conatin
194+
assert.Equal(self.T(), len(tasks), 4)
195+
196+
client_info, err = client_info_manager.Get(self.Ctx, self.client_id)
197+
assert.NoError(self.T(), err)
198+
golden.Set("SecondSetOfTasks", client_info)
199+
200+
goldie.Assert(self.T(), "TestInFlightMessages", json.MustMarshalIndent(golden))
201+
}

services/hunt_dispatcher/storage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ func (self *HuntStorageManagerImpl) loadHuntsFromDatastore(
561561
self.dirty = true
562562

563563
// The old hunt record is newer than the one on disk, ignore it.
564-
} else if old_hunt_record.Version >= hunt_obj.Version {
564+
} else if old_hunt_record.Version > hunt_obj.Version {
565565
continue
566566
}
567567

services/launcher/flows.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,15 @@ func UpdateFlowStats(collection_context *flows_proto.ArtifactCollectorContext) {
276276
collection_context.StartTime = s.FirstActive
277277
}
278278

279+
// If the Query stats represents an unknown flow, we mark the
280+
// flow as errored.
281+
if s.Status == crypto_proto.VeloStatus_UNKNOWN_FLOW {
282+
collection_context.State = flows_proto.ArtifactCollectorContext_ERROR
283+
collection_context.Status = s.ErrorMessage
284+
collection_context.Backtrace = s.Backtrace
285+
break
286+
}
287+
279288
// Get the first errored query and mark the entire collection_context with it.
280289
if collection_context.State == flows_proto.ArtifactCollectorContext_RUNNING &&
281290
s.Status == crypto_proto.VeloStatus_GENERIC_ERROR {

vql/functions/alerts.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (self *AlertFunction) Call(ctx context.Context,
4646

4747
alert_name, pres := args.GetString("name")
4848
if !pres {
49-
scope.Log("alert: Alert name must be specified!")
49+
scope.Log("ERROR:alert: Alert name must be specified!")
5050
return &vfilter.Null{}
5151
}
5252

0 commit comments

Comments
 (0)