1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15+ import itertools
1516import logging
1617import os
1718import platform
@@ -168,40 +169,29 @@ def upload_stats(self, view_data):
168169 and create time series for each value
169170 """
170171 view_data_set = utils .uniq (view_data )
171- requests = self .make_request (view_data_set , MAX_TIME_SERIES_PER_UPLOAD )
172- for request in requests :
173- self .client .create_time_series (request [CONS_NAME ],
174- request [CONS_TIME_SERIES ])
175-
176- def make_request (self , view_data , limit ):
172+ time_series_batches = self .create_batched_time_series (
173+ view_data_set , MAX_TIME_SERIES_PER_UPLOAD )
174+ for time_series_batch in time_series_batches :
175+ self .client .create_time_series (
176+ self .client .project_path (self .options .project_id ),
177+ time_series_batch )
178+
179+ def create_batched_time_series (self , view_data , batch_size ):
177180 """ Create the data structure that will be
178181 sent to Stackdriver Monitoring
179182 """
180- requests = []
181- time_series = []
182-
183- resource = self .options .resource
184- metric_prefix = self .options .metric_prefix
185- for v_data in view_data :
186- series = self .create_time_series_list (v_data , resource ,
187- metric_prefix )
188- time_series .extend (series )
189-
190- project_id = self .options .project_id
191- request = {}
192- request [CONS_NAME ] = self .client .project_path (project_id )
193- request [CONS_TIME_SERIES ] = time_series
194- requests .append (request )
195-
196- if len (time_series ) == int (limit ):
197- time_series = []
198- return requests
183+ time_series_list = itertools .chain .from_iterable (
184+ self .create_time_series_list (
185+ v_data , self .options .resource , self .options .metric_prefix )
186+ for v_data in view_data )
187+ return list (utils .window (time_series_list , batch_size ))
199188
200189 def create_time_series_list (self , v_data , option_resource_type ,
201190 metric_prefix ):
202191 """ Create the TimeSeries object based on the view data
203192 """
204193 time_series_list = []
194+ aggregation_type = v_data .view .aggregation .aggregation_type
205195 tag_agg = v_data .tag_value_aggregation_data_map
206196 for tag_value , agg in tag_agg .items ():
207197 series = monitoring_v3 .types .TimeSeries ()
@@ -211,15 +201,12 @@ def create_time_series_list(self, v_data, option_resource_type,
211201 set_monitored_resource (series , option_resource_type )
212202
213203 point = series .points .add ()
214- if isinstance (
215- agg ,
216- aggregation .aggregation_data .DistributionAggregationData ):
217- agg_data = tag_agg .get (tag_value )
204+ if aggregation_type is aggregation .Type .DISTRIBUTION :
218205 dist_value = point .value .distribution_value
219- dist_value .count = agg_data .count_data
220- dist_value .mean = agg_data .mean_data
206+ dist_value .count = agg .count_data
207+ dist_value .mean = agg .mean_data
221208
222- sum_of_sqd = agg_data .sum_of_sqd_deviations
209+ sum_of_sqd = agg .sum_of_sqd_deviations
223210 dist_value .sum_of_squared_deviation = sum_of_sqd
224211
225212 # Uncomment this when stackdriver supports Range
@@ -233,23 +220,25 @@ def create_time_series_list(self, v_data, option_resource_type,
233220 # [0, first_bound).
234221 bounds .extend ([0 ])
235222 buckets .extend ([0 ])
236- bounds .extend (list (map (float , agg_data .bounds )))
237- buckets .extend (list (map (int , agg_data .counts_per_bucket )))
238- elif isinstance (agg ,
239- aggregation .aggregation_data .CountAggregationData ):
223+ bounds .extend (list (map (float , agg .bounds )))
224+ buckets .extend (list (map (int , agg .counts_per_bucket )))
225+ elif aggregation_type is aggregation .Type .COUNT :
240226 point .value .int64_value = agg .count_data
241- elif isinstance (
242- agg , aggregation .aggregation_data .SumAggregationDataFloat ):
243- point .value .double_value = agg .sum_data
244- elif not isinstance (
245- agg , aggregation .aggregation_data
246- .LastValueAggregationData ): # pragma: NO COVER
227+ elif aggregation_type is aggregation .Type .SUM :
228+ if isinstance (v_data .view .measure , measure .MeasureInt ):
229+ # TODO: Add implementation of sum aggregation that does not
230+ # store it's data as a float.
231+ point .value .int64_value = int (agg .sum_data )
232+ if isinstance (v_data .view .measure , measure .MeasureFloat ):
233+ point .value .double_value = float (agg .sum_data )
234+ elif aggregation_type is aggregation .Type .LASTVALUE :
247235 if isinstance (v_data .view .measure , measure .MeasureInt ):
248236 point .value .int64_value = int (agg .value )
249- elif isinstance (v_data .view .measure , measure .MeasureFloat ):
237+ if isinstance (v_data .view .measure , measure .MeasureFloat ):
250238 point .value .double_value = float (agg .value )
251239 else :
252- point .value .string_value = str (tag_value [0 ])
240+ raise TypeError ("Unsupported aggregation type: %s" %
241+ type (v_data .view .aggregation ))
253242
254243 start = datetime .strptime (v_data .start_time , EPOCH_PATTERN )
255244 end = datetime .strptime (v_data .end_time , EPOCH_PATTERN )
@@ -262,8 +251,7 @@ def create_time_series_list(self, v_data, option_resource_type,
262251 secs = point .interval .end_time .seconds
263252 point .interval .end_time .nanos = int ((timestamp_end - secs ) * 10 ** 9 )
264253
265- if type (agg ) is not aggregation .aggregation_data .\
266- LastValueAggregationData : # pragma: NO COVER
254+ if aggregation_type is not aggregation .Type .LASTVALUE :
267255 if timestamp_start == timestamp_end :
268256 # avoiding start_time and end_time to be equal
269257 timestamp_start = timestamp_start - 1
@@ -471,25 +459,13 @@ def set_metric_labels(series, view, tag_values):
471459 "TagKeys and TagValues don't have same size."
472460 ) # pragma: NO COVER
473461
474- for ii , tag_value in enumerate (tag_values ):
475- if tag_value is not None :
476- metric_label = remove_non_alphanumeric (view .columns [ii ])
477- series .metric .labels [metric_label ] = tag_value
462+ for key , value in zip (view .columns , tag_values ):
463+ if value is not None :
464+ series .metric .labels [remove_non_alphanumeric (key )] = value
478465 series .metric .labels [OPENCENSUS_TASK ] = get_task_value ()
479466
480467
481468def remove_non_alphanumeric (text ):
482469 """ Remove characters not accepted in labels key
483470 """
484471 return str (re .sub ('[^0-9a-zA-Z ]+' , '' , text )).replace (" " , "" )
485-
486-
487- def as_float (value ):
488- """ Converts a value to a float if possible
489- On success, it returns (converted_value, True)
490- On failure, it returns (None, False)
491- """
492- try :
493- return float (value ), True
494- except Exception : # Catch all exception including ValueError
495- return None , False
0 commit comments