1+ import re
2+ import logging
3+ from datetime import datetime
4+ from datetime import timedelta
5+ from concurrent .futures import ThreadPoolExecutor
6+
17import pandas as pd
28from pandas .errors import ParserError
3- import re
9+
10+ logger = logging .getLogger ("django" )
411
512
613class BaseDf :
@@ -13,6 +20,7 @@ def prepare(self) -> "BaseDf":
1320 self .prepare_value_column ()
1421 self .prepare_identifier_columns ()
1522 self .drop_other_columns ()
23+ self .drop_duplicates ()
1624 return self
1725
1826 def verify_column_count (self ):
@@ -22,10 +30,9 @@ def verify_column_count(self):
2230 def prepare_date_column (self ):
2331 original_name = self .df .columns [- 1 ]
2432 try :
25- # TODO: allow other units of time
2633 self .df [original_name ] = pd .to_datetime (
2734 self .df [original_name ], format = "ISO8601"
28- ). dt . year
35+ )
2936 except (ValueError , ParserError ):
3037 raise BaseDfException ("last column must be a date column" )
3138 self .df .rename (columns = {original_name : "date" }, inplace = True )
@@ -58,6 +65,9 @@ def drop_other_columns(self):
5865 drop = [col for col in self .df .columns if col not in keep ]
5966 self .df .drop (columns = drop , inplace = True )
6067
68+ def drop_duplicates (self ):
69+ self .df .drop_duplicates (["name" , "date" ], inplace = True )
70+
6171
6272class BaseDfException (Exception ):
6373 def __init__ (self , message ):
@@ -66,20 +76,40 @@ def __init__(self, message):
6676
6777
6878class DfProcessor :
69- def __init__ (self , bdf : BaseDf ):
70- self .df = bdf .df
79+ def __init__ (self , bdf : BaseDf , time_unit : str = "year" ):
80+ self .df = bdf .df .copy ()
81+ self .time_unit = time_unit
82+ if self .time_unit == "year" :
83+ self .df ["date" ] = self .df ["date" ].dt .strftime ("%Y-01-01" )
84+ elif self .time_unit == "month" :
85+ self .df ["date" ] = self .df ["date" ].dt .strftime ("%Y-%m-01" )
86+ else :
87+ self .df ["date" ] = self .df ["date" ].dt .strftime ("%Y-%m-%d" )
7188
7289 def elements (self ):
7390 identifiers = [col for col in ["url" , "category" ] if col in self .df .columns ]
7491 if not identifiers :
75- return { name : {} for name in self .df ["name" ].unique ()}
92+ return [{ " name" : name for name in self .df ["name" ].unique ()}]
7693 agg = {col : "first" for col in identifiers }
77- return self .df [["name" , * identifiers ]].groupby ("name" ).agg (agg ).reset_index ().to_dict ("records" )
94+ return (
95+ self .df [["name" , * identifiers ]]
96+ .groupby ("name" )
97+ .agg (agg )
98+ .reset_index ()
99+ .to_dict ("records" )
100+ )
101+
102+ def year_count (self ):
103+ min_year = int (self .df ["date" ].min ()[:4 ])
104+ max_year = int (self .df ["date" ].max ()[:4 ])
105+ return max_year - min_year + 1
78106
79107 def interpolated_df (self ):
80108 df = self .df
109+ names = df ["name" ].unique ()
110+ time_units = self .all_time_units ()
81111 mux = pd .MultiIndex .from_product (
82- [df [ "name" ]. unique (), range ( df [ "date" ]. min (), df [ "date" ]. max () + 1 ) ],
112+ [names , time_units ],
83113 names = ["name" , "date" ],
84114 )
85115 df = (
@@ -96,18 +126,58 @@ def interpolated_df(self):
96126 df ["rank" ] = df .groupby ("date" )["value" ].rank (method = "dense" , ascending = False )
97127 return df
98128
129+ def all_time_units (self ):
130+ min_year = int (self .df ["date" ].min ()[:4 ])
131+ max_year = int (self .df ["date" ].max ()[:4 ])
132+ year_range = range (min_year , max_year + 1 )
133+ if self .time_unit == "year" :
134+ year_range = range (min_year , max_year + 1 )
135+ return [f"{ y } -01-01" for y in year_range ]
136+ elif self .time_unit == "month" :
137+ start_month = int (self .df ["date" ].min ()[5 :7 ])
138+ start = [f"{ min_year } -{ m :02d} -01" for m in range (start_month , 13 )]
139+ year_range = list (year_range )
140+ year_range .pop (0 )
141+ if len (year_range ) == 0 :
142+ return start
143+ year_range .pop (- 1 )
144+ between = [
145+ f"{ y } -{ m } "
146+ for m in [
147+ "01-01" ,
148+ "02-01" ,
149+ "03-01" ,
150+ "04-01" ,
151+ "05-01" ,
152+ "06-01" ,
153+ "07-01" ,
154+ "08-01" ,
155+ "09-01" ,
156+ "10-01" ,
157+ "11-01" ,
158+ "12-01" ,
159+ ]
160+ for y in year_range
161+ ]
162+ end_month = int (self .df ["date" ].max ()[5 :7 ])
163+ end = [f"{ max_year } -{ m :02d} -01" for m in range (1 , end_month + 1 )]
164+ return [* start , * between , * end ]
165+ else :
166+ start_date = datetime .strptime (self .df ["date" ].min (), "%Y-%m-%d" ).date ()
167+ end_date = datetime .strptime (self .df ["date" ].max (), "%Y-%m-%d" ).date ()
168+ days = (end_date - start_date ).days + 1
169+ days_index = []
170+ for n in range (days ):
171+ date = start_date + timedelta (days = n )
172+ days_index .append (date .strftime ("%Y-%m-%d" ))
173+ return days_index
174+
99175 def values_by_date (self ):
100176 ip = self .interpolated_df ()
101177 vl = []
102- for date in list (sorted (ip ["date" ].unique ())):
103- date = int (date )
104- values = (
105- ip .loc [ip ["date" ] == date ]
106- .drop (columns = "date" )
107- .sort_values ("rank" )
108- .to_dict (orient = "records" )
109- )
110- vl .append ({"date" : f"{ date } -01-01" , "values" : values })
178+ for date , grouped in ip .set_index ("date" ).groupby (level = 0 ):
179+ values = grouped .sort_values ("rank" ).to_dict (orient = "records" )
180+ vl .append ({"date" : date , "values" : values })
111181 return vl
112182
113183
@@ -119,5 +189,17 @@ def process_bar_chart_race(df):
119189 return {"failed" : e .message }
120190 proc = DfProcessor (bdf )
121191 elements = proc .elements ()
122- values_by_date = proc .values_by_date ()
123- return {"elements" : elements , "values_by_date" : values_by_date }
192+ data = {"elements" : elements }
193+ run_daily = proc .year_count () <= 25
194+ proc_monthly = DfProcessor (bdf , time_unit = "month" )
195+ proc_daily = DfProcessor (bdf , time_unit = "day" )
196+ with ThreadPoolExecutor () as executor :
197+ t = executor .submit (proc .values_by_date )
198+ t_monthly = executor .submit (proc_monthly .values_by_date )
199+ if run_daily :
200+ t_daily = executor .submit (proc_daily .values_by_date )
201+ data ["values_by_date" ] = t .result ()
202+ data ["values_by_date_monthly" ] = t_monthly .result ()
203+ if run_daily :
204+ data ["values_by_date_daily" ] = t_daily .result ()
205+ return data
0 commit comments