Skip to content

Commit aaeb10f

Browse files
authored
[ML] Disable child span for streaming tasks (#132945)
There is the potential for a memory leak, depending on which thread handles the onComplete message. Currently, the child span does not add anything to debugging, so we will disable it until we can figure out a clean way to propagate the child span's context throughout the stream. In any case, it would be better to replace it entirely with a child span capturing the outbound service call and response for both streaming and non-streaming requests, so this may remain disabled in the long run anyway in favor of that child span.
1 parent 15768f6 commit aaeb10f

File tree

3 files changed

+10
-5
lines changed

3 files changed

+10
-5
lines changed

docs/changelog/132945.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 132945
2+
summary: Disable child span for streaming tasks
3+
area: Machine Learning
4+
type: bug
5+
issues: []

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/task/StreamingTaskManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
105105
flowTask.addListener(TaskBackedProcessor.this::cancelTask);
106106
return flowTask;
107107
}
108-
});
108+
}, false);
109109
}
110110
}
111111
}

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/action/task/StreamingTaskManagerTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public void setUp() throws Exception {
5454
doAnswer(ans -> {
5555
TaskAwareRequest taskAwareRequest = ans.getArgument(2);
5656
return taskAwareRequest.createTask(1L, taskType, taskAction, TaskId.EMPTY_TASK_ID, Map.of());
57-
}).when(taskManager).register(any(), any(), any());
57+
}).when(taskManager).register(any(), any(), any(), eq(false));
5858
}
5959

6060
@After
@@ -67,7 +67,7 @@ public void testSubscribeRegistersTask() {
6767

6868
processor.subscribe(mock());
6969

70-
verify(taskManager, only()).register(eq(taskType), eq(taskAction), any());
70+
verify(taskManager, only()).register(eq(taskType), eq(taskAction), any(), eq(false));
7171
}
7272

7373
public void testCancelPropagatesUpstreamAndDownstream() {
@@ -77,7 +77,7 @@ public void testCancelPropagatesUpstreamAndDownstream() {
7777
var registeredTask = (CancellableTask) taskAwareRequest.createTask(1L, taskType, taskAction, TaskId.EMPTY_TASK_ID, Map.of());
7878
task.set(registeredTask);
7979
return registeredTask;
80-
}).when(taskManager).register(any(), any(), any());
80+
}).when(taskManager).register(any(), any(), any(), eq(false));
8181

8282
Flow.Subscriber<Object> downstream = mock();
8383
Flow.Subscription upstream = mock();
@@ -173,7 +173,7 @@ public void testOnNextAfterCancelDoesNotForwardItem() {
173173
var registeredTask = (CancellableTask) taskAwareRequest.createTask(1L, taskType, taskAction, TaskId.EMPTY_TASK_ID, Map.of());
174174
task.set(registeredTask);
175175
return registeredTask;
176-
}).when(taskManager).register(any(), any(), any());
176+
}).when(taskManager).register(any(), any(), any(), eq(false));
177177

178178
var processor = streamingTaskManager.create(taskType, taskAction);
179179
var downstream = establishFlow(processor);

0 commit comments

Comments
 (0)