Skip to content

Commit a49a1ef

Browse files
committed
resurrect an older batch example, it is still useful
1 parent 85312bf commit a49a1ef

File tree

1 file changed

+137
-1
lines changed

1 file changed

+137
-1
lines changed

examples/batch.py

Lines changed: 137 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,139 @@
11
#!/usr/bin/env python3
22

3-
# Replaced by the CLI tool 'opencage', see README.md documentation
3+
# Example script we used between 2021 and 2023. It's now being replaced by
4+
# the much more powerful CLI tool (see README.md file).
5+
#
6+
# Git version history will show how we kept adding features. Below is a
7+
# version with less features, on purpose, for better readability.
8+
#
9+
# Background tutorial on async programming with Python
10+
# https://realpython.com/async-io-python/
11+
#
12+
# Requires Python 3.7 or newer. Tested with 3.8 and 3.9.
13+
#
14+
# Installation:
15+
# pip3 install opencage
16+
#
17+
18+
import sys
19+
import csv
20+
import asyncio
21+
from opencage.geocoder import OpenCageGeocode
22+
23+
API_KEY = ''
24+
INFILE = 'file_to_geocode.csv'
25+
OUTFILE = 'file_geocoded.csv'
26+
MAX_ITEMS = 100 # Set to 0 for unlimited
27+
NUM_WORKERS = 3 # For 10 requests per second try 2-5
28+
29+
csv_writer = csv.writer(open(OUTFILE, 'w', encoding='utf8', newline=''))
30+
31+
async def write_one_geocoding_result(geocoding_result, address, address_id):
32+
if geocoding_result is not None:
33+
geocoding_result = geocoding_result[0]
34+
row = [
35+
address_id,
36+
geocoding_result['geometry']['lat'],
37+
geocoding_result['geometry']['lng'],
38+
# Any of these components might be empty :
39+
geocoding_result['components'].get('country', ''),
40+
geocoding_result['components'].get('county', ''),
41+
geocoding_result['components'].get('city', ''),
42+
geocoding_result['components'].get('postcode', ''),
43+
geocoding_result['components'].get('road', ''),
44+
geocoding_result['components'].get('house_number', ''),
45+
geocoding_result['confidence'],
46+
geocoding_result['formatted']
47+
]
48+
49+
else:
50+
row = [
51+
address_id,
52+
0, # not to be confused with https://en.wikipedia.org/wiki/Null_Island
53+
0,
54+
'',
55+
'',
56+
'',
57+
'',
58+
'',
59+
'',
60+
-1, # confidence values are 1-10 (lowest to highest), use -1 for unknown
61+
''
62+
]
63+
sys.stderr.write(f"not found, writing empty result: ${address}\n")
64+
csv_writer.writerow(row)
65+
66+
67+
async def geocode_one_address(address, address_id):
68+
async with OpenCageGeocode(API_KEY) as geocoder:
69+
geocoding_result = await geocoder.geocode_async(address)
70+
try:
71+
await write_one_geocoding_result(geocoding_result, address, address_id)
72+
except Exception as e:
73+
sys.stderr.write(e)
74+
75+
76+
77+
async def run_worker(worker_name, queue):
78+
sys.stderr.write(f"Worker ${worker_name} starts...\n")
79+
while True:
80+
work_item = await queue.get()
81+
address_id = work_item['id']
82+
address = work_item['address']
83+
await geocode_one_address(address, address_id)
84+
queue.task_done()
85+
86+
87+
88+
89+
async def main():
90+
assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
91+
92+
## 1. Read CSV into a Queue
93+
## Each work_item is an address and id. The id will be part of the output,
94+
## easy to add more settings. Named 'work_item' to avoid the words
95+
## 'address' or 'task' which are used elsewhere
96+
##
97+
## https://docs.python.org/3/library/asyncio-queue.html
98+
##
99+
queue = asyncio.Queue(maxsize=MAX_ITEMS)
100+
101+
csv_reader = csv.reader(open(INFILE, 'r', encoding='utf8'))
102+
103+
for row in csv_reader:
104+
work_item = {'id': row[0], 'address': row[1]}
105+
await queue.put(work_item)
106+
if queue.full():
107+
break
108+
109+
sys.stderr.write(f"${queue.qsize()} work_items in queue\n")
110+
111+
112+
## 2. Create tasks workers. That is coroutines, each taks take work_items
113+
## from the queue until it's empty. Tasks run in parallel
114+
##
115+
## https://docs.python.org/3/library/asyncio-task.html#creating-tasks
116+
## https://docs.python.org/3/library/asyncio-task.html#coroutine
117+
##
118+
sys.stderr.write(f"Creating ${NUM_WORKERS} task workers...\n")
119+
tasks = []
120+
for i in range(NUM_WORKERS):
121+
task = asyncio.create_task(run_worker(f'worker {i}', queue))
122+
tasks.append(task)
123+
124+
125+
## 3. Now workers do the geocoding
126+
##
127+
sys.stderr.write("Now waiting for workers to finish processing queue...\n")
128+
await queue.join()
129+
130+
131+
## 4. Cleanup
132+
##
133+
for task in tasks:
134+
task.cancel()
135+
136+
sys.stderr.write("All done.\n")
137+
138+
139+
asyncio.run(main())

0 commit comments

Comments
 (0)