Skip to content

Commit fd9b8dc

Browse files
committed
similfy code from previous PR into these code changes
1 parent 5d9d7a5 commit fd9b8dc

File tree

2 files changed

+42
-55
lines changed

2 files changed

+42
-55
lines changed

sdks/python/apache_beam/yaml/examples/testing/examples_test.py

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,10 @@ def _kafka_test_preprocessor(
446446
'test_spanner_read_yaml',
447447
'test_spanner_write_yaml',
448448
'test_enrich_spanner_with_bigquery_yaml',
449+
'test_pubsub_topic_to_bigquery_yaml',
450+
'test_pubsub_subscription_to_bigquery_yaml',
451+
'test_jdbc_to_bigquery_yaml',
452+
'test_spanner_to_avro_yaml',
449453
'test_gcs_text_to_bigquery_yaml',
450454
'test_sqlserver_to_bigquery_yaml',
451455
'test_postgres_to_bigquery_yaml',
@@ -568,6 +572,7 @@ def _iceberg_io_read_test_preprocessor(
568572
@YamlExamplesTestSuite.register_test_preprocessor([
569573
'test_spanner_read_yaml',
570574
'test_enrich_spanner_with_bigquery_yaml',
575+
'test_spanner_to_avro_yaml',
571576
'test_spanner_to_bigquery_yaml'
572577
])
573578
def _spanner_io_read_test_preprocessor(
@@ -648,6 +653,41 @@ def _enrichment_test_preprocessor(
648653
return test_spec
649654

650655

656+
@YamlExamplesTestSuite.register_test_preprocessor([
657+
'test_pubsub_topic_to_bigquery_yaml',
658+
'test_pubsub_subscription_to_bigquery_yaml',
659+
'test_pubsub_to_iceberg_yaml'
660+
])
661+
def _pubsub_io_read_test_preprocessor(
662+
test_spec: dict, expected: List[str], env: TestEnvironment):
663+
"""
664+
Preprocessor for tests that involve reading from Pub/Sub.
665+
This preprocessor replaces any ReadFromPubSub transform with a Create
666+
transform that reads from a predefined in-memory list of messages.
667+
This allows the test to verify the pipeline's correctness without relying
668+
on an active Pub/Sub subscription or topic.
669+
"""
670+
if pipeline := test_spec.get('pipeline', None):
671+
for transform in pipeline.get('transforms', []):
672+
if transform.get('type', '') == 'ReadFromPubSub':
673+
transform['type'] = 'TestReadFromPubSub'
674+
675+
return test_spec
676+
677+
678+
@YamlExamplesTestSuite.register_test_preprocessor([
679+
'test_jdbc_to_bigquery_yaml',
680+
])
681+
def _jdbc_io_read_test_preprocessor(
682+
test_spec: dict, expected: List[str], env: TestEnvironment):
683+
"""
684+
Preprocessor for tests that involve reading from SqlServer.
685+
url syntax: 'jdbc:mysql://<host>:<port>/<database>'
686+
"""
687+
return _db_io_read_test_processor(
688+
test_spec, lambda url: url.split('/')[-1], 'Jdbc')
689+
690+
651691
@YamlExamplesTestSuite.register_test_preprocessor([
652692
'test_sqlserver_to_bigquery_yaml',
653693
])
@@ -748,14 +788,14 @@ def _db_io_read_test_processor(
748788
}
749789

750790
INPUT_TABLES = {
751-
('shipment-test', 'shipment', 'shipments'): input_data.
752-
spanner_shipments_data(),
791+
('shipment-test', 'shipment', 'shipments'): input_data.shipments_data(),
753792
('orders-test', 'order-database', 'orders'): input_data.
754793
spanner_orders_data(),
755794
('db', 'users', 'NY'): input_data.iceberg_dynamic_destinations_users_data(),
756795
('BigTable', 'beam-test', 'bigtable-enrichment-test'): input_data.
757796
bigtable_data(),
758797
('BigQuery', 'ALL_TEST', 'customers'): input_data.bigquery_data(),
798+
('Jdbc', 'shipment', 'shipments'): input_data.shipments_data(),
759799
('SqlServer', 'shipment', 'shipments'): input_data.shipments_data(),
760800
('Postgres', 'shipment', 'shipments'): input_data.shipments_data(),
761801
('Oracle', 'shipment', 'shipments'): input_data.shipments_data(),
@@ -784,5 +824,3 @@ def _db_io_read_test_processor(
784824
if __name__ == '__main__':
785825
logging.getLogger().setLevel(logging.INFO)
786826
unittest.main()
787-
788-
# cloud_spanner_to_bigquery_flex

sdks/python/apache_beam/yaml/examples/testing/input_data.py

Lines changed: 0 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -78,57 +78,6 @@ def spanner_orders_data():
7878
}]
7979

8080

81-
def spanner_shipments_data():
82-
return [{
83-
'shipment_id': 'S1',
84-
'customer_id': 'C1',
85-
'shipment_date': '2023-05-01',
86-
'shipment_cost': 150.0,
87-
'customer_name': 'Alice',
88-
'customer_email': 'alice@example.com'
89-
},
90-
{
91-
'shipment_id': 'S2',
92-
'customer_id': 'C2',
93-
'shipment_date': '2023-06-12',
94-
'shipment_cost': 300.0,
95-
'customer_name': 'Bob',
96-
'customer_email': 'bob@example.com'
97-
},
98-
{
99-
'shipment_id': 'S3',
100-
'customer_id': 'C1',
101-
'shipment_date': '2023-05-10',
102-
'shipment_cost': 20.0,
103-
'customer_name': 'Alice',
104-
'customer_email': 'alice@example.com'
105-
},
106-
{
107-
'shipment_id': 'S4',
108-
'customer_id': 'C4',
109-
'shipment_date': '2024-07-01',
110-
'shipment_cost': 150.0,
111-
'customer_name': 'Derek',
112-
'customer_email': 'derek@example.com'
113-
},
114-
{
115-
'shipment_id': 'S5',
116-
'customer_id': 'C5',
117-
'shipment_date': '2023-05-09',
118-
'shipment_cost': 300.0,
119-
'customer_name': 'Erin',
120-
'customer_email': 'erin@example.com'
121-
},
122-
{
123-
'shipment_id': 'S6',
124-
'customer_id': 'C4',
125-
'shipment_date': '2024-07-02',
126-
'shipment_cost': 150.0,
127-
'customer_name': 'Derek',
128-
'customer_email': 'derek@example.com'
129-
}]
130-
131-
13281
def shipments_data():
13382
return [{
13483
'shipment_id': 'S1',

0 commit comments

Comments
 (0)