diff --git a/flytekit/core/promise.py b/flytekit/core/promise.py index e92094ca04..712f0d25ca 100644 --- a/flytekit/core/promise.py +++ b/flytekit/core/promise.py @@ -885,16 +885,22 @@ async def binding_data_from_python_std( # akin to what the Type Engine does when it finds a Union type (see the UnionTransformer), but we can't rely on # that in this case, because of the mix and match of realized values, and Promises. for i in range(len(expected_literal_type.union_type.variants)): + lt_type = expected_literal_type.union_type.variants[i] + python_type = get_args(t_value_type)[i] if t_value_type else None try: - lt_type = expected_literal_type.union_type.variants[i] - python_type = get_args(t_value_type)[i] if t_value_type else None return await binding_data_from_python_std(ctx, lt_type, t_value, python_type, nodes) - except Exception: + except Exception as e: logger.debug( - f"failed to bind data {t_value} with literal type {expected_literal_type.union_type.variants[i]}." + f"Failed to bind data {t_value} " + f"using variant[{i}] literal type={repr(lt_type)} (expected overall {expected_literal_type}) " + f"and python type={python_type} (expected overall {t_value_type}). " + f"Error: {e}" ) raise AssertionError( - f"Failed to bind data {t_value} with literal type {expected_literal_type.union_type.variants}." + f"Failed to bind data {t_value} " + f"to any of the expected union variants.\n" + f"Value python type: {type(t_value).__name__}, declared python types: {t_value_type}\n" + f"Expected literal type: {repr(expected_literal_type)}" ) elif ( diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index 6b3760056e..24a78f184b 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -1630,21 +1630,24 @@ async def _literal_map_to_kwargs( f" than allowed by the input spec {len(python_interface_inputs)}" ) kwargs = {} - try: - for i, k in enumerate(lm.literals): - kwargs[k] = asyncio.create_task( - TypeEngine.async_to_python_value(ctx, lm.literals[k], python_interface_inputs[k]) - ) - await asyncio.gather(*kwargs.values()) - except Exception as e: - raise TypeTransformerFailedError( - f"Error converting input '{k}' at position {i}:\n" - f"Literal value: {lm.literals[k]}\n" - f"Expected Python type: {python_interface_inputs[k]}\n" - f"Exception: {e}" + for i, k in enumerate(lm.literals): + kwargs[k] = asyncio.create_task( + TypeEngine.async_to_python_value(ctx, lm.literals[k], python_interface_inputs[k]) ) + if kwargs: + await asyncio.wait(kwargs.values()) + + for k, t in kwargs.items(): + try: + kwargs[k] = t.result() + except Exception as e: + raise TypeTransformerFailedError( + f"Error converting input '{k}':\n" + f"Literal value: {lm.literals[k]!r}\n" + f"Expected Python type: {python_interface_inputs[k]!r}\n" + f"Exception: {e}" + ) - kwargs = {k: v.result() for k, v in kwargs.items() if v is not None} return kwargs @classmethod @@ -2096,7 +2099,7 @@ async def async_to_python_value( res_tag = trans.name found_res = True except Exception as e: - logger.debug(f"Failed to convert from {lv} to {v} with error: {e}") + logger.debug(f"Failed to convert from {repr(lv)} to {v} with error: {e}") if is_ambiguous: raise TypeError( @@ -2107,7 +2110,7 @@ async def async_to_python_value( if found_res: return res - raise TypeError(f"Cannot convert from {lv} to {expected_python_type} (using tag {union_tag})") + raise TypeError(f"Cannot convert from {repr(lv)} to {expected_python_type} (using tag {union_tag})") def guess_python_type(self, literal_type: LiteralType) -> type: if literal_type.union_type is not None: diff --git a/flytekit/models/common.py b/flytekit/models/common.py index 94a7bb66b7..26df175a1b 100644 --- a/flytekit/models/common.py +++ b/flytekit/models/common.py @@ -74,10 +74,10 @@ def __ne__(self, other): return not (self == other) def __repr__(self): - return self.short_string() + return str(self.to_flyte_idl()) def __str__(self): - return self.verbose_string() + return self.short_string() def __hash__(self): return hash(self.to_flyte_idl().SerializeToString(deterministic=True)) @@ -90,12 +90,6 @@ def short_string(self): type_str = type(self).__name__ return f"Flyte Serialized object ({type_str}):" + os.linesep + str_repr - def verbose_string(self): - """ - :rtype: Text - """ - return self.short_string() - def serialize_to_string(self) -> str: return self.to_flyte_idl().SerializeToString() diff --git a/tests/flytekit/unit/core/test_type_conversion_errors.py b/tests/flytekit/unit/core/test_type_conversion_errors.py index e8aca2570a..dd04c1420e 100644 --- a/tests/flytekit/unit/core/test_type_conversion_errors.py +++ b/tests/flytekit/unit/core/test_type_conversion_errors.py @@ -1,5 +1,6 @@ """Unit tests for type conversion errors.""" +import re from datetime import timedelta from string import ascii_lowercase from typing import Tuple @@ -9,6 +10,7 @@ from hypothesis import strategies as st from flytekit import task, workflow +from flytekit.core.type_engine import TypeTransformerFailedError @task @@ -67,11 +69,12 @@ def test_task_input_error(incorrect_input): @settings(deadline=timedelta(seconds=2)) def test_task_output_error(correct_input): with pytest.raises( - TypeError, + TypeTransformerFailedError, match=( - r"Failed to convert outputs of task '{}' at position 0:\n" - r" Expected value of type \ but got .+ of type .+" - ).format(task_incorrect_output.name), + r"Failed to convert outputs of task '{}' at position 0\.\n" + r"Failed to convert type .+ to type .+\.\n" + r"Error Message: Expected value of type \ but got .+ of type .+" + ).format(re.escape(task_incorrect_output.name)), ): task_incorrect_output(a=correct_input) @@ -80,12 +83,12 @@ def test_task_output_error(correct_input): @settings(deadline=timedelta(seconds=2)) def test_workflow_with_task_error(correct_input): with pytest.raises( - TypeError, + TypeTransformerFailedError, match=( - r"Error encountered while executing 'wf_with_task_error':\n" - r" Failed to convert outputs of task '.+' at position 0:\n" - r" Expected value of type \ but got .+ of type .+" - ).format(), + r"Failed to convert outputs of task '.+' at position 0\.\n" + r"Failed to convert type .+ to type .+\.\n" + r"Error Message: Expected value of type \ but got .+ of type .+" + ), ): wf_with_task_error(a=correct_input) @@ -105,7 +108,7 @@ def test_workflow_with_input_error(incorrect_input): def test_workflow_with_output_error(correct_input): with pytest.raises( TypeError, - match=(r"Failed to convert output in position 0 of value .+, expected type \"), + match=r"Failed to convert output in position 0 of value [\s\S]+, expected type \", ): wf_with_output_error(a=correct_input) @@ -122,6 +125,6 @@ def test_workflow_with_output_error(correct_input): def test_workflow_with_multioutput_error(workflow, position, correct_input): with pytest.raises( TypeError, - match=(r"Failed to convert output in position {} of value .+, expected type \").format(position), + match=(r"Failed to convert output in position {} of value [\s\S]+, expected type \").format(position), ): workflow(a=correct_input, b=correct_input) diff --git a/tests/flytekit/unit/core/test_type_engine.py b/tests/flytekit/unit/core/test_type_engine.py index ef139b6701..8945ea46dd 100644 --- a/tests/flytekit/unit/core/test_type_engine.py +++ b/tests/flytekit/unit/core/test_type_engine.py @@ -4034,3 +4034,242 @@ def test_literal_transformer_mixed_base_types(): t = typing.Literal["a", 1] with pytest.raises(TypeTransformerFailedError): TypeEngine.get_transformer(t).get_literal_type(t) + + +def test_literal_map_to_kwargs_empty_inputs(): + """ + Test that literal_map_to_kwargs handles empty input correctly without + raising ValueError about empty coroutines/futures set. + + This test specifically addresses the bug where asyncio.wait() was called + with an empty collection when there are no inputs to convert. + """ + from flytekit.models import literals as _literal_models + + ctx = FlyteContextManager.current_context() + + # Test with completely empty literal map and empty python types + empty_literal_map = _literal_models.LiteralMap(literals={}) + empty_python_types = {} + + # This should not raise a ValueError about empty coroutines/futures + result = TypeEngine.literal_map_to_kwargs(ctx, empty_literal_map, empty_python_types) + assert result == {} + + # Test with empty literal map but non-empty python types (task with defaults) + python_types_with_defaults = {"a": int, "b": str} + result = TypeEngine.literal_map_to_kwargs(ctx, empty_literal_map, python_types_with_defaults) + assert result == {} + + # Test with some inputs to ensure normal functionality still works + literal_map_with_data = _literal_models.LiteralMap(literals={ + "x": _literal_models.Literal( + scalar=_literal_models.Scalar( + primitive=_literal_models.Primitive(integer=42) + ) + ) + }) + python_types_with_data = {"x": int} + result = TypeEngine.literal_map_to_kwargs(ctx, literal_map_with_data, python_types_with_data) + assert result == {"x": 42} + + +def test_task_with_no_inputs_execution(): + """ + Test that tasks with no inputs (using default arguments) execute correctly. + + This is an integration test that ensures the fix for empty kwargs in + literal_map_to_kwargs works end-to-end with actual task execution. + """ + @task + def task_with_defaults(a: int = 10, b: str = "hello") -> str: + return f"{a}_{b}" + + @task + def task_no_params() -> int: + return 42 + + # These should not raise ValueError about empty coroutines/futures + result1 = task_with_defaults() + assert result1 == "10_hello" + + result2 = task_no_params() + assert result2 == 42 + + # Test with partial parameters + result3 = task_with_defaults(a=20) + assert result3 == "20_hello" + + result4 = task_with_defaults(b="world") + assert result4 == "10_world" + + +def test_asyncio_wait_empty_kwargs_regression(): + """ + Regression test for the specific bug where asyncio.wait() was called with empty kwargs. + + This test simulates the exact scenario that was causing the + "Set of Tasks/Futures is empty" ValueError. + """ + import asyncio + from flytekit.models import literals as _literal_models + from flytekit.core.type_engine import TypeEngine + + async def simulate_original_bug(): + """ + Simulate what would happen with the original buggy code. + This test documents the exact fix we applied. + """ + ctx = FlyteContextManager.current_context() + empty_literal_map = _literal_models.LiteralMap(literals={}) + python_interface_inputs = {} + + # Simulate the original buggy code path + kwargs = {} + for i, k in enumerate(empty_literal_map.literals): + # This loop wouldn't execute for empty literals + kwargs[k] = asyncio.create_task( + TypeEngine.async_to_python_value(ctx, empty_literal_map.literals[k], python_interface_inputs[k]) + ) + + # Before our fix, this would raise ValueError: Set of Tasks/Futures is empty + # After our fix, we check if kwargs is not empty before calling asyncio.wait + if kwargs: # This is the fix we added + await asyncio.wait(kwargs.values()) + + # The loop to get results should also handle empty kwargs + for k, t in kwargs.items(): + kwargs[k] = t.result() + + return kwargs + + # This should complete without error + result = asyncio.run(simulate_original_bug()) + assert result == {} + + +def test_error_message_improvements_literal_map_to_kwargs(): + """ + Test that error messages in literal_map_to_kwargs use proper repr formatting + for better debugging experience. + """ + from flytekit.models import literals as _literal_models + from flytekit.core.type_engine import TypeTransformerFailedError + + ctx = FlyteContextManager.current_context() + + # Create a literal map with a value that will cause a conversion error + # Using a string literal but expecting an int type + literal_map = _literal_models.LiteralMap(literals={ + "bad_input": _literal_models.Literal( + scalar=_literal_models.Scalar( + primitive=_literal_models.Primitive(string_value="not_a_number") + ) + ) + }) + + python_types = {"bad_input": int} + + # This should raise a TypeTransformerFailedError with improved formatting + with pytest.raises(TypeTransformerFailedError) as exc_info: + TypeEngine.literal_map_to_kwargs(ctx, literal_map, python_types) + + error_message = str(exc_info.value) + + # Verify the error message contains proper repr formatting + assert "Error converting input 'bad_input'" in error_message + assert "Literal value:" in error_message + assert "Expected Python type:" in error_message + assert "" in error_message # repr of int type + + # Verify the literal value is properly formatted with repr + # The exact format may vary, but it should be more readable than a raw object string + + +def test_error_message_improvements_union_transformer(): + """ + Test that UnionTransformer error messages use proper repr formatting + for better debugging when conversion fails. + """ + from typing import Union + from flytekit.models import literals as _literal_models + + ctx = FlyteContextManager.current_context() + + # Create a union type that will fail conversion + union_type = Union[int, str] + + # Create a literal that cannot be converted to either int or str + # Using a complex nested structure that should fail + complex_literal = _literal_models.Literal( + map=_literal_models.LiteralMap(literals={ + "nested": _literal_models.Literal( + scalar=_literal_models.Scalar( + primitive=_literal_models.Primitive(integer=42) + ) + ) + }) + ) + + # This should raise a TypeError with improved formatting + with pytest.raises(TypeError) as exc_info: + TypeEngine.to_python_value(ctx, complex_literal, union_type) + + error_message = str(exc_info.value) + + # Verify the error message uses repr formatting and is more readable + assert "Cannot convert from" in error_message + assert "using tag" in error_message + # The error should show a repr of the literal value, making it more readable + + +def test_debug_logging_union_transformer(caplog): + """ + Test that UnionTransformer debug logging uses proper repr formatting. + """ + import logging + from typing import Union + from flytekit.models import literals as _literal_models + + # Set logging level to capture debug messages + caplog.set_level(logging.DEBUG) + + ctx = FlyteContextManager.current_context() + + # Create a union type and a literal that will cause conversion to fail for all types + # This will trigger debug logging for each failed conversion attempt + union_type = Union[int, str] # Both will fail for a map literal + + # Create a literal that cannot be converted to int or str + literal_value = _literal_models.Literal( + map=_literal_models.LiteralMap(literals={ + "key": _literal_models.Literal( + scalar=_literal_models.Scalar( + primitive=_literal_models.Primitive(integer=42) + ) + ) + }) + ) + + # This should fail and log debug messages for each failed conversion attempt + with pytest.raises(TypeError) as exc_info: + TypeEngine.to_python_value(ctx, literal_value, union_type) + + # Verify the final error message uses repr formatting + error_message = str(exc_info.value) + assert "Cannot convert from" in error_message + assert "using tag" in error_message + + # Check that debug logging includes proper repr formatting + debug_logs = [record.message for record in caplog.records if record.levelname == "DEBUG"] + + # Look for debug messages that show conversion failures with improved formatting + conversion_failure_logs = [log for log in debug_logs if "Failed to convert from" in log] + + # There should be debug logs for the failed conversion attempts + if conversion_failure_logs: + # Verify that the log message uses proper formatting + log_message = conversion_failure_logs[0] + assert "Failed to convert from" in log_message + # The log should be more readable due to repr formatting - it should contain + # the literal representation which is now formatted with repr() diff --git a/tests/flytekit/unit/core/test_type_hints.py b/tests/flytekit/unit/core/test_type_hints.py index 596dcf3666..26d8615eb2 100644 --- a/tests/flytekit/unit/core/test_type_hints.py +++ b/tests/flytekit/unit/core/test_type_hints.py @@ -1738,16 +1738,16 @@ def foo4(input: DC1 = DC1(1, "a")) -> DC2: with pytest.raises( TypeTransformerFailedError, match=( - f"Failed to convert inputs of task '{exec_prefix}tests.flytekit.unit.core.test_type_hints.foo':\n" + f"Failed to convert inputs of task '.*tests.flytekit.unit.core.test_type_hints.foo':\n" " Failed argument 'a': Expected value of type but got 'hello' of type " ), ): foo(a="hello", b=10) # type: ignore with pytest.raises( - ValueError, + TypeTransformerFailedError, match=( - f"Failed to convert outputs of task '{exec_prefix}tests.flytekit.unit.core.test_type_hints.foo2' at position 0.\n" + f"Failed to convert outputs of task '.*tests.flytekit.unit.core.test_type_hints.foo2' at position 0.\n" f"Failed to convert type to type .\n" "Error Message: Expected value of type but got 'hello' of type ." ), @@ -1756,7 +1756,7 @@ def foo4(input: DC1 = DC1(1, "a")) -> DC2: with pytest.raises( TypeTransformerFailedError, - match=f"Failed to convert inputs of task '{exec_prefix}tests.flytekit.unit.core.test_type_hints.foo3':\n " + match=f"Failed to convert inputs of task '.*tests.flytekit.unit.core.test_type_hints.foo3':\n " f"Failed argument 'a': Expected a dict", ): foo3(a=[{"hello": 2}]) @@ -1764,7 +1764,7 @@ def foo4(input: DC1 = DC1(1, "a")) -> DC2: with pytest.raises( AttributeError, match=( - f"Failed to convert outputs of task '{exec_prefix}tests.flytekit.unit.core.test_type_hints.foo4' at position 0.\n" + f"Failed to convert outputs of task '.*tests.flytekit.unit.core.test_type_hints.foo4' at position 0.\n" f"Failed to convert type .DC1'> to type .DC2'>.\n" "Error Message: 'DC1' object has no attribute 'c'." ), @@ -1895,9 +1895,9 @@ def wf2(a: typing.Union[int, str]) -> typing.Union[int, str]: return t2(a=a) with pytest.raises( - TypeError, + TypeTransformerFailedError, match=( - rf"Error encountered while converting inputs of '{exec_prefix}tests\.flytekit\.unit\.core\.test_type_hints\.t2':\n\s+Error converting input 'a' at position 0:" + rf"Error encountered while converting inputs of '.*tests\.flytekit\.unit\.core\.test_type_hints\.t2':\n\s+Error converting input 'a':" ), ): wf2(a="2") # Removed assert as it was not necessary for the exception to be raised diff --git a/tests/flytekit/unit/core/test_worker_queue.py b/tests/flytekit/unit/core/test_worker_queue.py index fee1dca78a..0eb8e02475 100644 --- a/tests/flytekit/unit/core/test_worker_queue.py +++ b/tests/flytekit/unit/core/test_worker_queue.py @@ -21,8 +21,9 @@ def _mock_reconcile(update: Update): update.wf_exec.outputs.as_python_native.return_value = "hello" +@pytest.mark.asyncio @mock.patch("flytekit.core.worker_queue.Controller.reconcile_one", side_effect=_mock_reconcile) -def test_controller(mock_reconcile): +async def test_controller(mock_reconcile): print(f"ID mock_reconcile {id(mock_reconcile)}") mock_reconcile.return_value = 123 @@ -41,11 +42,12 @@ async def fake_eager(): res = await f assert res == "hello" - loop_manager.run_sync(fake_eager) + await fake_eager() +@pytest.mark.asyncio @mock.patch("flytekit.core.worker_queue.Controller._execute") -def test_controller_launch(mock_thread_target): +async def test_controller_launch(mock_thread_target): @task def t2() -> str: return "hello" @@ -263,13 +265,14 @@ def t1() -> str: ) +@pytest.mark.asyncio @pytest.mark.parametrize("phase,expected_update_status", [ (execution.WorkflowExecutionPhase.SUCCEEDED, ItemStatus.SUCCESS), (execution.WorkflowExecutionPhase.FAILED, ItemStatus.FAILED), (execution.WorkflowExecutionPhase.ABORTED, ItemStatus.FAILED), (execution.WorkflowExecutionPhase.TIMED_OUT, ItemStatus.FAILED), ]) -def test_reconcile(phase, expected_update_status): +async def test_reconcile(phase, expected_update_status): mock_remote = mock.MagicMock() wf_exec = FlyteWorkflowExecution( id=identifier.WorkflowExecutionIdentifier("project", "domain", "exec-name"), diff --git a/tests/flytekit/unit/models/test_common.py b/tests/flytekit/unit/models/test_common.py index ccb7042092..105e50a897 100644 --- a/tests/flytekit/unit/models/test_common.py +++ b/tests/flytekit/unit/models/test_common.py @@ -120,7 +120,7 @@ def test_auth_role_empty(): def test_short_string_raw_output_data_config(): obj = _common.RawOutputDataConfig("s3://bucket") assert "Flyte Serialized object (RawOutputDataConfig):" in obj.short_string() - assert "Flyte Serialized object (RawOutputDataConfig):" in repr(obj) + assert 'output_location_prefix: "s3://bucket"' in repr(obj) def test_html_repr_data_config(): @@ -148,7 +148,7 @@ def test_short_string_entities_ExecutionClosure(): created_at=None, updated_at=test_datetime, ) - expected_result = textwrap.dedent("""\ + short_string_expected = textwrap.dedent("""\ Flyte Serialized object (ExecutionClosure): outputs: uri: http://foo/ @@ -160,18 +160,36 @@ def test_short_string_entities_ExecutionClosure(): updated_at: seconds: 1640995200""") - assert repr(obj) == expected_result - assert obj.short_string() == expected_result + repr_expected = textwrap.dedent("""\ + outputs { + uri: "http://foo/" + } + phase: SUCCEEDED + started_at { + seconds: 1640995200 + } + duration { + seconds: 10 + } + updated_at { + seconds: 1640995200 + } + """) + + assert repr(obj) == repr_expected + assert obj.short_string() == short_string_expected def test_short_string_entities_Primitive(): obj = Primitive(integer=1) - expected_result = textwrap.dedent("""\ + short_string_expected = textwrap.dedent("""\ Flyte Serialized object (Primitive): integer: 1""") - assert repr(obj) == expected_result - assert obj.short_string() == expected_result + repr_expected = "integer: 1\n" + + assert repr(obj) == repr_expected + assert obj.short_string() == short_string_expected def test_short_string_entities_TaskMetadata(): @@ -188,7 +206,7 @@ def test_short_string_entities_TaskMetadata(): (), ) - expected_result = textwrap.dedent("""\ + short_string_expected = textwrap.dedent("""\ Flyte Serialized object (TaskMetadata): discoverable: True runtime: @@ -204,17 +222,44 @@ def test_short_string_entities_TaskMetadata(): interruptible: True cache_serializable: True pod_template_name: A""") - assert repr(obj) == expected_result - assert obj.short_string() == expected_result + + repr_expected = ('discoverable: true\n' + 'runtime {\n' + ' type: FLYTE_SDK\n' + ' version: "1.0.0"\n' + ' flavor: "python"\n' + '}\n' + 'timeout {\n' + ' seconds: 86400\n' + '}\n' + 'retries {\n' + ' retries: 3\n' + '}\n' + 'discovery_version: "0.1.1b0"\n' + 'deprecated_error_message: "This is deprecated!"\n' + 'interruptible: true\n' + 'cache_serializable: true\n' + 'pod_template_name: "A"\n' + 'generates_deck {\n' + '}\n') + + assert repr(obj) == repr_expected + assert obj.short_string() == short_string_expected def test_short_string_entities_Project(): obj = Project("project_id", "project_name", "project_description") - expected_result = textwrap.dedent("""\ + short_string_expected = textwrap.dedent("""\ Flyte Serialized object (Project): id: project_id name: project_name description: project_description""") - assert repr(obj) == expected_result - assert obj.short_string() == expected_result + repr_expected = textwrap.dedent("""\ + id: "project_id" + name: "project_name" + description: "project_description" + """) + + assert repr(obj) == repr_expected + assert obj.short_string() == short_string_expected