Skip to content

Commit 3f1205a

Browse files
committed
Basic analysis and indexing
1 parent 9645d1d commit 3f1205a

File tree

3 files changed

+243
-28
lines changed

3 files changed

+243
-28
lines changed

tsa/public/views.py

Lines changed: 64 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
# -*- coding: utf-8 -*-
22
"""Public section, including homepage and signup."""
3-
from collections import defaultdict
4-
from flask import abort, Blueprint, jsonify, render_template, request, url_for
3+
import logging
4+
import redis
55
import rfc3987
6-
from tsa.tasks import hello, analyze
6+
from atenvironment import environment
7+
from collections import defaultdict
8+
from flask import abort, Blueprint, current_app, jsonify, render_template, request, url_for
9+
from tsa.tasks import hello, system_check, analyze, analyze_upload
710

811
blueprint = Blueprint('public', __name__, static_folder='../static')
912

@@ -13,40 +16,78 @@ def home():
1316
"""Landing page."""
1417
return render_template('public/landing.html')
1518

16-
@blueprint.route('/api/1/test/base')
19+
20+
@blueprint.route('/api/v1/test/base')
1721
def test_basic():
18-
return ""
22+
return "Hello world!"
1923

20-
@blueprint.route('/api/1/test/job')
24+
25+
@blueprint.route('/api/v1/test/job')
2126
def test_celery():
2227
r = hello.delay()
2328
return r.get()
2429

25-
@blueprint.route('/api/1/analyze')
26-
def api_analyze():
30+
31+
@blueprint.route('/api/v1/test/system')
32+
def test_system():
33+
x = (system_check.s() | hello.si()).delay().get()
34+
log = logging.getLogger(__name__)
35+
log.info(f"System check result: {x!s}")
36+
return str(x)
37+
38+
39+
@blueprint.route('/api/v1/analyze', methods=['GET'])
40+
def api_analyze_iri():
2741
iri = request.args.get('iri', None)
42+
etl = bool(int(request.args.get('etl', 1)))
43+
44+
current_app.logger.info("ETL: " + str(etl))
45+
2846
if rfc3987.match(iri):
29-
task = analyze.delay(iri)
30-
return "", 202, {'Location': url_for('public.check_status', task_id=task.id)}
47+
return jsonify(analyze.delay(iri, etl).get())
3148
else:
3249
abort(400)
3350

34-
@blueprint.route('/api/1/analyze/status/<task_id>')
35-
def check_status(task_id):
36-
task = analyze.AsyncResult(task_id)
37-
def default(value):
38-
return { 'state': value.state, 'status': str(value.info) }
3951

40-
return jsonify(defaultdict(default,
41-
PENDING={ 'state': task.state, 'status': 'Pending' },
42-
SUCCESS={ 'state': task.state, 'status': 'Completed' },
43-
FAILURE={ 'state': task.state, 'status': 'Failed' }
44-
)[task.state])
52+
@blueprint.route('/api/v1/analyze', methods=['POST'])
53+
@environment('REDIS')
54+
def api_analyze_upload(redis_url):
55+
etl = bool(int(request.args.get('etl', 1)))
56+
57+
def read_in_chunks(file_object, chunk_size=1024):
58+
"""Lazy function (generator) to read a file piece by piece.
59+
Default chunk size: 1k."""
60+
while True:
61+
data = file_object.read(chunk_size)
62+
if not data:
63+
break
64+
yield data
65+
66+
keys = []
67+
mimes = []
68+
r = redis.StrictRedis.from_url(redis_url, charset="utf-8", decode_responses=True)
69+
for file in request.files:
70+
key = str(uuid.uuid4())
71+
keys.append(key)
72+
mimes.append(file.mimetype)
73+
for piece in read_in_chunks(file):
74+
r.append(key, piece)
75+
r.expire(key, 60)
76+
77+
g = group(analyze_upload.s(k, m, etl) for k, m in zip(keys, mimes))
78+
return jsonify(g.apply_async().get())
4579

46-
@blueprint.route('/api/1/query')
47-
def query():
80+
81+
@blueprint.route('/api/v1/query')
82+
@environment('REDIS')
83+
def index(redis_url):
84+
r = redis.StrictRedis.from_url(redis_url, charset="utf-8", decode_responses=True)
4885
iri = request.args.get('iri', None)
86+
current_app.logger.info("Querying for: " + iri)
4987
if rfc3987.match(iri):
50-
return ""
88+
if not r.exists(iri):
89+
abort(404)
90+
else:
91+
return jsonify([str(x) for x in r.smembers(iri)])
5192
else:
5293
abort(400)

tsa/settings.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ class Config(object):
1919
CACHE_TYPE = 'redis' # Can be "memcached", "redis", etc.
2020
SQLALCHEMY_TRACK_MODIFICATIONS = False
2121
WEBPACK_MANIFEST_PATH = 'webpack/manifest.json'
22-
CELERY_BROKER_URL = 'redis://redis:6379/0'
23-
CELERY_RESULT_BACKEND = 'redis://redis:6379/0'
22+
CELERY_BROKER_URL = os.environ.get('REDIS', 'redis://redis:6379/0')
23+
CELERY_RESULT_BACKEND = os.environ.get('REDIS', 'redis://redis:6379/0')
2424
REDIS_HOST = 'redis'
2525
REDIS_PORT = 6379
2626
REDIS_DB = 0
2727
CACHE_KEY_PREFIX = 'fcache'
2828
CACHE_REDIS_HOST = 'redis'
2929
CACHE_REDIS_PORT = '6379'
30-
CACHE_REDIS_URL = 'redis://redis:6379/0'
30+
CACHE_REDIS_URL = os.environ.get('REDIS', 'redis://redis:6379/0')
3131
SENTRY_CONFIG = {
3232
'dsn': 'https://9df1f926d1854fa4884d1f0ce9489a0b@sentry.io/1304923',
3333
'release': RELEASE,

tsa/tasks.py

Lines changed: 176 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,184 @@
11
"""Celery tasks invoked from the API endpoints."""
2+
import json
3+
import logging
4+
import rdflib
5+
import redis
6+
import requests
7+
from atenvironment import environment
8+
from rdflib import URIRef
9+
from urllib.parse import urlparse
10+
from urllib.error import URLError
11+
from tsa.analyzer import Analyzer
212
from tsa.celery import celery
13+
from tsa.transformation import PipelineFactory
14+
15+
@celery.task
16+
@environment('ETL', 'VIRTUOSO')
17+
def system_check(etl, virtuoso):
18+
log = logging.getLogger(__name__)
19+
log.info("System check started")
20+
log.info(f"Testing LP-ETL, URL: {etl!s}")
21+
requests.get(etl).raise_for_status()
22+
23+
virtuoso_url = f"{virtuoso!s}/sparql"
24+
log.info(f"Testing virtuoso, URL: {virtuoso_url}")
25+
requests.get(virtuoso_url).raise_for_status()
26+
27+
log.info("System check successful")
28+
329

430
@celery.task
531
def hello():
632
return "Hello world!"
733

34+
35+
@celery.task
36+
def analyze(iri, etl=True):
37+
log = logging.getLogger(__name__)
38+
log.info(f"Analyzing {iri!s}")
39+
if etl:
40+
(transform.s(iri) | poll.s() | inspect.s()).apply_async()
41+
else:
42+
guess = rdflib.util.guess_format(iri)
43+
if guess is None:
44+
r = requests.head(iri)
45+
r.raise_for_status()
46+
guess = r.headers.get('content-type')
47+
g = rdflib.ConjunctiveGraph()
48+
log.info(f"Guessing format to be {guess!s}")
49+
g.parse(iri, format=guess)
50+
a = Analyzer()
51+
index(g, iri)
52+
return a.analyze(g)
53+
54+
@environment('REDIS')
55+
def index(g, source_iri, redis_cfg):
56+
r = redis.StrictRedis.from_url(redis_cfg)
57+
pipe = r.pipeline()
58+
exp = 60*60 #1H
59+
for (s, p, o) in g:
60+
s = str(s)
61+
p = str(p)
62+
o = str(o)
63+
source_iri = str(source_iri)
64+
65+
pipe.sadd(s, source_iri, p, o)
66+
pipe.sadd(p, source_iri, s, o)
67+
pipe.sadd(o, source_iri, p, s)
68+
pipe.sadd(source_iri, s, p, o)
69+
70+
pipe.expire(s, exp)
71+
pipe.expire(p, exp)
72+
pipe.expire(o, exp)
73+
pipe.expire(source_iri, exp)
74+
pipe.execute()
75+
76+
@celery.task
77+
@environment('REDIS')
78+
def analyze_upload(key, mime, etl, redis_cfg):
79+
log = logging.getLogger(__name__)
80+
r = redis.StrictRedis.from_url(redis_cfg)
81+
if r.strlen(key) < 1024 * 1024: #approx 1MB
82+
g = rdflib.ConjunctiveGraph()
83+
g.parse(data=r.get(key), format=mime)
84+
a = Analyzer()
85+
return a.analyze(g)
86+
else:
87+
log.warn(f"Not analyzing an upload as it's too big: {key!s}")
88+
r.delete(key)
89+
90+
91+
@celery.task
92+
def inspect(iri):
93+
log = logging.getLogger(__name__)
94+
g = rdflib.ConjunctiveGraph()
95+
g.parse(iri)
96+
a = Analyzer()
97+
return a.analyze(g)
98+
99+
100+
@celery.task
101+
@environment('ETL', 'VIRTUOSO', 'DBA_PASSWORD')
102+
def transform(iri, etl, virtuoso, dbaPass):
103+
log = logging.getLogger(__name__)
104+
#create pipeline and call to start executions
105+
# prepare JSON-LD pipeline
106+
107+
log.info(f"Prepare pipeline for {iri!s}")
108+
pf = PipelineFactory()
109+
p = urlparse(virtuoso)
110+
pipeline = json.dumps(pf.createPipeline(iri, {'server': p.hostname, 'port': 1111, 'user': 'dba', 'password': dbaPass, 'iri': iri}))
111+
112+
log.info(f"Pipeline:\n{pipeline!s}")
113+
114+
# create the pipeline
115+
r = requests.post(f"{etl!s}/resources/pipelines", files={'pipeline': pipeline})
116+
r.raise_for_status()
117+
118+
g = rdflib.ConjunctiveGraph()
119+
g.parse(data=r.text, format="trig")
120+
121+
pipeline = g.value(object=URIRef("http://linkedpipes.com/ontology/Pipeline"), predicate=rdflib.namespace.RDF.type)
122+
log.info(f"Pipeline IRI: {pipeline!s}")
123+
124+
# POST /resources/executions
125+
r = requests.post(f"{etl!s}/resources/executions?pipeline={pipeline}")
126+
r.raise_for_status()
127+
log.info(f"Execution trigger result:\n{r.json()!s}")
128+
return f"{etl!s}/resources/executions/{r.json()['iri'].split('/')[-1]}"
129+
130+
131+
@celery.task(bind=True, retry_backoff=True, max_retries=None, default_retry_delay=30, time_limit=60*60)
132+
def poll(self, iri):
133+
def after_return(self, status, retval, task_id, args, kwargs, einfo):
134+
cleanup.apply_async()
135+
self.after_return = after_return
136+
137+
log = logging.getLogger(__name__)
138+
log.info(f"Polling {iri!s}")
139+
140+
r = requests.get(iri + "/overview")
141+
content = r.text
142+
log.info(content)
143+
r.raise_for_status()
144+
145+
j = json.loads(content)
146+
if j['status']['@id'] == "http://etl.linkedpipes.com/resources/status/failed":
147+
log.error("Execution failed")
148+
149+
try:
150+
r = requests.get(iri + "/logs")
151+
r.raise_for_status()
152+
log.error("ETL log:\n" + r.text)
153+
except HTTPError as e:
154+
raise EtlJobFailed(r) from e
155+
156+
raise EtlJobFailed(r)
157+
elif not (j['status']['@id'] == "http://etl.linkedpipes.com/resources/status/finished"):
158+
log.info("Execution is not finished yet")
159+
self.retry()
160+
else:
161+
#get result uri
162+
log.info(f"Final graph:\n{str(g)!s}")
163+
result = ""
164+
return result
165+
166+
8167
@celery.task
9-
def analyze(iri):
10-
return ""
168+
@environment('ETL')
169+
def cleanup(iri, etl):
170+
log = logging.getLogger(__name__)
171+
log.info(f"Deleting {iri!s}")
172+
173+
r = requests.delete(f"{etl!s}/pipelines?iri={iri!s}")
174+
r.raise_for_status()
175+
176+
log.info(f"Pipeline {iri!s} deleted")
177+
178+
179+
class EtlError(Exception):
180+
pass
181+
182+
183+
class EtlJobFailed(EtlError):
184+
pass

0 commit comments

Comments
 (0)