55import numpy as np
66import openai
77from pydantic import BaseModel
8- from multiprocessing . pool import Pool
8+ from concurrent . futures import ProcessPoolExecutor
99from action .utils import avion_video_loader
10+ import torch
1011import cv2
12+ from pathlib import Path
1113
1214client = openai .OpenAI (api_key = os .environ .get ("OPENAI_API_KEY" ))
1315
@@ -25,11 +27,21 @@ class MultiChoiceResponse(BaseModel):
2527 """
2628
2729 explanation : str
28-
29-
3030def split_indices (indices , num_chunks ):
31+ # Calculate the size of each chunk and the remainder
3132 chunk_size = len (indices ) // num_chunks
32- return [indices [i :i + chunk_size ] for i in range (0 , len (indices ), chunk_size )]
33+ remainder = len (indices ) % num_chunks
34+
35+ # Create chunks, distributing the remainder across the first few chunks
36+ chunks = []
37+ start = 0
38+ for i in range (num_chunks ):
39+ # Each of the first 'remainder' chunks will have one extra element
40+ end = start + chunk_size + (1 if i < remainder else 0 )
41+ chunks .append (indices [start :end ])
42+ start = end
43+
44+ return chunks
3345
3446class GPTAnnotator :
3547 def __init__ (self , ann_file , data_root , clip_length = 32 ):
@@ -51,8 +63,10 @@ def prepare_multiple_images(self, images):
5163
5264 """
5365 encoded_image_list = []
54-
5566 for image in images :
67+
68+ if isinstance (image , torch .Tensor ):
69+ image = image .cpu ().detach ().numpy ()
5670 # images from matplotlib etc.
5771 if isinstance (image , io .BytesIO ):
5872 image_bytes = image
@@ -82,6 +96,7 @@ def extract_frames(self, data_root, vid_path, start_second, end_second):
8296 'MP4' ,
8397 start_second ,
8498 end_second ,
99+ chunk_len = 15 ,
85100 clip_length = self .clip_length ,
86101 threads = 1 ,
87102 fast_rrc = False ,
@@ -98,52 +113,69 @@ def parse_conversation(self, item):
98113 human_dict = conversations [0 ]
99114
100115 # the offset is to remove the quote '
101- option_start = human_dict ['value' ].index [ '[' ] + 2
102- option_end = human_dict ['value' ].index [ ']' ] - 1
116+ option_start = human_dict ['value' ].index ( '[' ) + 2
117+ option_end = human_dict ['value' ].index ( ']' ) - 1
103118
104119 option_text = human_dict ['value' ][option_start :option_end ]
105120 gpt_dict = conversations [1 ]
106121 gt_answer = gpt_dict ['value' ]
122+ gt_answer = gt_answer [gt_answer .index ('.' ):].strip ()
107123
108124 assert human_dict ['from' ] == 'human' and gpt_dict ['from' ] == 'gpt'
109125
110126 ret = {'options' : option_text ,
111127 'gt_answer' : gt_answer ,
112128 'start_second' : item ['start_timestamp' ],
113- 'end_second' : item ['end_timestemp ' ]}
129+ 'end_second' : item ['end_timestamp ' ]}
114130
115131 return ret
116132
117133 def annotate (self , indices ):
118134
119135 data_batch = [self .data [i ] for i in range (len (self .data )) if i in indices ]
120136
121- for item in data_batch :
137+ ret = {}
138+ for index in indices :
139+ item = self .data [index ]
122140 start_timestamp = item ['start_timestamp' ]
123141 end_timestamp = item ['end_timestamp' ]
124142 vid_path = '{}/{}' .format (item ['video' ].split ('-' )[0 ], item ['video' ].split ('-' )[1 ])
125143 frames , time_meta = self .extract_frames (self .data_root , vid_path , start_timestamp , end_timestamp )
126- data_item = self .parse_conversation (item )
127- anno = self .annotate_images (frames , data_item )
128- print (anno )
144+ parsed_item = self .parse_conversation (item )
145+ gpt_answer = self .annotate_images (frames , parsed_item ).explanation
146+ item ['conversations' ][1 ]['value' ] = gpt_answer
147+ ret [index ] = item
129148 break
130149
150+ return ret
151+
131152 def annotate_images (self , images , data_item ):
132153 """
133154 Annotate with mc_data
134155 {
135156 }
136157 """
137158 gt_answer = data_item ['gt_answer' ]
138- option_text = data_item ['option_text ' ]
159+ option_text = data_item ['options ' ]
139160 start_second = data_item ['start_second' ]
140161 end_second = data_item ['end_second' ]
141162 temperature = 0
142163 system_prompt_prefix = f"""
143- You are seeing video frames from an egocentric view. You are determining what action the person is performing.
144- The video's start time is { start_second } and the end time is { end_second } .
145- In a multi-choice video question answering, you were given following options { option_text } and the correct answer is { gt_answer } .
146- Please describe what you see and why wrong answers are wrong and why right answer is right.
164+ You are seeing video frames from an egocentric view of a person.
165+ Please talk as if you are the person in the video and describe what action you are performing.
166+ To assist you for how to describe the action, the video's start time is { start_second } and the end time is { end_second } and the duration is { end_second - start_second } seconds.
167+ To further assist you for how to describe the action, note that in a multi-choice video question answering, you were given following options { option_text } and the correct answer is { gt_answer } .
168+ In addition to describe what you see, why wrong answers were wrong and why right answer was right.
169+ When you explain why wrong answers were wrong and why right answer was right, you should use the following flow of reasoning:
170+
171+ The flow of reasoning:
172+ 1. What objects need to be visible to support the answer?
173+ 2. What sequence of actions before and after the current action need to be seen to support the answer?
174+ 3. Whether the duration in time supports that answer?
175+
176+ Based on the answers above, why right answer is right and why wrong answers were wrong.
177+
178+
147179"""
148180 system_prompt_suffix = """"""
149181
@@ -165,21 +197,54 @@ def annotate_images(self, images, data_item):
165197 return response .choices [0 ].message .parsed
166198
167199
168- def annotate_using_chatgpt ():
169- """
170- Multi processing to speed up
171- """
172- with Pool () as pool :
173- pass
174- #pool.starmap(annotate, task_args)
200+ def process_subset (indices_subset , train_file_path , root ):
201+ # Initialize a new annotator instance within each process
202+ annotator = GPTAnnotator (train_file_path , root )
203+ return annotator .annotate (indices_subset )
175204
176- pass
177205
206+ if __name__ == '__main__' :
207+ #train_file_path = '/storage-rcp-pure/upmwmathis_scratch/shaokai/EK100_inst_train/avion_mc_top10/train_convs_narration.jsonl'
208+ #root = '/storage-rcp-pure/upmwmathis_scratch/shaokai/EK100'
209+ train_file_path = '/data/EK100_inst_train/avion_mc_top10/train_convs_narration.jsonl'
210+ root = '/data/EK100/EK100_320p_15sec_30fps_libx264'
211+
212+ num_cores = 2 #os.cpu_count()
178213
214+ print (f'Using { num_cores } cores thus splitting the data into { num_cores } chunks' )
179215
180- if __name__ == '__main__' :
181- train_file_path = '/storage-rcp-pure/upmwmathis_scratch/shaokai/EK100_inst_train/avion_mc_top10/train_convs_narration.jsonl'
182- root = '/storage-rcp-pure/upmwmathis_scratch/shaokai/EK100'
216+ with open (train_file_path , 'r' ) as f :
217+ num_lines = len ([line for line in f ])
183218
219+ print (f'Total number of lines in the file: { num_lines } ' )
220+ indices = list (range (num_lines ))
221+ print ('indices' , len (indices ))
184222
185- GPTAnnotator (train_file_path , root )
223+ indices_groups = split_indices (indices , num_cores )
224+
225+ print ('number of groups' )
226+ print (len (indices_groups ))
227+
228+ with ProcessPoolExecutor (max_workers = num_cores ) as executor :
229+ # Pass additional arguments to the function
230+ futures = [executor .submit (process_subset , group , train_file_path , root ) for group in indices_groups ]
231+
232+ # Wait for all futures to complete
233+ combined_results = {}
234+ for future in futures :
235+ result_dict = future .result ()
236+ combined_results .update (result_dict )
237+
238+ keys = sorted (list (combined_results .keys ()))
239+
240+ print ('resulted number of keys' , len (keys ))
241+
242+ result = []
243+ for key in keys :
244+ result .append (combined_results [key ])
245+
246+ anno_root = Path (train_file_path ).parent
247+
248+ with open (anno_root / 'gpt_annotated.jsonl' , 'w' ) as f :
249+ for item in result :
250+ f .write (json .dumps (item ) + '\n ' )
0 commit comments