Skip to content

Commit 2ebc4d5

Browse files
sdks/python: test enrichment with tecton
1 parent a8bbeaa commit 2ebc4d5

File tree

2 files changed

+67
-0
lines changed

2 files changed

+67
-0
lines changed

sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,3 +116,50 @@ def enrichment_with_vertex_ai_legacy():
116116
| "Enrich W/ Vertex AI" >> Enrichment(vertex_ai_handler)
117117
| "Print" >> beam.Map(print))
118118
# [END enrichment_with_vertex_ai_legacy]
119+
120+
121+
def enrichment_with_tecton():
122+
# [START enrichment_with_tecton]
123+
import apache_beam as beam
124+
from apache_beam.transforms.enrichment import Enrichment
125+
from apache_beam.transforms.enrichment_handlers.tecton_feature_store import (
126+
TectonConnectionConfig, TectonFeaturesRetrievalConfig,
127+
TectonFeatureStoreEnrichmentHandler)
128+
129+
data = [
130+
beam.Row(user_id='user_1990251765'),
131+
beam.Row(user_id='user_1284832379'),
132+
beam.Row(user_id='user_9979340926'),
133+
]
134+
135+
# Create connection configuration
136+
# Your actual Tecton credentials
137+
connection_config = TectonConnectionConfig(
138+
url='https://explore.tecton.ai',
139+
api_key='101142fd7d775e0a1bd9e343cca2a44d'
140+
)
141+
142+
# Create features retrieval configuration
143+
# Using your actual fraud detection feature service
144+
features_config = TectonFeaturesRetrievalConfig(
145+
feature_service_name='fraud_detection_feature_service',
146+
entity_id='user_id',
147+
workspace_name='prod'
148+
)
149+
150+
# Create the handler with both configurations
151+
tecton_handler = TectonFeatureStoreEnrichmentHandler(
152+
connection_config=connection_config,
153+
features_retrieval_config=features_config
154+
)
155+
156+
with beam.Pipeline() as p:
157+
_ = (
158+
p
159+
| "Create" >> beam.Create(data)
160+
| "Enrich W/ Tecton" >> Enrichment(tecton_handler)
161+
| "Print" >> beam.Map(print))
162+
# [END enrichment_with_tecton]
163+
164+
165+
enrichment_with_tecton()

sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from apache_beam.examples.snippets.transforms.elementwise.enrichment import enrichment_with_bigtable, \
2929
enrichment_with_vertex_ai_legacy
3030
from apache_beam.examples.snippets.transforms.elementwise.enrichment import enrichment_with_vertex_ai
31+
from apache_beam.examples.snippets.transforms.elementwise.enrichment import enrichment_with_tecton
3132
from apache_beam.io.requestresponse import RequestResponseIO
3233
except ImportError:
3334
raise unittest.SkipTest('RequestResponseIO dependencies are not installed')
@@ -60,6 +61,15 @@ def validate_enrichment_with_vertex_ai_legacy():
6061
return expected
6162

6263

64+
def validate_enrichment_with_tecton():
65+
expected = '''[START enrichment_with_tecton]
66+
Row(user_id='user_9979340926', user_transaction_metrics.amount_count_1d_1d=1, user_transaction_metrics.amount_count_3d_1d=3, user_transaction_metrics.amount_count_7d_1d=7, user_transaction_metrics.amount_mean_1d_1d=65.05, user_transaction_metrics.amount_mean_3d_1d=42.72333333333333, user_transaction_metrics.amount_mean_7d_1d=32.955714285714286)
67+
Row(user_id='user_1990251765', user_transaction_metrics.amount_count_1d_1d=None, user_transaction_metrics.amount_count_3d_1d=2, user_transaction_metrics.amount_count_7d_1d=3, user_transaction_metrics.amount_mean_1d_1d=None, user_transaction_metrics.amount_mean_3d_1d=25.880000000000003, user_transaction_metrics.amount_mean_7d_1d=27.796666666666667)
68+
Row(user_id='user_1284832379', user_transaction_metrics.amount_count_1d_1d=2, user_transaction_metrics.amount_count_3d_1d=6, user_transaction_metrics.amount_count_7d_1d=12, user_transaction_metrics.amount_mean_1d_1d=111.465, user_transaction_metrics.amount_mean_3d_1d=61.961666666666666, user_transaction_metrics.amount_mean_7d_1d=171.5625)
69+
[END enrichment_with_tecton]'''.splitlines()[1:-1]
70+
return expected
71+
72+
6373
def std_out_to_dict(stdout_lines, row_key):
6474
output_dict = {}
6575
for stdout_line in stdout_lines:
@@ -107,6 +117,16 @@ def test_enrichment_with_vertex_ai_legacy(self, mock_stdout):
107117
std_out_to_dict(output, 'entity_id'),
108118
std_out_to_dict(expected, 'entity_id'))
109119

120+
def test_enrichment_with_tecton(self, mock_stdout):
121+
enrichment_with_tecton()
122+
output = mock_stdout.getvalue().splitlines()
123+
expected = validate_enrichment_with_tecton()
124+
125+
self.assertEqual(len(output), len(expected))
126+
self.assertEqual(
127+
std_out_to_dict(output, 'user_id'),
128+
std_out_to_dict(expected, 'user_id'))
129+
110130

111131
if __name__ == '__main__':
112132
unittest.main()

0 commit comments

Comments
 (0)