Skip to content

Commit 1cac176

Browse files
MoteHuepriteau
andcommitted
Add OpenSearch as a v2 storage backend
To facilitate the switch from Elasticsearch to OpenSearch, the ES backend has been duplicated and renamed where appropriate to OpenSearch. The OpenSearch implementation was modified in places for compatibility with OpenSearch 2.x, for example: - remove mapping name from bulk API URL - replace put_mapping by post_mapping This will allow for the future removal of the Elasticsearch backend. Change-Id: I88b0a30f66af13dad1bd75cde412d2880b4ead30 Co-Authored-By: Pierre Riteau <[email protected]> (cherry picked from commit 964c670)
1 parent b016d44 commit 1cac176

File tree

14 files changed

+1349
-18
lines changed

14 files changed

+1349
-18
lines changed

.zuul.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,17 @@
9494
CLOUDKITTY_STORAGE_BACKEND: elasticsearch
9595
CLOUDKITTY_STORAGE_VERSION: 2
9696

97+
- job:
98+
name: cloudkitty-tempest-full-v2-storage-opensearch
99+
parent: base-cloudkitty-v2-api-tempest-job
100+
description: |
101+
Job testing cloudkitty installation on devstack with python 3 and the
102+
OpenSearch v2 storage driver and running tempest tests
103+
vars:
104+
devstack_localrc:
105+
CLOUDKITTY_STORAGE_BACKEND: opensearch
106+
CLOUDKITTY_STORAGE_VERSION: 2
107+
97108
- job:
98109
name: cloudkitty-tox-bandit
99110
parent: openstack-tox
@@ -130,6 +141,8 @@
130141
- cloudkitty-tempest-full-v2-storage-influxdb
131142
- cloudkitty-tempest-full-v2-storage-elasticsearch:
132143
voting: false
144+
- cloudkitty-tempest-full-v2-storage-opensearch:
145+
voting: false
133146
- cloudkitty-tempest-full-v1-storage-sqlalchemy
134147
- cloudkitty-tempest-full-ipv6-only
135148
- cloudkitty-tox-bandit:

cloudkitty/common/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import cloudkitty.storage.v1.hybrid.backends.gnocchi
3333
import cloudkitty.storage.v2.elasticsearch
3434
import cloudkitty.storage.v2.influx
35+
import cloudkitty.storage.v2.opensearch
3536
import cloudkitty.utils
3637

3738
__all__ = ['list_opts']
@@ -70,6 +71,8 @@
7071
cloudkitty.storage.v2.influx.influx_storage_opts))),
7172
('storage_elasticsearch', list(itertools.chain(
7273
cloudkitty.storage.v2.elasticsearch.elasticsearch_storage_opts))),
74+
('storage_opensearch', list(itertools.chain(
75+
cloudkitty.storage.v2.opensearch.opensearch_storage_opts))),
7376
('storage_gnocchi', list(itertools.chain(
7477
cloudkitty.storage.v1.hybrid.backends.gnocchi.gnocchi_storage_opts))),
7578
(None, list(itertools.chain(
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
# Copyright 2019 Objectif Libre
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
#
15+
import datetime
16+
17+
from oslo_config import cfg
18+
from oslo_log import log
19+
20+
from cloudkitty import dataframe
21+
from cloudkitty.storage import v2 as v2_storage
22+
from cloudkitty.storage.v2.opensearch import client as os_client
23+
from cloudkitty.storage.v2.opensearch import exceptions
24+
from cloudkitty.utils import tz as tzutils
25+
26+
LOG = log.getLogger(__name__)
27+
28+
CONF = cfg.CONF
29+
30+
OPENSEARCH_STORAGE_GROUP = 'storage_opensearch'
31+
32+
opensearch_storage_opts = [
33+
cfg.StrOpt(
34+
'host',
35+
help='OpenSearch host, along with port and protocol. '
36+
'Defaults to http://localhost:9200',
37+
default='http://localhost:9200'),
38+
cfg.StrOpt(
39+
'index_name',
40+
help='OpenSearch index to use. Defaults to "cloudkitty".',
41+
default='cloudkitty'),
42+
cfg.BoolOpt('insecure',
43+
help='Set to true to allow insecure HTTPS '
44+
'connections to OpenSearch',
45+
default=False),
46+
cfg.StrOpt('cafile',
47+
help='Path of the CA certificate to trust for '
48+
'HTTPS connections.',
49+
default=None),
50+
cfg.IntOpt('scroll_duration',
51+
help="Duration (in seconds) for which the OpenSearch scroll "
52+
"contexts should be kept alive.",
53+
advanced=True,
54+
default=30, min=0, max=300),
55+
]
56+
57+
CONF.register_opts(opensearch_storage_opts, OPENSEARCH_STORAGE_GROUP)
58+
59+
CLOUDKITTY_INDEX_MAPPING = {
60+
"dynamic_templates": [
61+
{
62+
"strings_as_keywords": {
63+
"match_mapping_type": "string",
64+
"mapping": {
65+
"type": "keyword"
66+
}
67+
}
68+
}
69+
],
70+
"dynamic": False,
71+
"properties": {
72+
"start": {"type": "date"},
73+
"end": {"type": "date"},
74+
"type": {"type": "keyword"},
75+
"unit": {"type": "keyword"},
76+
"qty": {"type": "double"},
77+
"price": {"type": "double"},
78+
"groupby": {"dynamic": True, "type": "object"},
79+
"metadata": {"dynamic": True, "type": "object"}
80+
},
81+
}
82+
83+
84+
class OpenSearchStorage(v2_storage.BaseStorage):
85+
86+
def __init__(self, *args, **kwargs):
87+
super(OpenSearchStorage, self).__init__(*args, **kwargs)
88+
89+
LOG.warning('The OpenSearch storage driver is experimental. '
90+
'DO NOT USE IT IN PRODUCTION.')
91+
92+
verify = not CONF.storage_opensearch.insecure
93+
if verify and CONF.storage_opensearch.cafile:
94+
verify = CONF.storage_opensearch.cafile
95+
96+
self._conn = os_client.OpenSearchClient(
97+
CONF.storage_opensearch.host,
98+
CONF.storage_opensearch.index_name,
99+
"_doc",
100+
verify=verify)
101+
102+
def init(self):
103+
r = self._conn.get_index()
104+
if r.status_code != 200:
105+
raise exceptions.IndexDoesNotExist(
106+
CONF.storage_opensearch.index_name)
107+
LOG.info('Creating mapping "_doc" on index {}...'.format(
108+
CONF.storage_opensearch.index_name))
109+
self._conn.post_mapping(CLOUDKITTY_INDEX_MAPPING)
110+
LOG.info('Mapping created.')
111+
112+
def push(self, dataframes, scope_id=None):
113+
for frame in dataframes:
114+
for type_, point in frame.iterpoints():
115+
start, end = self._local_to_utc(frame.start, frame.end)
116+
self._conn.add_point(point, type_, start, end)
117+
118+
self._conn.commit()
119+
120+
@staticmethod
121+
def _local_to_utc(*args):
122+
return [tzutils.local_to_utc(arg) for arg in args]
123+
124+
@staticmethod
125+
def _doc_to_datapoint(doc):
126+
return dataframe.DataPoint(
127+
doc['unit'],
128+
doc['qty'],
129+
doc['price'],
130+
doc['groupby'],
131+
doc['metadata'],
132+
)
133+
134+
def _build_dataframes(self, docs):
135+
dataframes = {}
136+
nb_points = 0
137+
for doc in docs:
138+
source = doc['_source']
139+
start = tzutils.dt_from_iso(source['start'])
140+
end = tzutils.dt_from_iso(source['end'])
141+
key = (start, end)
142+
if key not in dataframes.keys():
143+
dataframes[key] = dataframe.DataFrame(start=start, end=end)
144+
dataframes[key].add_point(
145+
self._doc_to_datapoint(source), source['type'])
146+
nb_points += 1
147+
148+
output = list(dataframes.values())
149+
output.sort(key=lambda frame: (frame.start, frame.end))
150+
return output
151+
152+
def retrieve(self, begin=None, end=None,
153+
filters=None,
154+
metric_types=None,
155+
offset=0, limit=1000, paginate=True):
156+
begin, end = self._local_to_utc(begin or tzutils.get_month_start(),
157+
end or tzutils.get_next_month())
158+
total, docs = self._conn.retrieve(
159+
begin, end, filters, metric_types,
160+
offset=offset, limit=limit, paginate=paginate)
161+
return {
162+
'total': total,
163+
'dataframes': self._build_dataframes(docs),
164+
}
165+
166+
def delete(self, begin=None, end=None, filters=None):
167+
self._conn.delete_by_query(begin, end, filters)
168+
169+
@staticmethod
170+
def _normalize_time(t):
171+
if isinstance(t, datetime.datetime):
172+
return tzutils.utc_to_local(t)
173+
return tzutils.dt_from_iso(t)
174+
175+
def _doc_to_total_result(self, doc, start, end):
176+
output = {
177+
'begin': self._normalize_time(doc.get('start', start)),
178+
'end': self._normalize_time(doc.get('end', end)),
179+
'qty': doc['sum_qty']['value'],
180+
'rate': doc['sum_price']['value'],
181+
}
182+
# Means we had a composite aggregation
183+
if 'key' in doc.keys():
184+
for key, value in doc['key'].items():
185+
if key == 'begin' or key == 'end':
186+
# OpenSearch returns ts in milliseconds
187+
value = tzutils.dt_from_ts(value // 1000)
188+
output[key] = value
189+
return output
190+
191+
def total(self, groupby=None, begin=None, end=None, metric_types=None,
192+
filters=None, custom_fields=None, offset=0, limit=1000,
193+
paginate=True):
194+
begin, end = self._local_to_utc(begin or tzutils.get_month_start(),
195+
end or tzutils.get_next_month())
196+
197+
total, docs = self._conn.total(begin, end, metric_types, filters,
198+
groupby, custom_fields=custom_fields,
199+
offset=offset, limit=limit,
200+
paginate=paginate)
201+
return {
202+
'total': total,
203+
'results': [self._doc_to_total_result(doc, begin, end)
204+
for doc in docs],
205+
}

0 commit comments

Comments
 (0)