Skip to content

Commit d357735

Browse files
authored
Remove es driver (#527)
* Add direct es instance to purgatory instead of plugin object. * Improve error messages and standardise API responses (#520) * Aqua test coverage (#556) * Add permission checking after decryption. * Rename ddo/query endpoint to query. * Improve exception handling on the es query endpoint. * README changes.
1 parent 51a5b3d commit d357735

32 files changed

+2332
-1054
lines changed

README.md

Lines changed: 148 additions & 80 deletions
Large diffs are not rendered by default.

aquarius/app/assets.py

Lines changed: 149 additions & 266 deletions
Large diffs are not rendered by default.

aquarius/app/chains.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@
66
import json
77
import logging
88

9-
from flask import Blueprint, Response
9+
from flask import Blueprint, jsonify
10+
from aquarius.app.es_instance import ElasticsearchInstance
1011
from aquarius.log import setup_logging
1112
from aquarius.myapp import app
12-
from oceandb_driver_interface import OceanDb
1313

1414
setup_logging()
1515
chains = Blueprint("chains", __name__)
1616
logger = logging.getLogger("aquarius")
17-
es_instance = OceanDb(app.config["AQUARIUS_CONFIG_FILE"]).plugin
17+
es_instance = ElasticsearchInstance(app.config["AQUARIUS_CONFIG_FILE"])
1818

1919

2020
@chains.route("/list", methods=["GET"])
@@ -30,16 +30,16 @@ def get_chains_list():
3030
description: No chains are present
3131
"""
3232
try:
33-
chains = es_instance.driver.es.get(
34-
index=f"{es_instance.driver.db_index}_plus", id="chains", doc_type="_doc"
33+
chains = es_instance.es.get(
34+
index=f"{es_instance.db_index}_plus", id="chains", doc_type="_doc"
3535
)["_source"]
36-
return Response(json.dumps(chains), 200, content_type="application/json")
36+
return json.dumps(chains)
3737
except (elasticsearch.exceptions.NotFoundError, KeyError):
38-
logger.error("Cannot get chains list")
39-
return Response("No chains found", 404)
38+
logger.error("Cannot get chains list.")
39+
return jsonify(error="No chains found."), 404
4040
except Exception as e:
41-
logger.error(f"Cannot get chains list: {str(e)}")
42-
return Response("No chains found", 404)
41+
logger.error(f"Error in get_chains_list: {str(e)}")
42+
return jsonify(error=f"Error retrieving chains: {str(e)}."), 404
4343

4444

4545
@chains.route("/status/<chain_id>", methods=["GET"])
@@ -61,17 +61,17 @@ def get_index_status(chain_id):
6161
description: This chainId is not indexed.
6262
"""
6363
try:
64-
last_block_record = es_instance.driver.es.get(
65-
index=f"{es_instance.driver.db_index}_plus",
64+
last_block_record = es_instance.es.get(
65+
index=f"{es_instance.db_index}_plus",
6666
id="events_last_block_" + str(chain_id),
6767
doc_type="_doc",
6868
)["_source"]
69-
return Response(
70-
json.dumps(last_block_record), 200, content_type="application/json"
71-
)
69+
return json.dumps(last_block_record)
7270
except (elasticsearch.exceptions.NotFoundError, KeyError):
73-
logger.error(f"Cannot get index status for chain {chain_id}")
74-
return Response(f"{chain_id} is not indexed", 404)
71+
logger.error(f"Cannot get index status for chain {chain_id}. Chain not found.")
72+
return jsonify(error=f"Chain {chain_id} is not indexed."), 404
7573
except Exception as e:
76-
logger.error(f"Cannot get index status for chain {chain_id}: {str(e)}")
77-
return Response(f"{chain_id} is not indexed", 404)
74+
logger.error(
75+
f"Cannot get index status for chain {chain_id}. Error encountered is: {str(e)}"
76+
)
77+
return jsonify(error=f"Error retrieving chain {chain_id}: {str(e)}."), 404

aquarius/app/dao.py

Lines changed: 0 additions & 128 deletions
This file was deleted.

aquarius/app/es_instance.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
#
2+
# Copyright 2021 Ocean Protocol Foundation
3+
# SPDX-License-Identifier: Apache-2.0
4+
#
5+
import os
6+
import logging
7+
import time
8+
from elasticsearch import Elasticsearch
9+
from elasticsearch.exceptions import NotFoundError
10+
11+
from aquarius.app.es_mapping import es_mapping
12+
13+
_DB_INSTANCE = None
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
def get_value(value, env_var, default, config=None):
19+
if os.getenv(env_var) is not None:
20+
return os.getenv(env_var)
21+
22+
if config is not None and value in config:
23+
return config[value]
24+
25+
return default
26+
27+
28+
class ElasticsearchInstance(object):
29+
def __init__(self, config=None):
30+
host = get_value("db.hostname", "DB_HOSTNAME", "localhost", config)
31+
port = int(get_value("db.port", "DB_PORT", 9200, config))
32+
username = get_value("db.username", "DB_USERNAME", "elastic", config)
33+
password = get_value("db.password", "DB_PASSWORD", "changeme", config)
34+
index = get_value("db.index", "DB_INDEX", "oceandb", config)
35+
ssl = self.str_to_bool(get_value("db.ssl", "DB_SSL", "false", config))
36+
verify_certs = self.str_to_bool(
37+
get_value("db.verify_certs", "DB_VERIFY_CERTS", "false", config)
38+
)
39+
ca_certs = get_value("db.ca_cert_path", "DB_CA_CERTS", None, config)
40+
client_key = get_value("db.client_key", "DB_CLIENT_KEY", None, config)
41+
client_cert = get_value("db.client_cert_path", "DB_CLIENT_CERT", None, config)
42+
self._index = index
43+
try:
44+
self._es = Elasticsearch(
45+
[host],
46+
http_auth=(username, password),
47+
port=port,
48+
use_ssl=ssl,
49+
verify_certs=verify_certs,
50+
ca_certs=ca_certs,
51+
client_cert=client_key,
52+
client_key=client_cert,
53+
maxsize=1000,
54+
)
55+
while self._es.ping() is False:
56+
logging.info("Trying to connect...")
57+
time.sleep(5)
58+
59+
self._es.indices.create(index=index, ignore=400, body=es_mapping)
60+
61+
except Exception as e:
62+
logging.info(f"Exception trying to connect... {e}")
63+
64+
@property
65+
def es(self):
66+
return self._es
67+
68+
@property
69+
def db_index(self):
70+
return self._index
71+
72+
@staticmethod
73+
def str_to_bool(s):
74+
if s == "true":
75+
return True
76+
elif s == "false":
77+
return False
78+
else:
79+
raise ValueError
80+
81+
def write(self, obj, resource_id=None):
82+
"""Write obj in elasticsearch.
83+
:param obj: value to be written in elasticsearch.
84+
:param resource_id: id for the resource.
85+
:return: id of the transaction.
86+
"""
87+
logger.debug("elasticsearch::write::{}".format(resource_id))
88+
if resource_id is not None:
89+
if self.es.exists(index=self.db_index, id=resource_id, doc_type="_doc"):
90+
raise ValueError(
91+
'Resource "{}" already exists, use update instead'.format(
92+
resource_id
93+
)
94+
)
95+
96+
return self.es.index(
97+
index=self.db_index,
98+
id=resource_id,
99+
body=obj,
100+
doc_type="_doc",
101+
refresh="wait_for",
102+
)["_id"]
103+
104+
def read(self, resource_id):
105+
"""Read object in elasticsearch using the resource_id.
106+
:param resource_id: id of the object to be read.
107+
:return: object value from elasticsearch.
108+
"""
109+
logger.debug("elasticsearch::read::{}".format(resource_id))
110+
return self.es.get(index=self.db_index, id=resource_id, doc_type="_doc")[
111+
"_source"
112+
]
113+
114+
def update(self, obj, resource_id):
115+
"""Update object in elasticsearch using the resource_id.
116+
:param obj: new value
117+
:param resource_id: id of the object to be updated.
118+
:return: id of the object.
119+
"""
120+
logger.debug("elasticsearch::update::{}".format(resource_id))
121+
return self.es.index(
122+
index=self.db_index,
123+
id=resource_id,
124+
body=obj,
125+
doc_type="_doc",
126+
refresh="wait_for",
127+
)["_id"]
128+
129+
def delete_all(self):
130+
q = """{
131+
"query" : {
132+
"match_all" : {}
133+
}
134+
}"""
135+
self.es.delete_by_query("_all", q)
136+
137+
def delete(self, resource_id):
138+
"""Delete an object from elasticsearch.
139+
:param resource_id: id of the object to be deleted.
140+
:return:
141+
"""
142+
logger.debug("elasticsearch::delete::{}".format(resource_id))
143+
if not self.es.exists(index=self.db_index, id=resource_id, doc_type="_doc"):
144+
raise ValueError(f"Resource {resource_id} does not exists")
145+
146+
return self.es.delete(index=self.db_index, id=resource_id, doc_type="_doc")
147+
148+
def count(self):
149+
count_result = self.es.count(index=self.db_index)
150+
if count_result is not None and count_result.get("count", 0) > 0:
151+
return count_result["count"]
152+
153+
return 0
154+
155+
def get(self, asset_id):
156+
try:
157+
asset = self.read(asset_id)
158+
except NotFoundError:
159+
logger.info(f"Asset with id {asset_id} was not found in ES.")
160+
raise
161+
162+
except Exception as e:
163+
logger.error(f"get: {str(e)}")
164+
raise
165+
166+
if asset is None or not self.is_listed(asset["service"]):
167+
return None
168+
169+
return asset
170+
171+
@staticmethod
172+
def is_listed(services):
173+
for service in services:
174+
if (
175+
service["type"] == "metadata"
176+
and "curation" in service["attributes"]
177+
and "isListed" in service["attributes"]["curation"]
178+
):
179+
return service["attributes"]["curation"]["isListed"]

0 commit comments

Comments
 (0)