Skip to content

Commit d19ae5d

Browse files
authored
Merge pull request #6 from actinia-org/send_job
Send PC from cloudevent to actinia
2 parents f49a779 + e9355f7 commit d19ae5d

File tree

11 files changed

+199
-91
lines changed

11 files changed

+199
-91
lines changed

.github/workflows/test.yml

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,16 @@ jobs:
3535
integration-tests:
3636
runs-on: ubuntu-latest
3737
steps:
38-
- name: Checkout
39-
uses: actions/checkout@v4
40-
# with:
41-
# path: "."
42-
- name: Set up Docker Buildx
43-
uses: docker/setup-buildx-action@v3
44-
- name: Replace run integration test command
45-
run: |
46-
sed -i "s+# RUN make test+RUN make integrationtest+g" docker/Dockerfile
47-
- name: Integration tests of actinia-cloudevent-plugin
48-
id: docker_build
49-
uses: docker/build-push-action@v6
50-
with:
51-
push: false
52-
tags: actinia-cloudevent-plugin-test:alpine
53-
context: .
54-
file: docker/Dockerfile
55-
no-cache: true
56-
# pull: true
38+
- uses: actions/checkout@v4
39+
- name: Start containers
40+
run: docker compose -f "docker/docker-compose.yml" up -d --build
41+
- name: List running docker
42+
run: docker ps
43+
- name: Docker logs actinia-cloudevent
44+
run: docker logs docker-actinia-cloudevent-1
45+
- name: Docker logs actinia-core
46+
run: docker logs docker-actinia-core-1
47+
- name: Run integration test
48+
run: docker exec -t docker-actinia-cloudevent-1 make integrationtest
49+
- name: Stop containers
50+
run: docker compose -f "docker/docker-compose.yml" down

config/mount/sample.ini

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
[ACTINIA]
2+
processing_base_url = http://actinia-core:8088/api/v3
3+
use_actinia_modules = True
4+
user = actinia-gdi
5+
password = actinia-gdi
6+
17
[EVENTRECEIVER]
28
url = http://localhost:3000/
39

docker/docker-compose.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,17 @@ services:
1212
ports:
1313
- "5000:5000"
1414
# network_mode: "host"
15+
16+
actinia-core:
17+
image: mundialis/actinia:2.12.1
18+
depends_on:
19+
- valkey
20+
21+
valkey:
22+
image: valkey/valkey:8.1-alpine
23+
command: [
24+
"sh",
25+
"-c",
26+
'docker-entrypoint.sh
27+
--requirepass pass'
28+
]

ruff.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,5 @@ lint.ignore = ["ANN001", "ANN201", "ANN202", "E501", "FA102", "N802", "N816", "P
2222
[lint.per-file-ignores]
2323
"tests/testsuite.py" = [ "PLC0415",]
2424
"src/actinia_cloudevent_plugin/resources/logging.py" = ["A005",]
25-
"src/actinia_cloudevent_plugin/resources/config.py" = ["SIM102",]
25+
"src/actinia_cloudevent_plugin/resources/config.py" = ["SIM102","S105"]
2626
"tests/integrationtests/test_cloudevent.py" = ["PLR2004",]

src/actinia_cloudevent_plugin/api/cloudevent.py renamed to src/actinia_cloudevent_plugin/api/cloudevents.py

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828

2929
from actinia_cloudevent_plugin.apidocs import cloudevent
3030
from actinia_cloudevent_plugin.core.cloudevents import (
31-
cloud_event_to_process_chain,
3231
receive_cloud_event,
3332
send_binary_cloud_event,
3433
# send_structured_cloud_event,
34+
start_actinia_job,
3535
)
3636
from actinia_cloudevent_plugin.model.response_models import (
3737
SimpleStatusCodeResponseModel,
@@ -70,33 +70,32 @@ def post(self) -> SimpleStatusCodeResponseModel:
7070
# Transform postbody to cloudevent
7171
event_received = receive_cloud_event()
7272
# With received process chain start actinia process + return cloudevent
73-
actinia_job = cloud_event_to_process_chain(event_received)
74-
# URL to which the generated cloudevent is sent
75-
url = EVENTRECEIVER.url
76-
# TODO: binary or structured cloud event?
77-
# From https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#message
78-
# A "structured-mode message" is one where the entire event (attributes and data)
79-
# are encoded in the message body, according to a specific event format.
80-
# A "binary-mode message" is one where the event data is stored in the message body,
81-
# and event attributes are stored as part of message metadata.
82-
# Often, binary mode is used when the producer of the CloudEvent wishes to add the
83-
# CloudEvent's metadata to an existing event without impacting the message's body.
84-
# In most cases a CloudEvent encoded as a binary-mode message will not break an
85-
# existing receiver's processing of the event because the message's metadata
86-
# typically allows for extension attributes.
87-
# In other words, a binary formatted CloudEvent would work for both
88-
# a CloudEvents enabled receiver as well as one that is unaware of CloudEvents.
73+
actinia_resp = start_actinia_job(event_received)
74+
queue_name = actinia_resp["queue"]
75+
8976
try:
90-
event_returned = send_binary_cloud_event(
77+
# TODO: Send event to JobSink
78+
# TODO: Configure JobSink URL
79+
# url = TODO
80+
# new_event = send_binary_cloud_event(
81+
# event_received,
82+
# queue_name,
83+
# url,
84+
# )
85+
86+
# Send event to configured broker
87+
# TODO: binary or structured cloud event?
88+
url = EVENTRECEIVER.url
89+
new_event = send_binary_cloud_event(
9190
event_received,
92-
actinia_job,
91+
actinia_resp,
9392
url,
9493
)
9594
return SimpleStatusCodeResponseModel(
9695
status=204,
9796
message=self.msg.replace("<EVENT1>", event_received["id"])
98-
.replace("<EVENT2>", event_returned["id"])
99-
.replace("<ACTINIA_JOB>", actinia_job),
97+
.replace("<EVENT2>", new_event["id"])
98+
.replace("<ACTINIA_JOB>", queue_name),
10099
)
101100
except ConnectionError as e:
102101
return f"Connection ERROR when returning cloudevent: {e}"

src/actinia_cloudevent_plugin/core/cloudevents.py

Lines changed: 109 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,15 @@
2323
__maintainer__ = "mundialis GmbH & Co. KG"
2424

2525

26+
import json
27+
2628
import requests
2729
from cloudevents.conversion import to_binary, to_structured
2830
from cloudevents.http import CloudEvent, from_http
2931
from flask import request
32+
from requests.auth import HTTPBasicAuth
33+
34+
from actinia_cloudevent_plugin.resources.config import ACTINIA, EVENTRECEIVER
3035

3136

3237
def receive_cloud_event():
@@ -43,64 +48,122 @@ def receive_cloud_event():
4348
return event
4449

4550

46-
def cloud_event_to_process_chain(event) -> str:
47-
"""Return queue name for process chain of event."""
48-
# (Remove ruff-exception, when pc variable used)
49-
pc = event.get_data()["list"][0] # noqa: F841
50-
# !! TODO !!: pc to job
51+
def start_actinia_job(event) -> str:
52+
"""Return actinia response for process chain of event."""
53+
pc = event.get_data()
5154
# NOTE: as standalone app -> consider for queue name creation
52-
# HTTP POST pc to actinia-module plugin processing endpoint
53-
# # # include an identifier for grouping cloudevents of same actinia process (?)
54-
# # # (e.g. new metadata field "queue_name", or within data, or use existign id)
55-
# -> actinia core returns resource-url, including resource_id (and queue name)
56-
# (queuename = xx_<resource_id>; if configured accordingly within actinia -> each job own queue)
57-
# via knative jobsink: start actinia worker (with queue name)
58-
# (https://knative.dev/docs/eventing/sinks/job-sink/#usage)
59-
# e.g. HTTP POST with queue name
60-
# kubectl run curl --image=curlimages/curl --rm=true --restart=Never -ti -- -X POST -v \
61-
# -H "content-type: application/json" \
62-
# -H "ce-specversion: 1.0" \
63-
# -H "ce-source: my/curl/command" \
64-
# -H "ce-type: my.demo.event" \
65-
# -H "ce-id: 123" \
66-
# -d '{"details":"queuename"}' \
67-
# http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger
68-
return "<queue_name>_<resource_id>" # queue name and resource id
69-
70-
71-
def send_binary_cloud_event(event, actinia_job, url):
72-
"""Return posted binary event with actinia_job."""
73-
attributes = {
74-
"specversion": event["specversion"],
75-
"source": "/actinia-cloudevent-plugin",
76-
"type": "com.mundialis.actinia.process.started",
77-
"subject": event["subject"],
78-
"datacontenttype": "application/json",
79-
}
80-
data = {"actinia_job": actinia_job}
8155

82-
event = CloudEvent(attributes, data)
83-
headers, body = to_binary(event)
84-
# send event
85-
requests.post(url, headers=headers, data=body)
86-
87-
return event
56+
# TODO: Define ce attribute for possible mapset.
57+
# Also in actiniaproject divided by "/" or "."?
58+
project = event.get_attributes().get("actiniaproject")
59+
mapset = None
60+
if "." in project:
61+
project = project.split(".")[0]
62+
mapset = project.split(".")[1]
63+
64+
url = f"{ACTINIA.processing_base_url}/projects/{project}/"
65+
if not mapset:
66+
# emphemeral processing
67+
if ACTINIA.use_actinia_modules:
68+
url += "processing_export"
69+
else:
70+
url += "processing_async_export"
71+
# persistent processing
72+
elif ACTINIA.use_actinia_modules:
73+
url += f"mapsets/{mapset}/processing"
74+
else:
75+
url += f"mapsets/{mapset}/processing_async/"
76+
77+
postkwargs = dict()
78+
postkwargs["headers"] = {"content-type": "application/json; charset=utf-8"}
79+
postkwargs["auth"] = HTTPBasicAuth(ACTINIA.user, ACTINIA.password)
80+
postkwargs["data"] = json.dumps(pc)
81+
82+
resp = requests.post(
83+
url,
84+
**postkwargs,
85+
)
86+
87+
# Part of resp:
88+
# 'message' = 'Resource accepted'
89+
# 'queue' = 'job_queue_resource_id-cddae7bb-b4fa-4249-aec4-2a646946ff36'
90+
# 'resource_id' = 'resource_id-cddae7bb-b4fa-4249-aec4-2a646946ff36'
91+
# 'status' = 'accepted'
92+
# 'urls' = {
93+
# 'resources': [],
94+
# 'status':
95+
# 'http://actinia-dev:8088/api/v3/resources/actinia-gdi/
96+
# resource_id-cddae7bb-b4fa-4249-aec4-2a646946ff36'}
97+
98+
return json.loads(resp.text)
99+
100+
101+
def send_binary_cloud_event(event, queue_name, url):
102+
"""Return posted binary event with actinia_job."""
103+
return send_cloud_event(
104+
mode="binary",
105+
version=event["specversion"],
106+
cetype="com.mundialis.actinia.process.startworker",
107+
subject=event["subject"],
108+
actiniaqueuename=queue_name,
109+
url=url,
110+
)
88111

89112

90113
def send_structured_cloud_event(event, actinia_job, url):
91114
"""Return posted structured event with actinia_job."""
115+
# TODO: adjust to queue name
116+
return send_cloud_event(
117+
mode="structured",
118+
version=event["specversion"],
119+
cetype="com.mundialis.actinia.process.started",
120+
subject=event["subject"],
121+
data={"actinia_job": actinia_job},
122+
url=url,
123+
)
124+
125+
126+
def send_cloud_event(
127+
mode="binary",
128+
version="1.0",
129+
cetype="com.mundialis.actinia.process.started",
130+
subject="nc_spm_08/PERMANENT",
131+
actiniaqueuename=None,
132+
data="{}",
133+
url=None,
134+
):
135+
"""Post event and return it."""
136+
if url is None:
137+
url = EVENTRECEIVER.url
138+
92139
attributes = {
93-
"specversion": event["specversion"],
140+
"specversion": version,
94141
"source": "/actinia-cloudevent-plugin",
95-
"type": "com.mundialis.actinia.process.started",
96-
"subject": event["subject"],
142+
"type": cetype,
143+
"subject": subject,
97144
"datacontenttype": "application/json",
145+
"actiniaqueuename": actiniaqueuename,
98146
}
99-
data = {"actinia_job": actinia_job}
100147

101148
event = CloudEvent(attributes, data)
102-
headers, body = to_structured(event)
103-
# send event
149+
150+
# From https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#message
151+
# A "structured-mode message" is one where the entire event (attributes and data)
152+
# are encoded in the message body, according to a specific event format.
153+
# A "binary-mode message" is one where the event data is stored in the message body,
154+
# and event attributes are stored as part of message metadata.
155+
# Often, binary mode is used when the producer of the CloudEvent wishes to add the
156+
# CloudEvent's metadata to an existing event without impacting the message's body.
157+
# In most cases a CloudEvent encoded as a binary-mode message will not break an
158+
# existing receiver's processing of the event because the message's metadata
159+
# typically allows for extension attributes.
160+
# In other words, a binary formatted CloudEvent would work for both
161+
# a CloudEvents enabled receiver as well as one that is unaware of CloudEvents.
162+
if mode == "binary":
163+
headers, body = to_binary(event)
164+
else:
165+
headers, body = to_structured(event)
166+
104167
requests.post(url, headers=headers, data=body)
105168

106169
return event

src/actinia_cloudevent_plugin/endpoints.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
from flask_restful_swagger_2 import Api
2727

28-
from actinia_cloudevent_plugin.api.cloudevent import Cloudevent
28+
from actinia_cloudevent_plugin.api.cloudevents import Cloudevent
2929
from actinia_cloudevent_plugin.api.hooks import Hooks
3030

3131

src/actinia_cloudevent_plugin/resources/config.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@
3434
GENERATED_CONFIG = DEFAULT_CONFIG_PATH + "/actinia-cloudevent-plugin.cfg"
3535

3636

37+
class ACTINIA:
38+
"""Default config for actinia processing."""
39+
40+
processing_base_url = "http://localhost:3000/"
41+
use_actinia_modules = True
42+
user = "actinia-gdi"
43+
password = "actinia-gdi"
44+
45+
3746
class EVENTRECEIVER:
3847
"""Default config for cloudevent receiver."""
3948

@@ -73,6 +82,23 @@ def __init__(self) -> None:
7382
config.write(configfile)
7483
print("Configuration written to " + GENERATED_CONFIG)
7584

85+
# ACTINIA
86+
if config.has_section("ACTINIA"):
87+
if config.has_option("ACTINIA", "processing_base_url"):
88+
ACTINIA.processing_base_url = config.get(
89+
"ACTINIA",
90+
"processing_base_url",
91+
)
92+
if config.has_option("ACTINIA", "use_actinia_modules"):
93+
ACTINIA.use_actinia_modules = config.getboolean(
94+
"ACTINIA",
95+
"use_actinia_modules",
96+
)
97+
if config.has_option("ACTINIA", "user"):
98+
ACTINIA.user = config.get("ACTINIA", "user")
99+
if config.has_option("ACTINIA", "password"):
100+
ACTINIA.password = config.get("ACTINIA", "password")
101+
76102
# LOGGING
77103
if config.has_section("LOGCONFIG"):
78104
if config.has_option("LOGCONFIG", "logfile"):

tests/examples/cloudevent_example.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
"source" : "/apps/ui",
55
"type":"com.mundialis.actinia.process.send",
66
"time":"2025-03-28T10:28:48Z",
7-
"subject" : "nc_spm_08/PERMANENT",
7+
"subject" : "myid",
8+
"actiniaproject":"nc_spm_08",
9+
"actiniaqueuename":"",
810
"datacontenttype":"application/json",
911
"data":{
1012
"list": [

tests/examples/cloudevent_example_return.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
"source" : "/actinia-cloudevent-plugin",
55
"type":"com.mundialis.actinia.process.started",
66
"time":"2025-03-28T10:30:48Z",
7-
"subject" : "nc_spm_08/PERMANENT",
7+
"subject" : "myid",
8+
"actiniaproject":"nc_spm_08",
9+
"actiniaqueuename":"",
810
"datacontenttype":"application/json",
911
"data":{
1012
"actinia_status_url": "todo, if needed"

0 commit comments

Comments
 (0)