Skip to content
This repository was archived by the owner on Dec 5, 2025. It is now read-only.

Commit 8ef3069

Browse files
[client] Connector queuing auto backpressure - Scheduler (#6325) (#698)
Co-authored-by: Helene Nguyen <[email protected]>
1 parent 264b5de commit 8ef3069

File tree

4 files changed

+493
-18
lines changed

4 files changed

+493
-18
lines changed

pycti/api/opencti_api_connector.py

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,43 @@ class OpenCTIApiConnector:
1010
def __init__(self, api):
1111
self.api = api
1212

13+
def read(self, connector_id: str) -> Dict:
14+
"""Reading the connector and its details
15+
16+
:return: return all the connector details
17+
:rtype: dict
18+
"""
19+
self.api.app_logger.info("[INFO] Getting connector details ...")
20+
query = """
21+
query GetConnector($id: String!) {
22+
connector(id: $id) {
23+
id
24+
name
25+
active
26+
auto
27+
only_contextual
28+
connector_type
29+
connector_scope
30+
connector_state
31+
connector_queue_details {
32+
messages_number
33+
messages_size
34+
}
35+
updated_at
36+
created_at
37+
config {
38+
listen
39+
listen_exchange
40+
push
41+
push_exchange
42+
}
43+
built_in
44+
}
45+
}
46+
"""
47+
result = self.api.query(query, {"id": connector_id})
48+
return result["data"]["connector"]
49+
1350
def list(self) -> Dict:
1451
"""list available connectors
1552
@@ -41,27 +78,44 @@ def list(self) -> Dict:
4178
result = self.api.query(query)
4279
return result["data"]["connectorsForWorker"]
4380

44-
def ping(self, connector_id: str, connector_state: Any) -> Dict:
81+
def ping(
82+
self, connector_id: str, connector_state: Any, connector_info: Dict
83+
) -> Dict:
4584
"""pings a connector by id and state
4685
4786
:param connector_id: the connectors id
4887
:type connector_id: str
4988
:param connector_state: state for the connector
5089
:type connector_state:
90+
:param connector_info: all details connector
91+
:type connector_info: Dict
5192
:return: the response pingConnector data dict
5293
:rtype: dict
5394
"""
5495

5596
query = """
56-
mutation PingConnector($id: ID!, $state: String) {
57-
pingConnector(id: $id, state: $state) {
97+
mutation PingConnector($id: ID!, $state: String, $connectorInfo: ConnectorInfoInput ) {
98+
pingConnector(id: $id, state: $state, connectorInfo: $connectorInfo) {
5899
id
59100
connector_state
101+
connector_info {
102+
run_and_terminate
103+
buffering
104+
queue_threshold
105+
queue_messages_size
106+
next_run_datetime
107+
last_run_datetime
108+
}
60109
}
61110
}
62111
"""
63112
result = self.api.query(
64-
query, {"id": connector_id, "state": json.dumps(connector_state)}
113+
query,
114+
{
115+
"id": connector_id,
116+
"state": json.dumps(connector_state),
117+
"connectorInfo": connector_info,
118+
},
65119
)
66120
return result["data"]["pingConnector"]
67121

0 commit comments

Comments
 (0)