|
12 | 12 | # See the License for the specific language governing permissions and
|
13 | 13 | # limitations under the License.
|
14 | 14 |
|
| 15 | +import atexit |
15 | 16 | import logging
|
16 | 17 | import threading
|
17 | 18 | from typing import Dict, Sequence, Tuple, Type
|
18 | 19 |
|
19 | 20 | from opentelemetry import metrics as metrics_api
|
| 21 | +from opentelemetry.sdk.metrics.export import ( |
| 22 | + ConsoleMetricsExporter, |
| 23 | + MetricsExporter, |
| 24 | +) |
20 | 25 | from opentelemetry.sdk.metrics.export.aggregate import Aggregator
|
21 | 26 | from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher
|
| 27 | +from opentelemetry.sdk.metrics.export.controller import PushController |
22 | 28 | from opentelemetry.sdk.resources import Resource
|
23 | 29 | from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
|
24 | 30 |
|
@@ -449,24 +455,64 @@ class MeterProvider(metrics_api.MeterProvider):
|
449 | 455 | Args:
|
450 | 456 | stateful: Indicates whether meters created are going to be stateful
|
451 | 457 | resource: Resource for this MeterProvider
|
| 458 | + shutdown_on_exit: Register an atexit hook to shut down when the |
| 459 | + application exists |
452 | 460 | """
|
453 | 461 |
|
454 | 462 | def __init__(
|
455 |
| - self, stateful=True, resource: Resource = Resource.create_empty(), |
| 463 | + self, |
| 464 | + stateful=True, |
| 465 | + resource: Resource = Resource.create_empty(), |
| 466 | + shutdown_on_exit: bool = True, |
456 | 467 | ):
|
457 | 468 | self.stateful = stateful
|
458 | 469 | self.resource = resource
|
| 470 | + self._controllers = [] |
| 471 | + self._exporters = set() |
| 472 | + self._atexit_handler = None |
| 473 | + if shutdown_on_exit: |
| 474 | + self._atexit_handler = atexit.register(self.shutdown) |
459 | 475 |
|
460 | 476 | def get_meter(
|
461 | 477 | self,
|
462 | 478 | instrumenting_module_name: str,
|
463 | 479 | instrumenting_library_version: str = "",
|
464 | 480 | ) -> "metrics_api.Meter":
|
| 481 | + """See `opentelemetry.metrics.MeterProvider`.get_meter.""" |
465 | 482 | if not instrumenting_module_name: # Reject empty strings too.
|
466 |
| - raise ValueError("get_meter called with missing module name.") |
| 483 | + instrumenting_module_name = "ERROR:MISSING MODULE NAME" |
| 484 | + logger.error("get_meter called with missing module name.") |
467 | 485 | return Meter(
|
468 | 486 | self,
|
469 | 487 | InstrumentationInfo(
|
470 |
| - instrumenting_module_name, instrumenting_library_version |
| 488 | + instrumenting_module_name, instrumenting_library_version, |
471 | 489 | ),
|
472 | 490 | )
|
| 491 | + |
| 492 | + def start_pipeline( |
| 493 | + self, |
| 494 | + meter: metrics_api.Meter, |
| 495 | + exporter: MetricsExporter = None, |
| 496 | + interval: float = 15.0, |
| 497 | + ) -> None: |
| 498 | + """Method to begin the collect/export pipeline. |
| 499 | +
|
| 500 | + Args: |
| 501 | + meter: The meter to collect metrics from. |
| 502 | + exporter: The exporter to export metrics to. |
| 503 | + interval: The collect/export interval in seconds. |
| 504 | + """ |
| 505 | + if not exporter: |
| 506 | + exporter = ConsoleMetricsExporter() |
| 507 | + self._exporters.add(exporter) |
| 508 | + # TODO: Controller type configurable? |
| 509 | + self._controllers.append(PushController(meter, exporter, interval)) |
| 510 | + |
| 511 | + def shutdown(self) -> None: |
| 512 | + for controller in self._controllers: |
| 513 | + controller.shutdown() |
| 514 | + for exporter in self._exporters: |
| 515 | + exporter.shutdown() |
| 516 | + if self._atexit_handler is not None: |
| 517 | + atexit.unregister(self._atexit_handler) |
| 518 | + self._atexit_handler = None |
0 commit comments