-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathGetCharges.py
More file actions
373 lines (310 loc) · 14.4 KB
/
GetCharges.py
File metadata and controls
373 lines (310 loc) · 14.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
"""
Vipps GetCharges
Extract all charges from your Vipps agreement.
Developed with love by Lily Brennhagen @ Fluido 2025
lily.brennhagen@fluidogroup.com
Licensed under the MIT License: free to use, modify, and share.
"""
import os
import requests
import json
import csv
import time
from datetime import datetime, timezone, timedelta
from dotenv import load_dotenv
load_dotenv()
VIPPS_SYSTEM_NAME = "ChargeAnalysisScript"
VIPPS_SYSTEM_VERSION = "2.0"
VIPPS_PLUGIN_NAME = "PythonRequests"
VIPPS_PLUGIN_VERSION = "1.3"
#Sets the page size. 1000 is max. Recommend leaving as 1000.
PAGE_SIZE = 1000
#Set the start date (YYYY-MM-DD). None = No start date filter (gets all historical charges).
START_DATE = "2025-01-01"
#Set the end date (YYYY-MM-DD). None = No end date filter (gets up to current date).
END_DATE = "2025-12-31"
#Skip empty agreements. If False: Save the agreement anyways in the .csv without any charges.
SKIP_EMPTY = True
#Agreement status filter: "ACTIVE", "STOPPED", "EXPIRED", "PENDING", or "ALL" (fetches all statuses).
AGREEMENT_STATUS = "ALL"
try:
CLIENT_ID = os.getenv('client_id')
CLIENT_SECRET = os.getenv('client_secret')
OCP_APIM_SUBSCRIPTION_KEY = os.getenv('Ocp_apim_subscription_key')
MERCHANT_SERIAL_NUMBER = os.getenv('merchant_serial_number')
if not all([CLIENT_ID, CLIENT_SECRET, OCP_APIM_SUBSCRIPTION_KEY, MERCHANT_SERIAL_NUMBER]):
raise ValueError("One or more environment variables are missing.")
except Exception as e:
print(f"Error loading environment variables: {e}")
exit()
BASE_URL = 'https://api.vipps.no'
AUTH_URL = f'{BASE_URL}/accesstoken/get'
AGREEMENTS_URL = f'{BASE_URL}/recurring/v3/agreements'
CHARGES_URL_TEMPLATE = f'{BASE_URL}/recurring/v3/agreements/{{}}/charges'
VIPPS_SYSTEM_HEADERS = {
'Vipps-System-Name': VIPPS_SYSTEM_NAME,
'Vipps-System-Version': VIPPS_SYSTEM_VERSION,
'Vipps-System-Plugin-Name': VIPPS_PLUGIN_NAME,
'Vipps-System-Plugin-Version': VIPPS_PLUGIN_VERSION,
}
def get_access_token():
headers = {
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'Ocp-Apim-Subscription-Key': OCP_APIM_SUBSCRIPTION_KEY,
}
headers.update(VIPPS_SYSTEM_HEADERS)
payload = {'grant_type': 'client_credentials'}
retries = 0
max_retries = 5
while retries < max_retries:
try:
response = requests.post(AUTH_URL, headers=headers, data=payload)
response.raise_for_status()
token_data = response.json()
return token_data.get('access_token')
except requests.exceptions.HTTPError as http_err:
if http_err.response.status_code == 429:
wait_time = 2 ** retries
print(f"Rate limit exceeded during token request. Waiting {wait_time} seconds before retrying...")
time.sleep(wait_time)
retries += 1
else:
print(f"HTTP error during authorization: {http_err}")
if http_err.response is not None:
print(f"Response Body: {http_err.response.text}")
time.sleep(1)
return None
except requests.exceptions.RequestException as req_err:
print(f"Request error during authorization: {req_err}")
time.sleep(1)
return None
print(f"Failed to get access token after {max_retries} retries.")
return None
def get_vipps_headers(access_token):
headers = {
'Authorization': f'Bearer {access_token}',
'Ocp-Apim-Subscription-Key': OCP_APIM_SUBSCRIPTION_KEY,
'Merchant-Serial-Number': MERCHANT_SERIAL_NUMBER,
'Accept': 'application/json',
}
headers.update(VIPPS_SYSTEM_HEADERS)
return headers
def get_agreements_by_status(access_token, status):
"""Fetch all agreements with a specific status."""
agreements = []
page_number = 1
headers = get_vipps_headers(access_token)
while True:
params = {
'pageNumber': page_number,
'pageSize': PAGE_SIZE,
'status': status
}
try:
response = requests.get(AGREEMENTS_URL, headers=headers, params=params)
response.raise_for_status()
try:
current_page_agreements = response.json()
except json.JSONDecodeError:
print(f"Warning: Received empty or non-JSON response for {status} page {page_number}. Assuming end of list.")
break
if not isinstance(current_page_agreements, list):
print(f"Error: Expected a list of agreements but received unexpected format on {status} page {page_number}.")
print(f"Response content: {response.text}")
return None
agreements.extend(current_page_agreements)
if len(current_page_agreements) < PAGE_SIZE:
break
page_number += 1
except requests.exceptions.RequestException as e:
print(f"Error fetching {status} agreements on page {page_number}: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"Status Code: {e.response.status_code}")
print(f"Response Body: {e.response.text}")
time.sleep(1)
return None
return agreements
def get_all_agreements(access_token):
"""Fetch agreements based on AGREEMENT_STATUS setting."""
# API doesn't support fetching all statuses at once, so we need to make separate calls
if AGREEMENT_STATUS == "ALL":
all_statuses = ['ACTIVE', 'STOPPED', 'EXPIRED', 'PENDING']
all_agreements = []
for status in all_statuses:
print(f" Fetching {status} agreements...")
agreements = get_agreements_by_status(access_token, status)
if agreements is None:
print(f" Warning: Failed to fetch {status} agreements, continuing with others...")
continue
print(f" Found {len(agreements)} {status} agreements")
all_agreements.extend(agreements)
return all_agreements
else:
# Single status fetch
return get_agreements_by_status(access_token, AGREEMENT_STATUS)
def get_charges_for_agreement(access_token, agreement_id):
url = CHARGES_URL_TEMPLATE.format(agreement_id)
all_charges = []
continuation_token = None
while True:
headers = get_vipps_headers(access_token)
if continuation_token:
headers['Continuation-Token'] = continuation_token
retries = 0
max_retries = 5
response = None
while retries < max_retries:
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
break
except requests.exceptions.HTTPError as http_err:
if http_err.response.status_code == 429:
wait_time = 2 ** retries
print(f"Rate limit exceeded fetching charges for {agreement_id}. Waiting {wait_time} seconds...")
time.sleep(wait_time)
retries += 1
elif http_err.response.status_code == 404:
print(f"Warning: Charges not found (404) for agreement {agreement_id}.")
return []
else:
print(f"HTTP error fetching charges for agreement {agreement_id}: {http_err}")
if http_err.response is not None:
print(f"Response Body: {http_err.response.text}")
time.sleep(1)
return None
except requests.exceptions.RequestException as req_err:
print(f"Request error fetching charges for agreement {agreement_id}: {req_err}")
time.sleep(1)
return None
if response is None:
print(f"Failed to fetch charges for agreement {agreement_id} after {max_retries} retries.")
return None
current_charges = response.json()
if not isinstance(current_charges, list):
print(f"Error: Expected a list of charges but received unexpected format for agreement {agreement_id}.")
print(f"Response content: {response.text}")
return None
all_charges.extend(current_charges)
continuation_token = response.headers.get('Continuation-Token')
if not continuation_token:
break
return all_charges
def parse_charge_date(date_str):
if not date_str:
return None
try:
dt = datetime.fromisoformat(date_str)
return dt
except ValueError:
try:
dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S").replace(tzinfo=timezone.utc)
return dt
except ValueError:
print(f"Warning: Could not parse date string: {date_str}")
return None
def main():
# Parse date filters
start_date_filter = None
end_date_filter = None
if START_DATE:
try:
start_date_filter = datetime.fromisoformat(START_DATE).replace(tzinfo=timezone.utc)
print(f"Filtering charges from {START_DATE} onwards...")
except ValueError:
print(f"Warning: Invalid START_DATE format '{START_DATE}'. Expected ISO format (YYYY-MM-DD). Ignoring filter.")
if END_DATE:
try:
# Set to end of day (23:59:59) to make END_DATE fully inclusive
end_date_filter = datetime.fromisoformat(END_DATE).replace(
hour=23, minute=59, second=59, microsecond=999999, tzinfo=timezone.utc
)
print(f"Filtering charges until {END_DATE} (inclusive)...")
except ValueError:
print(f"Warning: Invalid END_DATE format '{END_DATE}'. Expected ISO format (YYYY-MM-DD). Ignoring filter.")
print("Getting access token...")
access_token = get_access_token()
if not access_token:
print("Failed to get access token. Exiting.")
return
print("Fetching active agreements...")
agreements_data = get_all_agreements(access_token)
if agreements_data is None:
print("Failed to fetch agreements due to an error. Exiting.")
return
if not agreements_data:
print("No active agreements found. Exiting.")
return
# Setup incremental CSV writing - saves RAM and preserves data on crash
today = datetime.now().strftime('%Y-%m-%d')
timestamp = datetime.now().strftime('%H%M%S')
output_file = f'VippsCharges_{today}_{timestamp}.csv'
fieldnames = ['agreementId', 'agreementUrl', 'chargeStatus', 'chargeId', 'amount', 'due', 'transactionDate']
charges_written = 0
agreements_processed = 0
print(f"Found {len(agreements_data)} agreements. Fetching charges for each...")
print(f"Writing incrementally to: {output_file}")
try:
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for i, agreement in enumerate(agreements_data):
if (i + 1) % 100 == 0:
print(f"Processing agreement {i+1}/{len(agreements_data)}... ({charges_written} charges written so far)")
agreement_id = agreement.get('id')
if agreement_id:
charges = get_charges_for_agreement(access_token, agreement_id)
if charges is not None:
charges_for_agreement = 0
if charges:
for charge in charges:
charge_date = parse_charge_date(charge.get('due', charge.get('transactionDate')))
# Apply START_DATE filter
if start_date_filter and charge_date:
if charge_date < start_date_filter:
continue
# Apply END_DATE filter
if end_date_filter and charge_date:
if charge_date > end_date_filter:
continue
charge_info = {
'agreementId': agreement_id,
'agreementUrl': agreement.get('merchantAgreementUrl'),
'chargeStatus': charge.get('status'),
'chargeId': charge.get('id'),
'amount': charge.get('amount'),
'due': charge.get('due'),
'transactionDate': charge.get('transactionDate')
}
writer.writerow(charge_info)
charges_written += 1
charges_for_agreement += 1
# Handle empty agreements if SKIP_EMPTY is False
if charges_for_agreement == 0 and not SKIP_EMPTY:
writer.writerow({
'agreementId': agreement_id,
'agreementUrl': agreement.get('merchantAgreementUrl'),
'chargeStatus': 'NO_CHARGES',
'chargeId': '',
'amount': '',
'due': '',
'transactionDate': ''
})
charges_written += 1
agreements_processed += 1
else:
print("Skipping an agreement entry with no ID.")
# Flush to disk periodically to ensure data is saved
if (i + 1) % 50 == 0:
csvfile.flush()
if charges_written == 0:
print("No charges matched the filters. File created with headers only.")
else:
print(f"Successfully wrote {charges_written} charges from {agreements_processed} agreements to: {output_file}")
except IOError as e:
print(f"Error writing to CSV file: {e}")
if charges_written > 0:
print(f"Note: {charges_written} charges may have been partially saved before the error.")
if __name__ == "__main__":
main()