Skip to content

Commit 0d5abdd

Browse files
committed
forward: add --dry-run
1 parent d6f4485 commit 0d5abdd

File tree

2 files changed

+18
-7
lines changed

2 files changed

+18
-7
lines changed

cmd/forward.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func init() {
4646
forward.PrestoFlagsArray.Install(forwardCmd)
4747
_ = forwardCmd.Flags().MarkHidden("trino")
4848
wd, _ := os.Getwd()
49+
forwardCmd.Flags().BoolVarP(&forward.DryRun, "dry-run", "", false, "Turning on dry run will only show the queries but not sending them to the target server.")
4950
forwardCmd.Flags().StringVarP(&forward.OutputPath, "output-path", "o", wd, "Output directory path")
5051
forwardCmd.Flags().StringVarP(&forward.RunName, "name", "n", fmt.Sprintf("forward_%s", time.Now().Format(utils.DirectoryNameTimeFormat)), `Assign a name to this run. (default: "forward_<current time>")`)
5152
forwardCmd.Flags().DurationVarP(&forward.PollInterval, "poll-interval", "i", time.Second*5, "Interval between polls to the source cluster")

cmd/forward/main.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
)
1919

2020
var (
21+
DryRun bool
2122
PrestoFlagsArray utils.PrestoFlagsArray
2223
OutputPath string
2324
RunName string
@@ -172,7 +173,22 @@ func forwardQuery(ctx context.Context, queryState *presto.QueryStateInfo, client
172173
queryInfo.Query = replacedQuery
173174
}
174175
}
176+
if schema := queryInfo.Session.Schema; schema != nil {
177+
if mappedSchema, exists := schemaMappings[*schema]; exists {
178+
queryInfo.Session.Schema = &mappedSchema
179+
log.Info().Str("source_query_id", queryInfo.QueryId).
180+
Msgf("schema replaced %s -> %s", *schema, mappedSchema)
181+
}
182+
}
175183
SessionPropertyHeader := clients[0].GenerateSessionParamsHeaderValue(queryInfo.Session.CollectSessionProperties())
184+
if DryRun {
185+
logEntry := log.Info().Str("query", queryInfo.Query)
186+
if queryInfo.Session.Schema != nil {
187+
logEntry = logEntry.Str("schema", *queryInfo.Session.Schema)
188+
}
189+
logEntry.Msg("query not sent in dry-run mode")
190+
return
191+
}
176192
successful, failed := atomic.Uint32{}, atomic.Uint32{}
177193
forwardedQueries := sync.WaitGroup{}
178194
for i := 1; i < len(clients); i++ {
@@ -184,13 +200,7 @@ func forwardQuery(ctx context.Context, queryState *presto.QueryStateInfo, client
184200
req.Header.Set(presto.CatalogHeader, *queryInfo.Session.Catalog)
185201
}
186202
if queryInfo.Session.Schema != nil {
187-
if mappedSchema, exists := schemaMappings[*queryInfo.Session.Schema]; exists {
188-
req.Header.Set(presto.SchemaHeader, mappedSchema)
189-
log.Info().Str("source_query_id", queryInfo.QueryId).
190-
Msgf("schema replaced %s -> %s", *queryInfo.Session.Schema, mappedSchema)
191-
} else {
192-
req.Header.Set(presto.SchemaHeader, *queryInfo.Session.Schema)
193-
}
203+
req.Header.Set(presto.SchemaHeader, *queryInfo.Session.Schema)
194204
}
195205
req.Header.Set(presto.SessionHeader, SessionPropertyHeader)
196206
req.Header.Set(presto.SourceHeader, queryInfo.QueryId)

0 commit comments

Comments
 (0)