1
1
from typing import Iterable
2
2
3
- from ..metrics import Metric , Metrics
3
+ from ..metrics import (
4
+ INGEST_HOOKS_EXECUTED ,
5
+ NODES_UPSERTED ,
6
+ RELATIONSHIPS_UPSERTED ,
7
+ TIME_TO_LIVE_OPERATIONS ,
8
+ Metric ,
9
+ Metrics ,
10
+ )
4
11
from ..model import IngestionHook , Node , RelationshipWithNodes , TimeToLiveConfiguration
5
12
from .query_executor import (
6
13
OperationOnNodeIdentity ,
@@ -14,12 +21,45 @@ class QueryExecutorWithStatistics(QueryExecutor):
14
21
15
22
def __init__ (self , inner : QueryExecutor ) -> None :
16
23
self .inner = inner
24
+ self .node_metric_by_type : dict [str , Metric ] = {}
25
+ self .relationship_metric_by_relationship_type : dict [str , Metric ] = {}
26
+
27
+ def _get_or_create_node_metric (self , node_type : str ) -> Metric :
28
+ """Get or create a metric for a node type."""
29
+ if node_type not in self .node_metric_by_type :
30
+ self .node_metric_by_type [node_type ] = Metric (
31
+ f"nodes_upserted_by_type_{ node_type } " ,
32
+ f"Number of nodes upserted by type { node_type } " ,
33
+ )
34
+ return self .node_metric_by_type [node_type ]
35
+
36
+ def _get_or_create_relationship_metric (self , relationship_type : str ) -> Metric :
37
+ """Get or create a metric for a relationship type."""
38
+ if relationship_type not in self .relationship_metric_by_relationship_type :
39
+ self .relationship_metric_by_relationship_type [relationship_type ] = Metric (
40
+ f"relationships_upserted_by_relationship_type_{ relationship_type } " ,
41
+ f"Number of relationships upserted by relationship type { relationship_type } " ,
42
+ )
43
+ return self .relationship_metric_by_relationship_type [relationship_type ]
17
44
18
45
async def upsert_nodes_in_bulk_with_same_operation (
19
46
self , operation : OperationOnNodeIdentity , nodes : Iterable [Node ]
20
47
):
21
48
await self .inner .upsert_nodes_in_bulk_with_same_operation (operation , nodes )
22
- Metrics .get ().increment (Metric .NODES_UPSERTED , len (nodes ))
49
+
50
+ # Tally node types
51
+ node_type_counts : dict [str , int ] = {}
52
+ total_nodes = 0
53
+ for node in nodes :
54
+ node_type_counts [node .type ] = node_type_counts .get (node .type , 0 ) + 1
55
+ total_nodes += 1
56
+
57
+ # Increment metrics in bulk
58
+ metrics = Metrics .get ()
59
+ metrics .increment (NODES_UPSERTED , total_nodes )
60
+ for node_type , count in node_type_counts .items ():
61
+ metric = self ._get_or_create_node_metric (node_type )
62
+ metrics .increment (metric , count )
23
63
24
64
async def upsert_relationships_in_bulk_of_same_operation (
25
65
self ,
@@ -29,15 +69,31 @@ async def upsert_relationships_in_bulk_of_same_operation(
29
69
await self .inner .upsert_relationships_in_bulk_of_same_operation (
30
70
shape , relationships
31
71
)
32
- Metrics .get ().increment (Metric .RELATIONSHIPS_UPSERTED , len (relationships ))
72
+
73
+ # Tally relationship types
74
+ relationship_type_counts : dict [str , int ] = {}
75
+ total_relationships = 0
76
+ for relationship in relationships :
77
+ rel_type = relationship .relationship .type
78
+ relationship_type_counts [rel_type ] = (
79
+ relationship_type_counts .get (rel_type , 0 ) + 1
80
+ )
81
+ total_relationships += 1
82
+
83
+ # Increment metrics in bulk
84
+ metrics = Metrics .get ()
85
+ metrics .increment (RELATIONSHIPS_UPSERTED , total_relationships )
86
+ for rel_type , count in relationship_type_counts .items ():
87
+ metric = self ._get_or_create_relationship_metric (rel_type )
88
+ metrics .increment (metric , count )
33
89
34
90
async def perform_ttl_op (self , config : TimeToLiveConfiguration ):
35
91
await self .inner .perform_ttl_op (config )
36
- Metrics .get ().increment (Metric . TIME_TO_LIVE_OPERATIONS )
92
+ Metrics .get ().increment (TIME_TO_LIVE_OPERATIONS )
37
93
38
94
async def execute_hook (self , hook : IngestionHook ):
39
95
await self .inner .execute_hook (hook )
40
- Metrics .get ().increment (Metric . INGEST_HOOKS_EXECUTED )
96
+ Metrics .get ().increment (INGEST_HOOKS_EXECUTED )
41
97
42
98
async def finish (self ):
43
99
await self .inner .finish ()
0 commit comments