Skip to content
This repository was archived by the owner on Dec 5, 2025. It is now read-only.

Commit ed445ca

Browse files
author
peritz
committed
Standardise admin entities to use kwargs approach
1 parent f88ebd9 commit ed445ca

File tree

4 files changed

+210
-119
lines changed

4 files changed

+210
-119
lines changed

pycti/entities/opencti_capability.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def list(self) -> List[Dict]:
2727
self.opencti.admin_logger.info("Listing capabilities")
2828
query = (
2929
"""
30-
query Capabilities {
30+
query CapabilityList {
3131
capabilities(first: 500) {
3232
edges {
3333
node {

pycti/entities/opencti_group.py

Lines changed: 110 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -148,16 +148,7 @@ def __init__(self, opencti):
148148
}
149149
"""
150150

151-
def list(self,
152-
first: int = 500,
153-
after: str = None,
154-
orderBy: str = None,
155-
orderMode: str = None,
156-
search: str = None,
157-
filters: dict = None,
158-
customAttributes: str = None,
159-
getAll: bool = False,
160-
withPagination: bool = False) -> List[Dict]:
151+
def list(self, **kwargs) -> List[Dict]:
161152
"""Lists groups based on a number of filters.
162153
163154
:param first: Retrieve this number of results. If 0
@@ -193,6 +184,16 @@ def list(self,
193184
:return: List of groups in dictionary representation.
194185
:rtype: list[dict]
195186
"""
187+
first = kwargs.get("first", 500)
188+
after = kwargs.get("after", None)
189+
orderBy = kwargs.get("orderBy", None)
190+
orderMode = kwargs.get("orderMode", None)
191+
search = kwargs.get("search", None)
192+
filters = kwargs.get("filters", None)
193+
customAttributes = kwargs.get("customAttributes", None)
194+
getAll = kwargs.get("getAll", False)
195+
withPagination = kwargs.get("withPagination", False)
196+
196197
if getAll:
197198
first = 100
198199

@@ -251,11 +252,7 @@ def list(self,
251252
return self.opencti.process_multiple(result["data"]["groups"],
252253
withPagination)
253254

254-
def read(self,
255-
id: str = None,
256-
filters: dict = None,
257-
search: str = None,
258-
customAttributes: str = None) -> dict:
255+
def read(self, **kwargs) -> Dict:
259256
"""Fetch a given group from OpenCTI
260257
261258
:param id: ID of the group to fetch
@@ -267,6 +264,10 @@ def read(self,
267264
:return: Representation of a group.
268265
:rtype: dict
269266
"""
267+
id = kwargs.get("id", None)
268+
filters = kwargs.get("filters", None)
269+
search = kwargs.get("search", None)
270+
customAttributes = kwargs.get("customAttributes", None)
270271
if id is not None:
271272
self.opencti.admin_logger.info(
272273
"Fetching group with ID", {"id": id})
@@ -293,15 +294,7 @@ def read(self,
293294
"[opencti_group] Missing parameters: id or filters")
294295
return None
295296

296-
def create(self,
297-
name: str,
298-
group_confidence_level: dict,
299-
description: str = None,
300-
default_assignation: bool = False,
301-
no_creators: bool = False,
302-
restrict_delete: bool = False,
303-
auto_new_marking: bool = False,
304-
customAttributes: str = None) -> dict:
297+
def create(self, **kwargs) -> dict:
305298
"""Create a group with required details
306299
307300
Groups can be configured after creation using other functions.
@@ -331,6 +324,21 @@ def create(self,
331324
:return: Representation of the group.
332325
:rtype: dict
333326
"""
327+
name = kwargs.get("name", None)
328+
group_confidence_level = kwargs.get("group_confidence_level", None)
329+
description = kwargs.get("description", None)
330+
default_assignation = kwargs.get("default_assignation", False)
331+
no_creators = kwargs.get("no_creators", False)
332+
restrict_delete = kwargs.get("restrict_delete", False)
333+
auto_new_marking = kwargs.get("auto_new_marking", False)
334+
customAttributes = kwargs.get("customAttributes", None)
335+
336+
if name is None or group_confidence_level is None:
337+
self.opencti.admin_logger.error(
338+
"[opencti_group] Missing parameters: name and "
339+
"group_confidence_level")
340+
return None
341+
334342
self.opencti.admin_logger.info(
335343
"Creating new group with parameters", {
336344
"name": name,
@@ -382,10 +390,7 @@ def delete(self, id: str):
382390
"""
383391
self.opencti.query(query, {"id": id})
384392

385-
def update_field(self,
386-
id: str,
387-
input: List[Dict],
388-
customAttributes: str = None) -> Dict:
393+
def update_field(self, **kwargs) -> Dict:
389394
"""Update a group using fieldPatch
390395
391396
:param id: ID of the group to update
@@ -397,6 +402,15 @@ def update_field(self,
397402
:return: Representation of a group
398403
:rtype: dict
399404
"""
405+
id = kwargs.get("id", None)
406+
input = kwargs.get("input", None)
407+
customAttributes = kwargs.get("customAttributes", None)
408+
409+
if id is None or input is None:
410+
self.opencti.admin_logger.error(
411+
"[opencti_group] Missing parameters: id and input")
412+
return None
413+
400414
self.opencti.admin_logger.info("Editing group with input", {
401415
"input": input})
402416
query = (
@@ -417,9 +431,7 @@ def update_field(self,
417431
return self.opencti.process_multiple_fields(
418432
result["data"]["groupEdit"]["fieldPatch"])
419433

420-
def add_member(self,
421-
id: str,
422-
user_id: str) -> dict:
434+
def add_member(self, **kwargs) -> dict:
423435
"""Add a member to a given group.
424436
425437
:param id: ID of the group to add a member to
@@ -429,6 +441,14 @@ def add_member(self,
429441
:return: Representation of the relationship
430442
:rtype: dict
431443
"""
444+
id = kwargs.get("id", None)
445+
user_id = kwargs.get("user_id", None)
446+
447+
if id is None or user_id is None:
448+
self.opencti.admin_logger.error(
449+
"[opencti_group] Missing parameters: id and user_id")
450+
return None
451+
432452
self.opencti.admin_logger.info(
433453
"Adding member to group", {"groupId": id, "userId": user_id}
434454
)
@@ -457,9 +477,7 @@ def add_member(self,
457477
return self.opencti.process_multiple_fields(
458478
result["data"]["groupEdit"]["relationAdd"])
459479

460-
def delete_member(self,
461-
id: str,
462-
user_id: str) -> dict:
480+
def delete_member(self, **kwargs) -> dict:
463481
"""Remove a given user from a group
464482
465483
:param id: ID to remove a user from
@@ -469,6 +487,14 @@ def delete_member(self,
469487
:return: Representation of the group after the member has been removed
470488
:rtype: dict
471489
"""
490+
id = kwargs.get("id", None)
491+
user_id = kwargs.get("user_id", None)
492+
493+
if id is None or user_id is None:
494+
self.opencti.admin_logger.error(
495+
"[opencti_group] Missing parameters: id and user_id")
496+
return None
497+
472498
self.opencti.admin_logger.info(
473499
"Removing member from group", {"groupId": id, "userId": user_id}
474500
)
@@ -489,9 +515,7 @@ def delete_member(self,
489515
return self.opencti.process_multiple_fields(
490516
result["data"]["groupEdit"]["relationDelete"])
491517

492-
def add_role(self,
493-
id: str,
494-
role_id: str) -> dict:
518+
def add_role(self, **kwargs) -> Dict:
495519
"""Add a role to a given group
496520
497521
:param id: ID to add a role to
@@ -501,6 +525,14 @@ def add_role(self,
501525
:return: Representation of the group after a role has been added
502526
:rtype: dict
503527
"""
528+
id = kwargs.get("id", None)
529+
role_id = kwargs.get("role_id", None)
530+
531+
if id is None or role_id is None:
532+
self.opencti.admin_logger.error(
533+
"[opencti_group] Missing parameters: id and role_id")
534+
return None
535+
504536
self.opencti.admin_logger.info(
505537
"Adding role to group", {"groupId": id, "roleId": role_id})
506538
query = (
@@ -527,9 +559,7 @@ def add_role(self,
527559
return self.opencti.process_multiple_fields(
528560
result["data"]["groupEdit"]["relationAdd"])
529561

530-
def delete_role(self,
531-
id: str,
532-
role_id: str) -> dict:
562+
def delete_role(self, **kwargs) -> Dict:
533563
"""Removes a role from a given group
534564
535565
:param id: ID to remove role from
@@ -539,6 +569,14 @@ def delete_role(self,
539569
:return: Representation of the group after role is removed
540570
:rtype: dict
541571
"""
572+
id = kwargs.get("id", None)
573+
role_id = kwargs.get("role_id", None)
574+
575+
if id is None or role_id is None:
576+
self.opencti.admin_logger.error(
577+
"[opencti_group] Missing parameters: id and role_id")
578+
return None
579+
542580
self.opencti.admin_logger.info(
543581
"Removing role from group", {"groupId": id, "roleId": role_id})
544582
query = (
@@ -558,10 +596,7 @@ def delete_role(self,
558596
return self.opencti.process_multiple_fields(
559597
result["data"]["groupEdit"]["relationDelete"])
560598

561-
def edit_default_marking(self,
562-
id: str,
563-
marking_ids: List[str],
564-
entity_type: str = "GLOBAL") -> dict:
599+
def edit_default_marking(self, **kwargs) -> Dict:
565600
"""Adds a default marking to the group.
566601
567602
:param id: ID of the group.
@@ -576,6 +611,15 @@ def edit_default_marking(self,
576611
:return: Group after adding the default marking.
577612
:rtype: dict
578613
"""
614+
id = kwargs.get("id", None)
615+
marking_ids = kwargs.get("marking_ids", None)
616+
entity_type = kwargs.get("entity_type", "GLOBAL")
617+
618+
if id is None or marking_ids is None:
619+
self.opencti.admin_logger.error(
620+
"[opencti_group] Missing parameters: id and marking_ids")
621+
return None
622+
579623
self.opencti.admin_logger.info(
580624
"Setting default markings for entity on group", {
581625
"markings": marking_ids,
@@ -607,18 +651,24 @@ def edit_default_marking(self,
607651
return self.opencti.process_multiple_fields(
608652
result["data"]["groupEdit"]["editDefaultMarking"])
609653

610-
def add_allowed_marking(self,
611-
id: str,
612-
marking_id: str) -> dict:
654+
def add_allowed_marking(self, **kwargs) -> Dict:
613655
"""Allow a group to access a marking
614656
615657
:param id: ID of group to authorise
616658
:type id: str
617-
:param marking: ID of marking to authorise
618-
:type marking: str
659+
:param marking_id: ID of marking to authorise
660+
:type marking_id: str
619661
:return: Relationship from the group to the marking definition
620662
:rtype: dict
621663
"""
664+
id = kwargs.get("id", None)
665+
marking_id = kwargs.get("marking_id", None)
666+
667+
if id is None or marking_id is None:
668+
self.opencti.admin_logger.error(
669+
"[opencti_group] Missing parameters: id and marking_id")
670+
return None
671+
622672
self.opencti.admin_logger.info(
623673
"Granting group access to marking definition", {
624674
"groupId": id, "markingId": marking_id
@@ -668,18 +718,24 @@ def add_allowed_marking(self,
668718
return self.opencti.process_multiple_fields(
669719
result["data"]["groupEdit"]["relationAdd"])
670720

671-
def delete_allowed_marking(self,
672-
id: str,
673-
marking_id: str) -> dict:
721+
def delete_allowed_marking(self, **kwargs) -> Dict:
674722
"""Removes access to a marking for a group
675723
676724
:param id: ID of group to forbid
677725
:type id: str
678-
:param marking: ID of marking to deny
679-
:type marking: str
726+
:param marking_id: ID of marking to deny
727+
:type marking_id: str
680728
:return: Group after denying access to marking definition
681729
:rtype: dict
682730
"""
731+
id = kwargs.get("id", None)
732+
marking_id = kwargs.get("marking_id", None)
733+
734+
if id is None or marking_id is None:
735+
self.opencti.admin_logger.error(
736+
"[opencti_group] Missing parameters: id and marking_id")
737+
return None
738+
683739
self.opencti.admin_logger.info(
684740
"Forbidding group access to marking definition", {
685741
"groupId": id, "markingId": marking_id

pycti/entities/opencti_role.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def list(self, **kwargs) -> List[Dict]:
100100

101101
query = (
102102
"""
103-
query Roles($first: Int, $after: ID, $orderBy: RolesOrdering, $orderMode: OrderingMode, $search: String) {
103+
query RoleList($first: Int, $after: ID, $orderBy: RolesOrdering, $orderMode: OrderingMode, $search: String) {
104104
roles(first: $first, after: $after, orderBy: $orderBy, orderMode: $orderMode, search: $search) {
105105
edges {
106106
node {
@@ -177,7 +177,7 @@ def read(self, **kwargs) -> Dict:
177177
self.opencti.admin_logger.info("Reading role", {"id": id})
178178
query = (
179179
"""
180-
query Role($id: String!) {
180+
query RoleRead($id: String!) {
181181
role(id: $id) {
182182
"""
183183
+ (self.properties if customAttributes is None
@@ -250,7 +250,7 @@ def create(self, **kwargs) -> Dict:
250250
})
251251
query = (
252252
"""
253-
mutation RoleAdd($input: RoleAddInput!) {
253+
mutation RoleCreate($input: RoleAddInput!) {
254254
roleAdd(input: $input) {
255255
"""
256256
+ (self.properties if customAttributes is None
@@ -308,7 +308,7 @@ def update_field(self, **kwargs) -> Dict:
308308
})
309309
query = (
310310
"""
311-
mutation RoleFieldPatch($id: ID!, $input: [EditInput]!) {
311+
mutation RoleUpdate($id: ID!, $input: [EditInput]!) {
312312
roleEdit(id: $id) {
313313
fieldPatch(input: $input) {
314314
"""
@@ -405,7 +405,7 @@ def delete_capability(self, **kwargs) -> Dict:
405405
})
406406
query = (
407407
"""
408-
mutation RoleDelCapability($id: ID!, $toId: StixRef!) {
408+
mutation RoleDeleteCapability($id: ID!, $toId: StixRef!) {
409409
roleEdit(id: $id) {
410410
relationDelete(toId: $toId, relationship_type: "has-capability") {
411411
"""

0 commit comments

Comments
 (0)