Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions api/handler/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -3065,3 +3065,96 @@ func (h *RepoHandler) GetRepos(ctx *gin.Context) {
slog.String("search", search))
httpbase.OK(ctx, repos)
}

// GetInferenceLogsByVersion godoc
// @Security ApiKey
// @Summary get serverless logs by version (commitid)
// @Tags Repository
// @Accept json
// @Produce json
// @Param repo_type path string true "models,spaces" Enums(models,spaces)
// @Param namespace path string true "namespace"
// @Param name path string true "name"
// @Param id path string true "id"
// @Param commit_id path string true "commit_id"
// @Param current_user query string true "current_user"
// @Param since query string false "since time. Optional values: 10mins, 30mins, 1hour, 6hours, 1day, 2days, 1week"
// @Failure 400 {object} types.APIBadRequest "Bad request"
// @Failure 401 {object} types.APIUnauthorized "Permission denied"
// @Failure 500 {object} types.APIInternalServerError "Internal server error"
// @Router /{repo_type}/{namespace}/{name}/serverless/{id}/versions/{commit_id} [get]
func (h *RepoHandler) ServerlessVersionLogs(ctx *gin.Context) {
namespace, name, err := common.GetNamespaceAndNameFromContext(ctx)
if err != nil {
slog.ErrorContext(ctx.Request.Context(), "failed to get namespace and name from context", "error", err)
httpbase.NotFoundError(ctx, err)
return
}

currentUser := httpbase.GetCurrentUser(ctx)
repoType := common.RepoTypeFromContext(ctx)
deployID, err := strconv.ParseInt(ctx.Param("id"), 10, 64)
if err != nil {
slog.ErrorContext(ctx.Request.Context(), "Bad request format", "error", err)
httpbase.BadRequest(ctx, "Invalid deploy ID format")
return
}
commitID := ctx.Param("commit_id")
instance := ctx.Query("instance_name")
logReq := types.DeployActReq{
RepoType: repoType,
Namespace: namespace,
Name: name,
CurrentUser: currentUser,
DeployID: deployID,
DeployType: types.ServerlessType,
InstanceName: instance,
Since: ctx.Query("since"),
CommitID: commitID,
}

logReader, err := h.c.DeployInstanceLogs(ctx.Request.Context(), logReq)
if err != nil {
if errors.Is(err, errorx.ErrForbidden) {
slog.ErrorContext(ctx.Request.Context(), "user not allowed to get serverless deploy logs", slog.Any("logReq", logReq), slog.Any("error", err))
httpbase.ForbiddenError(ctx, err)
return
}

slog.ErrorContext(ctx.Request.Context(), "Failed to get serverless deploy logs", slog.Any("logReq", logReq), slog.Any("error", err))
httpbase.ServerError(ctx, err)
return
}

if logReader.RunLog() == nil {
httpbase.ServerError(ctx, errors.New("don't find any deploy instance log"))
return
}

ctx.Writer.Header().Set("Content-Type", "text/event-stream")
ctx.Writer.Header().Set("Cache-Control", "no-cache")
ctx.Writer.Header().Set("Connection", "keep-alive")
ctx.Writer.Header().Set("Transfer-Encoding", "chunked")
ctx.Writer.WriteHeader(http.StatusOK)
ctx.Writer.Flush()

heartbeatTicker := time.NewTicker(30 * time.Second)
defer heartbeatTicker.Stop()
for {
select {
case <-ctx.Request.Context().Done():
slog.Debug("repo handler logs request context done", slog.Any("error", ctx.Request.Context().Err()))
return
case data, ok := <-logReader.RunLog():
if ok {
ctx.SSEvent("Container", string(data))
ctx.Writer.Flush()
}
case <-heartbeatTicker.C:
ctx.SSEvent("Heartbeat", "keep-alive")
ctx.Writer.Flush()
default:
time.Sleep(time.Second * 1)
}
}
}
195 changes: 195 additions & 0 deletions api/handler/repo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,201 @@ func TestRepoHandler_CreateFile(t *testing.T) {

}

func TestRepoHandler_ServerlessVersionLogs(t *testing.T) {
// Test case: Invalid deploy ID format
t.Run("invalid_deploy_id", func(t *testing.T) {
tester := NewRepoTester(t).WithHandleFunc(func(rp *RepoHandler) gin.HandlerFunc {
return rp.ServerlessVersionLogs
})
tester.WithUser()

// Set invalid deploy ID
tester.WithKV("repo_type", types.ModelRepo)
tester.WithParam("id", "invalid")
tester.WithParam("commit_id", "test-commit-id")

// Execute request
tester.Execute()

// Verify response
require.Equal(t, http.StatusBadRequest, tester.Response().Code)
require.Contains(t, tester.Response().Body.String(), "Invalid deploy ID format")
})

// Test case: DeployInstanceLogs returns error
t.Run("deploy_instance_logs_error", func(t *testing.T) {
tester := NewRepoTester(t).WithHandleFunc(func(rp *RepoHandler) gin.HandlerFunc {
return rp.ServerlessVersionLogs
})
tester.WithUser()

// Set valid parameters
tester.WithKV("repo_type", types.ModelRepo)
tester.WithParam("id", "123")
tester.WithParam("commit_id", "test-commit-id")

// Mock error response
expectedDeployID := int64(123)
tester.mocks.repo.EXPECT().DeployInstanceLogs(mock.Anything, types.DeployActReq{
RepoType: types.ModelRepo,
Namespace: "u",
Name: "r",
CurrentUser: "u",
DeployID: expectedDeployID,
DeployType: types.ServerlessType,
InstanceName: "",
Since: "",
CommitID: "test-commit-id",
}).Return(nil, errors.New("internal server error"))

// Execute request
tester.Execute()

// Verify response
require.Equal(t, http.StatusInternalServerError, tester.Response().Code)
})

// Test case: DeployInstanceLogs returns forbidden error
t.Run("deploy_instance_logs_forbidden", func(t *testing.T) {
tester := NewRepoTester(t).WithHandleFunc(func(rp *RepoHandler) gin.HandlerFunc {
return rp.ServerlessVersionLogs
})
tester.WithUser()

// Set valid parameters
tester.WithKV("repo_type", types.ModelRepo)
tester.WithParam("id", "123")
tester.WithParam("commit_id", "test-commit-id")

// Mock forbidden error response
expectedDeployID := int64(123)
tester.mocks.repo.EXPECT().DeployInstanceLogs(mock.Anything, types.DeployActReq{
RepoType: types.ModelRepo,
Namespace: "u",
Name: "r",
CurrentUser: "u",
DeployID: expectedDeployID,
DeployType: types.ServerlessType,
InstanceName: "",
Since: "",
CommitID: "test-commit-id",
}).Return(nil, errorx.ErrForbidden)

// Execute request
tester.Execute()

// Verify response
require.Equal(t, http.StatusForbidden, tester.Response().Code)
})

// Test case: No logs found
t.Run("no_logs_found", func(t *testing.T) {
tester := NewRepoTester(t).WithHandleFunc(func(rp *RepoHandler) gin.HandlerFunc {
return rp.ServerlessVersionLogs
})
tester.WithUser()

// Set valid parameters
tester.WithKV("repo_type", types.ModelRepo)
tester.WithParam("id", "123")
tester.WithParam("commit_id", "test-commit-id")

// Mock empty logs response
expectedDeployID := int64(123)
buildLogChan := make(chan string)
close(buildLogChan)
// Create a MultiLogReader with nil runLogs
// Note: We're passing a nil channel directly
var nilRunLogChan <-chan string
multiLogReader := deploy.NewMultiLogReader(buildLogChan, nilRunLogChan)

tester.mocks.repo.EXPECT().DeployInstanceLogs(mock.Anything, types.DeployActReq{
RepoType: types.ModelRepo,
Namespace: "u",
Name: "r",
CurrentUser: "u",
DeployID: expectedDeployID,
DeployType: types.ServerlessType,
InstanceName: "",
Since: "",
CommitID: "test-commit-id",
}).Return(multiLogReader, nil)

// Execute request
tester.Execute()

// Verify response
require.Equal(t, http.StatusInternalServerError, tester.Response().Code)
require.Contains(t, tester.Response().Body.String(), "don't find any deploy instance log")
})

// Test case: Normal case with all parameters
t.Run("normal_case_all_params", func(t *testing.T) {
tester := NewRepoTester(t).WithHandleFunc(func(rp *RepoHandler) gin.HandlerFunc {
return rp.ServerlessVersionLogs
})
tester.WithUser()

// Set valid parameters
tester.WithKV("repo_type", types.ModelRepo)
tester.WithParam("id", "123")
tester.WithParam("commit_id", "test-commit-id")
tester.WithQuery("instance_name", "instance-1")
tester.WithQuery("since", "1h")

// Mock response with a timeout
expectedDeployID := int64(123)
buildLogChan := make(chan string)
runLogChan := make(chan string)

// Create multi log reader
multiLogReader := deploy.NewMultiLogReader(buildLogChan, runLogChan)

// Mock the expected call
tester.mocks.repo.EXPECT().DeployInstanceLogs(mock.Anything, types.DeployActReq{
RepoType: types.ModelRepo,
Namespace: "u",
Name: "r",
CurrentUser: "u",
DeployID: expectedDeployID,
DeployType: types.ServerlessType,
InstanceName: "instance-1",
Since: "1h",
CommitID: "test-commit-id",
}).Return(multiLogReader, nil)

// Set a timeout for the request
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()

// Execute request in a goroutine with timeout
done := make(chan bool)
go func() {
tester.Execute()
done <- true
}()

// Wait for either the request to complete or the timeout
select {
case <-done:
// Request completed successfully
case <-ctx.Done():
// Timeout occurred, which is expected for SSE streaming
}

// Verify that the mock was called

// Close channels to clean up
close(buildLogChan)
close(runLogChan)
})
}

func TestRepoHandler_ServerlessVersionLogs_SSE(t *testing.T) {
// This is a more advanced test that would require capturing SSE events
// For simplicity, we'll test the basic functionality in the main test function
}

func TestRepoHandler_UpdateFile(t *testing.T) {
tester := NewRepoTester(t).WithHandleFunc(func(rp *RepoHandler) gin.HandlerFunc {
return rp.UpdateFile
Expand Down
1 change: 1 addition & 0 deletions api/router/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ func createModelRoutes(config *config.Config,
modelsServerlessGroup.GET("/:namespace/:name/serverless/:id", middlewareCollection.Auth.NeedAdmin, repoCommonHandler.ServerlessDetail)
modelsServerlessGroup.GET("/:namespace/:name/serverless/:id/status", middlewareCollection.Auth.NeedAdmin, repoCommonHandler.ServerlessStatus)
modelsServerlessGroup.GET("/:namespace/:name/serverless/:id/logs/:instance", middlewareCollection.Auth.NeedAdmin, repoCommonHandler.ServerlessLogs)
modelsServerlessGroup.GET("/:namespace/:name/serverless/:id/versions/:commit_id", middlewareCollection.Auth.NeedAdmin, repoCommonHandler.ServerlessVersionLogs)
modelsServerlessGroup.PUT("/:namespace/:name/serverless/:id", middlewareCollection.Auth.NeedAdmin, repoCommonHandler.ServerlessUpdate)
}
{
Expand Down
4 changes: 4 additions & 0 deletions builder/deploy/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,10 @@ func (d *deployer) InstanceLogs(ctx context.Context, dr types.DeployRepo) (*Mult
types.LogLabelTypeKey: types.LogLabelDeploy,
types.StreamKeyDeployID: fmt.Sprintf("%d", deploy.ID),
}
if dr.CommitID != "" {
labels[types.StreamKeyDeployCommitID] = dr.CommitID
}

if dr.InstanceName != "" {
labels[types.StreamKeyInstanceName] = dr.InstanceName
}
Expand Down
3 changes: 2 additions & 1 deletion common/types/logcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ const (
StreamKeyDeployTypeID = "csghub_deploy_type_id"
StreamKeyDeployTaskID = "csghub_deploy_task_id"

StreamKeyInstanceName = "pod_name"
StreamKeyInstanceName = "pod_name"
StreamKeyDeployCommitID = "csghub_deploy_commit_id"
)

type ReportMsg string
Expand Down
1 change: 1 addition & 0 deletions common/types/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ type DeployActReq struct {
DeployType int `json:"deploy_type"`
InstanceName string `json:"instance_name"`
Since string `json:"since,omitempty"`
CommitID string `json:"commit_id"`
}

type DeployUpdateReq struct {
Expand Down
3 changes: 2 additions & 1 deletion common/types/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ type DeployRepo struct {
Message string `json:"message,omitempty"`
SupportFunctionCall bool `json:"support_function_call,omitempty"`

Since string `json:"since,omitempty"`
Since string `json:"since,omitempty"`
CommitID string `json:"commit_id,omitempty"`
}

type RuntimeFrameworkReq struct {
Expand Down
1 change: 1 addition & 0 deletions component/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2708,6 +2708,7 @@ func (c *repoComponentImpl) DeployInstanceLogs(ctx context.Context, logReq types
SvcName: deploy.SvcName,
InstanceName: logReq.InstanceName,
Since: logReq.Since,
CommitID: logReq.CommitID,
})
}

Expand Down
5 changes: 3 additions & 2 deletions runner/component/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ var (
KeyServiceLabel string = "serving.knative.dev/service"
KeyRunModeLabel string = "run-mode"
ValueMultiHost string = "multi-host"
CommitId string = "serving.knative.dev/commit_id"
RevisionName string = "serving.knative.dev/revision"
// CommitId string = "serving.knative.dev/commit_id"
CommitId = types.StreamKeyDeployCommitID
RevisionName string = "serving.knative.dev/revision"
)

type serviceComponentImpl struct {
Expand Down
Loading