2323except ImportError :
2424 from elasticsearch import VERSION as es_VERSION , Elasticsearch
2525
26- from pbench import es_index
26+ from pbench import tstos , es_index , es_put_template
2727
2828
2929_VERSION_ = "0.2.0.0"
@@ -33,8 +33,10 @@ _DEBUG = 0
3333# global - defaults to normal dict, ordered dict for unittests
3434_dict_const = dict
3535
36- # 30 second timeout talking to Elasticsearch
37- _read_timeout = 30
36+ # 100,000 minute timeout talking to Elasticsearch; basically we just don't
37+ # want to timeout waiting for Elasticsearch and then have to retry, as that
38+ # can add undue burden to the Elasticsearch cluster.
39+ _read_timeout = 100000 * 60
3840
3941# All indexing uses "create" (instead of "index") to avoid updating
4042# existing records, allowing us to detect duplicates.
@@ -112,16 +114,12 @@ def fetch_mapping(mapping_fn):
112114 raise MappingFileError ("Invalid mapping file: %s" % mapping_fn )
113115 return keys [0 ], mapping
114116
115- def es_template (es , options , INDEX_PREFIX , INDEX_VERSION , config ):
117+ def es_template (es , options , INDEX_PREFIX , INDEX_VERSION , config , dbg = 0 ):
116118 """Load the various Elasticsearch index templates required by pbench.
117119
118120 We first load the pbench-run index templates, and then we construct and
119121 load the templates for each tool data index.
120122 """
121- if not es :
122- # Don't do anything if we don't have an Elasticsearch client object.
123- return
124-
125123 index_settings = _dict_const (
126124 gateway = _dict_const (
127125 local = _dict_const (
@@ -181,7 +179,7 @@ def es_template(es, options, INDEX_PREFIX, INDEX_VERSION, config):
181179 for toolname ,frag in tool_mapping_frags .items ():
182180 tool_skel = copy .deepcopy (skel )
183181 tool_skel ['properties' ][toolname ] = frag
184- tool_mapping = _dict_const (("pbench-tool-data-%s" % toolname , tool_skel ))
182+ tool_mapping = _dict_const ([ ("pbench-tool-data-%s" % toolname , tool_skel )] )
185183 tool_template_body = _dict_const (
186184 template = '%s.tool-data-%s.*' % (INDEX_PREFIX , toolname ),
187185 settings = index_settings ,
@@ -192,7 +190,7 @@ def es_template(es, options, INDEX_PREFIX, INDEX_VERSION, config):
192190 results_mappings = _dict_const ()
193191 for mapping_fn in glob .iglob (os .path .join (MAPPING_DIR , "result-*.json" )):
194192 key , mapping = fetch_mapping (mapping_fn )
195- if options . debug_unittest :
193+ if dbg > 0 :
196194 print ("fetch_mapping: {}" .format (key ))
197195 from pprint import pprint ;
198196 pprint (mapping )
@@ -204,25 +202,26 @@ def es_template(es, options, INDEX_PREFIX, INDEX_VERSION, config):
204202 settings = index_settings ,
205203 mappings = results_mappings )
206204
207- if not es :
208- # Don't do anything if we don't have an Elasticsearch client object.
209- print (run_template_name , json .dumps (run_template_body , indent = 4 , sort_keys = True ))
210- for name , body in tool_templates :
211- print (name , json .dumps (body , indent = 4 , sort_keys = True ))
212- return
213-
214205 # FIXME: We should query to see if a template exists, and only
215206 # create it if it does not, or if the version of it is older than
216207 # expected.
217208 try :
218209 # Pbench Run template
219- res = es .indices .put_template (name = run_template_name , body = run_template_body )
210+ beg , end , retries = es_put_template (es , name = run_template_name , body = run_template_body )
211+ successes = 1
220212 # Pbench Tool Data templates
221213 for name , body in tool_templates :
222214 tool_template_name = '%s.tool-data-%s' % (INDEX_PREFIX ,name )
223- res = es .indices .put_template (name = tool_template_name , body = body )
215+ _ , end , retry_count = es_put_template (es , name = tool_template_name , body = body )
216+ retries += retry_count
217+ successes += 1
224218 except Exception as e :
225219 raise TemplateError (e )
220+ else :
221+ print ("\t done templates (end ts: %s, duration: %.2fs,"
222+ " successes: %d, retries: %d)" % (
223+ tstos (end ), end - beg , successes , retries ))
224+ sys .stdout .flush ()
226225
227226
228227###########################################################################
@@ -1821,7 +1820,7 @@ def main(options, args):
18211820
18221821 # Create the various index templates
18231822 if options .debug_time_operations : ts ("es_template" )
1824- es_template (es , options , INDEX_PREFIX , INDEX_VERSION , config )
1823+ es_template (es , options , INDEX_PREFIX , INDEX_VERSION , config , dbg = _DEBUG )
18251824
18261825 # returns 0 or 1
18271826 if options .indexing_errors :
@@ -1831,6 +1830,12 @@ def main(options, args):
18311830 with open (ie_filename , "w" ) as fp :
18321831 if options .debug_time_operations : ts ("es_index" )
18331832 res = es_index (es , actions , fp , dbg = _DEBUG )
1833+ beg , end , successes , duplicates , failures , retries = res
1834+ print ("\t done indexing (end ts: %s, duration: %.2fs,"
1835+ " successes: %d, duplicates: %d, failures: %d, retries: %d)" % (
1836+ tstos (end ), end - beg , successes , duplicates , failures , retries ))
1837+ sys .stdout .flush ()
1838+ res = 1 if failures > 0 else 0
18341839
18351840 except ConfigFileNotSpecified as e :
18361841 print (e , file = sys .stderr )
0 commit comments