33import asyncio
44import traceback
55import threading
6- import backoff
7- import certifi
86import random
9- import re
107
11- from tqdm import tqdm
12- from urllib .parse import urlencode
138from contextlib import suppress
9+ from urllib .parse import urlencode
10+ from tqdm import tqdm
11+ import certifi
12+ import backoff
1413from opencage .geocoder import OpenCageGeocode , OpenCageGeocodeError , _query_for_reverse_geocoding
1514
1615class OpenCageBatchGeocoder ():
16+
17+ """ Called from command_line.py
18+ init() receives the parsed command line parameters
19+ geocode() receive an input and output CSV reader/writer and loops over the data
20+ """
21+
1722 def __init__ (self , options ):
1823 self .options = options
1924 self .sslcontext = ssl .create_default_context (cafile = certifi .where ())
@@ -22,7 +27,7 @@ def __init__(self, options):
2227 def __call__ (self , * args , ** kwargs ):
2328 asyncio .run (self .geocode (* args , ** kwargs ))
2429
25- async def geocode (self , input , output ):
30+ async def geocode (self , csv_input , csv_output ):
2631 if not self .options .dry_run :
2732 test = await self .test_request ()
2833 if test ['error' ]:
@@ -33,28 +38,28 @@ async def geocode(self, input, output):
3338 self .options .workers = 1
3439
3540 if self .options .headers :
36- header_columns = next (input , None )
41+ header_columns = next (csv_input , None )
3742 if header_columns is None :
3843 return
3944
4045 queue = asyncio .Queue (maxsize = self .options .limit )
4146
42- read_warnings = await self .read_input (input , queue )
47+ read_warnings = await self .read_input (csv_input , queue )
4348
4449 if self .options .dry_run :
4550 if not read_warnings :
4651 print ('All good.' )
4752 return
4853
4954 if self .options .headers :
50- output .writerow (header_columns + self .options .add_columns )
55+ csv_output .writerow (header_columns + self .options .add_columns )
5156
5257 progress_bar = not (self .options .no_progress or self .options .quiet ) and \
5358 tqdm (total = queue .qsize (), position = 0 , desc = "Addresses geocoded" , dynamic_ncols = True )
5459
5560 tasks = []
5661 for _ in range (self .options .workers ):
57- task = asyncio .create_task (self .worker (output , queue , progress_bar ))
62+ task = asyncio .create_task (self .worker (csv_output , queue , progress_bar ))
5863 tasks .append (task )
5964
6065 # This starts the workers and waits until all are finished
@@ -80,9 +85,9 @@ async def test_request(self):
8085 except Exception as exc :
8186 return { 'error' : exc }
8287
83- async def read_input (self , input , queue ):
88+ async def read_input (self , csv_input , queue ):
8489 any_warnings = False
85- for index , row in enumerate (input ):
90+ for index , row in enumerate (csv_input ):
8691 line_number = index + 1
8792
8893 if len (row ) == 0 :
@@ -138,12 +143,12 @@ async def read_one_line(self, row, row_id):
138143
139144 return { 'row_id' : row_id , 'address' : ',' .join (address ), 'original_columns' : row , 'warnings' : warnings }
140145
141- async def worker (self , output , queue , progress ):
146+ async def worker (self , csv_output , queue , progress ):
142147 while True :
143148 item = await queue .get ()
144149
145150 try :
146- await self .geocode_one_address (output , item ['row_id' ], item ['address' ], item ['original_columns' ])
151+ await self .geocode_one_address (csv_output , item ['row_id' ], item ['address' ], item ['original_columns' ])
147152
148153 if progress :
149154 progress .update (1 )
@@ -152,7 +157,7 @@ async def worker(self, output, queue, progress):
152157 finally :
153158 queue .task_done ()
154159
155- async def geocode_one_address (self , output , row_id , address , original_columns ):
160+ async def geocode_one_address (self , csv_output , row_id , address , original_columns ):
156161 def on_backoff (details ):
157162 if not self .options .quiet :
158163 sys .stderr .write ("Backing off {wait:0.1f} seconds afters {tries} tries "
@@ -195,13 +200,13 @@ async def _geocode_one_address():
195200 'response' : geocoding_result
196201 })
197202
198- await self .write_one_geocoding_result (output , row_id , address , geocoding_result , original_columns )
203+ await self .write_one_geocoding_result (csv_output , row_id , geocoding_result , original_columns )
199204 except Exception as exc :
200205 traceback .print_exception (exc , file = sys .stderr )
201206
202207 await _geocode_one_address ()
203208
204- async def write_one_geocoding_result (self , output , row_id , address , geocoding_result , original_columns = [] ):
209+ async def write_one_geocoding_result (self , csv_output , row_id , geocoding_result , original_columns ):
205210 row = original_columns
206211
207212 for column in self .options .add_columns :
@@ -227,11 +232,10 @@ async def write_one_geocoding_result(self, output, row_id, address, geocoding_re
227232
228233 if self .options .verbose :
229234 self .log (f"Writing row { row_id } " )
230- output .writerow (row )
235+ csv_output .writerow (row )
231236 self .write_counter = self .write_counter + 1
232237
233238
234239 def log (self , message ):
235240 if not self .options .quiet :
236241 sys .stderr .write (f"{ message } \n " )
237-
0 commit comments