1+
2+ import psycopg
3+ import requests
4+ import os
5+ import logging
6+ import asyncio
7+ import aiohttp
8+ from dotenv import load_dotenv
9+ from fastapi_cache .decorator import cache
10+ import pandas as pd
11+ from sqlalchemy import create_engine
12+ from datetime import datetime
13+
14+ # from utils.consts import subcategory_map
15+
16+ log = logging .getLogger (__name__ )
17+
18+ load_dotenv ()
19+ token = os .getenv ("GITHUB_TOKEN" )
20+ headers = {
21+ "Authorization" : f"Bearer { token } "
22+ }
23+ base_contents_url = "https://api.github.com/repos/dvrpc/community-profiles-content/contents"
24+
25+ subcategory_map = {
26+ 'demographics-housing' : {'demographics' : [], 'housing' : []},
27+ 'economy' : {'employment' : [], 'income-poverty' : [], 'transportation' : []},
28+ 'active-transportation' : {'cycling' : [], 'pedestrian' : [], 'commute' : []},
29+ 'safety-health' : {'crash' : [], 'health' : []},
30+ 'freight' : {'freight' : []},
31+ 'environment' : {'open-space' : [], 'planning' : []},
32+ 'transit' : {'transit' : [], 'tip' : []},
33+ 'roadways' : {'conditions' : [], 'tip' : []}
34+ }
35+
36+ host = os .getenv ("DB_HOST" )
37+ dbname = os .getenv ("DB_NAME" )
38+ user = os .getenv ("DB_USER" )
39+ password = os .getenv ("DB_PASS" )
40+ port = os .getenv ("DB_PORT" )
41+
42+ uri = f"postgresql://{ user } :{ password } @{ host } :{ port } /{ dbname } "
43+ print (uri )
44+ # establish connection with the database
45+ engine = create_engine (uri )
46+
47+ async def get_viz_download_url (geo_level , category , subcategory , topic ):
48+ url = f"{ base_contents_url } /{ geo_level } /viz/{ category } /{ subcategory } /{ topic } .json"
49+ try :
50+ r = requests .get (url , headers = headers )
51+ r .raise_for_status ()
52+
53+ except requests .exceptions .HTTPError as e :
54+ log .error (
55+ f"Error fetching { geo_level } contents: \n { e } " )
56+
57+ data = r .json ()
58+
59+ return data ['download_url' ]
60+
61+
62+ async def get_download_urls (geo_level , type ):
63+ urls = []
64+
65+ for key , value in subcategory_map .items ():
66+ for subcategory in value .keys ():
67+ urls .append ({
68+ 'category' : key ,
69+ 'subcategory' : subcategory ,
70+ 'geo_level' : geo_level ,
71+ 'url' : f"{ base_contents_url } /{ geo_level } /{ type } /{ key } /{ subcategory } "
72+ })
73+
74+
75+ async with aiohttp .ClientSession () as session :
76+ tasks = []
77+ for url in urls :
78+ tasks .append (get_download_url (session , url , type ))
79+
80+ results = await asyncio .gather (* tasks )
81+ flattened = [item for sublist in results for item in sublist ]
82+ return flattened
83+
84+ async def get_download_url (session , url , type ):
85+ try :
86+ async with session .get (url ['url' ], headers = headers ) as response :
87+ if response .status == 200 :
88+ data = await response .json ()
89+ download_urls = []
90+ for file in data :
91+ index = - 5 if type == 'viz' else - 3
92+ download_urls .append ({
93+ 'category' : url ['category' ],
94+ 'subcategory' : url ['subcategory' ],
95+ 'geo_level' : url ['geo_level' ],
96+ 'name' : file ['name' ][:index ],
97+ 'url' : file ['download_url' ],
98+ })
99+ return download_urls
100+ else :
101+ log .info (response .status )
102+ except aiohttp .ClientConnectionError as e :
103+ log .error (f'Connection error: { e } ' )
104+
105+ async def get_files (urls ):
106+ async with aiohttp .ClientSession () as session :
107+ tasks = []
108+ for url in urls :
109+ tasks .append (get_file (session , url ))
110+
111+ results = await asyncio .gather (* tasks )
112+ return results
113+
114+
115+ async def get_file (session , url ):
116+ try :
117+ async with session .get (url ['url' ]) as response :
118+ if response .status == 200 :
119+ text = await response .text ()
120+ return {
121+ 'category' : url ['category' ],
122+ 'subcategory' : url ['subcategory' ],
123+ 'geo_level' : url ['geo_level' ],
124+ 'name' : url ['name' ],
125+ 'file' : text ,
126+ }
127+ else :
128+ log .info (response .status )
129+ except aiohttp .ClientConnectionError as e :
130+ log .error (f'Connection error: { e } ' )
131+
132+
133+ # def get_file(url):
134+ # try:
135+ # r = requests.get(url)
136+ # r.raise_for_status()
137+
138+ # except requests.exceptions.HTTPError as e:
139+ # log.error(
140+ # f"Error fetching {url}: \n{e}")
141+
142+ # return r.json()
143+
144+ async def save_content (geo_level ):
145+ all_urls = []
146+ for geo_level in ['region' , 'county' , 'municipality' ]:
147+ md_download_urls = await get_download_urls (geo_level , 'md' )
148+ all_urls += md_download_urls
149+
150+ files = await get_files (all_urls )
151+ df = pd .DataFrame (files )
152+
153+ current_local_time = datetime .now ()
154+ df ['create_date' ] = current_local_time
155+ df .to_sql ('content' , con = engine , if_exists = 'replace' ,index = False )
156+
157+ async def save_visualizations (geo_level ):
158+ all_urls = []
159+
160+ for geo_level in ['region' , 'county' , 'municipality' ]:
161+ viz_download_urls = await get_download_urls (geo_level , 'viz' )
162+ all_urls += viz_download_urls
163+
164+
165+ files = await get_files (all_urls )
166+ df = pd .DataFrame (files )
167+
168+ current_local_time = datetime .now ()
169+ df ['create_date' ] = current_local_time
170+ df .to_sql ('visualizations' , con = engine , if_exists = 'replace' ,index = False )
171+
172+ asyncio .run (save_content ('county' ))
173+ asyncio .run (save_visualizations ('county' ))
174+
175+
176+
177+
178+ # content_map = copy.deepcopy(subcategory_map)
179+ # for md in files:
180+ # content_map[md['category']][md['subcategory']].append({
181+ # 'name': md['name'],
182+ # 'content': content
183+ # })
184+
185+ # return content_map
0 commit comments