-
Notifications
You must be signed in to change notification settings - Fork 681
[history server] Web Server + Event Processor #4329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[history server] Web Server + Event Processor #4329
Conversation
Co-authored-by: chiayi [email protected] Co-authored-by: KunWuLuan [email protected]
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]> Co-authored-by: KunWuLuan <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Future-Outlier
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @chiayi @KunWuLuan
to help review, thank you!
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
| if err != nil { | ||
| logrus.Fatalf("Error starting server: %v", err) | ||
| os.Exit(1) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Graceful shutdown incorrectly treated as fatal error
Medium Severity
When server.Shutdown is called for graceful shutdown, ListenAndServe returns http.ErrServerClosed. The error check if err != nil treats this as a fatal error and calls logrus.Fatalf, causing the program to exit with code 1 even during normal graceful shutdown. The check needs to exclude http.ErrServerClosed from fatal error handling.
Signed-off-by: Future-Outlier <[email protected]>
| const ( | ||
| NIL TaskStatus = "NIL" | ||
| PENDING_ARGS_AVAIL TaskStatus = "PENDING_ARGS_AVAIL" | ||
| PENDING_NODE_ASSIGNMENT TaskStatus = "PENDING_NODE_ASSIGNMENT" | ||
| PENDING_OBJ_STORE_MEM_AVAIL TaskStatus = "PENDING_OBJ_STORE_MEM_AVAIL" | ||
| PENDING_ARGS_FETCH TaskStatus = "PENDING_ARGS_FETCH" | ||
| SUBMITTED_TO_WORKER TaskStatus = "SUBMITTED_TO_WORKER" | ||
| PENDING_ACTOR_TASK_ARGS_FETCH TaskStatus = "PENDING_ACTOR_TASK_ARGS_FETCH" | ||
| PENDING_ACTOR_TASK_ORDERING_OR_CONCURRENCY TaskStatus = "PENDING_ACTOR_TASK_ORDERING_OR_CONCURRENCY" | ||
| RUNNING TaskStatus = "RUNNING" | ||
| RUNNING_IN_RAY_GET TaskStatus = "RUNNING_IN_RAY_GET" | ||
| RUNNING_IN_RAY_WAIT TaskStatus = "RUNNING_IN_RAY_WAIT" | ||
| FINISHED TaskStatus = "FINISHED" | ||
| FAILED TaskStatus = "FAILED" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signed-off-by: Future-Outlier <[email protected]>
| } | ||
| if err := h.storeEvent(currEventData); err != nil { | ||
| return err | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Event processor failure causes event processing to block
High Severity
When storeEvent returns an error (e.g., from a malformed event), ProcessEvents returns the error and the processor goroutine terminates. However, the main event reader loop at line 153 continues sending events to all channels including the dead processor's channel. Once the channel buffer (size 20) fills up, the main loop blocks indefinitely at the send operation. A single corrupted event file in S3 will cause the history server to stop processing any new events. As noted in the PR discussion, this can lead to a crash loop scenario.
Additional Locations (1)
| if storedTask.AttemptNumber < currTask.AttemptNumber { | ||
| storedTask.AttemptNumber = currTask.AttemptNumber | ||
| } | ||
| clusterTaskMapObject.TaskMap[taskId] = storedTask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Task update discards all fields except attempt number
Medium Severity
When a task event arrives with a higher AttemptNumber than the stored task, the code updates storedTask.AttemptNumber from currTask but then saves storedTask back to the map instead of currTask. This means all other updated fields from the newer task attempt (NodeID, WorkerID, State, ErrorType, ErrorMessage, etc.) are discarded. Only the attempt number is preserved from the newer event while the rest of the stale data remains, causing incorrect task information to be displayed for retried tasks.
| } | ||
|
|
||
| // Construct the full path to the static directory | ||
| fullPath := filepath.Join(s.dashboardDir, prefix, "static", path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Path traversal vulnerability in static file handler
High Severity
The staticFileHandler constructs file paths using user-controlled input without path traversal validation. Both the path URL parameter and version cookie value are directly used in filepath.Join to build fullPath. An attacker could supply path traversal sequences (e.g., ../../../etc/passwd in the path parameter, or ../../etc in the dashboard_version cookie) to access arbitrary files outside the intended dashboard directory. While filepath.Join cleans the path, it does not prevent escaping the base directory, allowing reads of sensitive files on the server.
Additional Locations (1)
|
LGTM! Just a question you mentioned:
How does the event processor handle data consistency when multiple replicas are deployed? Currently each pod runs its own historyserver with in-memory state. Won't this cause inconsistent responses depending on which pod handles the request? |
|
todo:
|
yes it will, and this will be solved in the beta version. |
Co-authored-by: @chiayi [email protected]
Co-authored-by: @KunWuLuan [email protected]
Why are these changes needed?
This web server serves the history server's frontend and fetches data from the event server (processor).
As a follow-up, we should enable autoscaling for the web server using Kubernetes HPA.
Note: most code is copied from this branch #4187 , and this branch #4253
diagram
How it works in high level.
flowchart TB subgraph storage [MinIO S3 Storage] S3[(S3 Bucket)] JobEvents[job_events/] NodeEvents[node_events/] end subgraph historyserver [History Server - Modified] Main[main.go] SH[ServerHandler] EH[EventHandler] SR[StorageReader] Main --> SH Main -->|"go EH.Run()"| EH SH -->|"Reference"| EH EH --> SR SR --> S3 SR --> JobEvents SR --> NodeEvents EH --> TaskMap[ClusterTaskMap] EH --> ActorMap[ClusterActorMap] end subgraph endpoints [New Endpoints] T1["/api/v0/tasks (from EventHandler)"] A1["/logical/actors (from EventHandler)"] end SH --> T1 SH --> A1 T1 -.->|"Query"| TaskMap A1 -.->|"Query"| ActorMapScreenshot proof
take
http://localhost:8080/api/v0/tasksas example.logs in the historyserver
How to test and develop in your local env
Related issue number
#3966
Checks