1
- from bs4 import BeautifulSoup
2
- from selenium .webdriver .common .by import By
3
- from selenium .webdriver .support import expected_conditions as EC
4
- from selenium .webdriver .support .wait import WebDriverWait
1
+ import json
2
+ from dataclasses import asdict , dataclass
3
+ from typing import Literal
5
4
6
- from uk_bin_collection .uk_bin_collection .common import *
5
+ import requests
6
+ from cryptography .hazmat .backends import default_backend
7
+ from cryptography .hazmat .primitives import padding
8
+ from cryptography .hazmat .primitives .ciphers import Cipher , algorithms , modes
9
+
10
+ from uk_bin_collection .uk_bin_collection .common import check_uprn
7
11
from uk_bin_collection .uk_bin_collection .get_bin_data import AbstractGetBinDataClass
8
12
13
+ key_hex = "F57E76482EE3DC3336495DEDEEF3962671B054FE353E815145E29C5689F72FEC"
14
+ iv_hex = "2CBF4FC35C69B82362D393A4F0B9971A"
15
+
16
+
17
+ @dataclass
18
+ class BucksInput :
19
+ P_CLIENT_ID : Literal [152 ]
20
+ P_COUNCIL_ID : Literal [34505 ]
21
+ P_LANG_CODE : Literal ["EN" ]
22
+ P_UPRN : str
23
+
9
24
10
25
class CouncilClass (AbstractGetBinDataClass ):
11
26
"""
@@ -14,113 +29,73 @@ class CouncilClass(AbstractGetBinDataClass):
14
29
implementation.
15
30
"""
16
31
17
- def parse_data (self , page : str , ** kwargs ) -> dict :
18
- driver = None
32
+ def encode_body (self , bucks_input : BucksInput ):
33
+ key = bytes .fromhex (key_hex )
34
+ iv = bytes .fromhex (iv_hex )
35
+
36
+ json_data = json .dumps (asdict (bucks_input ))
37
+ data_bytes = json_data .encode ("utf-8" )
38
+
39
+ padder = padding .PKCS7 (128 ).padder ()
40
+ padded_data = padder .update (data_bytes ) + padder .finalize ()
41
+
42
+ backend = default_backend ()
43
+ cipher = Cipher (algorithms .AES (key ), modes .CBC (iv ), backend = backend )
44
+ encryptor = cipher .encryptor ()
45
+ ciphertext = encryptor .update (padded_data ) + encryptor .finalize ()
46
+
47
+ return ciphertext .hex ()
48
+
49
+ def decode_response (self , hex_input : str ):
50
+
51
+ key = bytes .fromhex (key_hex )
52
+ iv = bytes .fromhex (iv_hex )
53
+ ciphertext = bytes .fromhex (hex_input )
54
+
55
+ backend = default_backend ()
56
+ cipher = Cipher (algorithms .AES (key ), modes .CBC (iv ), backend = backend )
57
+ decryptor = cipher .decryptor ()
58
+ decrypted_padded = decryptor .update (ciphertext ) + decryptor .finalize ()
59
+
60
+ unpadder = padding .PKCS7 (128 ).unpadder ()
61
+ plaintext_bytes = unpadder .update (decrypted_padded ) + unpadder .finalize ()
62
+ plaintext = plaintext_bytes .decode ("utf-8" )
63
+
64
+ return json .loads (plaintext )
65
+
66
+ def parse_data (self , _ : str , ** kwargs ) -> dict :
19
67
try :
20
- data = {"bins" : []}
21
- user_paon = kwargs .get ("paon" )
22
- user_postcode = kwargs .get ("postcode" )
23
- user_uprn = kwargs .get ("uprn" )
24
- web_driver = kwargs .get ("web_driver" )
25
- headless = kwargs .get ("headless" )
26
- check_paon (user_paon )
27
- check_postcode (user_postcode )
28
-
29
- # Create Selenium webdriver
30
- driver = create_webdriver (web_driver , headless , None , __name__ )
31
- driver .get (kwargs .get ("url" ))
32
-
33
- # Click "Check now" button
34
- check_now_button = WebDriverWait (driver , 10 ).until (
35
- EC .element_to_be_clickable ((By .XPATH , "//a[contains(text(), 'Check now')]" ))
68
+ user_uprn : str = kwargs .get ("uprn" ) or ""
69
+ check_uprn (user_uprn )
70
+ bucks_input = BucksInput (
71
+ P_CLIENT_ID = 152 , P_COUNCIL_ID = 34505 , P_LANG_CODE = "EN" , P_UPRN = user_uprn
36
72
)
37
- check_now_button .click ()
38
73
39
- # Wait for the postcode field to appear then populate it
40
- inputElement_postcode = WebDriverWait (driver , 10 ).until (
41
- EC .presence_of_element_located ((By .ID , "postcodeSearch" ))
42
- )
43
- inputElement_postcode .send_keys (user_postcode )
74
+ encoded_input = self .encode_body (bucks_input )
44
75
45
- # Click Find button
46
- find_button = WebDriverWait (driver , 10 ).until (
47
- EC .element_to_be_clickable ((By .XPATH , "//button[contains(text(), 'Find')]" ))
76
+ session = requests .Session ()
77
+ response = session .post (
78
+ "https://itouchvision.app/portal/itouchvision/kmbd/collectionDay" ,
79
+ data = encoded_input ,
48
80
)
49
- find_button .click ()
50
-
51
- # Wait for the address dropdown and select by UPRN
52
- if user_uprn :
53
- address_option = WebDriverWait (driver , 10 ).until (
54
- EC .element_to_be_clickable ((By .XPATH , f"//option[@value='{ user_uprn } ']" ))
55
- )
56
- address_option .click ()
57
- else :
58
- # Fallback to selecting by address text
59
- address_option = WebDriverWait (driver , 10 ).until (
60
- EC .element_to_be_clickable (
61
- (By .XPATH , f"//select[@id='addressSelect']//option[contains(., '{ user_paon } ')]" )
62
- )
63
- )
64
- address_option .click ()
65
81
66
- # Wait a moment for the page to update after address selection
67
- import time
68
- time .sleep (2 )
82
+ output = response .text
69
83
70
- # Wait for collection information to appear - try multiple possible selectors
71
- try :
72
- WebDriverWait ( driver , 15 ). until (
73
- EC . presence_of_element_located (( By . XPATH , "//h2[contains(text(), 'Your next collections')]" ))
74
- )
75
- except :
76
- # Alternative wait for collection data structure
77
- WebDriverWait ( driver , 10 ). until (
78
- EC . presence_of_element_located (( By . XPATH , "//div[contains(@class, 'ant-row') and contains(@class, 'd-flex')]//h3[@class='text-white']" ))
84
+ decoded_bins = self . decode_response ( output )
85
+ data : dict [ str , list [ dict [ str , str ]]] = {}
86
+ data [ "bins" ] = list (
87
+ map (
88
+ lambda a : {
89
+ "type" : a [ "binType" ],
90
+ "collectionDate" : a [ "collectionDay" ]. replace ( "-" , "/" ),
91
+ },
92
+ decoded_bins [ "collectionDay" ],
79
93
)
80
-
81
- soup = BeautifulSoup (driver .page_source , features = "html.parser" )
82
-
83
- # Find all collection items with the specific structure - try multiple class patterns
84
- collection_items = soup .find_all ("div" , class_ = lambda x : x and "ant-col" in x and "ant-col-xs-12" in x )
85
- if not collection_items :
86
- # Fallback to finding items by structure
87
- collection_items = soup .find_all ("div" , class_ = lambda x : x and "p-2" in x and "d-flex" in x and "flex-column" in x )
88
-
89
- current_year = datetime .now ().year
90
- current_month = datetime .now ().month
91
-
92
- for item in collection_items :
93
- # Extract bin type from h3 element
94
- bin_type_elem = item .find ("h3" , class_ = "text-white" )
95
- # Extract date from div with specific classes
96
- date_elem = item .find ("div" , class_ = "text-white fw-bold" )
97
-
98
- if bin_type_elem and date_elem :
99
- bin_type = bin_type_elem .get_text ().strip ()
100
- date_text = date_elem .get_text ().strip ()
101
-
102
- try :
103
- collection_date = datetime .strptime (date_text , "%A %d %B" )
104
- if (current_month > 10 ) and (collection_date .month < 3 ):
105
- collection_date = collection_date .replace (year = (current_year + 1 ))
106
- else :
107
- collection_date = collection_date .replace (year = current_year )
108
-
109
- dict_data = {
110
- "type" : bin_type ,
111
- "collectionDate" : collection_date .strftime ("%d/%m/%Y" ),
112
- }
113
- data ["bins" ].append (dict_data )
114
- except ValueError :
115
- continue
94
+ )
116
95
117
96
except Exception as e :
118
97
# Here you can log the exception if needed
119
98
print (f"An error occurred: { e } " )
120
99
# Optionally, re-raise the exception if you want it to propagate
121
100
raise
122
- finally :
123
- # This block ensures that the driver is closed regardless of an exception
124
- if driver :
125
- driver .quit ()
126
101
return data
0 commit comments