diff --git a/.gitignore b/.gitignore index 0c635a4..7a97632 100644 --- a/.gitignore +++ b/.gitignore @@ -28,4 +28,7 @@ code/ input/ live/ -.DS_Store \ No newline at end of file +.DS_Store + +# log files +*.log \ No newline at end of file diff --git a/controller/ea.go b/controller/ea.go index 250fed1..07c760e 100644 --- a/controller/ea.go +++ b/controller/ea.go @@ -11,8 +11,8 @@ import ( ) func CreateEA(res http.ResponseWriter, req *http.Request) { - var logger = util.NewLogger() - logger.Info("CreateEA API called.") + var logger = util.SharedLogger + logger.InfoCtx(req, "CreateEA API called.") // Comment this out to test the API without authentication. user, err := modules.Auth(req) @@ -22,7 +22,7 @@ func CreateEA(res http.ResponseWriter, req *http.Request) { } // User has id, role, userName, email & fullName. - logger.Info(fmt.Sprintf("User: %s", user)) + logger.InfoCtx(req, fmt.Sprintf("User: %s", user)) data, err := util.Body(req) if err != nil { @@ -44,7 +44,7 @@ func CreateEA(res http.ResponseWriter, req *http.Request) { db, err := connection.PoolConn(req.Context()) if err != nil { - logger.Error(fmt.Sprintf("CreateEA: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateEA: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } @@ -66,12 +66,12 @@ func CreateEA(res http.ResponseWriter, req *http.Request) { err = row.Scan(&runID) if err != nil { - logger.Error(fmt.Sprintf("CreateEA.row.Scan: %s", err.Error())) + logger.Error(fmt.Sprintf("CreateEA.row.Scan: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } - logger.Info(fmt.Sprintf("RunID: %s", runID)) + logger.InfoCtx(req, fmt.Sprintf("RunID: %s", runID)) _, err = db.Exec(req.Context(), ` INSERT INTO access (runID, userID, mode) @@ -79,14 +79,14 @@ func CreateEA(res http.ResponseWriter, req *http.Request) { `, runID, user["id"], "write") if err != nil { - logger.Error(fmt.Sprintf("CreateEA.db.Exec: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateEA.db.Exec: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } inputParams, err := json.Marshal(data) if err != nil { - logger.Error(fmt.Sprintf("CreateEA.json.Marshal: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateEA.json.Marshal: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } @@ -94,7 +94,7 @@ func CreateEA(res http.ResponseWriter, req *http.Request) { // Save code and upload to minIO. os.Mkdir("code", 0755) if err := os.WriteFile(fmt.Sprintf("code/%v.py", runID), []byte(code), 0644); err != nil { - logger.Error(fmt.Sprintf("CreateEA.os.WriteFile: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateEA.os.WriteFile: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } @@ -106,7 +106,7 @@ func CreateEA(res http.ResponseWriter, req *http.Request) { // Save input and upload to minIO. os.Mkdir("input", 0755) if err := os.WriteFile(fmt.Sprintf("input/%v.json", runID), inputParams, 0644); err != nil { - logger.Error(fmt.Sprintf("CreateEA.os.WriteFile: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateEA.os.WriteFile: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } @@ -117,12 +117,12 @@ func CreateEA(res http.ResponseWriter, req *http.Request) { // Remove code and input files from local. if err := os.Remove(fmt.Sprintf("code/%v.py", runID)); err != nil { - logger.Error(fmt.Sprintf("CreateEA.os.Remove: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateEA.os.Remove: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } if err := os.Remove(fmt.Sprintf("input/%v.json", runID)); err != nil { - logger.Error(fmt.Sprintf("CreateEA.os.Remove: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateEA.os.Remove: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } diff --git a/controller/gp.go b/controller/gp.go index 5d8e5a1..793ef27 100644 --- a/controller/gp.go +++ b/controller/gp.go @@ -11,8 +11,8 @@ import ( ) func CreateGP(res http.ResponseWriter, req *http.Request) { - var logger = util.NewLogger() - logger.Info("CreateGP API called.") + var logger = util.SharedLogger + logger.InfoCtx(req, "CreateGP API called.") // Comment this out to test the API without authentication. user, err := modules.Auth(req) @@ -22,7 +22,7 @@ func CreateGP(res http.ResponseWriter, req *http.Request) { } // User has id, role, userName, email & fullName. - logger.Info(fmt.Sprintf("User: %s", user)) + logger.InfoCtx(req, fmt.Sprintf("User: %s", user)) data, err := util.Body(req) if err != nil { @@ -44,7 +44,7 @@ func CreateGP(res http.ResponseWriter, req *http.Request) { db, err := connection.PoolConn(req.Context()) if err != nil { - logger.Error(fmt.Sprintf("CreateGP: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateGP: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } @@ -59,12 +59,12 @@ func CreateGP(res http.ResponseWriter, req *http.Request) { err = row.Scan(&runID) if err != nil { - logger.Error(fmt.Sprintf("CreateGP.row.Scan: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateGP.row.Scan: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } - logger.Info(fmt.Sprintf("RunID: %s", runID)) + logger.InfoCtx(req, fmt.Sprintf("RunID: %s", runID)) _, err = db.Exec(req.Context(), ` INSERT INTO access (runID, userID, mode) @@ -72,14 +72,14 @@ func CreateGP(res http.ResponseWriter, req *http.Request) { `, runID, user["id"], "write") if err != nil { - logger.Error(fmt.Sprintf("CreateGP.db.Exec: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateGP.db.Exec: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } inputParams, err := json.Marshal(data) if err != nil { - logger.Error(fmt.Sprintf("CreateGP.json.Marshal: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateGP.json.Marshal: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } @@ -87,7 +87,7 @@ func CreateGP(res http.ResponseWriter, req *http.Request) { // Save code and upload to minIO. os.Mkdir("code", 0755) if err := os.WriteFile(fmt.Sprintf("code/%v.py", runID), []byte(code), 0644); err != nil { - logger.Error(fmt.Sprintf("CreateGP.os.WriteFile: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateGP.os.WriteFile: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } @@ -99,7 +99,7 @@ func CreateGP(res http.ResponseWriter, req *http.Request) { // Save input and upload to minIO. os.Mkdir("input", 0755) if err := os.WriteFile(fmt.Sprintf("input/%v.json", runID), inputParams, 0644); err != nil { - logger.Error(fmt.Sprintf("CreateGP.os.WriteFile: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateGP.os.WriteFile: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } @@ -110,12 +110,12 @@ func CreateGP(res http.ResponseWriter, req *http.Request) { // Remove code and input files from local. if err := os.Remove(fmt.Sprintf("code/%v.py", runID)); err != nil { - logger.Error(fmt.Sprintf("CreateGP.os.Remove: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateGP.os.Remove: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } if err := os.Remove(fmt.Sprintf("input/%v.json", runID)); err != nil { - logger.Error(fmt.Sprintf("CreateGP.os.Remove: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateGP.os.Remove: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } diff --git a/controller/ml.go b/controller/ml.go index 1cc7b54..20c12e1 100644 --- a/controller/ml.go +++ b/controller/ml.go @@ -11,8 +11,8 @@ import ( ) func CreateML(res http.ResponseWriter, req *http.Request) { - var logger = util.NewLogger() - logger.Info("CreateML API called.") + var logger = util.SharedLogger + logger.InfoCtx(req, "CreateML API called.") // Comment this out to test the API without authentication. user, err := modules.Auth(req) @@ -22,7 +22,7 @@ func CreateML(res http.ResponseWriter, req *http.Request) { } // User has id, role, userName, email & fullName. - logger.Info(fmt.Sprintf("User: %s", user)) + logger.InfoCtx(req, fmt.Sprintf("User: %s", user)) data, err := util.Body(req) if err != nil { @@ -44,7 +44,7 @@ func CreateML(res http.ResponseWriter, req *http.Request) { db, err := connection.PoolConn(req.Context()) if err != nil { - logger.Error(fmt.Sprintf("CreateML: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateML: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } @@ -59,12 +59,12 @@ func CreateML(res http.ResponseWriter, req *http.Request) { err = row.Scan(&runID) if err != nil { - logger.Error(fmt.Sprintf("CreateML.row.Scan: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateML.row.Scan: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } - logger.Info(fmt.Sprintf("RunID: %s", runID)) + logger.InfoCtx(req, fmt.Sprintf("RunID: %s", runID)) _, err = db.Exec(req.Context(), ` INSERT INTO access (runID, userID, mode) @@ -72,14 +72,14 @@ func CreateML(res http.ResponseWriter, req *http.Request) { `, runID, user["id"], "write") if err != nil { - logger.Error(fmt.Sprintf("CreateML.db.Exec: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateML.db.Exec: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } inputParams, err := json.Marshal(data) if err != nil { - logger.Error(fmt.Sprintf("CreateML.json.Marshal: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateML.json.Marshal: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } @@ -87,7 +87,7 @@ func CreateML(res http.ResponseWriter, req *http.Request) { // Save code and upload to minIO. os.Mkdir("code", 0755) if err := os.WriteFile(fmt.Sprintf("code/%v.py", runID), []byte(code), 0644); err != nil { - logger.Error(fmt.Sprintf("CreateML.os.WriteFile: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateML.os.WriteFile: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } @@ -99,7 +99,7 @@ func CreateML(res http.ResponseWriter, req *http.Request) { // Save input and upload to minIO. os.Mkdir("input", 0755) if err := os.WriteFile(fmt.Sprintf("input/%v.json", runID), inputParams, 0644); err != nil { - logger.Error(fmt.Sprintf("CreateML.os.WriteFile: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateML.os.WriteFile: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } @@ -110,12 +110,12 @@ func CreateML(res http.ResponseWriter, req *http.Request) { // Remove code and input files from local. if err := os.Remove(fmt.Sprintf("code/%v.py", runID)); err != nil { - logger.Error(fmt.Sprintf("CreateML.os.Remove: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateML.os.Remove: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } if err := os.Remove(fmt.Sprintf("input/%v.json", runID)); err != nil { - logger.Error(fmt.Sprintf("CreateML.os.Remove: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreateML.os.Remove: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } diff --git a/controller/pso.go b/controller/pso.go index 03465f1..b75bf64 100644 --- a/controller/pso.go +++ b/controller/pso.go @@ -11,8 +11,8 @@ import ( ) func CreatePSO(res http.ResponseWriter, req *http.Request) { - var logger = util.NewLogger() - logger.Info("CreatePSO API called.") + var logger = util.SharedLogger + logger.InfoCtx(req, "CreatePSO API called.") // Comment this out to test the API without authentication. user, err := modules.Auth(req) @@ -22,7 +22,7 @@ func CreatePSO(res http.ResponseWriter, req *http.Request) { } // User has id, role, userName, email & fullName. - logger.Info(fmt.Sprintf("User: %s", user)) + logger.InfoCtx(req, fmt.Sprintf("User: %s", user)) data, err := util.Body(req) if err != nil { @@ -44,7 +44,7 @@ func CreatePSO(res http.ResponseWriter, req *http.Request) { db, err := connection.PoolConn(req.Context()) if err != nil { - logger.Error(fmt.Sprintf("CreatePSO: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreatePSO: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } @@ -59,12 +59,12 @@ func CreatePSO(res http.ResponseWriter, req *http.Request) { err = row.Scan(&runID) if err != nil { - logger.Error(fmt.Sprintf("CreatePSO.row.Scan: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreatePSO.row.Scan: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } - logger.Info(fmt.Sprintf("RunID: %s", runID)) + logger.InfoCtx(req, fmt.Sprintf("RunID: %s", runID)) _, err = db.Exec(req.Context(), ` INSERT INTO access (runID, userID, mode) @@ -72,14 +72,14 @@ func CreatePSO(res http.ResponseWriter, req *http.Request) { `, runID, user["id"], "write") if err != nil { - logger.Error(fmt.Sprintf("CreatePSO.db.Exec: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreatePSO.db.Exec: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } inputParams, err := json.Marshal(data) if err != nil { - logger.Error(fmt.Sprintf("CreatePSO.json.Marshal: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreatePSO.json.Marshal: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } @@ -87,7 +87,7 @@ func CreatePSO(res http.ResponseWriter, req *http.Request) { // Save code and upload to minIO. os.Mkdir("code", 0755) if err := os.WriteFile(fmt.Sprintf("code/%v.py", runID), []byte(code), 0644); err != nil { - logger.Error(fmt.Sprintf("CreatePSO.os.WriteFile: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreatePSO.os.WriteFile: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } @@ -99,7 +99,7 @@ func CreatePSO(res http.ResponseWriter, req *http.Request) { // Save input and upload to minIO. os.Mkdir("input", 0755) if err := os.WriteFile(fmt.Sprintf("input/%v.json", runID), inputParams, 0644); err != nil { - logger.Error(fmt.Sprintf("CreatePSO.os.WriteFile: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreatePSO.os.WriteFile: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } @@ -110,12 +110,12 @@ func CreatePSO(res http.ResponseWriter, req *http.Request) { // Remove code and input files from local. if err := os.Remove(fmt.Sprintf("code/%v.py", runID)); err != nil { - logger.Error(fmt.Sprintf("CreatePSO.os.Remove: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreatePSO.os.Remove: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } if err := os.Remove(fmt.Sprintf("input/%v.json", runID)); err != nil { - logger.Error(fmt.Sprintf("CreatePSO.os.Remove: %s", err.Error())) + logger.ErrorCtx(req, fmt.Sprintf("CreatePSO.os.Remove: %s", err.Error()), err) util.JSONResponse(res, http.StatusInternalServerError, "something went wrong", nil) return } diff --git a/controller/run.go b/controller/run.go index 9587b6a..a84ece5 100644 --- a/controller/run.go +++ b/controller/run.go @@ -8,8 +8,8 @@ import ( ) func UserRun(res http.ResponseWriter, req *http.Request) { - var logger = util.NewLogger() - logger.Info("UserRuns API called.") + var logger = util.SharedLogger + logger.InfoCtx(req, "UserRuns API called.") user, err := modules.Auth(req) if err != nil { @@ -18,7 +18,7 @@ func UserRun(res http.ResponseWriter, req *http.Request) { } // User has id, role, userName, email & fullName. - logger.Info(fmt.Sprintf("User: %s", user)) + logger.InfoCtx(req, fmt.Sprintf("User: %s", user)) data, err := util.Body(req) if err != nil { @@ -32,7 +32,7 @@ func UserRun(res http.ResponseWriter, req *http.Request) { return } - logger.Info(fmt.Sprintf("Run: %s", run.RunID)) + logger.InfoCtx(req, fmt.Sprintf("Run: %s", run.RunID)) runData, err := run.UserRun(req.Context(), user["id"], logger) if err != nil { @@ -44,8 +44,8 @@ func UserRun(res http.ResponseWriter, req *http.Request) { } func UserRuns(res http.ResponseWriter, req *http.Request) { - var logger = util.NewLogger() - logger.Info("UserRuns API called.") + var logger = util.SharedLogger + logger.InfoCtx(req, "UserRuns API called.") user, err := modules.Auth(req) if err != nil { @@ -54,7 +54,7 @@ func UserRuns(res http.ResponseWriter, req *http.Request) { } // User has id, role, userName, email & fullName. - logger.Info(fmt.Sprintf("User: %s", user)) + logger.InfoCtx(req, fmt.Sprintf("User: %s", user)) runs, err := modules.UserRuns(req.Context(), user["id"], logger) if err != nil { @@ -66,8 +66,8 @@ func UserRuns(res http.ResponseWriter, req *http.Request) { } func ShareRun(res http.ResponseWriter, req *http.Request) { - var logger = util.NewLogger() - logger.Info("ShareRun API called.") + var logger = util.SharedLogger + logger.InfoCtx(req, "ShareRun API called.") user, err := modules.Auth(req) if err != nil { @@ -76,7 +76,7 @@ func ShareRun(res http.ResponseWriter, req *http.Request) { } // User has id, role, userName, email & fullName. - logger.Info(fmt.Sprintf("User: %s", user)) + logger.InfoCtx(req, fmt.Sprintf("User: %s", user)) data, err := util.Body(req) if err != nil { diff --git a/controller/test.go b/controller/test.go index 9e71fd0..ef3082c 100644 --- a/controller/test.go +++ b/controller/test.go @@ -7,8 +7,8 @@ import ( // Test is a test API for checking the server status. func Test(res http.ResponseWriter, req *http.Request) { - var logger = util.NewLogger() - logger.Info("Test API called.") + var logger = util.SharedLogger + logger.InfoCtx(req, "Test API called.") switch req.Method { case "GET": diff --git a/go.mod b/go.mod index 69071e6..c3a9a52 100644 --- a/go.mod +++ b/go.mod @@ -25,9 +25,12 @@ require ( github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.2.10 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect github.com/minio/crc64nvme v1.0.1 // indirect github.com/minio/md5-simd v1.1.2 // indirect github.com/rs/xid v1.6.0 // indirect + github.com/rs/zerolog v1.34.0 // indirect golang.org/x/crypto v0.37.0 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sync v0.13.0 // indirect diff --git a/go.sum b/go.sum index 1dd885a..5003932 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,7 @@ github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -19,6 +20,7 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= @@ -38,12 +40,18 @@ github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYW github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/minio/crc64nvme v1.0.1 h1:DHQPrYPdqK7jQG/Ls5CTBZWeex/2FMS3G5XGkycuFrY= github.com/minio/crc64nvme v1.0.1/go.mod h1:eVfm2fAzLlxMdUGc0EEBGSMmPwmXD5XiNRpnu9J3bvg= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v7 v7.0.91 h1:tWLZnEfo3OZl5PoXQwcwTAPNNrjyWwOh6cbZitW5JQc= github.com/minio/minio-go/v7 v7.0.91/go.mod h1:uvMUcGrpgeSAAI6+sD3818508nUyMULw94j2Nxku/Go= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI= @@ -52,6 +60,8 @@ github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA= github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= +github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= +github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -75,6 +85,9 @@ golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= diff --git a/main.go b/main.go index 0739ae6..1621547 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,9 @@ var ( ) func main() { + + logger, err := util.InitLogger(os.Getenv("ENV")) + util.SharedLogger = logger PORT = fmt.Sprintf(":%s", os.Getenv("HTTP_PORT")) if PORT == ":" { PORT = ":5002" @@ -33,11 +36,12 @@ func main() { FRONTEND_URL = "http://localhost:3000" } - var logger = util.NewLogger() + redisClient, err := sse.GetRedisClient(*logger) err := util.InitRedisClient(*logger) + if err != nil { - logger.Error(fmt.Sprintf("Failed to initialize Redis client: %v. Exiting.", err)) + logger.Error(fmt.Sprintf("Failed to initialize Redis client: %v. Exiting.", err), err) os.Exit(1) } logger.Info("Redis client initialized successfully.") @@ -88,7 +92,7 @@ func main() { go func() { logger.Info(fmt.Sprintf("HTTP server starting on %s (Allowed Frontend Origin: %s)", server.Addr, FRONTEND_URL)) if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - logger.Error(fmt.Sprintf("HTTP server ListenAndServe error: %v", err)) + logger.Error(fmt.Sprintf("HTTP server ListenAndServe error: %v", err), err) stop() } }() @@ -104,12 +108,18 @@ func main() { // Attempt to gracefully shut down the HTTP server. if err := server.Shutdown(shutdownCtx); err != nil { - logger.Error(fmt.Sprintf("HTTP server graceful shutdown failed: %v", err)) + logger.Error(fmt.Sprintf("HTTP server graceful shutdown failed: %v", err), err) } else { logger.Info("HTTP server shutdown complete.") } // Close Redis Client. + logger.Info("Shutting down Redis client...") + if redisErr := redisClient.Close(); redisErr != nil { + logger.Error(fmt.Sprintf("Redis client shutdown error: %v", redisErr), redisErr) + } else { + logger.Info("Redis client shutdown complete.") + } util.ShutDownRedisClient(*logger) logger.Info("Server exiting.") diff --git a/modules/auth.go b/modules/auth.go index de55a7c..9a5d99a 100644 --- a/modules/auth.go +++ b/modules/auth.go @@ -16,12 +16,12 @@ import ( ) func Auth(req *http.Request) (map[string]string, error) { - var logger = util.NewLogger() - logger.Info("Auth called.") + var logger = util.SharedLogger + logger.InfoCtx(req, "Auth called.") token, err := req.Cookie("t") if err != nil { - logger.Error(fmt.Sprintf("No token found in request: %v", err)) + logger.ErrorCtx(req, fmt.Sprintf("No token found in request: %v", err), err) return nil, fmt.Errorf("unauthorized") } @@ -31,7 +31,7 @@ func Auth(req *http.Request) (map[string]string, error) { // to the auth micro-service. authConn, err := grpc.NewClient(os.Getenv("AUTH_GRPC_ADDRESS"), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - logger.Error(fmt.Sprintf("Failed to create gRPC client: %v", err)) + logger.ErrorCtx(req, fmt.Sprintf("Failed to create gRPC client: %v", err), err) return nil, fmt.Errorf("something went wrong") } defer authConn.Close() @@ -42,7 +42,7 @@ func Auth(req *http.Request) (map[string]string, error) { r, err := authClient.Auth(ctx, &pb.TokenValidateRequest{Token: token.Value}) if err != nil { - logger.Error(fmt.Sprintf("gRPC call failed: %v", err)) + logger.ErrorCtx(req, fmt.Sprintf("gRPC call failed: %v", err), err) return nil, fmt.Errorf("something went wrong") } diff --git a/modules/run.go b/modules/run.go index d5ae48b..3e83839 100644 --- a/modules/run.go +++ b/modules/run.go @@ -20,17 +20,17 @@ type ( } ) -func UserRuns(ctx context.Context, userID string, logger *util.Logger) ([]map[string]string, error) { +func UserRuns(ctx context.Context, userID string, logger *util.LoggerService) ([]map[string]string, error) { db, err := connection.PoolConn(ctx) if err != nil { - logger.Error(fmt.Sprintf("UserRuns: %s", err.Error())) + logger.Error(fmt.Sprintf("UserRuns: %s", err.Error()), err) return nil, fmt.Errorf("something went wrong") } var runIDs []string rows, err := db.Query(ctx, "SELECT runID FROM access WHERE userID = $1", userID) if err != nil { - logger.Error(fmt.Sprintf("UserRuns.db.Query: %s", err.Error())) + logger.Error(fmt.Sprintf("UserRuns.db.Query: %s", err.Error()), err) return nil, fmt.Errorf("something went wrong") } @@ -38,7 +38,7 @@ func UserRuns(ctx context.Context, userID string, logger *util.Logger) ([]map[st var runID string err = rows.Scan(&runID) if err != nil { - logger.Error(fmt.Sprintf("UserRuns.rows.Scan: %s", err.Error())) + logger.Error(fmt.Sprintf("UserRuns.rows.Scan: %s", err.Error()), err) return nil, fmt.Errorf("something went wrong") } runIDs = append(runIDs, runID) @@ -52,7 +52,7 @@ func UserRuns(ctx context.Context, userID string, logger *util.Logger) ([]map[st rows, err = db.Query(ctx, "SELECT * FROM run WHERE id = ANY($1)", runIDs) if err != nil { - logger.Error(fmt.Sprintf("UserRuns.db.Query: %s", err.Error())) + logger.Error(fmt.Sprintf("UserRuns.db.Query: %s", err.Error()), err) return nil, fmt.Errorf("something went wrong") } @@ -70,7 +70,7 @@ func UserRuns(ctx context.Context, userID string, logger *util.Logger) ([]map[st err := rows.Scan(&id, &name, &description, &status, &runType, &command, &createdBy, &createdAt, &updatedAt) if err != nil { - logger.Error(fmt.Sprintf("UserRuns.rows.Scan: %s", err.Error())) + logger.Error(fmt.Sprintf("UserRuns.rows.Scan: %s", err.Error()), err) return nil, fmt.Errorf("something went wrong") } @@ -114,24 +114,24 @@ func ShareRunReqFromJSON(jsonData map[string]any) (*ShareRunReq, error) { return s, nil } -func (s *ShareRunReq) ShareRun(ctx context.Context, logger *util.Logger) error { +func (s *ShareRunReq) ShareRun(ctx context.Context, logger *util.LoggerService) error { db, err := connection.PoolConn(ctx) if err != nil { - logger.Error(fmt.Sprintf("ShareRun: %s", err.Error())) + logger.Error(fmt.Sprintf("ShareRun: %s", err.Error()), err) return fmt.Errorf("something went wrong") } // Check if run exists. var runID string if err := db.QueryRow(ctx, "SELECT id FROM run WHERE id = $1", s.RunID).Scan(&runID); err != nil { - logger.Error(fmt.Sprintf("ShareRun.db.QueryRow: %s", err.Error())) + logger.Error(fmt.Sprintf("ShareRun.db.QueryRow: %s", err.Error()), err) return fmt.Errorf("run does not exist") } // Check if provided emails exist. rows, err := db.Query(ctx, "SELECT id FROM users WHERE email = ANY($1)", s.UserEmailList) if err != nil { - logger.Error(fmt.Sprintf("ShareRun.db.Query: %s", err.Error())) + logger.Error(fmt.Sprintf("ShareRun.db.Query: %s", err.Error()), err) return fmt.Errorf("something went wrong") } @@ -140,7 +140,7 @@ func (s *ShareRunReq) ShareRun(ctx context.Context, logger *util.Logger) error { var id string err = rows.Scan(&id) if err != nil { - logger.Error(fmt.Sprintf("ShareRun.rows.Scan: %s", err.Error())) + logger.Error(fmt.Sprintf("ShareRun.rows.Scan: %s", err.Error()), err) return fmt.Errorf("something went wrong") } userIDs = append(userIDs, id) @@ -154,7 +154,7 @@ func (s *ShareRunReq) ShareRun(ctx context.Context, logger *util.Logger) error { for _, userID := range userIDs { _, err = db.Exec(ctx, "INSERT INTO access (runID, userID, mode) VALUES ($1, $2, $3)", s.RunID, userID, "read") if err != nil { - logger.Error(fmt.Sprintf("ShareRun.db.Exec: %s", err.Error())) + logger.Error(fmt.Sprintf("ShareRun.db.Exec: %s", err.Error()), err) return fmt.Errorf("make sure the run is not already shared with the user") } } @@ -175,17 +175,17 @@ func RunDataReqFromJSON(jsonData map[string]any) (*RunDataReq, error) { return r, nil } -func (r *RunDataReq) UserRun(ctx context.Context, userID string, logger *util.Logger) (map[string]string, error) { +func (r *RunDataReq) UserRun(ctx context.Context, userID string, logger *util.LoggerService) (map[string]string, error) { db, err := connection.PoolConn(ctx) if err != nil { - logger.Error(fmt.Sprintf("RunData: %s", err.Error())) + logger.Error(fmt.Sprintf("RunData: %s", err.Error()), err) return nil, fmt.Errorf("something went wrong") } // Check if user has access to the run. var runID string if err := db.QueryRow(ctx, "SELECT runID FROM access WHERE userID = $1 AND runID = $2", userID, r.RunID).Scan(&runID); err != nil { - logger.Error(fmt.Sprintf("RunData.db.QueryRow: %s", err.Error())) + logger.Error(fmt.Sprintf("RunData.db.QueryRow: %s", err.Error()), err) return nil, fmt.Errorf("run does not exist") } @@ -194,7 +194,7 @@ func (r *RunDataReq) UserRun(ctx context.Context, userID string, logger *util.Lo // Get the run details like name, description, status, type, command, createdBy, createdAt, updatedAt. err = db.QueryRow(ctx, "SELECT id, name, description, status, type, command, createdBy, createdAt, updatedAt FROM run WHERE id = $1", r.RunID).Scan(&id, &name, &description, &status, &runType, &command, &createdBy, &createdAt, &updatedAt) if err != nil { - logger.Error(fmt.Sprintf("RunData.db.QueryRow: %s", err.Error())) + logger.Error(fmt.Sprintf("RunData.db.QueryRow: %s", err.Error()), err) return nil, fmt.Errorf("something went wrong") } diff --git a/modules/sse/server.go b/modules/sse/server.go index e746d42..27a2285 100644 --- a/modules/sse/server.go +++ b/modules/sse/server.go @@ -32,9 +32,35 @@ type redisLogPayload struct { RunID string `json:"runId"` // From EOF message. } +// GetRedisClient initializes a Redis client. +func GetRedisClient(logger util.LoggerService) (*redis.Client, error) { + + redisURL := os.Getenv("REDIS_URL") + if redisURL == "" { + redisURL = "redis://localhost:6379/0" + logger.Warn(fmt.Sprintf("REDIS_URL not set, using default: %s", redisURL)) + } + opts, err := redis.ParseURL(redisURL) + if err != nil { + logger.Error(fmt.Sprintf("Failed to parse REDIS_URL '%s': %v", redisURL, err), err) + return nil, err + } + rdb := redis.NewClient(opts) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err = rdb.Ping(ctx).Result() + if err != nil { + logger.Error(fmt.Sprintf("Failed to connect to Redis at %s: %v", opts.Addr, err), err) + return nil, err + } + logger.Info(fmt.Sprintf("Successfully connected to Redis at %s", opts.Addr)) + return rdb, nil +} + // GetSSEHandler returns an HTTP handler // for Server-Sent Events (SSE) using Redis Streams. -func GetSSEHandler(logger util.Logger) http.HandlerFunc { + +func GetSSEHandler(logger util.LoggerService, redisClient *redis.Client) http.HandlerFunc { if util.RedisClient == nil { logger.Error("GetSSEHandler requires a non-nil Redis client") return func(w http.ResponseWriter, r *http.Request) { @@ -46,7 +72,7 @@ func GetSSEHandler(logger util.Logger) http.HandlerFunc { } } -func sendSSEData(w http.ResponseWriter, rc *http.ResponseController, payload string, runId string, logger *util.Logger) bool { +func sendSSEData(w http.ResponseWriter, rc *http.ResponseController, payload string, runId string, logger *util.LoggerService) bool { // logger.Info(fmt.Sprintf("[SSE SENDING DATA] runId=%s | data=%s", runId, payload)) // Debug log _, writeErr := fmt.Fprintf(w, "data: %s\n\n", payload) // Payload should already be JSON string if writeErr != nil { @@ -58,7 +84,7 @@ func sendSSEData(w http.ResponseWriter, rc *http.ResponseController, payload str } if flushErr := rc.Flush(); flushErr != nil { if !errors.Is(flushErr, context.Canceled) && !strings.Contains(flushErr.Error(), "client disconnected") { - logger.Error(fmt.Sprintf("[SSE FLUSH ERROR] runId=%s | error=%v", runId, flushErr)) + logger.Error(fmt.Sprintf("[SSE FLUSH ERROR] runId=%s | error=%v", runId, flushErr), flushErr) } return false // Indicate failure } @@ -68,32 +94,32 @@ func sendSSEData(w http.ResponseWriter, rc *http.ResponseController, payload str } // serveSSEWithStream handles the SSE stream for a given run ID. -func serveSSEWithStream(logger util.Logger, w http.ResponseWriter, r *http.Request) { +func serveSSEWithStream(logger util.LoggerService, redisClient *redis.Client, w http.ResponseWriter, r *http.Request) { ctx := r.Context() - logger.Info("[SSE Stream Handler] Entered serveSSEWithStream") + logger.InfoCtx(r, "[SSE Stream Handler] Entered serveSSEWithStream") runId := r.URL.Query().Get("runId") if runId == "" { runId = r.Header.Get(runIdHeader) if runId == "" { - logger.Warn(fmt.Sprintf("[SSE Stream Handler] Missing runId query parameter AND %s header", runIdHeader)) + logger.WarnCtx(r, fmt.Sprintf("[SSE Stream Handler] Missing runId query parameter AND %s header", runIdHeader)) http.Error(w, fmt.Sprintf("Missing runId query parameter or %s header", runIdHeader), http.StatusBadRequest) return } - logger.Info(fmt.Sprintf("[SSE Stream Handler] Using runId from header: %s", runId)) + logger.InfoCtx(r, fmt.Sprintf("[SSE Stream Handler] Using runId from header: %s", runId)) } else { - logger.Info(fmt.Sprintf("[SSE Stream Handler] Using runId from query parameter: %s", runId)) + logger.InfoCtx(r, fmt.Sprintf("[SSE Stream Handler] Using runId from query parameter: %s", runId)) } // Basic Sanitize Run ID. if strings.ContainsAny(runId, "\n\r*?") { - logger.Warn(fmt.Sprintf("[SSE Stream Handler] Invalid characters suspected in runId: %s", runId)) + logger.WarnCtx(r, fmt.Sprintf("[SSE Stream Handler] Invalid characters suspected in runId: %s", runId)) http.Error(w, "Invalid Run ID format", http.StatusBadRequest) return } redisStreamName := runId - logger.Info(fmt.Sprintf("[SSE Stream Handler] Determined runId: '%s', Stream Name: '%s'", runId, redisStreamName)) + logger.InfoCtx(r, fmt.Sprintf("[SSE Stream Handler] Determined runId: '%s', Stream Name: '%s'", runId, redisStreamName)) // Set SSE Headers. w.Header().Set("Content-Type", "text/event-stream") @@ -103,20 +129,20 @@ func serveSSEWithStream(logger util.Logger, w http.ResponseWriter, r *http.Reque _, err := fmt.Fprintf(w, "retry: %ds\n\n", retrySeconds) if err != nil { - logger.Error(fmt.Sprintf("[SSE Stream Handler] Error writing retry header for runId %s: %v", runId, err)) + logger.ErrorCtx(r, fmt.Sprintf("[SSE Stream Handler] Error writing retry header for runId %s: %v", runId, err), err) return } rc := http.NewResponseController(w) if rc == nil { - logger.Error(fmt.Sprintf("[SSE Stream Handler] Failed to get ResponseController for runId: %s", runId)) + logger.ErrorCtx(r, fmt.Sprintf("[SSE Stream Handler] Failed to get ResponseController for runId: %s", runId), nil) return } if err := rc.Flush(); err != nil { - logger.Error(fmt.Sprintf("[SSE Stream Handler] Error flushing headers for runId %s: %v", runId, err)) + logger.ErrorCtx(r, fmt.Sprintf("[SSE Stream Handler] Error flushing headers for runId %s: %v", runId, err), err) return } - logger.Info(fmt.Sprintf("[SSE Stream Handler] Flushed SSE headers for runId: %s", runId)) + logger.InfoCtx(r, fmt.Sprintf("[SSE Stream Handler] Flushed SSE headers for runId: %s", runId)) // sendSSEData := func(payload string) bool { // // logger.Info(fmt.Sprintf("[SSE SENDING DATA] runId=%s | data=%s", runId, payload)) // Debug log @@ -143,7 +169,7 @@ func serveSSEWithStream(logger util.Logger, w http.ResponseWriter, r *http.Reque // Read History from beginning. lastProcessedID := "0-0" - logger.Info(fmt.Sprintf("[SSE Stream Handler] Reading historical logs for stream: '%s' from ID: %s", redisStreamName, lastProcessedID)) + logger.InfoCtx(r, fmt.Sprintf("[SSE Stream Handler] Reading historical logs for stream: '%s' from ID: %s", redisStreamName, lastProcessedID)) historyProcessed := 0 for { @@ -160,14 +186,14 @@ func serveSSEWithStream(logger util.Logger, w http.ResponseWriter, r *http.Reque // Check error message for "NOGROUP" or similar if needed, but often just means empty. // For now, assume empty stream is not an error, just means no history. if errors.Is(err, redis.Nil) || strings.Contains(err.Error(), "NOGROUP") { - logger.Info(fmt.Sprintf("[SSE Stream Handler] No historical logs found or reached end for stream: '%s'", redisStreamName)) + logger.InfoCtx(r, fmt.Sprintf("[SSE Stream Handler] No historical logs found or reached end for stream: '%s'", redisStreamName)) break } else if errors.Is(err, context.Canceled) { // Client disconnected. - logger.Info(fmt.Sprintf("[SSE Stream Handler] Context cancelled during history read for runId: %s", runId)) + logger.InfoCtx(r, fmt.Sprintf("[SSE Stream Handler] Context cancelled during history read for runId: %s", runId)) return } - logger.Error(fmt.Sprintf("[SSE Stream Handler] Error reading history from stream '%s': %v", redisStreamName, err)) + logger.ErrorCtx(r, fmt.Sprintf("[SSE Stream Handler] Error reading history from stream '%s': %v", redisStreamName, err), err) return } @@ -177,12 +203,12 @@ func serveSSEWithStream(logger util.Logger, w http.ResponseWriter, r *http.Reque } streamMessages := results[0].Messages - logger.Info(fmt.Sprintf("[SSE Stream Handler] Processing %d historical messages for stream: '%s'", len(streamMessages), redisStreamName)) + logger.InfoCtx(r, fmt.Sprintf("[SSE Stream Handler] Processing %d historical messages for stream: '%s'", len(streamMessages), redisStreamName)) for _, msg := range streamMessages { logPayloadStr, ok := msg.Values[logDataField].(string) if !ok { - logger.Warn(fmt.Sprintf("[SSE Stream Handler] Invalid data format in stream '%s', ID '%s': Missing or non-string field '%s'", redisStreamName, msg.ID, logDataField)) + logger.WarnCtx(r, fmt.Sprintf("[SSE Stream Handler] Invalid data format in stream '%s', ID '%s': Missing or non-string field '%s'", redisStreamName, msg.ID, logDataField)) continue } @@ -195,7 +221,7 @@ func serveSSEWithStream(logger util.Logger, w http.ResponseWriter, r *http.Reque // Check if this message is the EOF marker. var logData redisLogPayload if json.Unmarshal([]byte(logPayloadStr), &logData) == nil && logData.Status == eofStatus { - logger.Info(fmt.Sprintf("[SSE Stream Handler] EOF marker found in history (ID: %s) for runId: %s", msg.ID, runId)) + logger.InfoCtx(r, fmt.Sprintf("[SSE Stream Handler] EOF marker found in history (ID: %s) for runId: %s", msg.ID, runId)) // Send the done event *now* and finish. doneData := `{"message": "Stream ended (found in history)."}` _, _ = fmt.Fprintf(w, "event: %s\ndata: %s\n\n", sseDoneEvent, doneData) @@ -212,14 +238,14 @@ func serveSSEWithStream(logger util.Logger, w http.ResponseWriter, r *http.Reque } } - logger.Info(fmt.Sprintf("[SSE Stream Handler] Finished reading history (%d entries) for stream: '%s'. Last ID: %s. Starting live block.", historyProcessed, redisStreamName, lastProcessedID)) + logger.InfoCtx(r, fmt.Sprintf("[SSE Stream Handler] Finished reading history (%d entries) for stream: '%s'. Last ID: %s. Starting live block.", historyProcessed, redisStreamName, lastProcessedID)) // Read Live Updates (Blocking). for { // Check context before blocking read. select { case <-ctx.Done(): - logger.Info(fmt.Sprintf("[SSE Stream Handler] Context done before blocking read for runId %s: %v", runId, ctx.Err())) + logger.InfoCtx(r, fmt.Sprintf("[SSE Stream Handler] Context done before blocking read for runId %s: %v", runId, ctx.Err())) return default: // Continue to blocking read. @@ -240,11 +266,11 @@ func serveSSEWithStream(logger util.Logger, w http.ResponseWriter, r *http.Reque continue } else if errors.Is(err, context.Canceled) { // Client disconnected. - logger.Info(fmt.Sprintf("[SSE Stream Handler] Context cancelled during blocking read for runId %s: %v", runId, ctx.Err())) + logger.InfoCtx(r, fmt.Sprintf("[SSE Stream Handler] Context cancelled during blocking read for runId %s: %v", runId, ctx.Err())) return } - logger.Error(fmt.Sprintf("[SSE Stream Handler] Error during blocking read for stream '%s': %v", redisStreamName, err)) + logger.ErrorCtx(r, fmt.Sprintf("[SSE Stream Handler] Error during blocking read for stream '%s': %v", redisStreamName, err), err) time.Sleep(1 * time.Second) continue } @@ -257,7 +283,7 @@ func serveSSEWithStream(logger util.Logger, w http.ResponseWriter, r *http.Reque for _, msg := range streamMessages { logPayloadStr, ok := msg.Values[logDataField].(string) if !ok { - logger.Warn(fmt.Sprintf("[SSE Stream Handler] Invalid live data format in stream '%s', ID '%s': Missing or non-string field '%s'", redisStreamName, msg.ID, logDataField)) + logger.WarnCtx(r, fmt.Sprintf("[SSE Stream Handler] Invalid live data format in stream '%s', ID '%s': Missing or non-string field '%s'", redisStreamName, msg.ID, logDataField)) continue } @@ -269,7 +295,7 @@ func serveSSEWithStream(logger util.Logger, w http.ResponseWriter, r *http.Reque // Check if this message is the EOF marker. var logData redisLogPayload if json.Unmarshal([]byte(logPayloadStr), &logData) == nil && logData.Status == eofStatus { - logger.Info(fmt.Sprintf("[SSE Stream Handler] Live EOF marker found (ID: %s) for runId: %s. Sending done event.", msg.ID, runId)) + logger.InfoCtx(r, fmt.Sprintf("[SSE Stream Handler] Live EOF marker found (ID: %s) for runId: %s. Sending done event.", msg.ID, runId)) doneData := `{"message": "Stream ended."}` _, _ = fmt.Fprintf(w, "event: %s\ndata: %s\n\n", sseDoneEvent, doneData) _ = rc.Flush() diff --git a/util/enqueue.go b/util/enqueue.go index 9716e5e..cb872ab 100644 --- a/util/enqueue.go +++ b/util/enqueue.go @@ -9,7 +9,7 @@ import ( ) func EnqueueRunRequest(ctx context.Context, runID string, fileName string, extension string) error { - var logger = NewLogger() + var logger = SharedLogger // Message represents the structure of our message type Message struct { @@ -19,6 +19,40 @@ func EnqueueRunRequest(ctx context.Context, runID string, fileName string, exten Timestamp time.Time `json:"timestamp"` } + // Get RabbitMQ connection string from environment variable or use default + rabbitMQURL := os.Getenv("RABBITMQ_URL") + if rabbitMQURL == "" { + rabbitMQURL = "amqp://guest:guest@localhost:5672/" + } + + // Connect to RabbitMQ server + conn, err := amqp.Dial(rabbitMQURL) + if err != nil { + logger.Error(fmt.Sprintf("Failed to connect to RabbitMQ: %v", err), err) + } + defer conn.Close() + + // Create a channel + ch, err := conn.Channel() + if err != nil { + logger.Error(fmt.Sprintf("Failed to open a channel: %v", err), err) + } + defer ch.Close() + + // Declare a queue + queueName := "task_queue" + q, err := ch.QueueDeclare( + queueName, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + + if err != nil { + logger.Error(fmt.Sprintf("Failed to declare a queue: %v", err), err) + } // Declare a queue queueName := os.Getenv("REDIS_QUEUE_NAME") if queueName == "" { @@ -37,15 +71,15 @@ func EnqueueRunRequest(ctx context.Context, runID string, fileName string, exten // Convert message to JSON body, err := json.Marshal(msg) if err != nil { - logger.Error(fmt.Sprintf("Error marshaling message: %v", err)) - return err + logger.Error(fmt.Sprintf("Error marshaling message: %v", err), err) + return err } // Push message to Redis List (LPUSH = enqueue at head) err = RedisClient.LPush(ctx, queueName, string(body)).Err(); if err != nil { - logger.Error(fmt.Sprintf("Failed to publish message: %v", err)) + logger.Error(fmt.Sprintf("Failed to publish message: %v", err), err) return err } diff --git a/util/logger.go b/util/logger.go index 9ecc7d8..fbe6190 100644 --- a/util/logger.go +++ b/util/logger.go @@ -1,55 +1,210 @@ package util import ( - "log" + "fmt" + "net" + "net/http" "os" + "strings" "time" + + "github.com/rs/zerolog" ) -type LogLevel int +var SharedLogger *LoggerService + +type LoggerService struct { + Logger zerolog.Logger + Env string +} +// environments const ( - INFO LogLevel = iota - WARN - ERROR + EnvDevelopment = "DEVELOPMENT" + EnvProduction = "PRODUCTION" ) -type Logger struct { - logger *log.Logger - level LogLevel +type ILoggerService interface { + // Function to enrich each log with data + enrich(req *http.Request, e *zerolog.Event) *zerolog.Event + + // This set of functions is to be used in the context of the web-server + // where there is a server context involved + DebugCtx(req *http.Request, msg string) + InfoCtx(req *http.Request, msg string) + WarnCtx(req *http.Request, msg string) + ErrorCtx(req *http.Request, msg string, err error) + FatalCtx(req *http.Request, msg string, err error) + PanicCtx(req *http.Request, msg string, r any, trace string) // r = recover() + SuccessCtx(req *http.Request) + + // This set of functions can be used in scenarios where there is no + // server context involved + Debug(msg string) + Info(msg string) + Warn(msg string) + Error(msg string, err error) + Fatal(msg string, err error) + + // Logging middleware to be used only as a global middleware during router + // initialization + LogMiddleware(next http.Handler) http.Handler } -func NewLogger() *Logger { - return &Logger{ - logger: log.New(os.Stdout, "", 0), - level: INFO, +func InitLogger(env string) (*LoggerService, error) { + if env == "" { + env = EnvDevelopment } + if env != EnvDevelopment && env != EnvProduction { + return nil, fmt.Errorf("invalid environment for logger setup: %s", env) + } + + config := zerolog.ConsoleWriter{ + Out: os.Stdout, + TimeFormat: time.RFC3339, + } + logger := zerolog.New(config).With().Timestamp().Logger() + return &LoggerService{ + Logger: logger, + Env: env, + }, nil } -func (l *Logger) Log(level LogLevel, msg string) { - if level >= l.level { - timestamp := time.Now().Format(time.DateTime) - levelStr := "" - switch level { - case INFO: - levelStr = "INFO" - case WARN: - levelStr = "WARN" - case ERROR: - levelStr = "ERROR" - } - l.logger.Printf("%s [%s]: %s", timestamp, levelStr, msg) +func (l *LoggerService) enrich(req *http.Request, e *zerolog.Event) *zerolog.Event { + queryParams := req.URL.Query() + clientIP := req.Header.Get("X-Forwarded-For") + if clientIP == "" { + clientIP = req.Header.Get("X-Real-IP") + } + if clientIP == "" { + clientIP, _, _ = net.SplitHostPort(req.RemoteAddr) + } + // In case of multiple IPs in X-Forwarded-For, take the first one + if strings.Contains(clientIP, ",") { + clientIP = strings.TrimSpace(strings.Split(clientIP, ",")[0]) + } + + return e. + Str("route", req.URL.Path). + Str("method", req.Method). + Interface("query-params", queryParams). + Str("ip", clientIP). + Str("user-agent", req.Header.Get("User-Agent")) +} + +func (l *LoggerService) DebugCtx(req *http.Request, msg string) { + if l.Env == EnvProduction { + return + } + event := l.Logger.WithLevel(zerolog.DebugLevel) + l.enrich(req, event).Msg(msg) +} + +func (l *LoggerService) InfoCtx(req *http.Request, msg string) { + event := l.Logger.WithLevel(zerolog.InfoLevel) + l.enrich(req, event).Msg(msg) +} + +func (l *LoggerService) WarnCtx(req *http.Request, msg string) { + event := l.Logger.WithLevel(zerolog.WarnLevel) + l.enrich(req, event).Msg(msg) +} + +func (l *LoggerService) ErrorCtx(req *http.Request, msg string, err error) { + event := l.Logger.WithLevel(zerolog.ErrorLevel).Err(err) + l.enrich(req, event).Msg(msg) +} + +func (l *LoggerService) FatalCtx(req *http.Request, msg string, err error) { + event := l.Logger.WithLevel(zerolog.FatalLevel).Err(err) + l.enrich(req, event).Msg(msg) +} + +func (l *LoggerService) PanicCtx(req *http.Request, msg string, r any, trace string) { + event := l.Logger.WithLevel(zerolog.InfoLevel). + Str("panic_value", fmt.Sprintf("%v", r)). + Str("trace", trace) + l.enrich(req, event).Msg(msg) +} + +func (l *LoggerService) SuccessCtx(req *http.Request) { + event := l.Logger.WithLevel(zerolog.InfoLevel) + l.enrich(req, event).Msg("request successful") +} + +func (l *LoggerService) Debug(msg string) { + if l.Env == EnvProduction { + return } + l.Logger.WithLevel(zerolog.DebugLevel).Msg(msg) +} + +func (l *LoggerService) Info(msg string) { + l.Logger.WithLevel(zerolog.InfoLevel).Msg(msg) +} + +func (l *LoggerService) Warn(msg string) { + l.Logger.WithLevel(zerolog.InfoLevel).Msg(msg) +} + +func (l *LoggerService) Error(msg string, err error) { + l.Logger.WithLevel(zerolog.InfoLevel).Err(err).Msg(msg) +} + +func (l *LoggerService) Fatal(msg string, err error) { + l.Logger.WithLevel(zerolog.FatalLevel).Err(err).Msg(msg) } -func (l *Logger) Info(msg string) { - l.Log(INFO, msg) +// loggingResponseWriter is a wrapper around http.ResponseWriter to capture status code and response size. +type loggingResponseWriter struct { + http.ResponseWriter + statusCode int + size int } -func (l *Logger) Warn(msg string) { - l.Log(WARN, msg) +func (lrw *loggingResponseWriter) WriteHeader(code int) { + lrw.statusCode = code + lrw.ResponseWriter.WriteHeader(code) +} + +func (lrw *loggingResponseWriter) Write(b []byte) (int, error) { + if lrw.statusCode == 0 { + lrw.statusCode = http.StatusOK + } + size, err := lrw.ResponseWriter.Write(b) + lrw.size += size + return size, err } -func (l *Logger) Error(msg string) { - l.Log(ERROR, msg) +func (l *LoggerService) LogMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + lrw := &loggingResponseWriter{ResponseWriter: w, statusCode: 0, size: 0} + next.ServeHTTP(lrw, r) + + // Get client IP from headers or remote address + clientIP := r.Header.Get("X-Forwarded-For") + if clientIP == "" { + clientIP = r.Header.Get("X-Real-IP") + } + if clientIP == "" { + clientIP, _, _ = net.SplitHostPort(r.RemoteAddr) + } + // In case of multiple IPs in X-Forwarded-For, take the first one + if strings.Contains(clientIP, ",") { + clientIP = strings.TrimSpace(strings.Split(clientIP, ",")[0]) + } + + event := l.Logger.WithLevel(zerolog.InfoLevel). + Str("route", r.URL.Path). + Str("method", r.Method). + Int("status", lrw.statusCode). + Int("response-size", lrw.size). + Dur("duration", time.Since(start)). + Interface("query-params", r.URL.Query()). + Str("ip", clientIP). + Str("user-agent", r.Header.Get("User-Agent")) + + event.Send() + }) } diff --git a/util/minio.go b/util/minio.go index a15628e..5fdb1c5 100644 --- a/util/minio.go +++ b/util/minio.go @@ -9,7 +9,7 @@ import ( ) func UploadFile(ctx context.Context, runID string, fileName string, extension string) error { - var logger = NewLogger() + var logger = SharedLogger endpoint := os.Getenv("MINIO_ENDPOINT") accessKeyID := os.Getenv("MINIO_ACCESS_KEY_ID") @@ -22,7 +22,7 @@ func UploadFile(ctx context.Context, runID string, fileName string, extension st Secure: false, }) if err != nil { - logger.Error(fmt.Sprintf("Failed to create minio client: %v", err)) + logger.Error(fmt.Sprintf("Failed to create minio client: %v", err), err) return err } @@ -34,7 +34,7 @@ func UploadFile(ctx context.Context, runID string, fileName string, extension st if errBucketExists == nil && exists { logger.Info(fmt.Sprintf("We already own bucket: %s\n", bucketName)) } else { - logger.Error(fmt.Sprintf("Failed to create bucket %s: %v", bucketName, err)) + logger.Error(fmt.Sprintf("Failed to create bucket %s: %v", bucketName, err), err) return err } } else { @@ -45,7 +45,7 @@ func UploadFile(ctx context.Context, runID string, fileName string, extension st policy := `{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["*"]},"Action":["s3:GetObject"],"Resource":["arn:aws:s3:::` + bucketName + `/*"]},{"Effect":"Allow","Principal":{"AWS":["*"]},"Action":["s3:GetBucketLocation"],"Resource":["arn:aws:s3:::` + bucketName + `"]}]}` err = minioClient.SetBucketPolicy(ctx, bucketName, policy) if err != nil { - logger.Error(fmt.Sprintf("Failed to set bucket policy: %v", err)) + logger.Error(fmt.Sprintf("Failed to set bucket policy: %v", err), err) return err } logger.Info(fmt.Sprintf("Successfully set bucket policy for %s\n", bucketName)) @@ -55,7 +55,7 @@ func UploadFile(ctx context.Context, runID string, fileName string, extension st filePath := fmt.Sprintf("%s/%s.%s", fileName, runID, extension) info, err := minioClient.FPutObject(ctx, bucketName, objectName, filePath, minio.PutObjectOptions{}) if err != nil { - logger.Error(fmt.Sprintf("Failed to upload %s: %v", filePath, err)) + logger.Error(fmt.Sprintf("Failed to upload %s: %v", filePath, err), err) return err } diff --git a/util/validate.go b/util/validate.go index a4f91a8..85dff10 100644 --- a/util/validate.go +++ b/util/validate.go @@ -11,5 +11,3 @@ func ValidateAlgorithmName(algo string) error { } return fmt.Errorf("invalid algorithm name: %s", algo) } - -// TODO: Implement remaining validation functions.