@@ -9,58 +9,26 @@ import com.cosmotech.api.events.RunStart
99import com.cosmotech.api.metrics.DownSamplingAggregationType
1010import com.cosmotech.api.metrics.PersistentMetric
1111import com.cosmotech.api.metrics.PersitentMetricType
12- import com.cosmotech.run.WORKFLOW_TYPE_LABEL
13- import com.cosmotech.run.service.WORKFLOW_TYPE_RUN
14- import com.cosmotech.run.workflow.WorkflowContextData
15- import com.cosmotech.run.workflow.WorkflowService
1612import com.cosmotech.runner.domain.Runner
1713import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
1814import org.springframework.context.event.EventListener
19- import org.springframework.scheduling.annotation.Scheduled
2015import org.springframework.stereotype.Service
2116
22- private const val RUNNING_STATUS = " Running"
23-
2417private const val SERVICE_NAME = " run"
25-
2618private const val ORGANIZATION_ID_LABEL = " organizationId"
2719private const val WORKSPACE_ID_LABEL = " workspaceId"
2820private const val RUNNER_ID_LABEL = " runnerId"
29-
30- private const val DEFAULT_EMPTY_WORKFLOW_LABEL = " none"
3121private const val TOTAL_QUALIFIER = " total"
32- private const val TS_RUNNING_WORKFLOW_NAME = " running"
3322private const val TS_RUN_WORKFLOW_NAME = " run"
3423
3524@Service
3625@ConditionalOnProperty(
3726 name = [" csm.platform.metrics.enabled" ], havingValue = " true" , matchIfMissing = false )
3827internal class RunMetrics (
39- private val workflowService : WorkflowService ,
4028 private val eventPublisher : CsmEventPublisher ,
4129 private val csmPlatformProperties : CsmPlatformProperties ,
4230) {
4331
44- @Scheduled(fixedDelay = 10000 )
45- fun publishCurrentRunningRunner () {
46- if (! csmPlatformProperties.metrics.enabled) return
47-
48- val countRunningWorkflows = getRunningWorkflowsCount()
49- val labels =
50- mutableMapOf (
51- " usage" to " licensing" ,
52- )
53- // Global
54- publishRunningLabeledMetric(
55- countRunningWorkflows.values.sum(), TS_RUNNING_WORKFLOW_NAME , labels)
56-
57- // By Organization, Workspace, Runner
58- countRunningWorkflows.forEach { (workflowContext, workflowCount) ->
59- publishOrganizationWorkspaceAndRunnerRunningLabeledMetricIfAny(
60- workflowContext!! , workflowCount, TS_RUNNING_WORKFLOW_NAME , labels)
61- }
62- }
63-
6432 @EventListener(RunStart ::class )
6533 fun onRunStart (event : RunStart ) {
6634 if (! csmPlatformProperties.metrics.enabled) return
@@ -93,15 +61,6 @@ internal class RunMetrics(
9361 publishRunStartLabeledMetric(name, labels)
9462 }
9563
96- private fun getRunningWorkflowsCount (): Map <WorkflowContextData ?, Int > {
97- val runWorkflows =
98- this .workflowService.findWorkflowStatusByLabel(" $WORKFLOW_TYPE_LABEL =$WORKFLOW_TYPE_RUN " )
99- return runWorkflows
100- .filter { it.status == RUNNING_STATUS }
101- .groupingBy { it.contextData }
102- .eachCount()
103- }
104-
10564 internal fun publishRunStartLabeledMetric (name : String , labels : Map <String , String >) {
10665 val metric =
10766 PersistentMetric (
@@ -117,46 +76,4 @@ internal class RunMetrics(
11776 )
11877 eventPublisher.publishEvent(PersistentMetricEvent (this , metric))
11978 }
120-
121- internal fun publishOrganizationWorkspaceAndRunnerRunningLabeledMetricIfAny (
122- workflowContext : WorkflowContextData ,
123- workflowCount : Int ,
124- name : String ,
125- labels : MutableMap <String , String >
126- ) {
127- var existingName = name
128- val organizationId = workflowContext.organizationId!!
129- if (organizationId != DEFAULT_EMPTY_WORKFLOW_LABEL ) {
130- labels[ORGANIZATION_ID_LABEL ] = organizationId
131- existingName + = " :$organizationId "
132- publishRunningLabeledMetric(workflowCount, existingName, labels)
133- val workspaceId = workflowContext.workspaceId!!
134- if (workspaceId != DEFAULT_EMPTY_WORKFLOW_LABEL ) {
135- labels[WORKSPACE_ID_LABEL ] = workspaceId
136- existingName + = " :$workspaceId "
137- publishRunningLabeledMetric(workflowCount, existingName, labels)
138- val runnerId = workflowContext.runnerId!!
139- if (runnerId != DEFAULT_EMPTY_WORKFLOW_LABEL ) {
140- labels[RUNNER_ID_LABEL ] = runnerId
141- existingName + = " :$runnerId "
142- publishRunningLabeledMetric(workflowCount, existingName, labels)
143- }
144- }
145- }
146- }
147-
148- internal fun publishRunningLabeledMetric (value : Int , name : String , labels : Map <String , String >) {
149- val metric =
150- PersistentMetric (
151- service = SERVICE_NAME ,
152- name = name,
153- value = value.toDouble(),
154- qualifier = TOTAL_QUALIFIER ,
155- labels = labels,
156- type = PersitentMetricType .GAUGE ,
157- downSampling = true ,
158- downSamplingAggregation = DownSamplingAggregationType .MAX ,
159- )
160- eventPublisher.publishEvent(PersistentMetricEvent (this , metric))
161- }
16279}
0 commit comments