Tdigestdistribution #37109
Tdigestdistribution #37109
7 fail, 7 skipped, 2 pass in 6m 30s
2 files 2 suites 6m 30s ⏱️
16 tests 2 ✅ 7 💤 7 ❌
23 runs 2 ✅ 14 💤 7 ❌
Results for commit 7280297.
Annotations
github-actions / Python 3.12 Test Results (ubuntu-latest)
test_empty_input_chunks (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) failed
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-ml.xml [took 5m 10s]
Raw output
RuntimeError: Pipeline job-001 failed in state FAILED: bundle inst001 stage-001 failed:Traceback (most recent call last):
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 316, in _execute
response = task()
^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 390, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 669, in do_instruction
return getattr(self, request_type)(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 708, in process_bundle
monitoring_infos = bundle_processor.monitoring_infos()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1431, in monitoring_infos
op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 245, in monitoring_infos
all_monitoring_infos = super().monitoring_infos(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "apache_beam/runners/worker/operations.py", line 578, in apache_beam.runners.worker.operations.Operation.monitoring_infos
def monitoring_infos(self, transform_id, tag_to_pcollection_id):
File "apache_beam/runners/worker/operations.py", line 584, in apache_beam.runners.worker.operations.Operation.monitoring_infos
self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.Operation.pcollection_count_monitoring_infos
DistributionData(sum, count, min, max),
File "apache_beam/metrics/cells.py", line 584, in apache_beam.metrics.cells.DistributionData.__init__
self.tdigest = tdigest
AttributeError: 'apache_beam.metrics.cells.DistributionData' object has no attribute 'tdigest'
self = <apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment testMethod=test_empty_input_chunks>
def test_empty_input_chunks(self):
test_chunks = []
anns_field = "dense_embedding_cosine"
search_parameters = MilvusSearchParameters(
collection_name=MILVUS_IT_CONFIG["collection_name"],
search_strategy=VectorSearchParameters(anns_field=anns_field))
collection_load_parameters = MilvusCollectionLoadParameters()
handler = MilvusSearchEnrichmentHandler(
self._connection_params,
search_parameters,
collection_load_parameters=collection_load_parameters)
expected_chunks = []
> with TestPipeline() as p:
^^^^^^^^^^^^^^
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:589:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/pipeline.py:671: in __exit__
self.result = self.run()
^^^^^^^^^^
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/testing/test_pipeline.py:122: in run
state = result.wait_until_finish(duration=self.timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7eff9162ae40>
duration = None
def wait_until_finish(self, duration=None):
"""
:param duration: The maximum time in milliseconds to wait for the result of
the execution. If None or zero, will wait until the pipeline finishes.
:return: The result of the pipeline, i.e. PipelineResult.
"""
last_error_text = None
def read_messages() -> None:
nonlocal last_error_text
previous_state = -1
for message in self._message_stream:
if message.HasField('message_response'):
mr = message.message_response
logging.log(MESSAGE_LOG_LEVELS[mr.importance], "%s", mr.message_text)
if mr.importance == beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR:
last_error_text = mr.message_text
else:
current_state = message.state_response.state
if current_state != previous_state:
_LOGGER.info(
"Job state changed to %s",
self.runner_api_state_to_pipeline_state(current_state))
previous_state = current_state
self._messages.append(message)
message_thread = threading.Thread(
target=read_messages, name='wait_until_finish_read')
message_thread.daemon = True
message_thread.start()
if duration:
state_thread = threading.Thread(
target=functools.partial(self._observe_state, message_thread),
name='wait_until_finish_state_observer')
state_thread.daemon = True
state_thread.start()
start_time = time.time()
duration_secs = duration / 1000
while (time.time() - start_time < duration_secs and
state_thread.is_alive()):
time.sleep(1)
else:
self._observe_state(message_thread)
if self._runtime_exception:
> raise self._runtime_exception
E RuntimeError: Pipeline job-001 failed in state FAILED: bundle inst001 stage-001 failed:Traceback (most recent call last):
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 316, in _execute
E response = task()
E ^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 390, in <lambda>
E lambda: self.create_worker().do_instruction(request), request)
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 669, in do_instruction
E return getattr(self, request_type)(
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 708, in process_bundle
E monitoring_infos = bundle_processor.monitoring_infos()
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1431, in monitoring_infos
E op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 245, in monitoring_infos
E all_monitoring_infos = super().monitoring_infos(
E ^^^^^^^^^^^^^^^^^^^^^^^^^
E File "apache_beam/runners/worker/operations.py", line 578, in apache_beam.runners.worker.operations.Operation.monitoring_infos
E def monitoring_infos(self, transform_id, tag_to_pcollection_id):
E File "apache_beam/runners/worker/operations.py", line 584, in apache_beam.runners.worker.operations.Operation.monitoring_infos
E self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
E File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.Operation.pcollection_count_monitoring_infos
E DistributionData(sum, count, min, max),
E File "apache_beam/metrics/cells.py", line 584, in apache_beam.metrics.cells.DistributionData.__init__
E self.tdigest = tdigest
E AttributeError: 'apache_beam.metrics.cells.DistributionData' object has no attribute 'tdigest'
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner.py:571: RuntimeError
github-actions / Python 3.12 Test Results (ubuntu-latest)
test_filtered_search_with_bm25_full_text_and_batching (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) failed
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-ml.xml [took 5m 9s]
Raw output
RuntimeError: Pipeline job-001 failed in state FAILED: bundle inst002 stage-006 failed:Traceback (most recent call last):
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 316, in _execute
response = task()
^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 390, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 669, in do_instruction
return getattr(self, request_type)(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 708, in process_bundle
monitoring_infos = bundle_processor.monitoring_infos()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1431, in monitoring_infos
op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 245, in monitoring_infos
all_monitoring_infos = super().monitoring_infos(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "apache_beam/runners/worker/operations.py", line 578, in apache_beam.runners.worker.operations.Operation.monitoring_infos
def monitoring_infos(self, transform_id, tag_to_pcollection_id):
File "apache_beam/runners/worker/operations.py", line 584, in apache_beam.runners.worker.operations.Operation.monitoring_infos
self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.Operation.pcollection_count_monitoring_infos
DistributionData(sum, count, min, max),
File "apache_beam/metrics/cells.py", line 584, in apache_beam.metrics.cells.DistributionData.__init__
self.tdigest = tdigest
AttributeError: 'apache_beam.metrics.cells.DistributionData' object has no attribute 'tdigest'
self = <apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment testMethod=test_filtered_search_with_bm25_full_text_and_batching>
def test_filtered_search_with_bm25_full_text_and_batching(self):
test_chunks = [
Chunk(
id="query1",
embedding=Embedding(sparse_embedding=None),
content=Content(text="This is a test document")),
Chunk(
id="query2",
embedding=Embedding(sparse_embedding=None),
content=Content(text="Another test document")),
Chunk(
id="query3",
embedding=Embedding(sparse_embedding=None),
content=Content(text="وثيقة اختبار"))
]
filter_condition = 'ARRAY_CONTAINS_ANY(tags, ["healthcare", "banking"])'
anns_field = "sparse_embedding_bm25"
addition_search_params = {"metric_type": KeywordSearchMetrics.BM25.value}
keyword_search_parameters = KeywordSearchParameters(
anns_field=anns_field,
limit=10,
filter=filter_condition,
search_params=addition_search_params)
search_parameters = MilvusSearchParameters(
collection_name=MILVUS_IT_CONFIG["collection_name"],
search_strategy=keyword_search_parameters,
output_fields=["id", "content", "metadata"],
round_decimal=1)
collection_load_parameters = MilvusCollectionLoadParameters()
# Force batching.
min_batch_size, max_batch_size = 2, 2
handler = MilvusSearchEnrichmentHandler(
connection_parameters=self._connection_params,
search_parameters=search_parameters,
collection_load_parameters=collection_load_parameters,
min_batch_size=min_batch_size,
max_batch_size=max_batch_size)
expected_chunks = [
Chunk(
id='query1',
content=Content(text='This is a test document'),
metadata={
'enrichment_data': {
'id': [1],
'distance': [3.3],
'fields': [{
'content': 'This is a test document',
'metadata': {
'language': 'en'
},
'id': 1
}]
}
},
embedding=Embedding()),
Chunk(
id='query2',
content=Content(text='Another test document'),
metadata={
'enrichment_data': {
'id': [1],
'distance': [0.8],
'fields': [{
'content': 'This is a test document',
'metadata': {
'language': 'en'
},
'id': 1
}]
}
},
embedding=Embedding()),
Chunk(
id='query3',
content=Content(text='وثيقة اختبار'),
metadata={
'enrichment_data': {
'id': [3],
'distance': [2.3],
'fields': [{
'content': 'وثيقة اختبار',
'metadata': {
'language': 'ar'
},
'id': 3
}]
}
},
embedding=Embedding())
]
> with TestPipeline() as p:
^^^^^^^^^^^^^^
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:822:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/pipeline.py:671: in __exit__
self.result = self.run()
^^^^^^^^^^
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/testing/test_pipeline.py:122: in run
state = result.wait_until_finish(duration=self.timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f027f340920>
duration = None
def wait_until_finish(self, duration=None):
"""
:param duration: The maximum time in milliseconds to wait for the result of
the execution. If None or zero, will wait until the pipeline finishes.
:return: The result of the pipeline, i.e. PipelineResult.
"""
last_error_text = None
def read_messages() -> None:
nonlocal last_error_text
previous_state = -1
for message in self._message_stream:
if message.HasField('message_response'):
mr = message.message_response
logging.log(MESSAGE_LOG_LEVELS[mr.importance], "%s", mr.message_text)
if mr.importance == beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR:
last_error_text = mr.message_text
else:
current_state = message.state_response.state
if current_state != previous_state:
_LOGGER.info(
"Job state changed to %s",
self.runner_api_state_to_pipeline_state(current_state))
previous_state = current_state
self._messages.append(message)
message_thread = threading.Thread(
target=read_messages, name='wait_until_finish_read')
message_thread.daemon = True
message_thread.start()
if duration:
state_thread = threading.Thread(
target=functools.partial(self._observe_state, message_thread),
name='wait_until_finish_state_observer')
state_thread.daemon = True
state_thread.start()
start_time = time.time()
duration_secs = duration / 1000
while (time.time() - start_time < duration_secs and
state_thread.is_alive()):
time.sleep(1)
else:
self._observe_state(message_thread)
if self._runtime_exception:
> raise self._runtime_exception
E RuntimeError: Pipeline job-001 failed in state FAILED: bundle inst002 stage-006 failed:Traceback (most recent call last):
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 316, in _execute
E response = task()
E ^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 390, in <lambda>
E lambda: self.create_worker().do_instruction(request), request)
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 669, in do_instruction
E return getattr(self, request_type)(
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 708, in process_bundle
E monitoring_infos = bundle_processor.monitoring_infos()
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1431, in monitoring_infos
E op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 245, in monitoring_infos
E all_monitoring_infos = super().monitoring_infos(
E ^^^^^^^^^^^^^^^^^^^^^^^^^
E File "apache_beam/runners/worker/operations.py", line 578, in apache_beam.runners.worker.operations.Operation.monitoring_infos
E def monitoring_infos(self, transform_id, tag_to_pcollection_id):
E File "apache_beam/runners/worker/operations.py", line 584, in apache_beam.runners.worker.operations.Operation.monitoring_infos
E self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
E File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.Operation.pcollection_count_monitoring_infos
E DistributionData(sum, count, min, max),
E File "apache_beam/metrics/cells.py", line 584, in apache_beam.metrics.cells.DistributionData.__init__
E self.tdigest = tdigest
E AttributeError: 'apache_beam.metrics.cells.DistributionData' object has no attribute 'tdigest'
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner.py:571: RuntimeError
github-actions / Python 3.12 Test Results (ubuntu-latest)
test_hybrid_search (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) failed
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-ml.xml [took 5m 10s]
Raw output
RuntimeError: Pipeline job-001 failed in state FAILED: bundle inst001 stage-001 failed:Traceback (most recent call last):
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 316, in _execute
response = task()
^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 390, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 669, in do_instruction
return getattr(self, request_type)(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 708, in process_bundle
monitoring_infos = bundle_processor.monitoring_infos()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1431, in monitoring_infos
op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 245, in monitoring_infos
all_monitoring_infos = super().monitoring_infos(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "apache_beam/runners/worker/operations.py", line 578, in apache_beam.runners.worker.operations.Operation.monitoring_infos
def monitoring_infos(self, transform_id, tag_to_pcollection_id):
File "apache_beam/runners/worker/operations.py", line 584, in apache_beam.runners.worker.operations.Operation.monitoring_infos
self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.Operation.pcollection_count_monitoring_infos
DistributionData(sum, count, min, max),
File "apache_beam/metrics/cells.py", line 584, in apache_beam.metrics.cells.DistributionData.__init__
self.tdigest = tdigest
AttributeError: 'apache_beam.metrics.cells.DistributionData' object has no attribute 'tdigest'
self = <apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment testMethod=test_hybrid_search>
def test_hybrid_search(self):
test_chunks = [
Chunk(
id="query1",
embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3]),
content=Content(text="This is a test document"))
]
anns_vector_field = "dense_embedding_cosine"
addition_vector_search_params = {
"metric_type": VectorSearchMetrics.COSINE.value, "nprobe": 1
}
vector_search_parameters = VectorSearchParameters(
anns_field=anns_vector_field,
limit=10,
search_params=addition_vector_search_params)
anns_keyword_field = "sparse_embedding_bm25"
addition_keyword_search_params = {
"metric_type": KeywordSearchMetrics.BM25.value
}
keyword_search_parameters = KeywordSearchParameters(
anns_field=anns_keyword_field,
limit=10,
search_params=addition_keyword_search_params)
hybrid_search_parameters = HybridSearchParameters(
vector=vector_search_parameters,
keyword=keyword_search_parameters,
ranker=RRFRanker(1),
limit=1)
search_parameters = MilvusSearchParameters(
collection_name=MILVUS_IT_CONFIG["collection_name"],
search_strategy=hybrid_search_parameters,
output_fields=["id", "content", "metadata"],
round_decimal=1)
collection_load_parameters = MilvusCollectionLoadParameters()
handler = MilvusSearchEnrichmentHandler(
connection_parameters=self._connection_params,
search_parameters=search_parameters,
collection_load_parameters=collection_load_parameters)
expected_chunks = [
Chunk(
content=Content(text='This is a test document'),
id='query1',
metadata={
'enrichment_data': {
'id': [1],
'distance': [1.0],
'fields': [{
'content': 'This is a test document',
'metadata': {
'language': 'en'
},
'id': 1
}]
}
},
embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3]))
]
> with TestPipeline() as p:
^^^^^^^^^^^^^^
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:1241:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/pipeline.py:671: in __exit__
self.result = self.run()
^^^^^^^^^^
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/testing/test_pipeline.py:122: in run
state = result.wait_until_finish(duration=self.timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7fa299bac950>
duration = None
def wait_until_finish(self, duration=None):
"""
:param duration: The maximum time in milliseconds to wait for the result of
the execution. If None or zero, will wait until the pipeline finishes.
:return: The result of the pipeline, i.e. PipelineResult.
"""
last_error_text = None
def read_messages() -> None:
nonlocal last_error_text
previous_state = -1
for message in self._message_stream:
if message.HasField('message_response'):
mr = message.message_response
logging.log(MESSAGE_LOG_LEVELS[mr.importance], "%s", mr.message_text)
if mr.importance == beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR:
last_error_text = mr.message_text
else:
current_state = message.state_response.state
if current_state != previous_state:
_LOGGER.info(
"Job state changed to %s",
self.runner_api_state_to_pipeline_state(current_state))
previous_state = current_state
self._messages.append(message)
message_thread = threading.Thread(
target=read_messages, name='wait_until_finish_read')
message_thread.daemon = True
message_thread.start()
if duration:
state_thread = threading.Thread(
target=functools.partial(self._observe_state, message_thread),
name='wait_until_finish_state_observer')
state_thread.daemon = True
state_thread.start()
start_time = time.time()
duration_secs = duration / 1000
while (time.time() - start_time < duration_secs and
state_thread.is_alive()):
time.sleep(1)
else:
self._observe_state(message_thread)
if self._runtime_exception:
> raise self._runtime_exception
E RuntimeError: Pipeline job-001 failed in state FAILED: bundle inst001 stage-001 failed:Traceback (most recent call last):
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 316, in _execute
E response = task()
E ^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 390, in <lambda>
E lambda: self.create_worker().do_instruction(request), request)
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 669, in do_instruction
E return getattr(self, request_type)(
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 708, in process_bundle
E monitoring_infos = bundle_processor.monitoring_infos()
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1431, in monitoring_infos
E op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 245, in monitoring_infos
E all_monitoring_infos = super().monitoring_infos(
E ^^^^^^^^^^^^^^^^^^^^^^^^^
E File "apache_beam/runners/worker/operations.py", line 578, in apache_beam.runners.worker.operations.Operation.monitoring_infos
E def monitoring_infos(self, transform_id, tag_to_pcollection_id):
E File "apache_beam/runners/worker/operations.py", line 584, in apache_beam.runners.worker.operations.Operation.monitoring_infos
E self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
E File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.Operation.pcollection_count_monitoring_infos
E DistributionData(sum, count, min, max),
E File "apache_beam/metrics/cells.py", line 584, in apache_beam.metrics.cells.DistributionData.__init__
E self.tdigest = tdigest
E AttributeError: 'apache_beam.metrics.cells.DistributionData' object has no attribute 'tdigest'
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner.py:571: RuntimeError
github-actions / Python 3.12 Test Results (ubuntu-latest)
test_filtered_search_with_cosine_similarity_and_batching (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) failed
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-ml.xml [took 5m 9s]
Raw output
RuntimeError: Pipeline job-001 failed in state FAILED: bundle inst002 stage-006 failed:Traceback (most recent call last):
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 316, in _execute
response = task()
^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 390, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 669, in do_instruction
return getattr(self, request_type)(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 708, in process_bundle
monitoring_infos = bundle_processor.monitoring_infos()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1431, in monitoring_infos
op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 245, in monitoring_infos
all_monitoring_infos = super().monitoring_infos(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "apache_beam/runners/worker/operations.py", line 578, in apache_beam.runners.worker.operations.Operation.monitoring_infos
def monitoring_infos(self, transform_id, tag_to_pcollection_id):
File "apache_beam/runners/worker/operations.py", line 584, in apache_beam.runners.worker.operations.Operation.monitoring_infos
self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.Operation.pcollection_count_monitoring_infos
DistributionData(sum, count, min, max),
File "apache_beam/metrics/cells.py", line 584, in apache_beam.metrics.cells.DistributionData.__init__
self.tdigest = tdigest
AttributeError: 'apache_beam.metrics.cells.DistributionData' object has no attribute 'tdigest'
self = <apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment testMethod=test_filtered_search_with_cosine_similarity_and_batching>
def test_filtered_search_with_cosine_similarity_and_batching(self):
test_chunks = [
Chunk(
id="query1",
embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3]),
content=Content()),
Chunk(
id="query2",
embedding=Embedding(dense_embedding=[0.2, 0.3, 0.4]),
content=Content()),
Chunk(
id="query3",
embedding=Embedding(dense_embedding=[0.3, 0.4, 0.5]),
content=Content())
]
filter_condition = 'metadata["language"] == "en"'
anns_field = "dense_embedding_cosine"
addition_search_params = {
"metric_type": VectorSearchMetrics.COSINE.value, "nprobe": 1
}
vector_search_parameters = VectorSearchParameters(
anns_field=anns_field,
limit=10,
filter=filter_condition,
search_params=addition_search_params)
search_parameters = MilvusSearchParameters(
collection_name=MILVUS_IT_CONFIG["collection_name"],
search_strategy=vector_search_parameters,
output_fields=["id", "content", "metadata"],
round_decimal=1)
collection_load_parameters = MilvusCollectionLoadParameters()
# Force batching.
min_batch_size, max_batch_size = 2, 2
handler = MilvusSearchEnrichmentHandler(
connection_parameters=self._connection_params,
search_parameters=search_parameters,
collection_load_parameters=collection_load_parameters,
min_batch_size=min_batch_size,
max_batch_size=max_batch_size)
expected_chunks = [
Chunk(
id='query1',
content=Content(),
metadata={
'enrichment_data': {
'id': [1, 2],
'distance': [1.0, 1.0],
'fields': [{
'content': 'This is a test document',
'metadata': {
'language': 'en'
},
'id': 1
},
{
'content': 'Another test document',
'metadata': {
'language': 'en'
},
'id': 2
}]
}
},
embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3])),
Chunk(
id='query2',
content=Content(),
metadata={
'enrichment_data': {
'id': [2, 1],
'distance': [1.0, 1.0],
'fields': [{
'content': 'Another test document',
'metadata': {
'language': 'en'
},
'id': 2
},
{
'content': 'This is a test document',
'metadata': {
'language': 'en'
},
'id': 1
}]
}
},
embedding=Embedding(dense_embedding=[0.2, 0.3, 0.4])),
Chunk(
id='query3',
content=Content(),
metadata={
'enrichment_data': {
'id': [2, 1],
'distance': [1.0, 1.0],
'fields': [{
'content': 'Another test document',
'metadata': {
'language': 'en'
},
'id': 2
},
{
'content': 'This is a test document',
'metadata': {
'language': 'en'
},
'id': 1
}]
}
},
embedding=Embedding(dense_embedding=[0.3, 0.4, 0.5]))
]
> with TestPipeline() as p:
^^^^^^^^^^^^^^
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:717:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/pipeline.py:671: in __exit__
self.result = self.run()
^^^^^^^^^^
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/testing/test_pipeline.py:122: in run
state = result.wait_until_finish(duration=self.timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7fda5f8fa690>
duration = None
def wait_until_finish(self, duration=None):
"""
:param duration: The maximum time in milliseconds to wait for the result of
the execution. If None or zero, will wait until the pipeline finishes.
:return: The result of the pipeline, i.e. PipelineResult.
"""
last_error_text = None
def read_messages() -> None:
nonlocal last_error_text
previous_state = -1
for message in self._message_stream:
if message.HasField('message_response'):
mr = message.message_response
logging.log(MESSAGE_LOG_LEVELS[mr.importance], "%s", mr.message_text)
if mr.importance == beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR:
last_error_text = mr.message_text
else:
current_state = message.state_response.state
if current_state != previous_state:
_LOGGER.info(
"Job state changed to %s",
self.runner_api_state_to_pipeline_state(current_state))
previous_state = current_state
self._messages.append(message)
message_thread = threading.Thread(
target=read_messages, name='wait_until_finish_read')
message_thread.daemon = True
message_thread.start()
if duration:
state_thread = threading.Thread(
target=functools.partial(self._observe_state, message_thread),
name='wait_until_finish_state_observer')
state_thread.daemon = True
state_thread.start()
start_time = time.time()
duration_secs = duration / 1000
while (time.time() - start_time < duration_secs and
state_thread.is_alive()):
time.sleep(1)
else:
self._observe_state(message_thread)
if self._runtime_exception:
> raise self._runtime_exception
E RuntimeError: Pipeline job-001 failed in state FAILED: bundle inst002 stage-006 failed:Traceback (most recent call last):
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 316, in _execute
E response = task()
E ^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 390, in <lambda>
E lambda: self.create_worker().do_instruction(request), request)
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 669, in do_instruction
E return getattr(self, request_type)(
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 708, in process_bundle
E monitoring_infos = bundle_processor.monitoring_infos()
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1431, in monitoring_infos
E op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 245, in monitoring_infos
E all_monitoring_infos = super().monitoring_infos(
E ^^^^^^^^^^^^^^^^^^^^^^^^^
E File "apache_beam/runners/worker/operations.py", line 578, in apache_beam.runners.worker.operations.Operation.monitoring_infos
E def monitoring_infos(self, transform_id, tag_to_pcollection_id):
E File "apache_beam/runners/worker/operations.py", line 584, in apache_beam.runners.worker.operations.Operation.monitoring_infos
E self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
E File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.Operation.pcollection_count_monitoring_infos
E DistributionData(sum, count, min, max),
E File "apache_beam/metrics/cells.py", line 584, in apache_beam.metrics.cells.DistributionData.__init__
E self.tdigest = tdigest
E AttributeError: 'apache_beam.metrics.cells.DistributionData' object has no attribute 'tdigest'
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner.py:571: RuntimeError
github-actions / Python 3.12 Test Results (ubuntu-latest)
test_vector_search_with_inner_product_similarity (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) failed
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-ml.xml [took 4s]
Raw output
RuntimeError: Pipeline job-002 failed in state FAILED: bundle inst001 stage-006 failed:Traceback (most recent call last):
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 316, in _execute
response = task()
^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 390, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 669, in do_instruction
return getattr(self, request_type)(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 708, in process_bundle
monitoring_infos = bundle_processor.monitoring_infos()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1431, in monitoring_infos
op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 245, in monitoring_infos
all_monitoring_infos = super().monitoring_infos(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "apache_beam/runners/worker/operations.py", line 578, in apache_beam.runners.worker.operations.Operation.monitoring_infos
def monitoring_infos(self, transform_id, tag_to_pcollection_id):
File "apache_beam/runners/worker/operations.py", line 584, in apache_beam.runners.worker.operations.Operation.monitoring_infos
self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.Operation.pcollection_count_monitoring_infos
DistributionData(sum, count, min, max),
File "apache_beam/metrics/cells.py", line 584, in apache_beam.metrics.cells.DistributionData.__init__
self.tdigest = tdigest
AttributeError: 'apache_beam.metrics.cells.DistributionData' object has no attribute 'tdigest'
self = <apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment testMethod=test_vector_search_with_inner_product_similarity>
def test_vector_search_with_inner_product_similarity(self):
test_chunks = [
Chunk(
id="query1",
embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3]),
content=Content()),
Chunk(
id="query2",
embedding=Embedding(dense_embedding=[0.2, 0.3, 0.4]),
content=Content()),
Chunk(
id="query3",
embedding=Embedding(dense_embedding=[0.3, 0.4, 0.5]),
content=Content())
]
anns_field = "dense_embedding_inner_product"
addition_search_params = {
"metric_type": VectorSearchMetrics.INNER_PRODUCT.value, "nprobe": 1
}
vector_search_parameters = VectorSearchParameters(
anns_field=anns_field, limit=10, search_params=addition_search_params)
search_parameters = MilvusSearchParameters(
collection_name=MILVUS_IT_CONFIG["collection_name"],
search_strategy=vector_search_parameters,
output_fields=["id", "content", "metadata"],
round_decimal=1)
collection_load_parameters = MilvusCollectionLoadParameters()
handler = MilvusSearchEnrichmentHandler(
connection_parameters=self._connection_params,
search_parameters=search_parameters,
collection_load_parameters=collection_load_parameters)
expected_chunks = [
Chunk(
id='query1',
content=Content(),
metadata={
'enrichment_data': {
'id': [3, 2, 1],
'distance': [0.3, 0.2, 0.1],
'fields': [{
'content': 'وثيقة اختبار',
'metadata': {
'language': 'ar'
},
'id': 3
},
{
'content': 'Another test document',
'metadata': {
'language': 'en'
},
'id': 2
},
{
'content': 'This is a test document',
'metadata': {
'language': 'en'
},
'id': 1
}]
}
},
embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3])),
Chunk(
id='query2',
content=Content(),
metadata={
'enrichment_data': {
'id': [3, 2, 1],
'distance': [0.4, 0.3, 0.2],
'fields': [{
'content': 'وثيقة اختبار',
'metadata': {
'language': 'ar'
},
'id': 3
},
{
'content': 'Another test document',
'metadata': {
'language': 'en'
},
'id': 2
},
{
'content': 'This is a test document',
'metadata': {
'language': 'en'
},
'id': 1
}]
}
},
embedding=Embedding(dense_embedding=[0.2, 0.3, 0.4])),
Chunk(
id='query3',
content=Content(),
metadata={
'enrichment_data': {
'id': [3, 2, 1],
'distance': [0.5, 0.4, 0.3],
'fields': [{
'content': 'وثيقة اختبار',
'metadata': {
'language': 'ar'
},
'id': 3
},
{
'content': 'Another test document',
'metadata': {
'language': 'en'
},
'id': 2
},
{
'content': 'This is a test document',
'metadata': {
'language': 'en'
},
'id': 1
}]
}
},
embedding=Embedding(dense_embedding=[0.3, 0.4, 0.5]))
]
> with TestPipeline() as p:
^^^^^^^^^^^^^^
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:1103:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/pipeline.py:671: in __exit__
self.result = self.run()
^^^^^^^^^^
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/testing/test_pipeline.py:122: in run
state = result.wait_until_finish(duration=self.timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7fda408012e0>
duration = None
def wait_until_finish(self, duration=None):
"""
:param duration: The maximum time in milliseconds to wait for the result of
the execution. If None or zero, will wait until the pipeline finishes.
:return: The result of the pipeline, i.e. PipelineResult.
"""
last_error_text = None
def read_messages() -> None:
nonlocal last_error_text
previous_state = -1
for message in self._message_stream:
if message.HasField('message_response'):
mr = message.message_response
logging.log(MESSAGE_LOG_LEVELS[mr.importance], "%s", mr.message_text)
if mr.importance == beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR:
last_error_text = mr.message_text
else:
current_state = message.state_response.state
if current_state != previous_state:
_LOGGER.info(
"Job state changed to %s",
self.runner_api_state_to_pipeline_state(current_state))
previous_state = current_state
self._messages.append(message)
message_thread = threading.Thread(
target=read_messages, name='wait_until_finish_read')
message_thread.daemon = True
message_thread.start()
if duration:
state_thread = threading.Thread(
target=functools.partial(self._observe_state, message_thread),
name='wait_until_finish_state_observer')
state_thread.daemon = True
state_thread.start()
start_time = time.time()
duration_secs = duration / 1000
while (time.time() - start_time < duration_secs and
state_thread.is_alive()):
time.sleep(1)
else:
self._observe_state(message_thread)
if self._runtime_exception:
> raise self._runtime_exception
E RuntimeError: Pipeline job-002 failed in state FAILED: bundle inst001 stage-006 failed:Traceback (most recent call last):
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 316, in _execute
E response = task()
E ^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 390, in <lambda>
E lambda: self.create_worker().do_instruction(request), request)
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 669, in do_instruction
E return getattr(self, request_type)(
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 708, in process_bundle
E monitoring_infos = bundle_processor.monitoring_infos()
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1431, in monitoring_infos
E op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 245, in monitoring_infos
E all_monitoring_infos = super().monitoring_infos(
E ^^^^^^^^^^^^^^^^^^^^^^^^^
E File "apache_beam/runners/worker/operations.py", line 578, in apache_beam.runners.worker.operations.Operation.monitoring_infos
E def monitoring_infos(self, transform_id, tag_to_pcollection_id):
E File "apache_beam/runners/worker/operations.py", line 584, in apache_beam.runners.worker.operations.Operation.monitoring_infos
E self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
E File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.Operation.pcollection_count_monitoring_infos
E DistributionData(sum, count, min, max),
E File "apache_beam/metrics/cells.py", line 584, in apache_beam.metrics.cells.DistributionData.__init__
E self.tdigest = tdigest
E AttributeError: 'apache_beam.metrics.cells.DistributionData' object has no attribute 'tdigest'
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner.py:571: RuntimeError
github-actions / Python 3.12 Test Results (ubuntu-latest)
test_vector_search_with_euclidean_distance (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) failed
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-ml.xml [took 4s]
Raw output
RuntimeError: Pipeline job-002 failed in state FAILED: bundle inst002 stage-006 failed:Traceback (most recent call last):
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 316, in _execute
response = task()
^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 390, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 669, in do_instruction
return getattr(self, request_type)(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 708, in process_bundle
monitoring_infos = bundle_processor.monitoring_infos()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1431, in monitoring_infos
op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 245, in monitoring_infos
all_monitoring_infos = super().monitoring_infos(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "apache_beam/runners/worker/operations.py", line 578, in apache_beam.runners.worker.operations.Operation.monitoring_infos
def monitoring_infos(self, transform_id, tag_to_pcollection_id):
File "apache_beam/runners/worker/operations.py", line 584, in apache_beam.runners.worker.operations.Operation.monitoring_infos
self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.Operation.pcollection_count_monitoring_infos
DistributionData(sum, count, min, max),
File "apache_beam/metrics/cells.py", line 584, in apache_beam.metrics.cells.DistributionData.__init__
self.tdigest = tdigest
AttributeError: 'apache_beam.metrics.cells.DistributionData' object has no attribute 'tdigest'
self = <apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment testMethod=test_vector_search_with_euclidean_distance>
def test_vector_search_with_euclidean_distance(self):
test_chunks = [
Chunk(
id="query1",
embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3]),
content=Content()),
Chunk(
id="query2",
embedding=Embedding(dense_embedding=[0.2, 0.3, 0.4]),
content=Content()),
Chunk(
id="query3",
embedding=Embedding(dense_embedding=[0.3, 0.4, 0.5]),
content=Content())
]
anns_field = "dense_embedding_euclidean"
addition_search_params = {
"metric_type": VectorSearchMetrics.EUCLIDEAN_DISTANCE.value,
"nprobe": 1
}
vector_search_parameters = VectorSearchParameters(
anns_field=anns_field, limit=10, search_params=addition_search_params)
search_parameters = MilvusSearchParameters(
collection_name=MILVUS_IT_CONFIG["collection_name"],
search_strategy=vector_search_parameters,
output_fields=["id", "content", "metadata"],
round_decimal=1)
collection_load_parameters = MilvusCollectionLoadParameters()
handler = MilvusSearchEnrichmentHandler(
connection_parameters=self._connection_params,
search_parameters=search_parameters,
collection_load_parameters=collection_load_parameters)
expected_chunks = [
Chunk(
id='query1',
content=Content(),
metadata={
'enrichment_data': {
'id': [1, 2, 3],
'distance': [0.0, 0.0, 0.1],
'fields': [{
'content': 'This is a test document',
'metadata': {
'language': 'en'
},
'id': 1
},
{
'content': 'Another test document',
'metadata': {
'language': 'en'
},
'id': 2
},
{
'content': 'وثيقة اختبار',
'metadata': {
'language': 'ar'
},
'id': 3
}]
}
},
embedding=Embedding(dense_embedding=[0.1, 0.2, 0.3])),
Chunk(
id='query2',
content=Content(),
metadata={
'enrichment_data': {
'id': [2, 3, 1],
'distance': [0.0, 0.0, 0.0],
'fields': [{
'content': 'Another test document',
'metadata': {
'language': 'en'
},
'id': 2
},
{
'content': 'وثيقة اختبار',
'metadata': {
'language': 'ar'
},
'id': 3
},
{
'content': 'This is a test document',
'metadata': {
'language': 'en'
},
'id': 1
}]
}
},
embedding=Embedding(dense_embedding=[0.2, 0.3, 0.4])),
Chunk(
id='query3',
content=Content(),
metadata={
'enrichment_data': {
'id': [3, 2, 1],
'distance': [0.0, 0.0, 0.1],
'fields': [{
'content': 'وثيقة اختبار',
'metadata': {
'language': 'ar'
},
'id': 3
},
{
'content': 'Another test document',
'metadata': {
'language': 'en'
},
'id': 2
},
{
'content': 'This is a test document',
'metadata': {
'language': 'en'
},
'id': 1
}]
}
},
embedding=Embedding(dense_embedding=[0.3, 0.4, 0.5]))
]
> with TestPipeline() as p:
^^^^^^^^^^^^^^
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:963:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/pipeline.py:671: in __exit__
self.result = self.run()
^^^^^^^^^^
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/testing/test_pipeline.py:122: in run
state = result.wait_until_finish(duration=self.timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7f02809a4cb0>
duration = None
def wait_until_finish(self, duration=None):
"""
:param duration: The maximum time in milliseconds to wait for the result of
the execution. If None or zero, will wait until the pipeline finishes.
:return: The result of the pipeline, i.e. PipelineResult.
"""
last_error_text = None
def read_messages() -> None:
nonlocal last_error_text
previous_state = -1
for message in self._message_stream:
if message.HasField('message_response'):
mr = message.message_response
logging.log(MESSAGE_LOG_LEVELS[mr.importance], "%s", mr.message_text)
if mr.importance == beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR:
last_error_text = mr.message_text
else:
current_state = message.state_response.state
if current_state != previous_state:
_LOGGER.info(
"Job state changed to %s",
self.runner_api_state_to_pipeline_state(current_state))
previous_state = current_state
self._messages.append(message)
message_thread = threading.Thread(
target=read_messages, name='wait_until_finish_read')
message_thread.daemon = True
message_thread.start()
if duration:
state_thread = threading.Thread(
target=functools.partial(self._observe_state, message_thread),
name='wait_until_finish_state_observer')
state_thread.daemon = True
state_thread.start()
start_time = time.time()
duration_secs = duration / 1000
while (time.time() - start_time < duration_secs and
state_thread.is_alive()):
time.sleep(1)
else:
self._observe_state(message_thread)
if self._runtime_exception:
> raise self._runtime_exception
E RuntimeError: Pipeline job-002 failed in state FAILED: bundle inst002 stage-006 failed:Traceback (most recent call last):
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 316, in _execute
E response = task()
E ^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 390, in <lambda>
E lambda: self.create_worker().do_instruction(request), request)
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 669, in do_instruction
E return getattr(self, request_type)(
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 708, in process_bundle
E monitoring_infos = bundle_processor.monitoring_infos()
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1431, in monitoring_infos
E op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 245, in monitoring_infos
E all_monitoring_infos = super().monitoring_infos(
E ^^^^^^^^^^^^^^^^^^^^^^^^^
E File "apache_beam/runners/worker/operations.py", line 578, in apache_beam.runners.worker.operations.Operation.monitoring_infos
E def monitoring_infos(self, transform_id, tag_to_pcollection_id):
E File "apache_beam/runners/worker/operations.py", line 584, in apache_beam.runners.worker.operations.Operation.monitoring_infos
E self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
E File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.Operation.pcollection_count_monitoring_infos
E DistributionData(sum, count, min, max),
E File "apache_beam/metrics/cells.py", line 584, in apache_beam.metrics.cells.DistributionData.__init__
E self.tdigest = tdigest
E AttributeError: 'apache_beam.metrics.cells.DistributionData' object has no attribute 'tdigest'
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner.py:571: RuntimeError
github-actions / Python 3.12 Test Results (ubuntu-latest)
test_keyword_search_with_inner_product_sparse_embedding (apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment) failed
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-ml.xml [took 5s]
Raw output
RuntimeError: Pipeline job-002 failed in state FAILED: bundle inst001 stage-001 failed:Traceback (most recent call last):
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 316, in _execute
response = task()
^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 390, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 669, in do_instruction
return getattr(self, request_type)(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 708, in process_bundle
monitoring_infos = bundle_processor.monitoring_infos()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1431, in monitoring_infos
op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 245, in monitoring_infos
all_monitoring_infos = super().monitoring_infos(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "apache_beam/runners/worker/operations.py", line 578, in apache_beam.runners.worker.operations.Operation.monitoring_infos
def monitoring_infos(self, transform_id, tag_to_pcollection_id):
File "apache_beam/runners/worker/operations.py", line 584, in apache_beam.runners.worker.operations.Operation.monitoring_infos
self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.Operation.pcollection_count_monitoring_infos
DistributionData(sum, count, min, max),
File "apache_beam/metrics/cells.py", line 584, in apache_beam.metrics.cells.DistributionData.__init__
self.tdigest = tdigest
AttributeError: 'apache_beam.metrics.cells.DistributionData' object has no attribute 'tdigest'
self = <apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment testMethod=test_keyword_search_with_inner_product_sparse_embedding>
def test_keyword_search_with_inner_product_sparse_embedding(self):
test_chunks = [
Chunk(
id="query1",
embedding=Embedding(
sparse_embedding=([1, 2, 3, 4], [0.05, 0.41, 0.05, 0.41])),
content=Content())
]
anns_field = "sparse_embedding_inner_product"
addition_search_params = {
"metric_type": VectorSearchMetrics.INNER_PRODUCT.value,
}
keyword_search_parameters = KeywordSearchParameters(
anns_field=anns_field, limit=3, search_params=addition_search_params)
search_parameters = MilvusSearchParameters(
collection_name=MILVUS_IT_CONFIG["collection_name"],
search_strategy=keyword_search_parameters,
output_fields=["id", "content", "metadata"],
round_decimal=1)
collection_load_parameters = MilvusCollectionLoadParameters()
handler = MilvusSearchEnrichmentHandler(
connection_parameters=self._connection_params,
search_parameters=search_parameters,
collection_load_parameters=collection_load_parameters)
expected_chunks = [
Chunk(
id='query1',
content=Content(),
metadata={
'enrichment_data': {
'id': [1, 2],
'distance': [0.3, 0.2],
'fields': [{
'content': 'This is a test document',
'metadata': {
'language': 'en'
},
'id': 1
},
{
'content': 'Another test document',
'metadata': {
'language': 'en'
},
'id': 2
}]
}
},
embedding=Embedding(
sparse_embedding=([1, 2, 3, 4], [0.05, 0.41, 0.05, 0.41])))
]
> with TestPipeline() as p:
^^^^^^^^^^^^^^
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:1168:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/pipeline.py:671: in __exit__
self.result = self.run()
^^^^^^^^^^
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/testing/test_pipeline.py:122: in run
state = result.wait_until_finish(duration=self.timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7eff92841790>
duration = None
def wait_until_finish(self, duration=None):
"""
:param duration: The maximum time in milliseconds to wait for the result of
the execution. If None or zero, will wait until the pipeline finishes.
:return: The result of the pipeline, i.e. PipelineResult.
"""
last_error_text = None
def read_messages() -> None:
nonlocal last_error_text
previous_state = -1
for message in self._message_stream:
if message.HasField('message_response'):
mr = message.message_response
logging.log(MESSAGE_LOG_LEVELS[mr.importance], "%s", mr.message_text)
if mr.importance == beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR:
last_error_text = mr.message_text
else:
current_state = message.state_response.state
if current_state != previous_state:
_LOGGER.info(
"Job state changed to %s",
self.runner_api_state_to_pipeline_state(current_state))
previous_state = current_state
self._messages.append(message)
message_thread = threading.Thread(
target=read_messages, name='wait_until_finish_read')
message_thread.daemon = True
message_thread.start()
if duration:
state_thread = threading.Thread(
target=functools.partial(self._observe_state, message_thread),
name='wait_until_finish_state_observer')
state_thread.daemon = True
state_thread.start()
start_time = time.time()
duration_secs = duration / 1000
while (time.time() - start_time < duration_secs and
state_thread.is_alive()):
time.sleep(1)
else:
self._observe_state(message_thread)
if self._runtime_exception:
> raise self._runtime_exception
E RuntimeError: Pipeline job-002 failed in state FAILED: bundle inst001 stage-001 failed:Traceback (most recent call last):
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 316, in _execute
E response = task()
E ^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 390, in <lambda>
E lambda: self.create_worker().do_instruction(request), request)
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 669, in do_instruction
E return getattr(self, request_type)(
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/sdk_worker.py", line 708, in process_bundle
E monitoring_infos = bundle_processor.monitoring_infos()
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1431, in monitoring_infos
E op.monitoring_infos(transform_id, dict(tag_to_pcollection_id)))
E ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E File "/home/runner/work/beam/beam/sdks/python/test-suites/tox/py312/build/srcs/sdks/python/target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/worker/bundle_processor.py", line 245, in monitoring_infos
E all_monitoring_infos = super().monitoring_infos(
E ^^^^^^^^^^^^^^^^^^^^^^^^^
E File "apache_beam/runners/worker/operations.py", line 578, in apache_beam.runners.worker.operations.Operation.monitoring_infos
E def monitoring_infos(self, transform_id, tag_to_pcollection_id):
E File "apache_beam/runners/worker/operations.py", line 584, in apache_beam.runners.worker.operations.Operation.monitoring_infos
E self.pcollection_count_monitoring_infos(tag_to_pcollection_id))
E File "apache_beam/runners/worker/operations.py", line 613, in apache_beam.runners.worker.operations.Operation.pcollection_count_monitoring_infos
E DistributionData(sum, count, min, max),
E File "apache_beam/metrics/cells.py", line 584, in apache_beam.metrics.cells.DistributionData.__init__
E self.tdigest = tdigest
E AttributeError: 'apache_beam.metrics.cells.DistributionData' object has no attribute 'tdigest'
target/.tox-py312-ml/py312-ml/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner.py:571: RuntimeError
Check notice on line 0 in .github
github-actions / Python 3.12 Test Results (ubuntu-latest)
7 skipped tests found
There are 7 skipped tests, see "Raw output" for the full list of skipped tests.
Raw output
apache_beam.ml.inference.huggingface_inference_it_test
apache_beam.ml.inference.huggingface_inference_test
apache_beam.ml.inference.onnx_inference_test
apache_beam.ml.inference.tensorrt_inference_test
apache_beam.ml.inference.xgboost_inference_test
apache_beam.ml.transforms.handlers_test
apache_beam.ml.transforms.tft_test
Check notice on line 0 in .github
github-actions / Python 3.12 Test Results (ubuntu-latest)
16 tests found
There are 16 tests, see "Raw output" for the full list of tests.
Raw output
apache_beam.ml.inference.huggingface_inference_it_test
apache_beam.ml.inference.huggingface_inference_test
apache_beam.ml.inference.onnx_inference_test
apache_beam.ml.inference.tensorrt_inference_test
apache_beam.ml.inference.xgboost_inference_test
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_empty_input_chunks
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_filtered_search_with_bm25_full_text_and_batching
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_filtered_search_with_cosine_similarity_and_batching
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_hybrid_search
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_invalid_query_on_non_existent_collection
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_invalid_query_on_non_existent_field
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_keyword_search_with_inner_product_sparse_embedding
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_vector_search_with_euclidean_distance
apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment ‑ test_vector_search_with_inner_product_similarity
apache_beam.ml.transforms.handlers_test
apache_beam.ml.transforms.tft_test