31
31
# The years in the dataset.
32
32
YEARS = [19 , 20 , 21 , 22 , 23 , 24 ]
33
33
34
+ _MAX_EXAMPLES_PER_DAY = 24 * 12 # 288 per day
35
+
34
36
_REWARD_RESPONSES = [
35
37
'agentRewardValue' ,
36
38
'productivityReward' ,
@@ -196,18 +198,20 @@ def _generate_examples(self, path: epath.Path, year: int, pipeline):
196
198
197
199
return (
198
200
pipeline
199
- | f'CreateDates_{ year } ' >> beam .Create (all_dates )
201
+ | f'CreateDates_{ year } ' >> beam .Create (enumerate ( all_dates ) )
200
202
| f'ProcessDate_{ year } ' >> beam .FlatMap (process_date , path = path )
201
203
| f'Reshuffle_{ year } ' >> beam .Reshuffle ()
202
204
)
203
205
204
206
205
207
def process_date (
206
- start_time : pd .Timestamp ,
208
+ day_index_and_start_time : tuple [ int , pd .Timestamp ] ,
207
209
path : epath .Path ,
208
210
) -> Iterable [tuple [int , dict [str , Any ]]]:
209
211
"""Process a single date."""
212
+ day_index , start_time = day_index_and_start_time
210
213
end_time = start_time + pd .Timedelta (hours = 23 )
214
+ key_offset = day_index * _MAX_EXAMPLES_PER_DAY
211
215
212
216
reader = controller_reader .ProtoReader (path )
213
217
observation_responses = reader .read_observation_responses (
@@ -230,6 +234,12 @@ def process_date(
230
234
key = lambda o : to_ns_timestamp (o .start_timestamp ),
231
235
)
232
236
237
+ if len (observation_responses ) > _MAX_EXAMPLES_PER_DAY :
238
+ raise ValueError (
239
+ f'Too many observation responses for date { start_time } : '
240
+ f'{ len (observation_responses )} > { _MAX_EXAMPLES_PER_DAY } '
241
+ )
242
+
233
243
for i in range (len (observation_responses )):
234
244
observation_response = json_format .MessageToDict (observation_responses [i ])
235
245
action_response = json_format .MessageToDict (action_responses [i ])
@@ -256,7 +266,7 @@ def process_date(
256
266
reward_response [val ] = - 1 # sentinal value
257
267
258
268
beam .metrics .Metrics .counter (f'date_{ start_time } ' , 'example_count' ).inc ()
259
- key = int ( f' { start_time . toordinal () } { i :05d } ' )
269
+ key = key_offset + i
260
270
yield key , {
261
271
'observation' : observation_response ,
262
272
'action' : action_response ,
0 commit comments