Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion synda/sdt/sdenqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import sddatasetdao
import sdutils
import sdtimestamp
from sdtypes import Dataset,File
from sdtypes import Dataset,File,Selection
import sdsqlutils
import sdpostpipelineutils
import sdtime
Expand Down Expand Up @@ -181,6 +181,7 @@ def add_file(f):
)

f.dataset_id = add_dataset(f)
f.searchapi_host = Selection.searchapi_host
f.status = TRANSFER["status"]['waiting']
f.crea_date = sdtime.now()

Expand Down
8 changes: 5 additions & 3 deletions synda/sdt/sdfiledao.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ def update_transfer_last_access_date(i__date,i__transfer_id,conn=sddb.conn):
c.close()

def add_file(file,commit=True,conn=sddb.conn):
keys_to_insert=['status', 'crea_date', 'url', 'local_path', 'filename', 'file_functional_id', 'tracking_id', 'priority', 'checksum', 'checksum_type', 'size', 'variable', 'project', 'model', 'data_node', 'dataset_id', 'insertion_group_id', 'timestamp']
# for future:, 'searchapi_host']
keys_to_insert = [
'status', 'crea_date', 'url', 'local_path', 'filename', 'file_functional_id', 'tracking_id',
'priority', 'checksum', 'checksum_type', 'size', 'variable', 'project', 'model',
'data_node', 'dataset_id', 'insertion_group_id', 'timestamp', 'searchapi_host']

if not Internal().is_processes_get_files_caching:
return sdsqlutils.insert(file,keys_to_insert,commit,conn)
Expand Down Expand Up @@ -324,7 +326,7 @@ def update_file(_file, commit=True, conn=sddb.conn):

if next_url_on_error:
keys.append('url')
# for future: keys.append('searchapi_host')
keys.append('searchapi_host')

rowcount = sdsqlutils.update(
_file,
Expand Down
37 changes: 24 additions & 13 deletions synda/sdt/sdnexturl.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ def run(tr):

def next_url(tr,conn):
all_urlps=get_urls(tr.file_functional_id) # [[url1,protocol1],[url2,protocol2],...]
all_urlps=get_urls(tr.file_functional_id,tr.searchapi_host,tr.url)
# ... looks like [[url1,protocol1],[url2,protocol2],...]
sdlog.info("SDNEXTUR-006","all_urpls= %s"%(all_urlps,))
c = conn.cursor()
fus = c.execute("SELECT url FROM failed_url WHERE file_id="+
Expand All @@ -94,14 +96,14 @@ def next_url(tr,conn):
raise sdexception.NextUrlNotFoundException()


def get_urls(file_functional_id):
def get_urls(file_functional_id, searchapi_host, old_url):
"""returns a prioritized list of [url,protocol] where each url can supply the specified file"""

try:
result=sdquicksearch.run(
parameter=['limit=4','fields=%s'%url_fields,'type=File','instance_id=%s'%
file_functional_id],
post_pipeline_mode=None )
post_pipeline_mode=None, index_host=searchapi_host )
except Exception as e:
sdlog.debug("SDNEXTUR-015", "exception %s. instance_id=%s"%(e,file_functional_id))
raise e
Expand All @@ -114,7 +116,7 @@ def get_urls(file_functional_id):
result=sdquicksearch.run(
parameter=['limit=4','fields=%s'%url_fields,'type=File','instance_id=%s'%
file_functional_id+'*'],
post_pipeline_mode=None )
post_pipeline_mode=None, index_host=searchapi_host )
li=result.get_files()
sdlog.info("SDNEXTUR-017","sdquicksearch 2nd call %s sets of file urls: %s"%(len(li),li))
# result looks like
Expand All @@ -133,28 +135,37 @@ def get_urls(file_functional_id):
# The search for //None bypasses an issue with the SOLR lookup where there is no
# url_gridftp possibility.

return prioritize_urlps( urlps )
return prioritize_urlps( urlps, old_url )


url_fields = ','.join(URL_FIELDS) # used for the sdquicksearch call above


def prioritize_urlps( urlps ):
def prioritize_urlps( urlps, old_url ):
"""Orders a list urlps so that the highest-priority urls come first. urlps is a list of
lists of the form [url,protocol]. First, GridFTP urls are preferred over everything else.
Then, prefer some data nodes over others."""
lists of the form [url,protocol].
Some data nodes are preferred over others. Then, GridFTP is preferred over http."""
# Formerly, I prioritized the other way; but experience shows that many data nodes which
# officially support GridFTP, don't usually have it working.
# Note also that within this function a "high priority" url has a low priority number.
# That's just for programming convenience.

def priprotocol(protocol):
if protocol.find('gridftp')>0: return 0
if protocol.find('http')>0: return 1
return 2
def priurl(url):
if url.find('llnl')>0: return 0
if url.find('ceda')>0: return 1
if url.find('dkrz')>0: return 2
if url.find('ipsl')>0: return 3
if url.find('nci')>0: return 4
return 5
return sorted( urlps, key=(lambda urlp: (priprotocol(urlp[1]), priurl(urlp[0]))) )
if url.find('gridftp.ipsl')>0: return 1
if url.find('vesg.ipsl')>0: return 2
if url.find('ceda')>0: return 3
if url.find('dkrz')>0: return 4
if url.find('nci')>0: return 5
if old_url.find('lasg')<0 and url.find('lasg')>0:
return 99 # Never fall back to this very slow data node; but changing protocol is ok.
return 6
urlps_cleaned = [ urlp for urlp in urlps if priurl(urlp[0])<99 ]
return sorted( urlps_cleaned, key=(lambda urlp: ( priurl(urlp[0]), priprotocol(urlp[1]))) )


if __name__ == '__main__':
Expand Down
11 changes: 11 additions & 0 deletions synda/sdt/sdparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import sdprint
import sdi18n
import sdearlystreamutils
import sdlog

from synda.source.config.file.selection.models import Config as SelectionConfig
from synda.source.config.file.selection.constants import PENDING_PARAMETER
Expand Down Expand Up @@ -130,6 +131,16 @@ def build(buffer, load_default=None):
# set default_selection as parent of project_default_selection
project_default_selection.parent = default_selection

if selection.filename is not None:
#sdlog.info('SDPARSE-0001','selection.filename=%s'%selection.filename)
#sdlog.info('SDPARSE-0002','searchapi_host facet=%s'%selection.facets.get('searchapi_host'))
if selection.facets.get('searchapi_host') is not None and\
len(selection.facets['searchapi_host'])>0 and\
selection.facets['searchapi_host'][0] is not None:
#sdlog.info('SDPARSE-0003','selection %s has searchapi_host=%s'%
# (selection.filename,selection.facets['searchapi_host']))
Selection.searchapi_host = selection.facets['searchapi_host'][0]

return selection


Expand Down
2 changes: 2 additions & 0 deletions synda/sdt/sdpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def build_queries(stream=None,selection=None,path=None,parameter=None,index_host
if selection is None:
buffer=sdbuffer.get_selection_file_buffer(path=path,parameter=parameter)
selection=sdparse.build(buffer,load_default=load_default)
if index_host is not None:
selection.facets['searchapi_host'] = index_host

stream=selection.merge_facets()

Expand Down
1 change: 1 addition & 0 deletions synda/sdt/sdtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def __str__(self):


class Selection():
searchapi_host = None # a recently user-specified searchapi_host
def __init__(self, **kw):
# sub-selections list (a selection can contain facets groups, but can also contain other selections)
self.childs = []
Expand Down