Skip to content

Commit cf991d8

Browse files
authored
Add task instance integration tests for rtif, heartbeat, state transitions (apache#57198)
1 parent a92319a commit cf991d8

File tree

1 file changed

+78
-9
lines changed

1 file changed

+78
-9
lines changed

task-sdk-tests/tests/task_sdk_tests/test_task_instance_operations.py

Lines changed: 78 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19+
import requests
20+
1921
from airflow.sdk.api.datamodels._generated import (
2022
InactiveAssetsResponse,
2123
PrevSuccessfulDagRunResponse,
@@ -98,24 +100,91 @@ def test_ti_get_task_states(sdk_client, dag_info):
98100
console.print("[green]✅ Task states test passed!")
99101

100102

101-
def test_ti_finish_failed(sdk_client, task_instance_id):
103+
def test_ti_set_rtif(sdk_client, task_instance_id):
104+
"""
105+
Test setting Rendered Task Instance Fields (RTIF).
106+
"""
107+
console.print("[yellow]Setting Rendered Task Instance Fields...")
108+
109+
rtif_data = {
110+
"rendered_field_1": "test_value_1",
111+
"rendered_field_2": "1234",
112+
}
113+
114+
response = sdk_client.task_instances.set_rtif(task_instance_id, rtif_data)
115+
116+
console.print(" RTIF Response ".center(72, "="))
117+
console.print(f"[bright_blue]Response Type:[/] {type(response).__name__}")
118+
console.print(f"[bright_blue]Status:[/] {response.ok}")
119+
console.print(f"[bright_blue]Task Instance ID:[/] {task_instance_id}")
120+
console.print(f"[bright_blue]Fields Set:[/] {list(rtif_data.keys())}")
121+
console.print("=" * 72)
122+
123+
assert response.ok is True
124+
console.print("[green]✅ RTIF test passed!")
125+
126+
127+
def test_ti_heartbeat(sdk_client, task_instance_id, core_api_headers, dag_info, monkeypatch):
102128
"""
103-
Test finishing a task instance with failed state.
129+
Test sending heartbeat for a running task instance.
104130
105-
This is the LAST test and will terminate the long-running task.
106-
It must run after all other tests that need the task to be running.
131+
This test fetches the actual worker's PID and hostname from core API,
132+
then patches get_hostname() to return the worker's hostname, allowing
133+
the heartbeat to be accepted by the server.
107134
"""
108-
console.print("[yellow]Finishing task instance as FAILED...")
135+
console.print("[yellow]Getting task instance details for heartbeat...")
109136

110-
# Finish the task with failed state
137+
ti_url = (
138+
f"http://localhost:8080/api/v2/dags/{dag_info['dag_id']}/"
139+
f"dagRuns/{dag_info['dag_run_id']}/taskInstances/long_running_task/tries/1"
140+
)
141+
ti_response = requests.get(ti_url, headers=core_api_headers, timeout=10)
142+
ti_response.raise_for_status()
143+
144+
ti_data = ti_response.json()
145+
worker_hostname = ti_data.get("hostname")
146+
worker_pid = ti_data.get("pid")
147+
148+
console.print(" Worker Information ".center(72, "="))
149+
console.print(f"[bright_blue]Worker Hostname:[/] {worker_hostname}")
150+
console.print(f"[bright_blue]Worker PID:[/] {worker_pid}")
151+
console.print("=" * 72)
152+
153+
assert worker_hostname is not None
154+
assert worker_pid is not None
155+
156+
# Patch get_hostname to return the worker's hostname
157+
from airflow.sdk.api import client as sdk_client_module
158+
159+
monkeypatch.setattr(sdk_client_module, "get_hostname", lambda: worker_hostname)
160+
161+
console.print("[yellow]Sending heartbeat with worker's PID/hostname...")
162+
163+
sdk_client.task_instances.heartbeat(task_instance_id, pid=worker_pid)
164+
165+
console.print(" Heartbeat Response ".center(72, "="))
166+
console.print("[bright_blue]Status:[/] Success (204 No Content)")
167+
console.print(f"[bright_blue]Task Instance ID:[/] {task_instance_id}")
168+
console.print(f"[bright_blue]Used PID:[/] {worker_pid}")
169+
console.print(f"[bright_blue]Used Hostname:[/] {worker_hostname}")
170+
console.print("=" * 72)
171+
172+
console.print("[green]✅ Heartbeat test passed!")
173+
174+
175+
def test_ti_state_transitions(sdk_client, task_instance_id):
176+
"""
177+
Test task instance state transition to terminal state.
178+
"""
179+
console.print("[yellow]Testing state transition: RUNNING → FAILED...")
111180
sdk_client.task_instances.finish(
112181
id=task_instance_id, state=TerminalStateNonSuccess.FAILED, when=utcnow(), rendered_map_index="-1"
113182
)
114183

115-
console.print(" Task Finish Response ".center(72, "="))
184+
console.print(" State: FAILED (Terminal) ".center(72, "="))
185+
console.print("[bright_blue]Transition:[/] RUNNING → FAILED")
116186
console.print("[bright_blue]Status:[/] Success (204 No Content)")
117187
console.print("[bright_blue]Final State:[/] FAILED")
118188
console.print(f"[bright_blue]Task Instance ID:[/] {task_instance_id}")
119189
console.print("=" * 72)
120-
121-
console.print("[green]✅ Task instance finished successfully!")
190+
console.print("[green]✅ Successfully transitioned to FAILED terminal state!")

0 commit comments

Comments
 (0)