Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions api/handler/finetune_ce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,62 @@ func TestFinetuneHandler_ReadLogNonStream_Success(t *testing.T) {

tester.ResponseEq(t, 200, tester.OKText, logs)
}

func TestFinetuneHandler_GetLogs_NonStream(t *testing.T) {
tester := NewFinetuneTester(t).WithHandleFunc(func(h *FinetuneHandler) gin.HandlerFunc {
return h.GetLogs
})
tester.WithUser()
tester.WithParam("id", "1")
tester.WithQuery("since", "1hour")

tester.mocks.finetune.EXPECT().CheckUserPermission(tester.Ctx(), types.FinetuneLogReq{
CurrentUser: "u",
ID: 1,
Since: "1hour",
}).Return(true, nil)

logs := "mock logs content"
tester.mocks.finetune.EXPECT().ReadJobLogsNonStream(tester.Ctx(), types.FinetuneLogReq{
CurrentUser: "u",
ID: 1,
Since: "1hour",
}).Return(logs, nil)

tester.Execute()

tester.ResponseEq(t, 200, tester.OKText, logs)
}

func TestFinetuneHandler_GetLogs_InvalidID(t *testing.T) {
tester := NewFinetuneTester(t).WithHandleFunc(func(h *FinetuneHandler) gin.HandlerFunc {
return h.GetLogs
})
tester.WithUser()
tester.WithParam("id", "invalid")

tester.Execute()

// The actual error message contains the parsing error details
tester.ResponseEq(t, 400, "strconv.ParseInt: parsing \"invalid\": invalid syntax", nil)
}

func TestFinetuneHandler_GetLogs_PermissionDenied(t *testing.T) {
tester := NewFinetuneTester(t).WithHandleFunc(func(h *FinetuneHandler) gin.HandlerFunc {
return h.GetLogs
})
tester.WithUser()
tester.WithParam("id", "1")
tester.WithQuery("since", "1hour")

tester.mocks.finetune.EXPECT().CheckUserPermission(tester.Ctx(), types.FinetuneLogReq{
CurrentUser: "u",
ID: 1,
Since: "1hour",
}).Return(false, nil)

tester.Execute()

// The actual error message is "user not allowed to read finetune job logs"
tester.ResponseEq(t, 403, "user not allowed to read finetune job logs", nil)
}
103 changes: 103 additions & 0 deletions api/handler/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -1913,6 +1913,109 @@ func (h *RepoHandler) DeployInstanceLogs(ctx *gin.Context) {
}
}

// FinetuneInstanceLogs godoc
// @Security ApiKey
// @Summary get finetune instance logs
// @Tags Repository
// @Accept json
// @Produce json
// @Param namespace path string true "namespace"
// @Param name path string true "name"
// @Param id path string true "id"
// @Param instance path string true "instance"
// @Param current_user query string true "current_user"
// @Param since query string false "since time. Optional values: 10mins, 30mins, 1hour, 6hours, 1day, 2days, 1week"
// @Failure 400 {object} types.APIBadRequest "Bad request"
// @Failure 401 {object} types.APIUnauthorized "Permission denied"
// @Failure 500 {object} types.APIInternalServerError "Internal server error"
// @Router /models/{namespace}/{name}/finetune/{id}/logs/{instance} [get]
func (h *RepoHandler) FinetuneInstanceLogs(ctx *gin.Context) {
if ctx.Query("test") == "true" {
h.testLogs(ctx)
return
}
namespace, name, err := common.GetNamespaceAndNameFromContext(ctx)
if err != nil {
slog.ErrorContext(ctx.Request.Context(), "failed to get namespace and name from context", "error", err)
httpbase.BadRequest(ctx, err.Error())
return
}

currentUser := httpbase.GetCurrentUser(ctx)
repoType := common.RepoTypeFromContext(ctx)
deployID, err := strconv.ParseInt(ctx.Param("id"), 10, 64)
if err != nil {
slog.ErrorContext(ctx.Request.Context(), "Bad request format", "error", err)
httpbase.BadRequest(ctx, err.Error())
return
}
instance := ctx.Param("instance")
if len(instance) < 1 {
httpbase.UnauthorizedError(ctx, errors.New("fail to get deploy instance"))
return
}
ctx.Writer.Header().Set("Content-Type", "text/event-stream")
ctx.Writer.Header().Set("Cache-Control", "no-cache")
ctx.Writer.Header().Set("Connection", "keep-alive")
ctx.Writer.Header().Set("Transfer-Encoding", "chunked")

logReq := types.DeployActReq{
RepoType: repoType,
Namespace: namespace,
Name: name,
CurrentUser: currentUser,
DeployID: deployID,
DeployType: types.FinetuneType,
InstanceName: instance,
Since: ctx.Query("since"),
}

// user http request context instead of gin context, so that server knows the life cycle of the request
logReader, err := h.c.DeployInstanceLogs(ctx.Request.Context(), logReq)
if err != nil {
if errors.Is(err, errorx.ErrForbidden) {
slog.Warn("not allowed to get instance logs", slog.Any("error", err), slog.Any("req", logReq))
httpbase.ForbiddenError(ctx, err)
return
}

slog.ErrorContext(ctx.Request.Context(), "failed to get instance logs", slog.Any("error", err), slog.Any("req", logReq))
httpbase.ServerError(ctx, err)
return
}

if logReader.RunLog() == nil {
httpbase.ServerError(ctx, errors.New("don't find any deploy instance log"))
return
}

// to quickly respond the http request
ctx.Writer.WriteHeader(http.StatusOK)
ctx.Writer.Flush()

heartbeatTicker := time.NewTicker(30 * time.Second)
defer heartbeatTicker.Stop()
for {
select {
case <-ctx.Request.Context().Done():
slog.Debug("repo handler logs request context done", slog.Any("error", ctx.Request.Context().Err()))
return
case data, ok := <-logReader.RunLog():
if ok {
ctx.SSEvent("Container", string(data))
ctx.Writer.Flush()
}
case <-heartbeatTicker.C:
// Send a heartbeat message
ctx.SSEvent("Heartbeat", "keep-alive")
ctx.Writer.Flush()
default:
// Add a small sleep to prevent CPU spinning when no data is available
time.Sleep(time.Second * 1)
}
}
}

func (h *RepoHandler) testLogs(ctx *gin.Context) {
ctx.Writer.Header().Set("Content-Type", "text/event-stream")
ctx.Writer.Header().Set("Cache-Control", "no-cache")
Expand Down
2 changes: 2 additions & 0 deletions api/router/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,8 @@ func createModelRoutes(config *config.Config,
modelsDeployGroup.PUT("/:namespace/:name/finetune/:id/start", modelHandler.FinetuneStart)
// delete a finetune instance
modelsDeployGroup.DELETE("/:namespace/:name/finetune/:id", modelHandler.FinetuneDelete)
// get finetune instance logs
modelsDeployGroup.GET("/:namespace/:name/finetune/:id/logs/:instance", repoCommonHandler.FinetuneInstanceLogs)
}

modelsMonitorGroup := modelsGroup.Group("")
Expand Down
Loading