|
4 | 4 | */ |
5 | 5 | package aws.smithy.kotlin.runtime.telemetry.ism |
6 | 6 |
|
| 7 | +import aws.smithy.kotlin.runtime.ExperimentalApi |
7 | 8 | import aws.smithy.kotlin.runtime.collections.Attributes |
8 | 9 | import aws.smithy.kotlin.runtime.http.operation.OperationAttributes |
9 | 10 | import aws.smithy.kotlin.runtime.telemetry.context.Context |
10 | 11 | import aws.smithy.kotlin.runtime.telemetry.context.ContextManager |
11 | | -import aws.smithy.kotlin.runtime.telemetry.context.Scope |
12 | 12 | import aws.smithy.kotlin.runtime.telemetry.context.telemetryContext |
13 | 13 | import kotlin.coroutines.CoroutineContext |
14 | 14 |
|
15 | | -public interface SpanListener { |
16 | | - public fun onNewSpan(parentContext: Context?, name: String, attributes: Attributes): Context |
17 | | - public fun onCloseSpan(context: Context) |
| 15 | +internal interface MetricListener { |
| 16 | + fun onMetrics(context: Context, metrics: MetricRecord<*>) |
18 | 17 | } |
19 | 18 |
|
20 | | -public class IsmContextManager private constructor() : ContextManager { |
21 | | - public companion object { |
22 | | - public fun createWithScopeListener(): Pair<IsmContextManager, SpanListener> { |
23 | | - val manager = IsmContextManager() |
24 | | - val listener = object : SpanListener { |
25 | | - override fun onNewSpan(parentContext: Context?, name: String, attributes: Attributes) = |
26 | | - manager.onNewSpan(parentContext, name, attributes) |
| 19 | +internal interface SpanListener { |
| 20 | + fun onNewSpan(parentContext: Context?, name: String, attributes: Attributes): Context |
| 21 | + fun onCloseSpan(context: Context) |
| 22 | +} |
| 23 | + |
| 24 | +@OptIn(ExperimentalApi::class) |
| 25 | +internal class IsmContextManager internal constructor(private val sink: IsmMetricSink) : ContextManager { |
| 26 | + private val rootContext = RootContext() |
27 | 27 |
|
28 | | - override fun onCloseSpan(context: Context) = manager.onCloseSpan(context) |
| 28 | + override fun current(ctx: CoroutineContext): Context = |
| 29 | + ctx.telemetryContext.takeIf { it != Context.None } ?: rootContext |
| 30 | + |
| 31 | + internal val metricListener = object : MetricListener { |
| 32 | + override fun onMetrics(context: Context, metrics: MetricRecord<*>) { |
| 33 | + println("Listener received metrics on $context: $metrics") |
| 34 | + when (context) { |
| 35 | + is OperationContext -> context.records += metrics |
| 36 | + is ChildContext -> context.records += metrics |
29 | 37 | } |
30 | | - return manager to listener |
31 | 38 | } |
32 | 39 | } |
33 | 40 |
|
34 | | - private val rootContext = object : HierarchicalContext(null) { } |
35 | | - |
36 | | - override fun current(ctx: CoroutineContext): Context = ctx.telemetryContext ?: rootContext |
| 41 | + internal val spanListener = object : SpanListener { |
| 42 | + override fun onNewSpan(parentContext: Context?, name: String, attributes: Attributes): Context { |
| 43 | + println("Listener received new span on $parentContext: $name") |
| 44 | + return when (parentContext) { |
| 45 | + null, rootContext -> { |
| 46 | + val service = attributes.getOrNull(OperationAttributes.RpcService) |
| 47 | + val operation = attributes.getOrNull(OperationAttributes.RpcOperation) |
| 48 | + val sdkInvocationId = attributes.getOrNull(OperationAttributes.AwsInvocationId) |
37 | 49 |
|
38 | | - private fun onNewSpan(parentContext: Context?, name: String, attributes: Attributes): Context = |
39 | | - when (parentContext) { |
40 | | - rootContext -> { |
41 | | - val service = attributes.getOrNull(OperationAttributes.RpcService) |
42 | | - val operation = attributes.getOrNull(OperationAttributes.RpcOperation) |
43 | | - val sdkInvocationId = attributes.getOrNull(OperationAttributes.AwsInvocationId) |
44 | | - if (service != null && operation != null && sdkInvocationId != null) { |
45 | | - OperationContext(service, operation, sdkInvocationId) |
46 | | - } else { |
47 | | - OtherContext(parentContext) |
| 50 | + if (service == null || operation == null || sdkInvocationId == null) { |
| 51 | + OtherContext(name, parentContext) |
| 52 | + } else { |
| 53 | + OperationContext(name, rootContext, service, operation, sdkInvocationId) |
| 54 | + } |
48 | 55 | } |
49 | | - } |
50 | 56 |
|
51 | | - is OperationContext, is ChildContext -> ChildContext(parentContext) |
| 57 | + is HierarchicalContext -> ChildContext(name, parentContext) |
52 | 58 |
|
53 | | - else -> OtherContext(parentContext) |
| 59 | + else -> OtherContext(name, parentContext) |
| 60 | + } |
54 | 61 | } |
55 | 62 |
|
56 | | - private fun onCloseSpan(context: Context) = Unit |
57 | | - |
58 | | - private abstract inner class HierarchicalContext(val parent: Context?) : Context { |
59 | | - override fun makeCurrent(): Scope { |
60 | | - return IsmScope(this) |
| 63 | + override fun onCloseSpan(context: Context) { |
| 64 | + when (context) { |
| 65 | + is OperationContext -> publish(context) |
| 66 | + is ChildContext -> (context.parent as? HierarchicalContext)?.let { parentContext -> |
| 67 | + parentContext.children += context.name to context |
| 68 | + } |
| 69 | + } |
61 | 70 | } |
62 | 71 | } |
63 | 72 |
|
64 | | - private inner class OperationContext( |
65 | | - val service: String, |
66 | | - val operation: String, |
67 | | - val sdkInvocationId: String, |
68 | | - val records: MutableList<MetricRecord<*>> = mutableListOf(), |
69 | | - val childScopes: MutableMap<String, ScopeMetrics> = mutableMapOf(), |
70 | | - ) : HierarchicalContext(rootContext) |
71 | | - |
72 | | - private inner class ChildContext( |
73 | | - parent: Context, |
74 | | - val records: MutableList<MetricRecord<*>> = mutableListOf(), |
75 | | - val childScopes: MutableMap<String, ScopeMetrics> = mutableMapOf(), |
76 | | - ) : HierarchicalContext(parent) |
77 | | - |
78 | | - private inner class OtherContext(parent: Context?) : HierarchicalContext(parent) |
79 | | - |
80 | | - private inner class IsmScope(val context: Context) : Scope { |
81 | | - override fun close() = Unit |
| 73 | + private fun publish(context: OperationContext) { |
| 74 | + val opMetrics = OperationMetrics( |
| 75 | + context.service, |
| 76 | + context.operation, |
| 77 | + context.sdkInvocationId, |
| 78 | + context.records.toList(), |
| 79 | + context.children.mapValues { (_, child) -> child.toScopeMetrics() } |
| 80 | + ) |
| 81 | + sink.onInvocationComplete(opMetrics) |
82 | 82 | } |
83 | 83 | } |
| 84 | + |
| 85 | +private fun ChildContext.toScopeMetrics(): ScopeMetrics = ScopeMetrics( |
| 86 | + records.toList(), |
| 87 | + children.mapValues { (_, child) -> child.toScopeMetrics() } |
| 88 | +) |
0 commit comments