|
1 | 1 | #!/usr/bin/env python3 |
2 | 2 |
|
3 | | -# Background tutorial on async programming with Python |
4 | | -# https://realpython.com/async-io-python/ |
5 | | - |
6 | | -# Requires Python 3.8 or newer. Tested with 3.8/3.9/3.10/3.11. |
7 | | - |
8 | | -# Installation: |
9 | | -# pip3 install --upgrade opencage asyncio aiohttp backoff tqdm |
10 | | - |
11 | | -# Example (forward) input file: |
12 | | -# 1,"Via Allende 8, Cascina, Toscana, Italia" |
13 | | -# 2,"Via Coppi, 17, Formigine, Emilia-Romagna, Italia" |
14 | | -# 3,"Via Dei Salici 20, Gallarate, Lombardia, Italia" |
15 | | -# 4,"Via Vittorio Veneto N7, San Giuliano Terme, Toscana, Italia" |
16 | | -# 5,"Via Tiro A Segno 8, Gallarate, Lombardia, Italia" |
17 | | - |
18 | | -# Example (reverse) input file: |
19 | | -# 1,"43.6783472,10.5533173" |
20 | | -# 2,"44.5655041,10.8412106" |
21 | | -# 3,"45.6823942,8.7919808" |
22 | | -# 4,"43.7804922,10.402925" |
23 | | -# 5,"45.6506236,8.8037173" |
24 | | -# or |
25 | | -# 1,43.6783472,10.5533173 |
26 | | -# 2,44.5655041,10.8412106 |
27 | | -# 3,45.6823942,8.7919808 |
28 | | -# 4,43.7804922,10.402925 |
29 | | -# 5,45.6506236,8.8037173 |
30 | | - |
31 | | -import os |
32 | | -import sys |
33 | | -import csv |
34 | | -import re |
35 | | -import ssl |
36 | | -import asyncio |
37 | | -import traceback |
38 | | -import aiohttp |
39 | | -import backoff |
40 | | -import certifi |
41 | | -import pkg_resources |
42 | | -from tqdm import tqdm |
43 | | -import opencage |
44 | | -from opencage.geocoder import OpenCageGeocode, OpenCageGeocodeError |
45 | | - |
46 | | -# Use certificates from the certifi package instead of those of the operating system |
47 | | -# https://pypi.org/project/certifi/ |
48 | | -# https://docs.aiohttp.org/en/stable/client_advanced.html#ssl-control-for-tcp-sockets |
49 | | -sslcontext = ssl.create_default_context(cafile=certifi.where()) |
50 | | -# Alternatively set sslcontext=False to ignore certificate validation (not advised) |
51 | | -# or sslcontext=None to use those of the operating system |
52 | | - |
53 | | - |
54 | | - |
55 | | -API_KEY = '' |
56 | | -FILENAME_INPUT_CSV = 'file_to_geocode.csv' |
57 | | -FILENAME_OUTPUT_CSV = 'file_geocoded.csv' |
58 | | -FORWARD_OR_REVERSE = 'guess' # 'forward' (address -> coordinates) or 'reverse' (coordinates -> address) |
59 | | - # With 'guess' the script checks if the address is two numbers and then |
60 | | - # assumes reverse |
61 | | -API_DOMAIN = 'api.opencagedata.com' |
62 | | -MAX_ITEMS = 100 # How many lines to read from the input file. Set to 0 for unlimited |
63 | | -NUM_WORKERS = 3 # For 10 requests per second try 2-5 |
64 | | -REQUEST_TIMEOUT_SECONDS = 5 # For individual HTTP requests. Default is 1 |
65 | | -RETRY_MAX_TRIES = 10 # How often to retry if a HTTP request times out |
66 | | -RETRY_MAX_TIME = 60 # Limit in seconds for retries |
67 | | -SHOW_PROGRESS = True # Show progress bar |
68 | | - |
69 | | - |
70 | | - |
71 | | - |
72 | | - |
73 | | -# Check OpenCage geocoder is the latest version |
74 | | -# |
75 | | -minimum_required_version = '2.3.1' |
76 | | -package_version = pkg_resources.get_distribution('opencage').version |
77 | | -if pkg_resources.parse_version(package_version) < pkg_resources.parse_version(minimum_required_version): |
78 | | - sys.stderr.write(f"At least version {minimum_required_version} of opencage geocoder package required. ") |
79 | | - sys.stderr.write(f"Try upgrading by running 'pip install --upgrade opencage'.\n") |
80 | | - sys.exit(1) |
81 | | - |
82 | | - |
83 | | -# Check API key present |
84 | | -# |
85 | | -if len(API_KEY) < 32: |
86 | | - sys.stderr.write(f"API_KEY '{API_KEY}' does not look valid.\n") |
87 | | - sys.exit(1) |
88 | | - |
89 | | - |
90 | | -# Don't overwrite output file |
91 | | -# |
92 | | -if os.path.exists(FILENAME_OUTPUT_CSV): |
93 | | - sys.stderr.write(f"The output file '{FILENAME_OUTPUT_CSV}' already exists.\n") |
94 | | - sys.exit(1) |
95 | | - |
96 | | -csv_writer = csv.writer(open(FILENAME_OUTPUT_CSV, 'w', encoding='utf8', newline='')) |
97 | | - |
98 | | -PROGRESS_BAR = SHOW_PROGRESS and tqdm(total=0, position=0, desc="Addresses geocoded", dynamic_ncols=True) |
99 | | - |
100 | | -# '40.78,-73.97' => true |
101 | | -# '3rd Ave, New York' => false |
102 | | -def guess_text_is_coordinate_pair(text): |
103 | | - coordinate_pattern = r'^(-?\d+(\.\d+)?),(-?\d+(\.\d+)?)$' |
104 | | - # x = 'yes' if bool(re.search(coordinate_pattern, text)) else 'no' |
105 | | - # sys.stderr.write(f"{text} is coordinate_pair: {x}\n") |
106 | | - return bool(re.search(coordinate_pattern, text)) |
107 | | - |
108 | | -async def write_one_geocoding_result(geocoding_result, address, address_id): |
109 | | - # print(geocoding_result, file=sys.stderr) |
110 | | - if geocoding_result is not None: |
111 | | - row = [ |
112 | | - address_id, |
113 | | - geocoding_result['geometry']['lat'], |
114 | | - geocoding_result['geometry']['lng'], |
115 | | - # Any of the components might be empty: |
116 | | - geocoding_result['components'].get('_type', ''), |
117 | | - geocoding_result['components'].get('country', ''), |
118 | | - geocoding_result['components'].get('county', ''), |
119 | | - geocoding_result['components'].get('city', ''), |
120 | | - geocoding_result['components'].get('postcode', ''), |
121 | | - geocoding_result['components'].get('road', ''), |
122 | | - geocoding_result['components'].get('house_number', ''), |
123 | | - geocoding_result['confidence'], |
124 | | - geocoding_result['formatted'] |
125 | | - ] |
126 | | - |
127 | | - else: |
128 | | - sys.stderr.write(f"not found, writing empty result: {address}\n") |
129 | | - row = [ |
130 | | - address_id, |
131 | | - 0, # not to be confused with https://en.wikipedia.org/wiki/Null_Island |
132 | | - 0, |
133 | | - '', |
134 | | - '', |
135 | | - '', |
136 | | - '', |
137 | | - '', |
138 | | - '', |
139 | | - '', |
140 | | - -1, # confidence values are 1-10 (lowest to highest), use -1 for unknown |
141 | | - '' |
142 | | - ] |
143 | | - csv_writer.writerow(row) |
144 | | - |
145 | | - |
146 | | -# Backing off 0.4 seconds afters 1 tries calling function <function geocode_one_address |
147 | | -# at 0x10dbf5e50> with args ('14464 3RD ST # 4, 91423, CA, USA', '1780245') and kwargs {} |
148 | | -def backoff_hdlr(details): |
149 | | - sys.stderr.write("Backing off {wait:0.1f} seconds afters {tries} tries " |
150 | | - "calling function {target} with args {args} and kwargs " |
151 | | - "{kwargs}\n".format(**details)) |
152 | | - |
153 | | -# https://pypi.org/project/backoff/ |
154 | | -@backoff.on_exception(backoff.expo, |
155 | | - (asyncio.TimeoutError), |
156 | | - max_time=RETRY_MAX_TIME, # seconds |
157 | | - max_tries=RETRY_MAX_TRIES, |
158 | | - on_backoff=backoff_hdlr) |
159 | | -async def geocode_one_address(address, address_id): |
160 | | - async with OpenCageGeocode(API_KEY, domain=API_DOMAIN, sslcontext=sslcontext) as geocoder: |
161 | | - global FORWARD_OR_REVERSE |
162 | | - |
163 | | - geocoding_results = None |
164 | | - try: |
165 | | - if FORWARD_OR_REVERSE == 'reverse' or \ |
166 | | - (FORWARD_OR_REVERSE == 'guess' and guess_text_is_coordinate_pair(address)): |
167 | | - # Reverse: |
168 | | - # coordinates -> address, e.g. '40.78,-73.97' => '101, West 91st Street, New York' |
169 | | - lon_lat = address.split(',') |
170 | | - geocoding_results = await geocoder.reverse_geocode_async( |
171 | | - lon_lat[0], lon_lat[1], no_annotations=1) |
172 | | - else: |
173 | | - # Forward: |
174 | | - # address -> coordinates |
175 | | - # note: you may also want to set other optional parameters like |
176 | | - # countrycode, language, etc |
177 | | - # see the full list: https://opencagedata.com/api#forward-opt |
178 | | - geocoding_results = await geocoder.geocode_async(address, no_annotations=1) |
179 | | - except OpenCageGeocodeError as exc: |
180 | | - sys.stderr.write(str(exc) + "\n") |
181 | | - except Exception as exc: |
182 | | - traceback.print_exception(exc, file=sys.stderr) |
183 | | - |
184 | | - try: |
185 | | - if geocoding_results is not None and len(geocoding_results): |
186 | | - geocoding_result = geocoding_results[0] |
187 | | - else: |
188 | | - geocoding_result = None |
189 | | - |
190 | | - await write_one_geocoding_result(geocoding_result, address, address_id) |
191 | | - except Exception as exc: |
192 | | - traceback.print_exception(exc, file=sys.stderr) |
193 | | - |
194 | | - |
195 | | - |
196 | | -async def run_worker(worker_name, queue): |
197 | | - global PROGRESS_BAR |
198 | | - sys.stderr.write(f"Worker {worker_name} starts...\n") |
199 | | - |
200 | | - while True: |
201 | | - work_item = await queue.get() |
202 | | - address_id = work_item['id'] |
203 | | - address = work_item['address'] |
204 | | - await geocode_one_address(address, address_id) |
205 | | - |
206 | | - if SHOW_PROGRESS: |
207 | | - PROGRESS_BAR.update(1) |
208 | | - |
209 | | - queue.task_done() |
210 | | - |
211 | | - |
212 | | - |
213 | | - |
214 | | -async def main(): |
215 | | - global PROGRESS_BAR |
216 | | - assert sys.version_info >= (3, 8), "Script requires Python 3.8 or newer" |
217 | | - |
218 | | - ## 1. Read CSV into a Queue |
219 | | - ## Each work_item is an address and id. The id will be part of the output, |
220 | | - ## easy to add more settings. Named 'work_item' to avoid the words |
221 | | - ## 'address' or 'task' which are used elsewhere |
222 | | - ## |
223 | | - ## https://docs.python.org/3/library/asyncio-queue.html |
224 | | - ## |
225 | | - queue = asyncio.Queue(maxsize=MAX_ITEMS) |
226 | | - |
227 | | - with open(FILENAME_INPUT_CSV, 'r', encoding='utf-8') as infile: |
228 | | - csv_reader = csv.reader(infile, strict=True, skipinitialspace=True) |
229 | | - |
230 | | - for row in csv_reader: |
231 | | - if len(row) == 0: |
232 | | - raise Exception(f"Empty line in input file at line number {csv_reader.line_num}, aborting") |
233 | | - |
234 | | - if FORWARD_OR_REVERSE == 'reverse' or \ |
235 | | - (FORWARD_OR_REVERSE == 'guess' and len(row) > 2 and \ |
236 | | - guess_text_is_coordinate_pair(f"{row[1]},{row[2]}")): |
237 | | - work_item = {'id': row[0], 'address': f"{row[1]},{row[2]}"} |
238 | | - else: |
239 | | - work_item = {'id': row[0], 'address': row[1]} |
240 | | - |
241 | | - await queue.put(work_item) |
242 | | - if queue.full(): |
243 | | - break |
244 | | - |
245 | | - sys.stderr.write(f"{queue.qsize()} work_items in queue\n") |
246 | | - |
247 | | - if SHOW_PROGRESS: |
248 | | - PROGRESS_BAR.total = queue.qsize() |
249 | | - PROGRESS_BAR.refresh() |
250 | | - |
251 | | - ## 2. Create tasks workers. That is coroutines, each taks take work_items |
252 | | - ## from the queue until it's empty. Tasks run in parallel |
253 | | - ## |
254 | | - ## https://docs.python.org/3/library/asyncio-task.html#creating-tasks |
255 | | - ## https://docs.python.org/3/library/asyncio-task.html#coroutine |
256 | | - ## |
257 | | - sys.stderr.write(f"Creating {NUM_WORKERS} task workers...\n") |
258 | | - tasks = [] |
259 | | - for i in range(NUM_WORKERS): |
260 | | - task = asyncio.create_task(run_worker(f'worker {i}', queue)) |
261 | | - tasks.append(task) |
262 | | - |
263 | | - |
264 | | - ## 3. Now workers do the geocoding |
265 | | - ## |
266 | | - sys.stderr.write("Now waiting for workers to finish processing queue...\n") |
267 | | - await queue.join() |
268 | | - |
269 | | - |
270 | | - ## 4. Cleanup |
271 | | - ## |
272 | | - for task in tasks: |
273 | | - task.cancel() |
274 | | - |
275 | | - if SHOW_PROGRESS: |
276 | | - PROGRESS_BAR.close() |
277 | | - |
278 | | - sys.stderr.write("All done.\n") |
279 | | - |
280 | | - |
281 | | -asyncio.run(main()) |
| 3 | +# Replaced by the CLI tool 'opencage', see README.md documentation |
0 commit comments