diff --git a/common/log/context_handler.go b/common/log/context_handler.go index c1983b01a..8154feca8 100644 --- a/common/log/context_handler.go +++ b/common/log/context_handler.go @@ -2,7 +2,10 @@ package log import ( "context" + "fmt" "log/slog" + "path/filepath" + "runtime" "opencsg.com/csghub-server/common/utils/trace" ) @@ -20,5 +23,15 @@ func (h *ContextHandler) Handle(ctx context.Context, r slog.Record) error { if sessionID := trace.GetSessionIDFromContext(ctx); sessionID != "" { r.AddAttrs(slog.String("xnet_session_id", sessionID)) } + + if r.Level == slog.LevelError || r.Level == slog.LevelDebug { + if r.PC != 0 { + fs := runtime.CallersFrames([]uintptr{r.PC}) + f, _ := fs.Next() + shortFile := filepath.Base(f.File) + r.AddAttrs(slog.String("source", fmt.Sprintf("%s:%d", shortFile, f.Line))) + } + } + return h.Handler.Handle(ctx, r) } diff --git a/common/log/context_handler_test.go b/common/log/context_handler_test.go new file mode 100644 index 000000000..3763146e3 --- /dev/null +++ b/common/log/context_handler_test.go @@ -0,0 +1,81 @@ +package log + +import ( + "bytes" + "context" + "encoding/json" + "log/slog" + "os" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/trace" + csgtrace "opencsg.com/csghub-server/common/utils/trace" +) + +func TestContextHandler_Handle(t *testing.T) { + // Setup buffer to capture logs + var buf bytes.Buffer + // Use JSONHandler to easily parse output + jsonHandler := slog.NewJSONHandler(&buf, &slog.HandlerOptions{ + // Remove time to make testing easier or just ignore it in assertion + ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { + if a.Key == slog.TimeKey { + return slog.Attr{} + } + return a + }, + }) + + h := &ContextHandler{Handler: jsonHandler} + logger := slog.New(h) + + // Case 1: Context with TraceID (via OTEL) and SessionID + ctx := context.Background() + + // Generate a valid TraceID and SpanID for OTEL + traceIDStr := "4bf92f3577b34da6a3ce929d0e0e4736" + traceID, err := trace.TraceIDFromHex(traceIDStr) + require.NoError(t, err) + + spanIDStr := "00f067aa0ba902b7" + spanID, err := trace.SpanIDFromHex(spanIDStr) + require.NoError(t, err) + + spanCtx := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + SpanID: spanID, + TraceFlags: trace.FlagsSampled, + }) + // Inject SpanContext into context + ctx = trace.ContextWithSpanContext(ctx, spanCtx) + + // Inject SessionID + sessionID := "sess-12345" + ctx = csgtrace.SetSessionIDInContext(ctx, sessionID) + + // Log something + logger.ErrorContext(ctx, "test message") + + // Parse result + var result map[string]interface{} + err = json.Unmarshal(buf.Bytes(), &result) + require.NoError(t, err) + + // Verify fields + require.Equal(t, "test message", result["msg"]) + require.Equal(t, traceIDStr, result["trace_id"]) + require.Equal(t, sessionID, result["xnet_session_id"]) + + // Verify source + source, ok := result["source"].(string) + require.True(t, ok, "source field should be present and string") + + // Check that source contains the filename + // Since we are running the test, the source file should be this file. + require.Contains(t, source, "context_handler_test.go") + + wd, _ := os.Getwd() + t.Logf("Working Dir: %s", wd) + t.Logf("Source Logged: %s", source) +} diff --git a/component/git_http.go b/component/git_http.go index fbbcfd41b..100a0916d 100644 --- a/component/git_http.go +++ b/component/git_http.go @@ -219,12 +219,23 @@ func (c *gitHTTPComponentImpl) lfsBatchDownloadInfo(ctx context.Context, req typ Pointer: obj, } reqParams := make(url.Values) - url, err := c.s3Client.PresignedGetObject(ctx, c.config.S3.Bucket, objectKey, types.OssFileExpire, reqParams) - if err != nil { - objs = append(objs, &types.ObjectResponse{ - Error: &types.ObjectError{}, - }) - continue + var url *url.URL + if repo.XnetEnabled { + url, err = c.xnetClient.PresignedGetObject(ctx, objectKey, types.OssFileExpire, reqParams) + if err != nil { + objs = append(objs, &types.ObjectResponse{ + Error: &types.ObjectError{}, + }) + continue + } + } else { + url, err = c.s3Client.PresignedGetObject(ctx, c.config.S3.Bucket, objectKey, types.OssFileExpire, reqParams) + if err != nil { + objs = append(objs, &types.ObjectResponse{ + Error: &types.ObjectError{}, + }) + continue + } } resp.Actions["download"] = &types.Link{Href: url.String(), Header: map[string]any{}} objs = append(objs, resp) @@ -250,11 +261,22 @@ func (c *gitHTTPComponentImpl) lfsBatchUploadInfo(ctx context.Context, req types } useMultipart := slices.Contains(req.Transfers, "multipart") - - if useMultipart { + useXnet := false + transfer = "basic" + if repo.XnetEnabled && slices.Contains(req.Transfers, "xet") { + transfer = "xet" + useXnet = true + } else if useMultipart { transfer = "multipart" } + if useXnet { + header["X-Xet-Cas-Url"] = c.config.Xnet.Endpoint + header["X-Xet-Access-Token"] = "nosniff" + header["X-Xet-Token-Expiration"] = "" + header["X-Xet-Session-Id"] = "1" + } + for _, obj := range req.Objects { // for existing lfs files, return pointer only and no action, // this is the expeted format when file exists and doesn't diff --git a/component/git_http_test.go b/component/git_http_test.go index 625623c73..e9b616d47 100644 --- a/component/git_http_test.go +++ b/component/git_http_test.go @@ -213,6 +213,7 @@ func TestGitHTTPComponent_Batch(t *testing.T) { operation: types.LFSBatchUpload, hasWriteAccess: true, resp: &types.BatchResponse{ + Transfer: "basic", Objects: []*types.ObjectResponse{ { Pointer: types.Pointer{Oid: notExistOID, Size: 100},