@@ -203,42 +203,13 @@ def max_record_count(self) -> int | None:
203203 """Return the maximum number of records to fetch in a single query."""
204204 return self .config .get ("max_record_count" )
205205
206- def build_query (self , context : Context | None ) -> sa .sql .Select :
206+ def build_query (self , table : sa . Table ) -> sa .sql .Select :
207207 """Build a SQLAlchemy query for the stream."""
208- selected_column_names = self .get_selected_schema ()["properties" ].keys ()
209- table = self .connector .get_table (
210- full_table_name = self .fully_qualified_name ,
211- column_names = selected_column_names ,
212- )
213- query = table .select ()
214-
215- if self .replication_key :
216- replication_key_col = table .columns [self .replication_key ]
217- order_by = (
218- sa .nulls_first (replication_key_col .asc ())
219- if self .supports_nulls_first
220- else replication_key_col .asc ()
221- )
222- query = query .order_by (order_by )
223-
224- start_val = self .get_starting_replication_key_value (context )
225- if start_val :
226- query = query .where (replication_key_col >= start_val )
227-
208+ query = super ().build_query (table )
228209 stream_options = self .config .get ("stream_options" , {}).get (self .name , {})
229210 if clauses := stream_options .get ("custom_where_clauses" ):
230211 query = query .where (* (sa .text (clause .strip ()) for clause in clauses ))
231212
232- if self .ABORT_AT_RECORD_COUNT is not None :
233- # Limit record count to one greater than the abort threshold. This ensures
234- # `MaxRecordsLimitException` exception is properly raised by caller
235- # `Stream._sync_records()` if more records are available than can be
236- # processed.
237- query = query .limit (self .ABORT_AT_RECORD_COUNT + 1 )
238-
239- if self .max_record_count ():
240- query = query .limit (self .max_record_count ())
241-
242213 return query
243214
244215 # Get records from stream
@@ -264,8 +235,18 @@ def get_records(self, context: Context | None) -> t.Iterable[dict[str, t.Any]]:
264235 msg = f"Stream '{ self .name } ' does not support partitioning."
265236 raise NotImplementedError (msg )
266237
238+ selected_column_names = self .get_selected_schema ()["properties" ].keys ()
239+ table = self .connector .get_table (
240+ full_table_name = self .fully_qualified_name ,
241+ column_names = selected_column_names ,
242+ )
243+
244+ query = self .build_query (table )
245+ query = self .apply_replication_filter (query , table , context = context )
246+ query = self .apply_abort_query_limit (query )
247+
267248 with self .connector ._connect () as conn :
268- for record in conn .execute (self . build_query ( context ) ).mappings ():
249+ for record in conn .execute (query ).mappings ():
269250 # TODO: Standardize record mapping type
270251 # https://github.com/meltano/sdk/issues/2096
271252 transformed_record = self .post_process (dict (record ))
0 commit comments