Skip to content
This repository was archived by the owner on Dec 5, 2025. It is now read-only.

Commit c36c424

Browse files
author
Samuel Hassine
committed
[client] Fix synchronization between platforms
1 parent 63540fb commit c36c424

File tree

8 files changed

+108
-43
lines changed

8 files changed

+108
-43
lines changed

pycti/connector/opencti_connector_helper.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,18 +219,23 @@ def run(self):
219219

220220

221221
class ListenStream(threading.Thread):
222-
def __init__(self, helper, callback, url, token, verify_ssl):
222+
def __init__(self, helper, callback, url, token, verify_ssl, start_timestamp):
223223
threading.Thread.__init__(self)
224224
self.helper = helper
225225
self.callback = callback
226226
self.url = url
227227
self.token = token
228228
self.verify_ssl = verify_ssl
229+
self.start_timestamp = start_timestamp
229230

230231
def run(self):
231232
current_state = self.helper.get_state()
232233
if current_state is None:
233-
current_state = {"connectorLastEventId": "-"}
234+
current_state = {
235+
"connectorLastEventId": str(self.start_timestamp) + "-0"
236+
if self.start_timestamp is not None and len(self.start_timestamp) > 0
237+
else "-"
238+
}
234239
self.helper.set_state(current_state)
235240

236241
# If URL and token are provided, likely consuming a remote stream
@@ -462,13 +467,16 @@ def listen_stream(
462467
url=None,
463468
token=None,
464469
verify_ssl=None,
470+
start_timestamp=None,
465471
) -> None:
466472
"""listen for messages and register callback function
467473
468474
:param message_callback: callback function to process messages
469475
"""
470476

471-
listen_stream = ListenStream(self, message_callback, url, token, verify_ssl)
477+
listen_stream = ListenStream(
478+
self, message_callback, url, token, verify_ssl, start_timestamp
479+
)
472480
listen_stream.start()
473481

474482
def get_opencti_url(self):
@@ -515,11 +523,12 @@ def send_stix2_bundle(self, bundle, **kwargs) -> list:
515523
work_id = kwargs.get("work_id", self.work_id)
516524
entities_types = kwargs.get("entities_types", None)
517525
update = kwargs.get("update", False)
526+
event_version = kwargs.get("event_version", None)
518527

519528
if entities_types is None:
520529
entities_types = []
521530
stix2_splitter = OpenCTIStix2Splitter()
522-
bundles = stix2_splitter.split_bundle(bundle)
531+
bundles = stix2_splitter.split_bundle(bundle, True, event_version)
523532
if len(bundles) == 0:
524533
raise ValueError("Nothing to import")
525534
if work_id is not None:

pycti/entities/opencti_stix_core_relationship.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,10 @@ def update_field(self, **kwargs):
535535
id = kwargs.get("id", None)
536536
key = kwargs.get("key", None)
537537
value = kwargs.get("value", None)
538+
if isinstance(value, list):
539+
value = [str(v) for v in value]
540+
else:
541+
value = str(value)
538542
if id is not None and key is not None and value is not None:
539543
self.opencti.log(
540544
"info",

pycti/entities/opencti_stix_cyber_observable.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1078,7 +1078,10 @@ def update_field(self, **kwargs):
10781078
key = kwargs.get("key", None)
10791079
value = kwargs.get("value", None)
10801080
operation = kwargs.get("operation", "replace")
1081-
1081+
if isinstance(value, list):
1082+
value = [str(v) for v in value]
1083+
else:
1084+
value = str(value)
10821085
if id is not None and key is not None and value is not None:
10831086
self.opencti.log(
10841087
"info", "Updating Stix-Observable {" + id + "} field {" + key + "}."

pycti/entities/opencti_stix_domain_object.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,10 @@ def update_field(self, **kwargs):
577577
key = kwargs.get("key", None)
578578
value = kwargs.get("value", None)
579579
operation = kwargs.get("operation", "replace")
580-
580+
if isinstance(value, list):
581+
value = [str(v) for v in value]
582+
else:
583+
value = str(value)
581584
if id is not None and key is not None and value is not None:
582585
self.opencti.log(
583586
"info", "Updating Stix-Domain-Object {" + id + "} field {" + key + "}."

pycti/utils/opencti_stix2.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1490,9 +1490,14 @@ def import_bundle(self, stix_bundle, update=False, types=None) -> List:
14901490
raise ValueError("JSON data type is not a STIX2 bundle")
14911491
if "objects" not in stix_bundle or len(stix_bundle["objects"]) == 0:
14921492
raise ValueError("JSON data objects is empty")
1493+
event_version = (
1494+
stix_bundle["x_opencti_event_version"]
1495+
if "x_opencti_event_version" in stix_bundle
1496+
else None
1497+
)
14931498

14941499
stix2_splitter = OpenCTIStix2Splitter()
1495-
bundles = stix2_splitter.split_bundle(stix_bundle, False)
1500+
bundles = stix2_splitter.split_bundle(stix_bundle, False, event_version)
14961501
# Import every elements in a specific order
14971502
imported_elements = []
14981503

@@ -1503,10 +1508,12 @@ def import_bundle(self, stix_bundle, update=False, types=None) -> List:
15031508
if bundle["x_opencti_event_version"] == "1":
15041509
if "x_data_update" in item:
15051510
self.stix2_update.process_update_v1(item)
1511+
continue
15061512
elif bundle["x_opencti_event_version"] == "2":
1507-
if "x_opencti_patch":
1513+
if "x_opencti_patch" in item:
15081514
self.stix2_update.process_update_v2(item)
1509-
elif item["type"] == "relationship":
1515+
continue
1516+
if item["type"] == "relationship":
15101517
self.import_relationship(item, update, types)
15111518
elif item["type"] == "sighting":
15121519
# Resolve the to

pycti/utils/opencti_stix2_splitter.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def enlist_element(self, item_id, raw_data):
3535
self.cache_index[item_id] = item # Put in cache
3636
return nb_deps
3737

38-
def split_bundle(self, bundle, use_json=True) -> list:
38+
def split_bundle(self, bundle, use_json=True, event_version=None) -> list:
3939
"""splits a valid stix2 bundle into a list of bundles
4040
:param bundle: valid stix2 bundle
4141
:type bundle:
@@ -70,11 +70,11 @@ def by_dep_size(elem):
7070

7171
self.elements.sort(key=by_dep_size)
7272
for entity in self.elements:
73-
bundles.append(self.stix2_create_bundle([entity], use_json))
73+
bundles.append(self.stix2_create_bundle([entity], use_json, event_version))
7474
return bundles
7575

7676
@staticmethod
77-
def stix2_create_bundle(items, use_json):
77+
def stix2_create_bundle(items, use_json, event_version=None):
7878
"""create a stix2 bundle with items
7979
8080
:param items: valid stix2 items
@@ -91,4 +91,6 @@ def stix2_create_bundle(items, use_json):
9191
"spec_version": "2.1",
9292
"objects": items,
9393
}
94+
if event_version is not None:
95+
bundle["x_opencti_event_version"] = event_version
9496
return json.dumps(bundle) if use_json else bundle

0 commit comments

Comments
 (0)