12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
14
15
+ from concurrent import futures
15
16
import time
17
+ from typing import Generator
18
+ import uuid
16
19
20
+ from google .cloud import bigtable , pubsub # type: ignore
21
+ from google .cloud .bigtable import column_family , instance , table
17
22
import pytest
18
23
19
24
import bigframes
20
- import bigframes .streaming
25
+
26
+
27
+ def resource_name_full (project_id : str , resource_type : str , resource_id : str ):
28
+ return f"projects/{ project_id } /{ resource_type } /{ resource_id } "
29
+
30
+
31
+ @pytest .fixture (scope = "session" )
32
+ def bigtable_instance (session_load : bigframes .Session ) -> instance .Instance :
33
+ client = bigtable .Client (project = session_load ._project , admin = True )
34
+
35
+ instance_name = "streaming-testing-instance"
36
+ bt_instance = instance .Instance (
37
+ instance_name ,
38
+ client ,
39
+ )
40
+
41
+ if not bt_instance .exists ():
42
+ cluster_id = "streaming-testing-instance-c1"
43
+ cluster = bt_instance .cluster (
44
+ cluster_id ,
45
+ location_id = "us-west1-a" ,
46
+ serve_nodes = 1 ,
47
+ )
48
+ operation = bt_instance .create (
49
+ clusters = [cluster ],
50
+ )
51
+ operation .result (timeout = 480 )
52
+ return bt_instance
53
+
54
+
55
+ @pytest .fixture (scope = "function" )
56
+ def bigtable_table (
57
+ bigtable_instance : instance .Instance ,
58
+ ) -> Generator [table .Table , None , None ]:
59
+ table_id = "bigframes_test_" + uuid .uuid4 ().hex
60
+ bt_table = table .Table (
61
+ table_id ,
62
+ bigtable_instance ,
63
+ )
64
+ max_versions_rule = column_family .MaxVersionsGCRule (1 )
65
+ column_family_id = "body_mass_g"
66
+ column_families = {column_family_id : max_versions_rule }
67
+ bt_table .create (column_families = column_families )
68
+ yield bt_table
69
+ bt_table .delete ()
70
+
71
+
72
+ @pytest .fixture (scope = "function" )
73
+ def pubsub_topic_id (session_load : bigframes .Session ) -> Generator [str , None , None ]:
74
+ publisher = pubsub .PublisherClient ()
75
+ topic_id = "bigframes_test_topic_" + uuid .uuid4 ().hex
76
+
77
+ topic_name = resource_name_full (session_load ._project , "topics" , topic_id )
78
+
79
+ publisher .create_topic (name = topic_name )
80
+ yield topic_id
81
+ publisher .delete_topic (topic = topic_name )
82
+
83
+
84
+ @pytest .fixture (scope = "function" )
85
+ def pubsub_topic_subscription_ids (
86
+ session_load : bigframes .Session , pubsub_topic_id : str
87
+ ) -> Generator [tuple [str , str ], None , None ]:
88
+ subscriber = pubsub .SubscriberClient ()
89
+ subscription_id = "bigframes_test_subscription_" + uuid .uuid4 ().hex
90
+
91
+ subscription_name = resource_name_full (
92
+ session_load ._project , "subscriptions" , subscription_id
93
+ )
94
+ topic_name = resource_name_full (session_load ._project , "topics" , pubsub_topic_id )
95
+
96
+ subscriber .create_subscription (name = subscription_name , topic = topic_name )
97
+ yield (pubsub_topic_id , subscription_id )
98
+ subscriber .delete_subscription (subscription = subscription_name )
21
99
22
100
23
101
@pytest .mark .flaky (retries = 3 , delay = 10 )
24
- def test_streaming_df_to_bigtable (session_load : bigframes .Session ):
102
+ def test_streaming_df_to_bigtable (
103
+ session_load : bigframes .Session , bigtable_table : table .Table
104
+ ):
25
105
# launch a continuous query
26
106
job_id_prefix = "test_streaming_"
27
107
sdf = session_load .read_gbq_table_streaming ("birds.penguins_bigtable_streaming" )
@@ -30,52 +110,82 @@ def test_streaming_df_to_bigtable(session_load: bigframes.Session):
30
110
sdf = sdf [sdf ["body_mass_g" ] < 4000 ]
31
111
sdf = sdf .rename (columns = {"island" : "rowkey" })
32
112
33
- query_job = sdf .to_bigtable (
34
- instance = "streaming-testing-instance" ,
35
- table = "table-testing" ,
36
- service_account_email = "[email protected] " ,
37
- app_profile = None ,
38
- truncate = True ,
39
- overwrite = True ,
40
- auto_create_column_families = True ,
41
- bigtable_options = {},
42
- job_id = None ,
43
- job_id_prefix = job_id_prefix ,
44
- )
45
-
46
113
try :
114
+ query_job = sdf .to_bigtable (
115
+ instance = "streaming-testing-instance" ,
116
+ table = bigtable_table .table_id ,
117
+ service_account_email = "streaming-testing-admin@bigframes-load-testing.iam.gserviceaccount.com" ,
118
+ app_profile = None ,
119
+ truncate = True ,
120
+ overwrite = True ,
121
+ auto_create_column_families = True ,
122
+ bigtable_options = {},
123
+ job_id = None ,
124
+ job_id_prefix = job_id_prefix ,
125
+ )
126
+
47
127
# wait 100 seconds in order to ensure the query doesn't stop
48
128
# (i.e. it is continuous)
49
129
time .sleep (100 )
50
130
assert query_job .running ()
51
131
assert query_job .error_result is None
52
132
assert str (query_job .job_id ).startswith (job_id_prefix )
133
+ assert len (list (bigtable_table .read_rows ())) > 0
53
134
finally :
54
135
query_job .cancel ()
55
136
56
137
57
138
@pytest .mark .flaky (retries = 3 , delay = 10 )
58
- def test_streaming_df_to_pubsub (session_load : bigframes .Session ):
139
+ def test_streaming_df_to_pubsub (
140
+ session_load : bigframes .Session , pubsub_topic_subscription_ids : tuple [str , str ]
141
+ ):
142
+ topic_id , subscription_id = pubsub_topic_subscription_ids
143
+
144
+ subscriber = pubsub .SubscriberClient ()
145
+
146
+ subscription_name = "projects/{project_id}/subscriptions/{sub}" .format (
147
+ project_id = session_load ._project ,
148
+ sub = subscription_id ,
149
+ )
150
+
59
151
# launch a continuous query
60
152
job_id_prefix = "test_streaming_pubsub_"
61
153
sdf = session_load .read_gbq_table_streaming ("birds.penguins_bigtable_streaming" )
62
154
63
155
sdf = sdf [sdf ["body_mass_g" ] < 4000 ]
64
156
sdf = sdf [["island" ]]
65
157
66
- query_job = sdf .to_pubsub (
67
- topic = "penguins" ,
68
- service_account_email = "[email protected] " ,
69
- job_id = None ,
70
- job_id_prefix = job_id_prefix ,
71
- )
72
-
73
158
try :
74
- # wait 100 seconds in order to ensure the query doesn't stop
75
- # (i.e. it is continuous)
76
- time .sleep (100 )
159
+
160
+ def counter (func ):
161
+ def wrapper (* args , ** kwargs ):
162
+ wrapper .count += 1 # type: ignore
163
+ return func (* args , ** kwargs )
164
+
165
+ wrapper .count = 0 # type: ignore
166
+ return wrapper
167
+
168
+ @counter
169
+ def callback (message ):
170
+ message .ack ()
171
+
172
+ future = subscriber .subscribe (subscription_name , callback )
173
+
174
+ query_job = sdf .to_pubsub (
175
+ topic = topic_id ,
176
+ service_account_email = "[email protected] " ,
177
+ job_id = None ,
178
+ job_id_prefix = job_id_prefix ,
179
+ )
180
+ try :
181
+ # wait 100 seconds in order to ensure the query doesn't stop
182
+ # (i.e. it is continuous)
183
+ future .result (timeout = 100 )
184
+ except futures .TimeoutError :
185
+ future .cancel ()
77
186
assert query_job .running ()
78
187
assert query_job .error_result is None
79
188
assert str (query_job .job_id ).startswith (job_id_prefix )
189
+ assert callback .count > 0 # type: ignore
80
190
finally :
81
191
query_job .cancel ()
0 commit comments