Skip to content

Commit 47a99b5

Browse files
authored
Merge pull request #545 from GSA/add-opensearch
Add opensearch
2 parents a04fb0c + 4789347 commit 47a99b5

File tree

7 files changed

+1719
-665
lines changed

7 files changed

+1719
-665
lines changed

docker-compose.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,19 @@ services:
2525
interval: 5s
2626
timeout: 5s
2727
retries: 5
28+
opensearch:
29+
image: opensearchproject/opensearch:2.11.1
30+
environment:
31+
discovery.type: single-node
32+
ports:
33+
- "9200:9200"
34+
- "9600:9600"
35+
healthcheck:
36+
test: ['CMD-SHELL', 'curl -sk -u admin:admin https://localhost:9200/_cluster/health?wait_for_status=yellow\&timeout=1s | grep -q ''"timed_out":false''']
37+
interval: 5s
38+
timeout: 5s
39+
retries: 12
40+
start_period: 10s
2841
transformer:
2942
image: ghcr.io/gsa/mdtranslator:latest
3043
restart: always
@@ -45,11 +58,14 @@ services:
4558
depends_on:
4659
db:
4760
condition: service_healthy
61+
opensearch:
62+
condition: service_healthy
4863
transformer:
4964
condition: service_healthy
5065
volumes:
5166
- .:/app
5267
environment:
68+
OPENSEARCH_HOST: opensearch
5369
DATABASE_URI: postgresql+psycopg://${DATABASE_USER}:${DATABASE_PASSWORD}@db:${DATABASE_PORT}/${DATABASE_NAME}
5470
CF_SERVICE_USER: ${CF_SERVICE_USER}
5571
CF_SERVICE_AUTH: ${CF_SERVICE_AUTH}

harvester/harvest.py

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ class HarvestSource:
9999
internal_records: dict = field(default_factory=lambda: {}, repr=False)
100100

101101
deletions: set = field(default_factory=lambda: set(), repr=False)
102+
_opensearch: object = field(default=None, init=False, repr=False)
103+
_opensearch_initialized: bool = field(default=False, init=False, repr=False)
102104

103105
def __post_init__(self) -> None:
104106
self._db_interface: HarvesterDBInterface = db_interface
@@ -154,6 +156,27 @@ def records(self, value) -> None:
154156
def reporter(self):
155157
return self._reporter
156158

159+
@property
160+
def opensearch(self):
161+
if self._opensearch_initialized:
162+
return self._opensearch
163+
164+
self._opensearch_initialized = True
165+
opensearch_host = os.getenv("OPENSEARCH_HOST")
166+
if not opensearch_host:
167+
self._opensearch = None
168+
return None
169+
170+
try:
171+
from harvester.opensearch import OpenSearchInterface
172+
173+
self._opensearch = OpenSearchInterface.from_environment()
174+
except Exception as e:
175+
logger.exception("Failed to initialize OpenSearch client: %s", e)
176+
self._opensearch = None
177+
178+
return self._opensearch
179+
157180
@property
158181
def source_attrs(self) -> List:
159182
return self._source_attrs
@@ -1051,6 +1074,41 @@ def _dataset_payload(self, metadata: dict) -> dict:
10511074

10521075
return payload
10531076

1077+
def _index_dataset_in_opensearch(self, dataset) -> None:
1078+
client = self.harvest_source.opensearch
1079+
if client is None or dataset is None:
1080+
return
1081+
try:
1082+
succeeded, failed, errors = client.index_datasets([dataset])
1083+
if failed:
1084+
logger.error(
1085+
"OpenSearch indexing failed for dataset %s (slug %s): %s",
1086+
dataset.id,
1087+
dataset.slug,
1088+
errors,
1089+
)
1090+
except Exception as e:
1091+
logger.exception(
1092+
"OpenSearch indexing error for dataset %s (slug %s): %s",
1093+
dataset.id,
1094+
dataset.slug,
1095+
e,
1096+
)
1097+
1098+
def _delete_dataset_from_opensearch(self, dataset) -> None:
1099+
client = self.harvest_source.opensearch
1100+
if client is None or dataset is None:
1101+
return
1102+
try:
1103+
client.delete_dataset_by_id(dataset.id)
1104+
except Exception as e:
1105+
logger.exception(
1106+
"OpenSearch delete error for dataset %s (slug %s): %s",
1107+
dataset.id,
1108+
dataset.slug,
1109+
e,
1110+
)
1111+
10541112
def sync(self):
10551113
try:
10561114
if self.status == "error":
@@ -1068,13 +1126,21 @@ def sync(self):
10681126
if self.action in ("create", "update") and metadata is not None:
10691127
dataset_payload = self._dataset_payload(metadata)
10701128
if self.action == "create":
1071-
self._insert_dataset_with_unique_slug(dataset_payload)
1129+
dataset = self._insert_dataset_with_unique_slug(dataset_payload)
10721130
else:
1073-
self.harvest_source.db_interface.upsert_dataset(dataset_payload)
1131+
dataset = self.harvest_source.db_interface.upsert_dataset(
1132+
dataset_payload
1133+
)
1134+
self._index_dataset_in_opensearch(dataset)
10741135
elif self.action == "delete" and self.dataset_slug:
1075-
self.harvest_source.db_interface.delete_dataset_by_slug(
1136+
dataset = self.harvest_source.db_interface.get_dataset_by_slug(
10761137
self.dataset_slug
10771138
)
1139+
deleted = self.harvest_source.db_interface.delete_dataset_by_slug(
1140+
self.dataset_slug
1141+
)
1142+
if deleted:
1143+
self._delete_dataset_from_opensearch(dataset)
10781144

10791145
return True
10801146

@@ -1103,13 +1169,12 @@ def update_self_in_db(self) -> bool:
11031169
data,
11041170
)
11051171

1106-
def _insert_dataset_with_unique_slug(self, dataset_payload: dict) -> None:
1172+
def _insert_dataset_with_unique_slug(self, dataset_payload: dict) -> object:
11071173
"""Persist a dataset, retrying with a unique slug when needed."""
11081174

11091175
while True:
11101176
try:
1111-
self.harvest_source.db_interface.insert_dataset(dataset_payload)
1112-
return
1177+
return self.harvest_source.db_interface.insert_dataset(dataset_payload)
11131178
except IntegrityError as error:
11141179
if not self._is_slug_unique_violation(error):
11151180
raise

0 commit comments

Comments
 (0)