diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d406eb09..b578fa6bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,8 @@ NOTE: all releases may include dependency updates, not specifically mentioned ## UNRELEASED - feat: integrate try-as library [#912](https://github.com/hypermodeinc/modus/pull/912) -- feat: improve sentry usage [#931](https://github.com/hypermodeinc/modus/pull/931) +- feat: improve sentry usage [#931](https://github.com/hypermodeinc/modus/pull/931) [#937](https://github.com/hypermodeinc/modus/pull/937) +- fix: write inference history directly [#938](https://github.com/hypermodeinc/modus/pull/938) ## 2025-07-03 - Runtime v0.18.3 diff --git a/runtime/db/agentstate.go b/runtime/db/agentstate.go index 9eaaf9601..0bd7e4918 100644 --- a/runtime/db/agentstate.go +++ b/runtime/db/agentstate.go @@ -66,7 +66,7 @@ func writeAgentStateToModusDB(ctx context.Context, state AgentState) error { span, ctx := sentryutils.NewSpanForCurrentFunc(ctx) defer span.Finish() - gid, _, _, err := modusgraph.Upsert(ctx, GlobalModusDbEngine, state) + gid, _, _, err := modusgraph.Upsert(ctx, globalModusDbEngine, state) state.Gid = gid return err @@ -93,7 +93,7 @@ func getAgentStateFromModusDB(ctx context.Context, id string) (*AgentState, erro span, ctx := sentryutils.NewSpanForCurrentFunc(ctx) defer span.Finish() - _, result, err := modusgraph.Get[AgentState](ctx, GlobalModusDbEngine, modusgraph.ConstrainedField{ + _, result, err := modusgraph.Get[AgentState](ctx, globalModusDbEngine, modusgraph.ConstrainedField{ Key: "id", Value: id, }) @@ -108,7 +108,7 @@ func queryActiveAgentsFromModusDB(ctx context.Context) ([]AgentState, error) { span, ctx := sentryutils.NewSpanForCurrentFunc(ctx) defer span.Finish() - _, results, err := modusgraph.Query[AgentState](ctx, GlobalModusDbEngine, modusgraph.QueryParams{ + _, results, err := modusgraph.Query[AgentState](ctx, globalModusDbEngine, modusgraph.QueryParams{ Filter: &modusgraph.Filter{ Not: &modusgraph.Filter{ Field: "status", diff --git a/runtime/db/db.go b/runtime/db/db.go index b150a4ecd..1f3e05d89 100644 --- a/runtime/db/db.go +++ b/runtime/db/db.go @@ -15,7 +15,6 @@ import ( "os" "strconv" "sync" - "time" "github.com/hypermodeinc/modus/runtime/app" "github.com/hypermodeinc/modus/runtime/logger" @@ -28,24 +27,16 @@ import ( var errDbNotConfigured = errors.New("database not configured") -const inferenceRefresherInterval = 5 * time.Second - type runtimePostgresWriter struct { dbpool *pgxpool.Pool - buffer chan inferenceHistory - quit chan struct{} - done chan struct{} once sync.Once } func Stop(ctx context.Context) { - pool, _ := globalRuntimePostgresWriter.GetPool(ctx) + pool, _ := globalRuntimePostgresWriter.getPool(ctx) if pool == nil { return } - - close(globalRuntimePostgresWriter.quit) - <-globalRuntimePostgresWriter.done pool.Close() } @@ -67,16 +58,14 @@ func Initialize(ctx context.Context) { return } - // this will initialize the pool and start the worker - _, err := globalRuntimePostgresWriter.GetPool(ctx) + _, err := globalRuntimePostgresWriter.getPool(ctx) if err != nil { logger.Warn(ctx, err).Msg("Metadata database is not available.") } - go globalRuntimePostgresWriter.worker(ctx) } func GetTx(ctx context.Context) (pgx.Tx, error) { - pool, err := globalRuntimePostgresWriter.GetPool(ctx) + pool, err := globalRuntimePostgresWriter.getPool(ctx) if err != nil { return nil, err } diff --git a/runtime/db/inferencehistory.go b/runtime/db/inferencehistory.go index b5b62b4cf..59452410f 100644 --- a/runtime/db/inferencehistory.go +++ b/runtime/db/inferencehistory.go @@ -15,7 +15,6 @@ import ( "time" "github.com/hypermodeinc/modus/lib/manifest" - "github.com/hypermodeinc/modus/runtime/metrics" "github.com/hypermodeinc/modus/runtime/plugins" "github.com/hypermodeinc/modus/runtime/secrets" "github.com/hypermodeinc/modus/runtime/utils" @@ -26,12 +25,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" ) -var globalRuntimePostgresWriter *runtimePostgresWriter = &runtimePostgresWriter{ - dbpool: nil, - buffer: make(chan inferenceHistory, chanSize), - quit: make(chan struct{}), - done: make(chan struct{}), -} +var globalRuntimePostgresWriter *runtimePostgresWriter = &runtimePostgresWriter{} type Plugin struct { Gid uint64 `json:"gid,omitempty"` @@ -58,9 +52,6 @@ type Inference struct { Plugin Plugin `json:"plugin"` } -const batchSize = 100 -const chanSize = 10000 - const pluginsTable = "plugins" const inferencesTable = "inferences" @@ -74,7 +65,7 @@ type inferenceHistory struct { function *string } -func (w *runtimePostgresWriter) GetPool(ctx context.Context) (*pgxpool.Pool, error) { +func (w *runtimePostgresWriter) getPool(ctx context.Context) (*pgxpool.Pool, error) { var initErr error w.once.Do(func() { if !secrets.HasSecret(ctx, "MODUS_DB") { @@ -103,47 +94,6 @@ func (w *runtimePostgresWriter) GetPool(ctx context.Context) (*pgxpool.Pool, err } } -func (w *runtimePostgresWriter) Write(data inferenceHistory) { - select { - case w.buffer <- data: - default: - metrics.DroppedInferencesNum.Inc() - } -} - -func (w *runtimePostgresWriter) worker(ctx context.Context) { - var batchIndex int - var batch [batchSize]inferenceHistory - timer := time.NewTimer(inferenceRefresherInterval) - defer timer.Stop() - - for { - select { - case data := <-w.buffer: - batch[batchIndex] = data - batchIndex++ - if batchIndex == batchSize { - WriteInferenceHistoryToDB(ctx, batch[:batchSize]) - batchIndex = 0 - - // we need to drain the timer channel to prevent the timer from firing - if !timer.Stop() { - <-timer.C - } - timer.Reset(inferenceRefresherInterval) - } - case <-timer.C: - WriteInferenceHistoryToDB(ctx, batch[:batchIndex]) - batchIndex = 0 - timer.Reset(inferenceRefresherInterval) - case <-w.quit: - WriteInferenceHistoryToDB(ctx, batch[:batchIndex]) - close(w.done) - return - } - } -} - func (h *inferenceHistory) getJson() (input []byte, output []byte, err error) { input, err = getInferenceDataJson(h.input) if err != nil { @@ -186,16 +136,38 @@ func getInferenceDataJson(val any) ([]byte, error) { } func WritePluginInfo(ctx context.Context, plugin *plugins.Plugin) { - + var err error if useModusDB() { - err := writePluginInfoToModusdb(ctx, plugin) - if err != nil { - logDbError(ctx, err, "Plugin info not written to modusgraph.") - } - return + err = writePluginInfoToModusDB(ctx, plugin) + } else { + err = writePluginInfoToPostgresDB(ctx, plugin) } - err := WithTx(ctx, func(tx pgx.Tx) error { + if err != nil { + logDbError(ctx, err, "Plugin info not written to database.") + } +} + +func writePluginInfoToModusDB(ctx context.Context, plugin *plugins.Plugin) error { + if globalModusDbEngine == nil { + return nil + } + _, _, err := modusgraph.Create(ctx, globalModusDbEngine, Plugin{ + Id: plugin.Id, + Name: plugin.Metadata.Name(), + Version: plugin.Metadata.Version(), + Language: plugin.Language.Name(), + SdkVersion: plugin.Metadata.SdkVersion(), + BuildId: plugin.Metadata.BuildId, + BuildTime: plugin.Metadata.BuildTime, + GitRepo: plugin.Metadata.GitRepo, + GitCommit: plugin.Metadata.GitCommit, + }) + return err +} + +func writePluginInfoToPostgresDB(ctx context.Context, plugin *plugins.Plugin) error { + return WithTx(ctx, func(tx pgx.Tx) error { // Check if the plugin is already in the database // If so, update the ID to match @@ -243,28 +215,6 @@ ON CONFLICT (build_id) DO NOTHING`, return nil }) - - if err != nil { - logDbError(ctx, err, "Plugin info not written to database.") - } -} - -func writePluginInfoToModusdb(ctx context.Context, plugin *plugins.Plugin) error { - if GlobalModusDbEngine == nil { - return nil - } - _, _, err := modusgraph.Create[Plugin](ctx, GlobalModusDbEngine, Plugin{ - Id: plugin.Id, - Name: plugin.Metadata.Name(), - Version: plugin.Metadata.Version(), - Language: plugin.Language.Name(), - SdkVersion: plugin.Metadata.SdkVersion(), - BuildId: plugin.Metadata.BuildId, - BuildTime: plugin.Metadata.BuildTime, - GitRepo: plugin.Metadata.GitRepo, - GitCommit: plugin.Metadata.GitCommit, - }) - return err } func getPluginId(ctx context.Context, tx pgx.Tx, buildId string) (string, error) { @@ -293,7 +243,7 @@ func WriteInferenceHistory(ctx context.Context, model *manifest.ModelInfo, input function = &functionName } - globalRuntimePostgresWriter.Write(inferenceHistory{ + data := inferenceHistory{ model: model, input: input, output: output, @@ -301,116 +251,99 @@ func WriteInferenceHistory(ctx context.Context, model *manifest.ModelInfo, input end: end, pluginId: pluginId, function: function, - }) -} - -func WriteInferenceHistoryToDB(ctx context.Context, batch []inferenceHistory) { - if len(batch) == 0 { - return } + var err error if useModusDB() { - err := writeInferenceHistoryToModusDb(ctx, batch) - if err != nil { - logDbError(ctx, err, "Inference history not written to modusgraph.") - } - return + err = writeInferenceHistoryToModusDB(ctx, data) + } else { + err = writeInferenceHistoryToPostgresDB(ctx, data) } - err := WithTx(ctx, func(tx pgx.Tx) error { - b := &pgx.Batch{} - for _, data := range batch { - input, output, err := data.getJson() - if err != nil { - return err - } - query := fmt.Sprintf(`INSERT INTO %s -(id, model_hash, input, output, started_at, duration_ms, plugin_id, function) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8) -`, inferencesTable) - args := []any{ - utils.GenerateUUIDv7(), - data.model.Hash(), - input, - output, - data.start, - data.end.Sub(data.start).Milliseconds(), - data.pluginId, - data.function, - } - b.Queue(query, args...) - } + if err != nil { + logDbError(ctx, err, "Inference history not written to database.") + } +} - br := tx.SendBatch(ctx, b) - defer br.Close() +func writeInferenceHistoryToModusDB(ctx context.Context, data inferenceHistory) error { + if globalModusDbEngine == nil { + return nil + } - for range batch { - _, err := br.Exec() - if err != nil { - return err - } - } + input, output, err := data.getJson() + if err != nil { + return err + } - return nil - }) + var funcStr string + var pluginId string + if data.function != nil { + funcStr = *data.function + } + if data.pluginId != nil { + pluginId = *data.pluginId + } + _, _, err = modusgraph.Create(ctx, globalModusDbEngine, Inference{ + Id: utils.GenerateUUIDv7(), + ModelHash: data.model.Hash(), + Input: string(input), + Output: string(output), + StartedAt: data.start.Format(utils.TimeFormat), + DurationMs: data.end.Sub(data.start).Milliseconds(), + Function: funcStr, + Plugin: Plugin{ + Id: pluginId, + }, + }) if err != nil { - logDbError(ctx, err, "Inference history not written to database.") + return err } + + return nil } -func writeInferenceHistoryToModusDb(ctx context.Context, batch []inferenceHistory) error { - if GlobalModusDbEngine == nil { - return nil - } - for _, data := range batch { +func writeInferenceHistoryToPostgresDB(ctx context.Context, data inferenceHistory) error { + return WithTx(ctx, func(tx pgx.Tx) error { input, output, err := data.getJson() if err != nil { return err } - var funcStr string - var pluginId string - if data.function == nil { - funcStr = "" - } else { - funcStr = *data.function - } - if data.pluginId == nil { - pluginId = "" - } else { - pluginId = *data.pluginId + query := fmt.Sprintf(`INSERT INTO %s +(id, model_hash, input, output, started_at, duration_ms, plugin_id, function) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +`, inferencesTable) + args := []any{ + utils.GenerateUUIDv7(), + data.model.Hash(), + input, + output, + data.start, + data.end.Sub(data.start).Milliseconds(), + data.pluginId, + data.function, } - _, _, err = modusgraph.Create[Inference](ctx, GlobalModusDbEngine, Inference{ - Id: utils.GenerateUUIDv7(), - ModelHash: data.model.Hash(), - Input: string(input), - Output: string(output), - StartedAt: data.start.Format(utils.TimeFormat), - DurationMs: data.end.Sub(data.start).Milliseconds(), - Function: funcStr, - Plugin: Plugin{ - Id: pluginId, - }, - }) - if err != nil { + + if _, err := tx.Exec(ctx, query, args...); err != nil { return err } - } - return nil + + return nil + }) } func QueryPlugins(ctx context.Context) ([]Plugin, error) { - if GlobalModusDbEngine == nil { + if globalModusDbEngine == nil { return nil, nil } - _, plugins, err := modusgraph.Query[Plugin](ctx, GlobalModusDbEngine, modusgraph.QueryParams{}) + _, plugins, err := modusgraph.Query[Plugin](ctx, globalModusDbEngine, modusgraph.QueryParams{}) return plugins, err } func QueryInferences(ctx context.Context) ([]Inference, error) { - if GlobalModusDbEngine == nil { + if globalModusDbEngine == nil { return nil, nil } - _, inferences, err := modusgraph.Query[Inference](ctx, GlobalModusDbEngine, modusgraph.QueryParams{}) + _, inferences, err := modusgraph.Query[Inference](ctx, globalModusDbEngine, modusgraph.QueryParams{}) return inferences, err } diff --git a/runtime/db/modusdb.go b/runtime/db/modusdb.go index b8857fe2e..1f4a38567 100644 --- a/runtime/db/modusdb.go +++ b/runtime/db/modusdb.go @@ -24,7 +24,7 @@ import ( "github.com/hypermodeinc/modusgraph" ) -var GlobalModusDbEngine *modusgraph.Engine +var globalModusDbEngine *modusgraph.Engine func InitModusDb(ctx context.Context) { if !useModusDB() { @@ -50,13 +50,13 @@ func InitModusDb(ctx context.Context) { sentryutils.CaptureError(ctx, err, msg) logger.Fatal(ctx, err).Msg(msg) } else { - GlobalModusDbEngine = eng + globalModusDbEngine = eng } } func CloseModusDb(ctx context.Context) { - if GlobalModusDbEngine != nil { - GlobalModusDbEngine.Close() + if globalModusDbEngine != nil { + globalModusDbEngine.Close() } } diff --git a/runtime/metrics/metrics.go b/runtime/metrics/metrics.go index d111c82ab..57ab49b8a 100644 --- a/runtime/metrics/metrics.go +++ b/runtime/metrics/metrics.go @@ -98,13 +98,6 @@ var ( }, []string{"function_name"}, ) - - DroppedInferencesNum = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "runtime_dropped_inferences_num", - Help: "Number of dropped inference requests", - }, - ) ) func init() { @@ -116,7 +109,6 @@ func init() { FunctionExecutionsNum, FunctionExecutionDurationMilliseconds, FunctionExecutionDurationMillisecondsSummary, - DroppedInferencesNum, ) }