|
60 | 60 | import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; |
61 | 61 | import org.elasticsearch.xpack.esql.action.EsqlQueryAction; |
62 | 62 | import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction; |
| 63 | +import org.elasticsearch.xpack.esql.core.expression.Attribute; |
63 | 64 | import org.elasticsearch.xpack.esql.enrich.EnrichLookupService; |
64 | 65 | import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec; |
65 | 66 | import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec; |
@@ -206,13 +207,19 @@ public void execute( |
206 | 207 | ); |
207 | 208 | long start = configuration.getQueryStartTimeNanos(); |
208 | 209 | String local = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; |
| 210 | + /* |
| 211 | + * Grab the output attributes here, so we can pass them to |
| 212 | + * the listener without holding on to a reference to the |
| 213 | + * entire plan. |
| 214 | + */ |
| 215 | + List<Attribute> outputAttributes = physicalPlan.output(); |
209 | 216 | try ( |
210 | 217 | Releasable ignored = exchangeSource.addEmptySink(); |
211 | 218 | // this is the top level ComputeListener called once at the end (e.g., once all clusters have finished for a CCS) |
212 | 219 | var computeListener = ComputeListener.create(local, transportService, rootTask, execInfo, start, listener.map(r -> { |
213 | 220 | long tookTimeNanos = System.nanoTime() - configuration.getQueryStartTimeNanos(); |
214 | 221 | execInfo.overallTook(new TimeValue(tookTimeNanos, TimeUnit.NANOSECONDS)); |
215 | | - return new Result(physicalPlan.output(), collectedPages, r.getProfiles(), execInfo); |
| 222 | + return new Result(outputAttributes, collectedPages, r.getProfiles(), execInfo); |
216 | 223 | })) |
217 | 224 | ) { |
218 | 225 | // run compute on the coordinator |
|
0 commit comments