Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions historyserver/pkg/eventserver/eventserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,3 +915,32 @@ func (h *EventHandler) GetJobByJobID(clusterName, jobID string) (types.Job, bool
}
return job.DeepCopy(), true
}

func (h *EventHandler) GetJobLogByJobID(clusterName, jobID string) (io.Reader, error) {
h.ClusterJobMap.RLock()
h.ClusterJobMap.RLock()
defer h.ClusterJobMap.RUnlock()

jobMap, ok := h.ClusterJobMap.ClusterJobMap[clusterName]
if !ok {
return nil, fmt.Errorf("Cluster %s does not exist and could not be found", clusterName)
}

jobMap.Lock()
defer jobMap.Unlock()

job, ok := jobMap.JobMap[jobID]
if !ok {
return nil, fmt.Errorf("Job %s does not exist and could not be found", jobID)
}

// construct the job log filename
jobLogFileName := fmt.Sprintf("job-driver-%s.log", job.SubmissionID)

reader := h.reader.GetContent(clusterName, jobLogFileName)
if reader == nil {
return nil, fmt.Errorf("Failed to get content of job %s with filename %s", jobID, jobLogFileName)
}

return reader, nil
}
51 changes: 46 additions & 5 deletions historyserver/pkg/historyserver/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func routerAPI(s *ServerHandler) {
Param(ws.PathParameter("job_id", "job_id")).
Writes("")) // Placeholder for specific return type

ws.Route(ws.GET("/jobs/{job_id}/logs").To(s.getJobLog).Filter(s.CookieHandle).
Doc("get log for job").
Param(ws.PathParameter("job_id", "job_id")).
Writes("")) // Placeholder for specific return type

ws.Route(ws.GET("/data/datasets/{job_id}").To(s.getDatasets).Filter(s.CookieHandle).
Doc("get datasets").
Param(ws.PathParameter("job_id", "job_id")).
Expand Down Expand Up @@ -375,9 +380,7 @@ func (s *ServerHandler) getJobs(req *restful.Request, resp *restful.Response) {
formattedJobs = append(formattedJobs, formatJobForResponse(job))
}

response := formattedJobs

respData, err := json.Marshal(response)
respData, err := json.Marshal(formattedJobs)
if err != nil {
logrus.Errorf("Failed to marshal jobs response: %v", err)
resp.WriteErrorString(http.StatusInternalServerError, err.Error())
Expand Down Expand Up @@ -446,10 +449,8 @@ func (s *ServerHandler) getJob(req *restful.Request, resp *restful.Response) {
}

jobID := req.PathParameter("job_id")

clusterSessionKey := utils.BuildClusterSessionKey(clusterName, clusterNamespace, sessionName)
job, found := s.eventHandler.GetJobByJobID(clusterSessionKey, jobID)

if !found {
responseString := fmt.Sprintf("Job %s does not exist", jobID)
resp.Write([]byte(responseString))
Expand All @@ -466,6 +467,46 @@ func (s *ServerHandler) getJob(req *restful.Request, resp *restful.Response) {

}

func (s *ServerHandler) getJobLog(req *restful.Request, resp *restful.Response) {
clusterName := req.Attribute(COOKIE_CLUSTER_NAME_KEY).(string)
clusterNamespace := req.Attribute(COOKIE_CLUSTER_NAMESPACE_KEY).(string)
sessionName := req.Attribute(COOKIE_SESSION_NAME_KEY).(string)
if sessionName == "live" {
s.redirectRequest(req, resp)
return
}

jobID := req.PathParameter("job_id")
clusterSessionKey := utils.BuildClusterSessionKey(clusterName, clusterNamespace, sessionName)

reader, err := s.eventHandler.GetJobLogByJobID(clusterSessionKey, jobID)
if err != nil {
responseString := fmt.Sprintf("Failed to get log for Job %s, error: %v", jobID, err)
resp.Write([]byte(responseString))
return
}

logContent, err := io.ReadAll(reader)
if err != nil {
responseString := fmt.Sprintf("Failed to read file content for Job %s, error: %q", jobID, err)
resp.Write([]byte(responseString))
return
}

response := map[string]interface{}{
"logs": logContent,
}

respData, err := json.Marshal(response)
if err != nil {
logrus.Errorf("Failed to marshal jobs response: %v", err)
resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
}

resp.Write(respData)
}

func (s *ServerHandler) getDatasets(req *restful.Request, resp *restful.Response) {
sessionName := req.Attribute(COOKIE_SESSION_NAME_KEY).(string)
if sessionName == "live" {
Expand Down
Loading