@@ -24,9 +24,15 @@ class SalesforceBulkQueryToS3Operator(BaseOperator):
2424 """
2525 template_fields = ('soql' , 's3_key' )
2626
27- def __init__ (self , sf_conn_id , soql , object_type , #sf config
28- s3_conn_id , s3_bucket , s3_key , #s3 config
29- * args , ** kwargs ):
27+ def __init__ (self ,
28+ sf_conn_id ,
29+ soql ,
30+ object_type ,
31+ s3_conn_id ,
32+ s3_bucket ,
33+ s3_key ,
34+ * args ,
35+ ** kwargs ):
3036
3137 super ().__init__ (* args , ** kwargs )
3238
@@ -52,36 +58,17 @@ def execute(self, context):
5258
5359
5460class SalesforceToS3Operator (BaseOperator ):
55- """
56- Make a query against Salesforce and write the resulting data to a file.
57- """
58- template_fields = ("s3_key" ,
59- "query" )
60-
61- @apply_defaults
62- def __init__ (
63- self ,
64- sf_conn_id ,
65- sf_obj ,
66- s3_conn_id ,
67- s3_bucket ,
68- s3_key ,
69- sf_fields = None ,
70- fmt = "csv" ,
71- query = None ,
72- relationship_object = None ,
73- record_time_added = False ,
74- coerce_to_timestamp = False ,
75- * args ,
76- ** kwargs
77- ):
7861 """
79- Initialize the operator
62+ Salesforce to S3 Operator
63+
64+ Makes a query against Salesforce and write the resulting data to a file.
65+
8066 :param sf_conn_id: Name of the Airflow connection that has
8167 the following information:
8268 - username
8369 - password
8470 - security_token
71+ :type sf_conn_id: string
8572 :param sf_obj: Name of the relevant Salesforce object
8673 :param s3_conn_id: The destination s3 connection id.
8774 :type s3_conn_id: string
@@ -93,46 +80,70 @@ def __init__(
9380 to get from the object.
9481 If *None*, then this will get all fields
9582 for the object
83+ :type sf_fields: list
9684 :param fmt: *(optional)* format that the s3_key of the
9785 data should be in. Possible values include:
9886 - csv
9987 - json
10088 - ndjson
10189 *Default: csv*
90+ :type fmt: list
10291 :param query: *(optional)* A specific query to run for
10392 the given object. This will override
10493 default query creation.
10594 *Default: None*
95+ :type query: string
10696 :param relationship_object: *(optional)* Some queries require
10797 relationship objects to work, and
10898 these are not the same names as
10999 the SF object. Specify that
110100 relationship object here.
111101 *Default: None*
102+ :type relationship_object: string
112103 :param record_time_added: *(optional)* True if you want to add a
113104 Unix timestamp field to the resulting data
114105 that marks when the data was
115106 fetched from Salesforce.
116107 *Default: False*.
108+ :type record_time_added: string
117109 :param coerce_to_timestamp: *(optional)* True if you want to convert
118110 all fields with dates and datetimes
119111 into Unix timestamp (UTC).
120112 *Default: False*.
113+ :type coerce_to_timestamp: string
121114 """
122-
123- super (SalesforceToS3Operator , self ).__init__ (* args , ** kwargs )
124-
125- self .sf_conn_id = sf_conn_id
126- self .object = sf_obj
127- self .fields = sf_fields
128- self .s3_conn_id = s3_conn_id
129- self .s3_bucket = s3_bucket
130- self .s3_key = s3_key
131- self .fmt = fmt .lower ()
132- self .query = query
133- self .relationship_object = relationship_object
134- self .record_time_added = record_time_added
135- self .coerce_to_timestamp = coerce_to_timestamp
115+ template_fields = ("s3_key" ,
116+ "query" )
117+
118+ @apply_defaults
119+ def __init__ (self ,
120+ sf_conn_id ,
121+ sf_obj ,
122+ s3_conn_id ,
123+ s3_bucket ,
124+ s3_key ,
125+ sf_fields = None ,
126+ fmt = "csv" ,
127+ query = None ,
128+ relationship_object = None ,
129+ record_time_added = False ,
130+ coerce_to_timestamp = False ,
131+ * args ,
132+ ** kwargs ):
133+
134+ super (SalesforceToS3Operator , self ).__init__ (* args , ** kwargs )
135+
136+ self .sf_conn_id = sf_conn_id
137+ self .object = sf_obj
138+ self .fields = sf_fields
139+ self .s3_conn_id = s3_conn_id
140+ self .s3_bucket = s3_bucket
141+ self .s3_key = s3_key
142+ self .fmt = fmt .lower ()
143+ self .query = query
144+ self .relationship_object = relationship_object
145+ self .record_time_added = record_time_added
146+ self .coerce_to_timestamp = coerce_to_timestamp
136147
137148 def special_query (self , query , sf_hook , relationship_object = None ):
138149 if not query :
0 commit comments