1- from airflow .hooks .base_hook import BaseHook
2- import json
31import stripe
42
3+ from airflow .hooks .base_hook import BaseHook
4+
55
66class StripeHook (BaseHook ):
7- def __init__ (
8- self ,
9- conn_id ,
10- * args ,
11- ** kwargs ):
7+ def __init__ (self ,
8+ conn_id ,
9+ * args ,
10+ ** kwargs ):
1211 self .conn_id = conn_id
1312 self ._args = args
1413 self ._kwargs = kwargs
@@ -27,37 +26,32 @@ def get_conn(self):
2726 self .connection = self .get_connection (self .conn_id )
2827 self .extras = self .connection .extra_dejson
2928
30- stripe .api_key = self .extras ['api_key' ]
29+ stripe .api_key = self .extras .get ('api_key' , None )
30+
3131 self .stripe = stripe
3232
3333 return stripe
3434
35- def run_query (self , model , replication_key_value = None , ** kwargs ):
35+ def run_query (self ,
36+ stripe_object ,
37+ replication_key_value = None ,
38+ ** kwargs ):
3639 """
3740 Run a query against stripe
38- :param model : name of the Stripe model
41+ :param stripe_object : name of the Stripe object
3942 :param replication_key_value: Stripe replicaton key value
4043 """
4144 stripe_instance = self .get_conn ()
42- stripe_model = getattr (stripe_instance , model )
45+ stripe_endpoint = getattr (stripe_instance , stripe_object )
4346
4447 method_to_call = 'list'
45- if model is 'BalanceHistory' :
48+ if stripe_object is 'BalanceHistory' :
4649 method_to_call = 'all'
4750 if replication_key_value :
48- stripe_response = getattr (stripe_model , method_to_call )(
51+ stripe_response = getattr (stripe_endpoint , method_to_call )(
4952 ending_before = replication_key_value , ** kwargs )
5053 else :
51- stripe_response = getattr (stripe_model , method_to_call )(** kwargs )
54+ stripe_response = getattr (stripe_endpoint , method_to_call )(** kwargs )
5255
5356 for res in stripe_response .auto_paging_iter ():
5457 yield res
55-
56- def get_records (self , sql ):
57- pass
58-
59- def get_pandas_df (self , sql ):
60- pass
61-
62- def run (self , sql ):
63- pass
0 commit comments