-
Notifications
You must be signed in to change notification settings - Fork 18
Functionality for noRandomDuplicates parameter and custom Stream options with seeding #62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d1a5c83
de56042
2f057f8
ae8c95d
9b38245
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -108,6 +108,11 @@ func ParseStage(stage *Stage, stages Map) (*Stage, error) { | |
| } | ||
| } | ||
| stages[stage.Id] = stage | ||
| err := processStreams(stage, stages) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to process streams for stage %s: %w", stage.Id, err) | ||
| } | ||
|
|
||
| for _, nextStagePath := range stage.NextStagePaths { | ||
| if nextStage, err := ParseStageFromFile(nextStagePath, stages); err != nil { | ||
| return nil, err | ||
|
|
@@ -150,3 +155,57 @@ func checkStageLinks(stage *Stage) error { | |
| } | ||
| return nil | ||
| } | ||
|
|
||
| func processStreams(stage *Stage, stages Map) error { | ||
| if len(stage.Streams) == 0 { | ||
| stage.seed = stage.States.RandSeed | ||
| return nil | ||
| } | ||
|
|
||
| for _, spec := range stage.Streams { | ||
| if spec.StreamCount <= 0 { | ||
| return fmt.Errorf("stream_count must be positive, got %d for stream %s", spec.StreamCount, spec.StreamPath) | ||
| } | ||
|
|
||
| if len(spec.Seeds) > 0 { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not using your Validate() method? The code seems duplicated. |
||
| if len(spec.Seeds) != 1 && len(spec.Seeds) != spec.StreamCount { | ||
| return fmt.Errorf("seeds array length (%d) must be either 1 or equal to stream_count (%d) for stream %s", | ||
| len(spec.Seeds), spec.StreamCount, spec.StreamPath) | ||
| } | ||
| stage.States.RandSeed = 0 // Disable random seed generation when custom seeds are provided | ||
| } | ||
|
|
||
| streamPath, err := spec.GetValidatedPath(stage.BaseDir) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| for i := 0; i < spec.StreamCount; i++ { | ||
| streamStage, err := ReadStageFromFile(streamPath) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to read stream file %s: %w", streamPath, err) | ||
| } | ||
|
|
||
| // Set unique ID for this stream instance | ||
| baseId := fileNameWithoutPathAndExt(streamPath) | ||
| streamStage.Id = fmt.Sprintf("%s_stream_%d", baseId, i+1) | ||
|
|
||
| // Set custom seed if configured | ||
| if seed, hasCustomSeed := spec.GetSeedForInstance(i); hasCustomSeed { | ||
| streamStage.seed = seed | ||
| log.Info().Str("stream_stage", streamStage.Id).Int64("custom_seed", seed).Int("instance", i+1).Msg("stream assigned custom seed") | ||
| } else { | ||
| // No seed configured, use stage's RandSeed + instance offset | ||
| streamStage.seed = stage.States.RandSeed + int64(i-1) | ||
| log.Info().Str("stream_stage", streamStage.Id).Int64("generated_seed", streamStage.seed).Int64("base_seed", stage.States.RandSeed).Int("instance", i+1).Msg("stream assigned generated seed") | ||
| } | ||
|
|
||
| stages[streamStage.Id] = streamStage | ||
| stage.NextStages = append(stage.NextStages, streamStage) | ||
| streamStage.wgPrerequisites.Add(1) | ||
| } | ||
| } | ||
|
|
||
| stage.Streams = nil | ||
|
|
||
| return nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,9 +4,10 @@ import ( | |
| "context" | ||
| "database/sql" | ||
| _ "embed" | ||
| _ "github.com/go-sql-driver/mysql" | ||
| "pbench/log" | ||
| "pbench/utils" | ||
|
|
||
| _ "github.com/go-sql-driver/mysql" | ||
| ) | ||
|
|
||
| var ( | ||
|
|
@@ -65,7 +66,7 @@ VALUES (?, ?, ?, 0, 0, 0, ?)` | |
|
|
||
| func (m *MySQLRunRecorder) RecordQuery(_ context.Context, s *Stage, result *QueryResult) { | ||
| recordNewQuery := `INSERT INTO pbench_queries (run_id, stage_id, query_file, query_index, query_id, sequence_no, | ||
| cold_run, succeeded, start_time, end_time, row_count, expected_row_count, duration_ms, info_url) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` | ||
| cold_run, succeeded, start_time, end_time, row_count, expected_row_count, duration_ms, info_url, seed) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please update pbench_queries_ddl.sql with the new column
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And why do we need a seed in this table?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are two tables, pbench_runs and pbench_queries. Currently pbench_runs has a seed column, but with this additional functionality to be able to seed each stream, there had to be some way to add reporting for multiple seeds in a run. The simplest way I found to do it was to add 'seed' as a column to pbench_queries and group by stage_id to be able to present it in Grafana.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was looking for other ways to report seed per stream, please let me know if you have any suggestions. |
||
| var queryFile string | ||
| if result.Query.File != nil { | ||
| queryFile = *result.Query.File | ||
|
|
@@ -83,11 +84,13 @@ cold_run, succeeded, start_time, end_time, row_count, expected_row_count, durati | |
| result.RowCount, sql.NullInt32{ | ||
| Int32: int32(result.Query.ExpectedRowCount), | ||
| Valid: result.Query.ExpectedRowCount >= 0, | ||
| }, result.Duration.Milliseconds(), result.InfoUrl) | ||
| }, result.Duration.Milliseconds(), result.InfoUrl, result.Seed) | ||
| log.Info().Str("stage_id", result.StageId).Stringer("start_time", result.StartTime).Stringer("end_time", result.EndTime). | ||
| Str("info_url", result.InfoUrl).Int64("seed", result.Seed).Msg("recorded query result to MySQL") | ||
| if err != nil { | ||
| log.Error().EmbedObject(result).Err(err).Msg("failed to send query summary to MySQL") | ||
| } | ||
| updateRunInfo := `UPDATE pbench_runs SET start_time = ?, queries_ran = queries_ran + 1, failed = ?, mismatch = ? WHERE run_id = ?` | ||
| updateRunInfo := `UPDATE pbench_runs SET start_time = ?, queries_ran = queries_ran + 1, failed = ? , mismatch = ? WHERE run_id = ?` | ||
| res, err := m.db.Exec(updateRunInfo, s.States.RunStartTime, m.failed, m.mismatch, m.runId) | ||
| if err != nil { | ||
| log.Error().Err(err).Str("run_name", s.States.RunName).Int64("run_id", m.runId). | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.