Skip to content

Commit c5bcd43

Browse files
committed
fix: fix apmplus metrics
1 parent d18a011 commit c5bcd43

File tree

3 files changed

+51
-42
lines changed

3 files changed

+51
-42
lines changed

veadk/tracing/telemetry/exporters/apmplus_exporter.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414

1515
from typing import Any
1616

17-
from opentelemetry import metrics
17+
from opentelemetry import metrics as metrics_api
1818
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
1919
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
20-
from opentelemetry.sdk.metrics import MeterProvider
20+
from opentelemetry.sdk import metrics as metrics_sdk
2121
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
2222
from opentelemetry.sdk.resources import Resource
2323
from opentelemetry.sdk.trace.export import BatchSpanProcessor
@@ -26,6 +26,9 @@
2626

2727
from veadk.config import getenv
2828
from veadk.tracing.telemetry.exporters.base_exporter import BaseExporter
29+
from veadk.tracing.telemetry.metrics.opentelemetry_metrics import MeterUploader
30+
31+
# from veadk.tracing.telemetry.metrics.opentelemetry_metrics import meter_uploader_manager
2932
from veadk.utils.logger import get_logger
3033

3134
logger = get_logger(__name__)
@@ -68,15 +71,32 @@ def model_post_init(self, context: Any) -> None:
6871
endpoint=self.config.endpoint, insecure=True, headers=self.headers
6972
)
7073
self.processor = BatchSpanProcessor(self._exporter)
74+
self.meter_uploader = self._init_meter_uploader(exporter_id="apmplus")
7175

76+
def _init_meter_uploader(self, exporter_id: str) -> MeterUploader:
7277
# init meter
73-
resource = Resource.create()
74-
exporter = OTLPMetricExporter(endpoint=self.config.endpoint, headers=headers)
78+
exporter = OTLPMetricExporter(
79+
endpoint=self.config.endpoint, headers=self.headers
80+
)
7581
metric_reader = PeriodicExportingMetricReader(exporter)
76-
provider = MeterProvider(metric_readers=[metric_reader], resource=resource)
77-
metrics.set_meter_provider(provider)
7882

79-
# metrics.get_meter("veadk.apmplus.meter")
83+
global_metrics_provider = metrics_api.get_meter_provider()
84+
85+
if getattr(global_metrics_provider, "_sdk_config", None):
86+
global_resource = getattr(global_metrics_provider, "_sdk_config").resource
87+
else:
88+
global_resource = Resource.create()
89+
90+
new_resource = Resource.create(self.resource_attributes)
91+
merged_resource = global_resource.merge(new_resource)
92+
93+
provider = metrics_sdk.MeterProvider(
94+
metric_readers=[metric_reader], resource=merged_resource
95+
)
96+
metrics_api.set_meter_provider(provider)
97+
98+
meter_uploader = MeterUploader(exporter_id=exporter_id)
99+
return meter_uploader
80100

81101
@override
82102
def export(self) -> None:

veadk/tracing/telemetry/metrics/opentelemetry_metrics.py

Lines changed: 15 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -11,33 +11,20 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
15-
import time
16-
14+
from google.adk.models import LlmResponse
15+
from opentelemetry import metrics
1716
from opentelemetry.metrics._internal import Meter
18-
from opentelemetry.sdk.metrics import MeterProvider
19-
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
2017

2118
from veadk.config import getenv
2219

23-
24-
class MeterContext:
25-
def __init__(
26-
self,
27-
meter: Meter,
28-
provider: MeterProvider,
29-
reader: PeriodicExportingMetricReader,
30-
):
31-
self.meter = meter
32-
self.provider = provider
33-
self.reader = reader
20+
METER_NAME_TEMPLATE = "veadk.{exporter_id}.meter"
3421

3522

3623
class MeterUploader:
37-
def __init__(self, meter_context: MeterContext):
38-
self.meter = meter_context.meter
39-
self.provider = meter_context.provider
40-
self.reader = meter_context.reader
24+
def __init__(self, exporter_id: str):
25+
self.meter: Meter = metrics.get_meter(
26+
METER_NAME_TEMPLATE.format(exporter_id=exporter_id)
27+
)
4128

4229
self.base_attributes = {
4330
"gen_ai_system": "volcengine",
@@ -57,17 +44,11 @@ def __init__(self, meter_context: MeterContext):
5744
unit="count",
5845
)
5946

60-
def record(self, prompt_tokens: list[int], completion_tokens: list[int]):
61-
self.llm_invoke_counter.add(len(completion_tokens), self.base_attributes)
62-
63-
for prompt_token in prompt_tokens:
64-
token_attributes = {**self.base_attributes, "gen_ai_token_type": "input"}
65-
self.token_usage.record(prompt_token, attributes=token_attributes)
66-
for completion_token in completion_tokens:
67-
token_attributes = {**self.base_attributes, "gen_ai_token_type": "output"}
68-
self.token_usage.record(completion_token, attributes=token_attributes)
69-
70-
def close(self):
71-
time.sleep(0.05)
72-
self.reader.force_flush()
73-
self.provider.shutdown()
47+
def record(self, llm_response: LlmResponse):
48+
input_token = llm_response.usage_metadata.prompt_token_count
49+
output_token = llm_response.usage_metadata.candidates_token_count
50+
self.llm_invoke_counter.add(1, self.base_attributes)
51+
token_attributes = {**self.base_attributes, "gen_ai_token_type": "input"}
52+
self.token_usage.record(input_token, attributes=token_attributes)
53+
token_attributes = {**self.base_attributes, "gen_ai_token_type": "output"}
54+
self.token_usage.record(output_token, attributes=token_attributes)

veadk/tracing/telemetry/telemetry.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@
1919
logger = get_logger(__name__)
2020

2121

22-
def trace_send_data(): ...
22+
def trace_send_data(invocation_context: InvocationContext, llm_response: LlmResponse):
23+
tracers = invocation_context.agent.tracers
24+
for tracer in tracers:
25+
for exporter in getattr(tracer, "exporters", []):
26+
if getattr(exporter, "meter_uploader", None):
27+
exporter.meter_uploader.record(llm_response)
2328

2429

2530
def set_common_attributes(
@@ -116,3 +121,6 @@ def trace_call_llm(
116121
for attr_name, attr_extractor in llm_attributes_mapping.items():
117122
response: ExtractorResponse = attr_extractor(params)
118123
ExtractorResponse.update_span(span, attr_name, response)
124+
125+
# Report meter
126+
trace_send_data(invocation_context, llm_response)

0 commit comments

Comments
 (0)