11"""Celery tasks invoked from the API endpoints."""
22import json
33import logging
4+ from urllib .parse import urlparse
5+
46import rdflib
57import redis
68import requests
79from atenvironment import environment
810from rdflib import URIRef
9- from urllib .parse import urlparse
10- from urllib .error import URLError
11+
1112from tsa .analyzer import Analyzer
1213from tsa .celery import celery
1314from tsa .transformation import PipelineFactory
1415
16+
1517@celery .task
1618@environment ('ETL' , 'VIRTUOSO' )
1719def system_check (etl , virtuoso ):
1820 log = logging .getLogger (__name__ )
19- log .info (" System check started" )
20- log .info (f" Testing LP-ETL, URL: { etl !s} " )
21+ log .info (' System check started' )
22+ log .info (f' Testing LP-ETL, URL: { etl !s} ' )
2123 requests .get (etl ).raise_for_status ()
2224
23- virtuoso_url = f" { virtuoso !s} /sparql"
24- log .info (f" Testing virtuoso, URL: { virtuoso_url } " )
25+ virtuoso_url = f' { virtuoso !s} /sparql'
26+ log .info (f' Testing virtuoso, URL: { virtuoso_url } ' )
2527 requests .get (virtuoso_url ).raise_for_status ()
2628
27- log .info (" System check successful" )
29+ log .info (' System check successful' )
2830
2931
3032@celery .task
3133def hello ():
32- return " Hello world!"
34+ return ' Hello world!'
3335
3436
3537@celery .task
3638def analyze (iri , etl = True ):
3739 log = logging .getLogger (__name__ )
38- log .info (f" Analyzing { iri !s} " )
40+ log .info (f' Analyzing { iri !s} ' )
3941 if etl :
4042 (transform .s (iri ) | poll .s () | inspect .s ()).apply_async ()
4143 else :
@@ -45,17 +47,18 @@ def analyze(iri, etl=True):
4547 r .raise_for_status ()
4648 guess = r .headers .get ('content-type' )
4749 g = rdflib .ConjunctiveGraph ()
48- log .info (f" Guessing format to be { guess !s} " )
50+ log .info (f' Guessing format to be { guess !s} ' )
4951 g .parse (iri , format = guess )
5052 a = Analyzer ()
5153 index (g , iri )
5254 return a .analyze (g )
5355
56+
5457@environment ('REDIS' )
5558def index (g , source_iri , redis_cfg ):
5659 r = redis .StrictRedis .from_url (redis_cfg )
5760 pipe = r .pipeline ()
58- exp = 60 * 60 # 1H
61+ exp = 60 * 60 # 1H
5962 for (s , p , o ) in g :
6063 s = str (s )
6164 p = str (p )
@@ -73,24 +76,24 @@ def index(g, source_iri, redis_cfg):
7376 pipe .expire (source_iri , exp )
7477 pipe .execute ()
7578
79+
7680@celery .task
7781@environment ('REDIS' )
7882def analyze_upload (key , mime , etl , redis_cfg ):
7983 log = logging .getLogger (__name__ )
8084 r = redis .StrictRedis .from_url (redis_cfg )
81- if r .strlen (key ) < 1024 * 1024 : # approx 1MB
85+ if r .strlen (key ) < 1024 * 1024 : # approx 1MB
8286 g = rdflib .ConjunctiveGraph ()
8387 g .parse (data = r .get (key ), format = mime )
8488 a = Analyzer ()
8589 return a .analyze (g )
8690 else :
87- log .warn (f"Not analyzing an upload as it's too big: { key !s} " )
88- r .delete (key )
91+ log .warn (f"Not analyzing an upload as it's too big: { key !s} " )
92+ r .delete (key )
8993
9094
9195@celery .task
9296def inspect (iri ):
93- log = logging .getLogger (__name__ )
9497 g = rdflib .ConjunctiveGraph ()
9598 g .parse (iri )
9699 a = Analyzer ()
@@ -99,81 +102,84 @@ def inspect(iri):
99102
100103@celery .task
101104@environment ('ETL' , 'VIRTUOSO' , 'DBA_PASSWORD' )
102- def transform (iri , etl , virtuoso , dbaPass ):
105+ def transform (iri , etl , virtuoso , dba_pass ):
103106 log = logging .getLogger (__name__ )
104- #create pipeline and call to start executions
107+ # create pipeline and call to start executions
105108 # prepare JSON-LD pipeline
106109
107- log .info (f" Prepare pipeline for { iri !s} " )
110+ log .info (f' Prepare pipeline for { iri !s} ' )
108111 pf = PipelineFactory ()
109112 p = urlparse (virtuoso )
110- pipeline = json .dumps (pf .createPipeline (iri , {'server' : p .hostname , 'port' : 1111 , 'user' : 'dba' , 'password' : dbaPass , 'iri' : iri }))
113+ pipeline = json .dumps (pf .create_pipeline (iri , {'server' : p .hostname ,
114+ 'port' : 1111 ,
115+ 'user' : 'dba' ,
116+ 'password' : dba_pass ,
117+ 'iri' : iri }))
111118
112- log .info (f" Pipeline:\n { pipeline !s} " )
119+ log .info (f' Pipeline:\n { pipeline !s} ' )
113120
114121 # create the pipeline
115- r = requests .post (f" { etl !s} /resources/pipelines" , files = {'pipeline' : pipeline })
122+ r = requests .post (f' { etl !s} /resources/pipelines' , files = {'pipeline' : pipeline })
116123 r .raise_for_status ()
117124
118125 g = rdflib .ConjunctiveGraph ()
119- g .parse (data = r .text , format = " trig" )
126+ g .parse (data = r .text , format = ' trig' )
120127
121- pipeline = g .value (object = URIRef (" http://linkedpipes.com/ontology/Pipeline" ), predicate = rdflib .namespace .RDF .type )
122- log .info (f" Pipeline IRI: { pipeline !s} " )
128+ pipeline = g .value (object = URIRef (' http://linkedpipes.com/ontology/Pipeline' ), predicate = rdflib .namespace .RDF .type )
129+ log .info (f' Pipeline IRI: { pipeline !s} ' )
123130
124131 # POST /resources/executions
125- r = requests .post (f" { etl !s} /resources/executions?pipeline={ pipeline } " )
132+ r = requests .post (f' { etl !s} /resources/executions?pipeline={ pipeline } ' )
126133 r .raise_for_status ()
127- log .info (f" Execution trigger result:\n { r .json ()!s} " )
134+ log .info (f' Execution trigger result:\n { r .json ()!s} ' )
128135 return f"{ etl !s} /resources/executions/{ r .json ()['iri' ].split ('/' )[- 1 ]} "
129136
130137
131- @celery .task (bind = True , retry_backoff = True , max_retries = None , default_retry_delay = 30 , time_limit = 60 * 60 )
138+ @celery .task (bind = True , retry_backoff = True , max_retries = None , default_retry_delay = 30 , time_limit = 60 * 60 )
132139def poll (self , iri ):
133140 def after_return (self , status , retval , task_id , args , kwargs , einfo ):
134141 cleanup .apply_async ()
135142 self .after_return = after_return
136143
137144 log = logging .getLogger (__name__ )
138- log .info (f" Polling { iri !s} " )
145+ log .info (f' Polling { iri !s} ' )
139146
140- r = requests .get (iri + " /overview" )
147+ r = requests .get (iri + ' /overview' )
141148 content = r .text
142149 log .info (content )
143150 r .raise_for_status ()
144151
145152 j = json .loads (content )
146- if j ['status' ]['@id' ] == " http://etl.linkedpipes.com/resources/status/failed" :
147- log .error (" Execution failed" )
153+ if j ['status' ]['@id' ] == ' http://etl.linkedpipes.com/resources/status/failed' :
154+ log .error (' Execution failed' )
148155
149156 try :
150- r = requests .get (iri + " /logs" )
157+ r = requests .get (iri + ' /logs' )
151158 r .raise_for_status ()
152- log .error (" ETL log:\n " + r .text )
153- except HTTPError as e :
159+ log .error (' ETL log:\n ' + r .text )
160+ except requests . HTTPError as e :
154161 raise EtlJobFailed (r ) from e
155162
156163 raise EtlJobFailed (r )
157- elif not (j ['status' ]['@id' ] == " http://etl.linkedpipes.com/resources/status/finished" ):
158- log .info (" Execution is not finished yet" )
164+ elif not (j ['status' ]['@id' ] == ' http://etl.linkedpipes.com/resources/status/finished' ):
165+ log .info (' Execution is not finished yet' )
159166 self .retry ()
160167 else :
161- #get result uri
162- log .info (f"Final graph:\n { str (g )!s} " )
163- result = ""
168+ # get result uri
169+ result = ''
164170 return result
165171
166172
167173@celery .task
168174@environment ('ETL' )
169175def cleanup (iri , etl ):
170176 log = logging .getLogger (__name__ )
171- log .info (f" Deleting { iri !s} " )
172-
173- r = requests .delete (f" { etl !s} /pipelines?iri={ iri !s} " )
177+ log .info (f' Deleting { iri !s} ' )
178+
179+ r = requests .delete (f' { etl !s} /pipelines?iri={ iri !s} ' )
174180 r .raise_for_status ()
175181
176- log .info (f" Pipeline { iri !s} deleted" )
182+ log .info (f' Pipeline { iri !s} deleted' )
177183
178184
179185class EtlError (Exception ):
0 commit comments