Skip to content

Commit 07d35d5

Browse files
committed
misc: query forward tool fix
1 parent 7c56990 commit 07d35d5

File tree

1 file changed

+10
-10
lines changed

1 file changed

+10
-10
lines changed

cmd/forward/main.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ var (
3636
runningTasks sync.WaitGroup
3737
failedToForward atomic.Uint32
3838
forwarded atomic.Uint32
39-
//This is the cache map to cache the running queries
40-
//the key is the queryId in source cluster. The values are the queries running on target clusters,
41-
//it includes, nextUri and the pointer to the target client.
42-
runningQueriesCacheMap = make(map[string]*[]QueryCacheEntry)
39+
// runningQueriesCacheMap caches mapping for original queries to forwarded queries.
40+
// The key is the queryId in source cluster. The values are the queries running on target clusters,
41+
// it includes nextUri and the pointer to the target cluster client.
42+
runningQueriesCacheMap = make(map[string][]*QueryCacheEntry)
4343
)
4444

4545
const (
@@ -156,7 +156,7 @@ func Run(_ *cobra.Command, _ []string) {
156156
// to process get filtered out.
157157
newCutoffTime := lastQueryStateCheckCutoffTime
158158
for _, state := range states {
159-
//check if there is query in cancel status
159+
// Check if there is query in cancel status
160160
if state.QueryState == queryStateFailed && state.ErrorCode.Name == queryStateErrorCancelled {
161161
go checkAndCancelQuery(ctx, &state)
162162
}
@@ -191,7 +191,7 @@ func Run(_ *cobra.Command, _ []string) {
191191

192192
func checkAndCancelQuery(ctx context.Context, queryState *presto.QueryStateInfo) {
193193
if queryCacheEntries, ok := runningQueriesCacheMap[queryState.QueryId]; ok {
194-
for _, q := range *queryCacheEntries {
194+
for _, q := range queryCacheEntries {
195195
if q.NextUri != "" {
196196
_, _, cancelQueryErr := q.Client.CancelQuery(ctx, q.NextUri)
197197
if cancelQueryErr != nil {
@@ -257,7 +257,7 @@ func forwardQuery(ctx context.Context, queryState *presto.QueryStateInfo, client
257257
}
258258
successful, failed := atomic.Uint32{}, atomic.Uint32{}
259259
forwardedQueries := sync.WaitGroup{}
260-
cachedQueries := make([]QueryCacheEntry, len(clients)-1)
260+
cachedQueries := make([]*QueryCacheEntry, len(clients)-1)
261261
for i := 1; i < len(clients); i++ {
262262
forwardedQueries.Add(1)
263263
go func(client *presto.Client) {
@@ -280,7 +280,7 @@ func forwardQuery(ctx context.Context, queryState *presto.QueryStateInfo, client
280280
}
281281
//build cache for running query
282282
if clientResult.NextUri != nil {
283-
cachedQueries[i-1] = QueryCacheEntry{NextUri: *clientResult.NextUri, Client: client}
283+
cachedQueries[i-1] = &QueryCacheEntry{NextUri: *clientResult.NextUri, Client: client}
284284
}
285285
rowCount := 0
286286
drainErr := clientResult.Drain(ctx, func(qr *presto.QueryResults) error {
@@ -299,8 +299,8 @@ func forwardQuery(ctx context.Context, queryState *presto.QueryStateInfo, client
299299
}(clients[i])
300300
}
301301
//Add running query into to cache
302-
runningQueriesCacheMap[queryState.QueryId] = &cachedQueries
303-
log.Info().Msg("adding query to cache" + queryState.QueryId)
302+
runningQueriesCacheMap[queryState.QueryId] = cachedQueries
303+
log.Debug().Msg("adding query to cache" + queryState.QueryId)
304304
forwardedQueries.Wait()
305305
log.Info().Str("source_query_id", queryInfo.QueryId).Uint32("successful", successful.Load()).
306306
Uint32("failed", failed.Load()).Msg("query forwarding finished")

0 commit comments

Comments
 (0)