diff --git a/synda/sdt/sdenqueue.py b/synda/sdt/sdenqueue.py index 053d994c..cd62c37c 100755 --- a/synda/sdt/sdenqueue.py +++ b/synda/sdt/sdenqueue.py @@ -29,7 +29,7 @@ import sddatasetdao import sdutils import sdtimestamp -from sdtypes import Dataset,File +from sdtypes import Dataset,File,Selection import sdsqlutils import sdpostpipelineutils import sdtime @@ -181,6 +181,7 @@ def add_file(f): ) f.dataset_id = add_dataset(f) + f.searchapi_host = Selection.searchapi_host f.status = TRANSFER["status"]['waiting'] f.crea_date = sdtime.now() diff --git a/synda/sdt/sdfiledao.py b/synda/sdt/sdfiledao.py index b3014c28..04b6046b 100755 --- a/synda/sdt/sdfiledao.py +++ b/synda/sdt/sdfiledao.py @@ -32,8 +32,10 @@ def update_transfer_last_access_date(i__date,i__transfer_id,conn=sddb.conn): c.close() def add_file(file,commit=True,conn=sddb.conn): - keys_to_insert=['status', 'crea_date', 'url', 'local_path', 'filename', 'file_functional_id', 'tracking_id', 'priority', 'checksum', 'checksum_type', 'size', 'variable', 'project', 'model', 'data_node', 'dataset_id', 'insertion_group_id', 'timestamp'] - # for future:, 'searchapi_host'] + keys_to_insert = [ + 'status', 'crea_date', 'url', 'local_path', 'filename', 'file_functional_id', 'tracking_id', + 'priority', 'checksum', 'checksum_type', 'size', 'variable', 'project', 'model', + 'data_node', 'dataset_id', 'insertion_group_id', 'timestamp', 'searchapi_host'] if not Internal().is_processes_get_files_caching: return sdsqlutils.insert(file,keys_to_insert,commit,conn) @@ -324,7 +326,7 @@ def update_file(_file, commit=True, conn=sddb.conn): if next_url_on_error: keys.append('url') - # for future: keys.append('searchapi_host') + keys.append('searchapi_host') rowcount = sdsqlutils.update( _file, diff --git a/synda/sdt/sdnexturl.py b/synda/sdt/sdnexturl.py index f0233640..e060ab3c 100755 --- a/synda/sdt/sdnexturl.py +++ b/synda/sdt/sdnexturl.py @@ -70,6 +70,8 @@ def run(tr): def next_url(tr,conn): all_urlps=get_urls(tr.file_functional_id) # [[url1,protocol1],[url2,protocol2],...] + all_urlps=get_urls(tr.file_functional_id,tr.searchapi_host,tr.url) + # ... looks like [[url1,protocol1],[url2,protocol2],...] sdlog.info("SDNEXTUR-006","all_urpls= %s"%(all_urlps,)) c = conn.cursor() fus = c.execute("SELECT url FROM failed_url WHERE file_id="+ @@ -94,14 +96,14 @@ def next_url(tr,conn): raise sdexception.NextUrlNotFoundException() -def get_urls(file_functional_id): +def get_urls(file_functional_id, searchapi_host, old_url): """returns a prioritized list of [url,protocol] where each url can supply the specified file""" try: result=sdquicksearch.run( parameter=['limit=4','fields=%s'%url_fields,'type=File','instance_id=%s'% file_functional_id], - post_pipeline_mode=None ) + post_pipeline_mode=None, index_host=searchapi_host ) except Exception as e: sdlog.debug("SDNEXTUR-015", "exception %s. instance_id=%s"%(e,file_functional_id)) raise e @@ -114,7 +116,7 @@ def get_urls(file_functional_id): result=sdquicksearch.run( parameter=['limit=4','fields=%s'%url_fields,'type=File','instance_id=%s'% file_functional_id+'*'], - post_pipeline_mode=None ) + post_pipeline_mode=None, index_host=searchapi_host ) li=result.get_files() sdlog.info("SDNEXTUR-017","sdquicksearch 2nd call %s sets of file urls: %s"%(len(li),li)) # result looks like @@ -133,28 +135,37 @@ def get_urls(file_functional_id): # The search for //None bypasses an issue with the SOLR lookup where there is no # url_gridftp possibility. - return prioritize_urlps( urlps ) + return prioritize_urlps( urlps, old_url ) url_fields = ','.join(URL_FIELDS) # used for the sdquicksearch call above -def prioritize_urlps( urlps ): +def prioritize_urlps( urlps, old_url ): """Orders a list urlps so that the highest-priority urls come first. urlps is a list of - lists of the form [url,protocol]. First, GridFTP urls are preferred over everything else. - Then, prefer some data nodes over others.""" + lists of the form [url,protocol]. + Some data nodes are preferred over others. Then, GridFTP is preferred over http.""" + # Formerly, I prioritized the other way; but experience shows that many data nodes which + # officially support GridFTP, don't usually have it working. + # Note also that within this function a "high priority" url has a low priority number. + # That's just for programming convenience. + def priprotocol(protocol): if protocol.find('gridftp')>0: return 0 if protocol.find('http')>0: return 1 return 2 def priurl(url): if url.find('llnl')>0: return 0 - if url.find('ceda')>0: return 1 - if url.find('dkrz')>0: return 2 - if url.find('ipsl')>0: return 3 - if url.find('nci')>0: return 4 - return 5 - return sorted( urlps, key=(lambda urlp: (priprotocol(urlp[1]), priurl(urlp[0]))) ) + if url.find('gridftp.ipsl')>0: return 1 + if url.find('vesg.ipsl')>0: return 2 + if url.find('ceda')>0: return 3 + if url.find('dkrz')>0: return 4 + if url.find('nci')>0: return 5 + if old_url.find('lasg')<0 and url.find('lasg')>0: + return 99 # Never fall back to this very slow data node; but changing protocol is ok. + return 6 + urlps_cleaned = [ urlp for urlp in urlps if priurl(urlp[0])<99 ] + return sorted( urlps_cleaned, key=(lambda urlp: ( priurl(urlp[0]), priprotocol(urlp[1]))) ) if __name__ == '__main__': diff --git a/synda/sdt/sdparse.py b/synda/sdt/sdparse.py index aa84b0c0..0bcdefc9 100755 --- a/synda/sdt/sdparse.py +++ b/synda/sdt/sdparse.py @@ -21,6 +21,7 @@ import sdprint import sdi18n import sdearlystreamutils +import sdlog from synda.source.config.file.selection.models import Config as SelectionConfig from synda.source.config.file.selection.constants import PENDING_PARAMETER @@ -130,6 +131,16 @@ def build(buffer, load_default=None): # set default_selection as parent of project_default_selection project_default_selection.parent = default_selection + if selection.filename is not None: + #sdlog.info('SDPARSE-0001','selection.filename=%s'%selection.filename) + #sdlog.info('SDPARSE-0002','searchapi_host facet=%s'%selection.facets.get('searchapi_host')) + if selection.facets.get('searchapi_host') is not None and\ + len(selection.facets['searchapi_host'])>0 and\ + selection.facets['searchapi_host'][0] is not None: + #sdlog.info('SDPARSE-0003','selection %s has searchapi_host=%s'% + # (selection.filename,selection.facets['searchapi_host'])) + Selection.searchapi_host = selection.facets['searchapi_host'][0] + return selection diff --git a/synda/sdt/sdpipeline.py b/synda/sdt/sdpipeline.py index 58018cea..c60ed807 100755 --- a/synda/sdt/sdpipeline.py +++ b/synda/sdt/sdpipeline.py @@ -68,6 +68,8 @@ def build_queries(stream=None,selection=None,path=None,parameter=None,index_host if selection is None: buffer=sdbuffer.get_selection_file_buffer(path=path,parameter=parameter) selection=sdparse.build(buffer,load_default=load_default) + if index_host is not None: + selection.facets['searchapi_host'] = index_host stream=selection.merge_facets() diff --git a/synda/sdt/sdtypes.py b/synda/sdt/sdtypes.py index 87542143..c3486680 100755 --- a/synda/sdt/sdtypes.py +++ b/synda/sdt/sdtypes.py @@ -76,6 +76,7 @@ def __str__(self): class Selection(): + searchapi_host = None # a recently user-specified searchapi_host def __init__(self, **kw): # sub-selections list (a selection can contain facets groups, but can also contain other selections) self.childs = []