Skip to content

Commit 4540f18

Browse files
[history server][collector] Fix getJobID for job event collection (#4342)
* [historyserver] Fix getJobID for job event collection Signed-off-by: Future-Outlier <[email protected]> * add jia-wei as co-author, since he debug with me together Signed-off-by: Future-Outlier <[email protected]> Co-authored-by: Jia-Wei Jiang <[email protected]> * remove unused code Signed-off-by: Future-Outlier <[email protected]> * update rueian's advice Signed-off-by: Future-Outlier <[email protected]> * add task profile event example Signed-off-by: Future-Outlier <[email protected]> * revert back oneof solution Signed-off-by: Future-Outlier <[email protected]> * add task profile event Signed-off-by: Future-Outlier <[email protected]> * update rueian's advice Signed-off-by: Future-Outlier <[email protected]> * a worked version in ray 2.52.0 Signed-off-by: Future-Outlier <[email protected]> --------- Signed-off-by: Future-Outlier <[email protected]> Co-authored-by: Jia-Wei Jiang <[email protected]>
1 parent d26dbfa commit 4540f18

File tree

3 files changed

+73
-5
lines changed

3 files changed

+73
-5
lines changed

historyserver/config/raycluster.yaml

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,18 @@ spec:
2020
affinity:
2121
containers:
2222
- env:
23+
- name: RAY_enable_ray_event
24+
value: "true"
2325
- name: RAY_enable_core_worker_ray_event_to_aggregator
24-
value: "1"
26+
value: "true"
2527
- name: RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR
2628
value: "http://localhost:8084/v1/events"
29+
# in ray 2.52.0, we need to set RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES
30+
# in ray 2.53.0 (noy yet done). we need to set RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISHER_HTTP_ENDPOINT_EXPOSABLE_EVENT_TYPES
31+
- name: RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES
32+
value: "TASK_DEFINITION_EVENT,TASK_LIFECYCLE_EVENT,ACTOR_TASK_DEFINITION_EVENT,
33+
TASK_PROFILE_EVENT,DRIVER_JOB_DEFINITION_EVENT,DRIVER_JOB_LIFECYCLE_EVENT,
34+
ACTOR_DEFINITION_EVENT,ACTOR_LIFECYCLE_EVENT,NODE_DEFINITION_EVENT,NODE_LIFECYCLE_EVENT"
2735
image: rayproject/ray:2.52.0
2836
imagePullPolicy: IfNotPresent
2937
command:
@@ -117,10 +125,18 @@ spec:
117125
imagePullSecrets:
118126
containers:
119127
- env:
128+
- name: RAY_enable_ray_event
129+
value: "true"
120130
- name: RAY_enable_core_worker_ray_event_to_aggregator
121-
value: "1"
131+
value: "true"
122132
- name: RAY_DASHBOARD_AGGREGATOR_AGENT_EVENTS_EXPORT_ADDR
123133
value: "http://localhost:8084/v1/events"
134+
# in ray 2.52.0, we need to set RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES
135+
# in ray 2.53.0 (not yet done). we need to set RAY_DASHBOARD_AGGREGATOR_AGENT_PUBLISHER_HTTP_ENDPOINT_EXPOSABLE_EVENT_TYPES
136+
- name: RAY_DASHBOARD_AGGREGATOR_AGENT_EXPOSABLE_EVENT_TYPES
137+
value: "TASK_DEFINITION_EVENT,TASK_LIFECYCLE_EVENT,ACTOR_TASK_DEFINITION_EVENT,
138+
TASK_PROFILE_EVENT,DRIVER_JOB_DEFINITION_EVENT,DRIVER_JOB_LIFECYCLE_EVENT,
139+
ACTOR_DEFINITION_EVENT,ACTOR_LIFECYCLE_EVENT,NODE_DEFINITION_EVENT,NODE_LIFECYCLE_EVENT"
124140
image: rayproject/ray:2.52.0
125141
command:
126142
- 'echo "=========================================="; [ -d "/tmp/ray/session_latest" ] && dest="/tmp/ray/prev-logs/$(basename $(readlink /tmp/ray/session_latest))/$(cat /tmp/ray/raylet_node_id)" && echo "dst is $dest" && mkdir -p "$dest" && mv /tmp/ray/session_latest/logs "$dest/logs"; echo "========================================="'

historyserver/config/rayjob.yaml

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,39 @@ kind: RayJob
33
metadata:
44
name: rayjob
55
spec:
6-
entrypoint: python -c "import ray; ray.init(); print(ray.cluster_resources())"
6+
entrypoint: |
7+
python -c "
8+
import ray
9+
ray.init()
10+
11+
@ray.remote
12+
def my_task(x):
13+
return x * 2
14+
15+
@ray.remote
16+
class Counter:
17+
def __init__(self):
18+
self.count = 0
19+
20+
def increment(self):
21+
self.count += 1
22+
return self.count
23+
24+
def get_count(self):
25+
return self.count
26+
27+
task_result = ray.get(my_task.remote(1))
28+
print(f'Task result: {task_result}')
29+
30+
counter = Counter.remote()
31+
for i in range(1):
32+
count = ray.get(counter.increment.remote())
33+
print(f'Counter: {count}')
34+
35+
final_count = ray.get(counter.get_count.remote())
36+
print(f'Final count: {final_count}')
37+
print(f'Cluster resources: {ray.cluster_resources()}')
38+
"
739
# Select the existing Ray cluster running the collector.
840
clusterSelector:
941
ray.io/cluster: raycluster-historyserver

historyserver/pkg/collector/eventserver/eventserver.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,21 @@ type EventServer struct {
4545
mutex sync.Mutex
4646
}
4747

48+
var eventTypesWithJobID = []string{
49+
// Job Events (Driver Job)
50+
"driverJobDefinitionEvent",
51+
"driverJobLifecycleEvent",
52+
53+
// Task Events (Normal Task)
54+
"taskDefinitionEvent",
55+
"taskLifecycleEvent",
56+
"taskProfileEvents",
57+
58+
// Actor Events (Actor Task + Actor Definition)
59+
"actorTaskDefinitionEvent",
60+
"actorDefinitionEvent",
61+
}
62+
4863
func NewEventServer(writer storage.StorageWriter, rootDir, sessionDir, nodeID, clusterName, clusterID, sessionName string) *EventServer {
4964
server := &EventServer{
5065
events: make([]Event, 0),
@@ -408,9 +423,14 @@ func (es *EventServer) isNodeEvent(eventData map[string]interface{}) bool {
408423

409424
// getJobID gets jobID associated with event
410425
func (es *EventServer) getJobID(eventData map[string]interface{}) string {
411-
if jobID, hasJob := eventData["jobId"]; hasJob && jobID != "" {
412-
return fmt.Sprintf("%v", jobID)
426+
for _, eventType := range eventTypesWithJobID {
427+
if nestedEvent, ok := eventData[eventType].(map[string]interface{}); ok {
428+
if jobID, hasJob := nestedEvent["jobId"]; hasJob && jobID != "" {
429+
return fmt.Sprintf("%v", jobID)
430+
}
431+
}
413432
}
433+
414434
return ""
415435
}
416436

0 commit comments

Comments
 (0)