@@ -116,42 +116,48 @@ def _instantiate_rpcs(self):
116116 cls = self ._get_rpc_obj (name , meta )
117117 setattr (parent , mname , cls ())
118118
119- def _samples_list (self , n : int = 1 , stream : str = "" , columns : list [str ] = [], average : bool = False , concatenate : bool = False ):
120- if n == 1 :
121- # single sample: dict of samples
122- sample = list (self ._samples (n , stream = stream , columns = columns ))[0 ]
123- # sample.pop("stream")
124- # sample.pop("time")
125- return sample
126- else :
127- samples = list (self ._samples (n , stream = stream , columns = columns ))
128- # bin into streams
129- streams = {}
130- for line in samples :
131- stream_id = line .pop ("stream" , None )
132- if stream_id not in streams :
133- streams [stream_id ] = { "stream" : stream_id }
134- for key , value in line .items ():
135- if key not in streams [stream_id ]:
136- streams [stream_id ][key ] = []
137- streams [stream_id ][key ].append (value )
138- if average :
139- # average down to dict of samples
140- for id in streams .keys ():
141- for key in streams [id ].keys ():
142- if key != "stream" :
143- streams [id ][key ] = sum (streams [id ][key ]) / len (streams [id ][key ])
144-
145- if concatenate :
146- raise NotImplementedError ("Stream concatenation not yet implemented" )
147-
148- return streams
149-
150- def _get_obj_samples (self , name : str , stream : str = "" , columns : list [str ] = [], * args , ** kwargs ):
119+ def _samples_dict (self , n : int = 1 , stream : str = "" , columns : list [str ] = []):
120+ samples = list (self ._samples (n , stream = stream , columns = columns ))
121+ # bin into streams
122+ streams = {}
123+ for line in samples :
124+ stream_id = line .pop ("stream" , None )
125+ if stream_id not in streams :
126+ streams [stream_id ] = { "stream" : stream_id }
127+ for key , value in line .items ():
128+ if key not in streams [stream_id ]:
129+ streams [stream_id ][key ] = []
130+ streams [stream_id ][key ].append (value )
131+ return streams
132+
133+ def _samples_list (self , n : int = 1 , stream : str = "" , columns : list [str ] = [], timeColumn = True , titleRow = True ):
134+ streams = self ._samples_dict (n , stream , columns )
135+ # Convert to list with rows of data. Not super happy about how inefficient this is.
136+ if len (streams .items ()) > 1 :
137+ raise NotImplementedError ("Stream concatenation not yet implemented for two different streams" )
138+ stream = list (streams .values ())[0 ]
139+ stream .pop ('stream' )
140+ if not timeColumn :
141+ stream .pop ('time' )
142+ dataColumns = [column for column in stream .values () ]
143+ dataRows = [list (row ) for row in zip (* dataColumns )]
144+ if titleRow :
145+ columnNames = list (stream .keys ());
146+ dataRows .insert (0 ,columnNames )
147+ return dataRows
148+
149+ def _get_obj_samples_dict (self , name : str , stream : str = "" , columns : list [str ] = [], * args , ** kwargs ):
150+ def samples_method (local_self , * args , ** kwargs ):
151+ # print(f"Sampling {name} from stream {stream} with columns {columns}")
152+ return self ._samples_dict (stream = stream , columns = columns , * args , ** kwargs )
153+ cls = type ('samplesDict' + name ,(), {'__name__' :name , '__call__' :samples_method })
154+ return cls
155+
156+ def _get_obj_samples_list (self , name : str , stream : str = "" , columns : list [str ] = [], * args , ** kwargs ):
151157 def samples_method (local_self , * args , ** kwargs ):
152158 # print(f"Sampling {name} from stream {stream} with columns {columns}")
153159 return self ._samples_list (stream = stream , columns = columns , * args , ** kwargs )
154- cls = type ('samples ' + name ,(), {'__name__' :name , '__call__' :samples_method })
160+ cls = type ('samplesList ' + name ,(), {'__name__' :name , '__call__' :samples_method })
155161 return cls
156162
157163 def _instantiate_samples (self , announce : bool = False ):
@@ -165,7 +171,7 @@ def _instantiate_samples(self, announce: bool = False):
165171 streams_flattened .append (stream + "." + column_name )
166172
167173 # All samples
168- cls = self ._get_obj_samples ("samples" , stream = "" , columns = [])
174+ cls = self ._get_obj_samples_dict ("samples" , stream = "" , columns = [])
169175 setattr (self , 'samples' , cls ())
170176
171177 for stream_column in streams_flattened :
@@ -174,7 +180,7 @@ def _instantiate_samples(self, announce: bool = False):
174180
175181 if not hasattr (parent , stream ):
176182 # All samples for this stream
177- cls = self ._get_obj_samples (stream , stream = stream , columns = [])
183+ cls = self ._get_obj_samples_list (stream , stream = stream , columns = [])
178184 setattr (parent , stream , cls ())
179185 parent = getattr (parent , stream )
180186
@@ -184,14 +190,14 @@ def _instantiate_samples(self, announce: bool = False):
184190 stream_prefix += "." + token
185191 if not hasattr (parent , token ):
186192 #wildcard columns
187- cls = self ._get_obj_samples (token , stream = stream , columns = [stream_prefix [1 :]+ ".*" ])
193+ cls = self ._get_obj_samples_list (token , stream = stream , columns = [stream_prefix [1 :]+ ".*" ])
188194 setattr (parent , token , cls ())
189195 parent = getattr (parent , token )
190196
191197 # specific stream samples
192198 stream , column_name = stream_column .split ("." ,1 )
193199
194- cls = self ._get_obj_samples (mname , stream = stream , columns = [column_name ])
200+ cls = self ._get_obj_samples_list (mname , stream = stream , columns = [column_name ])
195201 setattr (parent , mname , cls ())
196202
197203 def _interact (self ):
0 commit comments