55 */
66package com .linkedin .datastream .server ;
77
8+ import java .net .URI ;
9+ import java .net .URISyntaxException ;
810import java .time .Duration ;
911import java .time .Instant ;
1012import java .util .ArrayList ;
@@ -52,6 +54,18 @@ public class EventProducer implements DatastreamEventProducer {
5254 public static final String CONFIG_FLUSH_INTERVAL_MS = "flushIntervalMs" ;
5355 public static final String CONFIG_ENABLE_PER_TOPIC_METRICS = "enablePerTopicMetrics" ;
5456 public static final String CONFIG_ENABLE_PER_TOPIC_EVENT_LATENCY_METRICS = "enablePerTopicEventLatencyMetrics" ;
57+ /**
58+ * When enabled, emits per-source-database throughput attribution metrics keyed as
59+ * {@code EventProducer.db.<databaseName>.bytesProducedRate} and
60+ * {@code EventProducer.db.<databaseName>.eventProduceRate}.
61+ * Applies only to CDC connectors whose source URI uses a single-slash scheme
62+ * (e.g. {@code espresso:/}, {@code mysql:/}, {@code tidb:/}).
63+ * Double-slash URIs (e.g. {@code kafka://}) produce no database metrics.
64+ *
65+ * <p><b>Cardinality warning:</b> each distinct database name creates a new metric series.
66+ * Enable only when the set of source databases is bounded and well-understood.
67+ */
68+ public static final String CONFIG_ENABLE_THROUGHPUT_METRICS = "enableThroughputAttributionMetrics" ;
5569
5670 // Default flush interval, It is intentionally kept at low frequency. If a particular connectors wants
5771 // a more frequent flush (high traffic connectors), it can perform that on it's own.
@@ -79,7 +93,9 @@ public class EventProducer implements DatastreamEventProducer {
7993 private static final String EVENTS_PRODUCED_OUTSIDE_SLA = "eventsProducedOutsideSla" ;
8094 private static final String EVENTS_PRODUCED_OUTSIDE_ALTERNATE_SLA = "eventsProducedOutsideAlternateSla" ;
8195 private static final String DROPPED_SENT_FROM_SERIALIZATION_ERROR = "droppedSentFromSerializationError" ;
96+ static final String BYTES_PRODUCED_RATE = "bytesProducedRate" ;
8297 private static final String AGGREGATE = "aggregate" ;
98+
8399 private static final String DEFAULT_AVAILABILITY_THRESHOLD_SLA_MS = "60000" ; // 1 minute
84100 private static final String DEFAULT_AVAILABILITY_THRESHOLD_ALTERNATE_SLA_MS = "180000" ; // 3 minutes
85101 private static final String DEFAULT_WARN_LOG_LATENCY_ENABLED = "false" ;
@@ -109,6 +125,9 @@ public class EventProducer implements DatastreamEventProducer {
109125 private final boolean _skipMessageOnSerializationErrors ;
110126 private final boolean _enablePerTopicMetrics ;
111127 private final boolean _enablePerTopicEventLatencyMetrics ;
128+ private final boolean _enableThroughputMetrics ;
129+ // Cached source database name parsed from the connection string at construction time (null for non-CDC sources)
130+ private final String _sourceDatabase ;
112131 private final Duration _flushInterval ;
113132 private final Function <DatastreamTask , Set <String >> _throughputViolatingTopicsProvider ;
114133
@@ -188,6 +207,12 @@ public EventProducer(DatastreamTask task, TransportProvider transportProvider, C
188207 Boolean .parseBoolean (config .getProperty (CONFIG_ENABLE_PER_TOPIC_EVENT_LATENCY_METRICS ,
189208 Boolean .FALSE .toString ()));
190209
210+ _enableThroughputMetrics =
211+ Boolean .parseBoolean (config .getProperty (CONFIG_ENABLE_THROUGHPUT_METRICS , Boolean .FALSE .toString ()));
212+
213+ String [] sourceParts = getSourcePathParts ();
214+ _sourceDatabase = (sourceParts != null && sourceParts .length > 1 ) ? sourceParts [1 ] : null ;
215+
191216 _logger .info ("Created event producer with customCheckpointing={}" , customCheckpointing );
192217
193218 _dynamicMetricsManager = DynamicMetricsManager .getInstance ();
@@ -281,10 +306,17 @@ private DatastreamRecordMetadata helperSendOrBroadcast(DatastreamProducerRecord
281306 record .setEventsSendTimestamp (System .currentTimeMillis ());
282307 long recordEventsSourceTimestamp = record .getEventsSourceTimestamp ();
283308 long recordEventsSendTimestamp = record .getEventsSendTimestamp ().orElse (0L );
309+ final long numSerializedBytes = record .getEvents ().stream ()
310+ .mapToLong (e -> {
311+ long keySize = e .key ().filter (k -> k instanceof byte []).map (k -> (long ) ((byte []) k ).length ).orElse (0L );
312+ long valSize = e .value ().filter (v -> v instanceof byte []).map (v -> (long ) ((byte []) v ).length ).orElse (0L );
313+ return keySize + valSize ;
314+ })
315+ .sum ();
284316 if (isBroadcast ) {
285317 broadcastMetadata = _transportProvider .broadcast (destination , record ,
286318 (metadata , exception ) -> onSendCallback (metadata , exception , sendEventCallback , recordEventsSourceTimestamp ,
287- recordEventsSendTimestamp ));
319+ recordEventsSendTimestamp , numSerializedBytes ));
288320 _logger .debug ("Broadcast completed with {}" , broadcastMetadata );
289321 if (broadcastMetadata .isMessageSerializationError ()) {
290322 _logger .warn ("Broadcast of record {} to destination {} failed because of serialization error." ,
@@ -293,7 +325,7 @@ private DatastreamRecordMetadata helperSendOrBroadcast(DatastreamProducerRecord
293325 } else {
294326 _transportProvider .send (destination , record ,
295327 (metadata , exception ) -> onSendCallback (metadata , exception , sendEventCallback , recordEventsSourceTimestamp ,
296- recordEventsSendTimestamp ));
328+ recordEventsSendTimestamp , numSerializedBytes ));
297329 }
298330 } catch (Exception e ) {
299331 String errorMessage = String .format ("Failed to send the event %s exception %s" , record , e );
@@ -365,7 +397,8 @@ private void performSlaRelatedLogging(DatastreamRecordMetadata metadata, long ev
365397 * per DatastreamProducerRecord (i.e. by the number of events within the record), only increment all metrics by 1
366398 * to avoid overcounting.
367399 */
368- private void reportMetrics (DatastreamRecordMetadata metadata , long eventsSourceTimestamp , long eventsSendTimestamp ) {
400+ private void reportMetrics (DatastreamRecordMetadata metadata , long eventsSourceTimestamp , long eventsSendTimestamp ,
401+ long numBytes ) {
369402 // If per-topic metrics are enabled, use topic as key for metrics; else, use datastream name as the key
370403 String datastreamName = getDatastreamName ();
371404
@@ -413,6 +446,7 @@ private void reportMetrics(DatastreamRecordMetadata metadata, long eventsSourceT
413446 }
414447 _dynamicMetricsManager .createOrUpdateMeter (MODULE , AGGREGATE , EVENT_PRODUCE_RATE , 1 );
415448 _dynamicMetricsManager .createOrUpdateMeter (MODULE , _datastreamTask .getConnectorType (), EVENT_PRODUCE_RATE , 1 );
449+ reportThroughputAttributionMetrics (numBytes );
416450 }
417451
418452 /**
@@ -424,7 +458,7 @@ private void reportMetrics(DatastreamRecordMetadata metadata, long eventsSourceT
424458 * to avoid overcounting.
425459 */
426460 private void reportMetricsForThroughputViolatingTopics (DatastreamRecordMetadata metadata , long eventsSourceTimestamp ,
427- long eventsSendTimestamp ) {
461+ long eventsSendTimestamp , long numBytes ) {
428462 String topicOrDatastreamName = _enablePerTopicMetrics ? metadata .getTopic () : getDatastreamName ();
429463 // Treat all events within this record equally (assume same timestamp)
430464 if (eventsSourceTimestamp > 0 ) {
@@ -457,6 +491,7 @@ private void reportMetricsForThroughputViolatingTopics(DatastreamRecordMetadata
457491 }
458492 _dynamicMetricsManager .createOrUpdateMeter (MODULE , AGGREGATE , EVENT_PRODUCE_RATE , 1 );
459493 _dynamicMetricsManager .createOrUpdateMeter (MODULE , _datastreamTask .getConnectorType (), EVENT_PRODUCE_RATE , 1 );
494+ reportThroughputAttributionMetrics (numBytes );
460495 }
461496
462497 // Report Event Latency metrics for aggregate, connector and topic/datastream
@@ -492,7 +527,7 @@ private void reportSendLatencyMetrics(DatastreamRecordMetadata metadata, long se
492527 }
493528
494529 private void onSendCallback (DatastreamRecordMetadata metadata , Exception exception , SendCallback sendCallback ,
495- long eventSourceTimestamp , long eventSendTimestamp ) {
530+ long eventSourceTimestamp , long eventSendTimestamp , long numBytes ) {
496531
497532 SendFailedException sendFailedException = null ;
498533
@@ -505,9 +540,9 @@ private void onSendCallback(DatastreamRecordMetadata metadata, Exception excepti
505540 // Reporting separate metrics for throughput violating topics.
506541
507542 if (_throughputViolatingTopicsProvider .apply (_datastreamTask ).contains (metadata .getUndecoratedTopic ())) {
508- reportMetricsForThroughputViolatingTopics (metadata , eventSourceTimestamp , eventSendTimestamp );
543+ reportMetricsForThroughputViolatingTopics (metadata , eventSourceTimestamp , eventSendTimestamp , numBytes );
509544 } else {
510- reportMetrics (metadata , eventSourceTimestamp , eventSendTimestamp );
545+ reportMetrics (metadata , eventSourceTimestamp , eventSendTimestamp , numBytes );
511546 }
512547 }
513548 } catch (Exception e ) {
@@ -601,10 +636,36 @@ public String toString() {
601636 return String .format ("EventProducer producerId=%d" , _producerId );
602637 }
603638
639+ private void reportThroughputAttributionMetrics (long numBytes ) {
640+ if (!_enableThroughputMetrics ) {
641+ return ;
642+ }
643+ if (_sourceDatabase != null ) {
644+ _dynamicMetricsManager .createOrUpdateMeter (MODULE , "db." + _sourceDatabase , BYTES_PRODUCED_RATE , numBytes );
645+ _dynamicMetricsManager .createOrUpdateMeter (MODULE , "db." + _sourceDatabase , EVENT_PRODUCE_RATE , 1 );
646+ }
647+ _dynamicMetricsManager .createOrUpdateMeter (MODULE , AGGREGATE , BYTES_PRODUCED_RATE , numBytes );
648+ _dynamicMetricsManager .createOrUpdateMeter (MODULE , _datastreamTask .getConnectorType (), BYTES_PRODUCED_RATE , numBytes );
649+ }
650+
604651 private String getDatastreamName () {
605652 return _datastreamTask .getDatastreams ().get (0 ).getName ();
606653 }
607654
655+ // Returns path segments ["CLUSTER", "DATABASE", "TABLE"] for CDC single-slash URIs, null for BMM double-slash URIs.
656+ // Consistent with MySqlKafkaSource, TiDBKafkaSource, and EspressoSource parsing in brooklin-li-common.
657+ private String [] getSourcePathParts () {
658+ try {
659+ URI uri = new URI (_datastreamTask .getDatastreamSource ().getConnectionString ());
660+ if (uri .getAuthority () != null ) {
661+ return null ; // double-slash URI (e.g. kafka://host/topic) — no cluster/database segments
662+ }
663+ return uri .getPath ().substring (1 ).split ("/" );
664+ } catch (URISyntaxException e ) {
665+ return null ;
666+ }
667+ }
668+
608669 /**
609670 * Get the list of metrics maintained by the event producer
610671 */
@@ -615,6 +676,7 @@ public static List<BrooklinMetricInfo> getMetricInfos() {
615676 metrics .add (new BrooklinCounterInfo (METRICS_PREFIX + EVENTS_PRODUCED_WITHIN_ALTERNATE_SLA ));
616677 metrics .add (new BrooklinCounterInfo (METRICS_PREFIX + TOTAL_EVENTS_PRODUCED ));
617678 metrics .add (new BrooklinMeterInfo (METRICS_PREFIX + EVENT_PRODUCE_RATE ));
679+ metrics .add (new BrooklinMeterInfo (METRICS_PREFIX + BYTES_PRODUCED_RATE ));
618680 metrics .add (new BrooklinCounterInfo (METRICS_PREFIX + EVENTS_PRODUCED_OUTSIDE_SLA ));
619681 metrics .add (new BrooklinCounterInfo (METRICS_PREFIX + EVENTS_PRODUCED_OUTSIDE_ALTERNATE_SLA ));
620682 metrics .add (new BrooklinCounterInfo (METRICS_PREFIX + DROPPED_SENT_FROM_SERIALIZATION_ERROR ));
0 commit comments