Skip to content

Conversation

@zhangshenghang
Copy link
Member

Purpose of this pull request

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

@github-actions github-actions bot added the e2e label Jan 5, 2026
@corgy-w corgy-w requested a review from liugddx January 7, 2026 14:05
corgy-w
corgy-w previously approved these changes Jan 7, 2026
Copy link
Contributor

@corgy-w corgy-w left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM cc @hawk9821

return "";
}

Pattern pattern = Pattern.compile("((?:Sink|Source|Transform)\\[\\d+\\])");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not recommended to put the Pattern inside the method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request refactors job metrics handling to support multiple sinks/sources with the same table name by adding unique identifiers (e.g., "Sink[0]", "Source[0]") to metric keys. This prevents metric collision when different sinks write to tables with identical names.

Key changes:

  • Enhanced metrics aggregation logic to extract and use plugin identifiers from DAG information
  • Modified metric key format to include source/sink identifiers for disambiguation
  • Updated all related tests and E2E tests to expect the new metric key format with identifiers

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
BaseService.java Core refactoring: added JobDAGInfo parameter to getJobMetrics, implemented identifier extraction and mapping logic, refactored metric processing to use qualified keys
JobConfigParser.java Changed visibility of action name creation methods from package-private to public to enable test usage
BaseServiceTableMetricsTest.java Added new test for multiple sinks scenario, updated existing tests to pass JobDAGInfo parameter
RestApiIT.java Updated metric key assertions to include Source/Sink identifier prefixes
MultiTableMetricsIT.java Updated metric key assertions to use qualified names with identifiers
CommittedMetricsIT.java Updated metric key assertions to use qualified names with identifiers

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

int arraySize = metricNode.size();

if (arraySize == sinkIdentifiers.size()) {
ObjectMapper mapper = new ObjectMapper();
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new ObjectMapper instance is created inside the loop at line 424, which is inefficient. ObjectMapper instantiation is relatively expensive and should be reused. Consider creating the ObjectMapper instance once before the loop (at the method level or as a static/instance field) to improve performance, especially when processing multiple sink identifiers.

Copilot uses AI. Check for mistakes.
if (arraySize == sinkIdentifiers.size()) {
ObjectMapper mapper = new ObjectMapper();
for (int i = 0; i < arraySize; i++) {
String sinkIdentifier = sinkIdentifiers.get(i);
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable 'sinkIdentifier' at line 426 is misleadingly named. This variable holds an identifier that could be for either a source or a sink, depending on the context. The name should reflect this dual purpose. Consider renaming it to 'identifier' to avoid confusion.

Copilot uses AI. Check for mistakes.
Comment on lines +49 to 58
public static String createSourceActionName(int configIndex, String pluginName) {
return String.format("Source[%s]-%s", configIndex, pluginName);
}

static String createSinkActionName(int configIndex, String pluginName, String table) {
public static String createSinkActionName(int configIndex, String pluginName, String table) {
return String.format("Sink[%s]-%s-%s", configIndex, pluginName, table);
}

static String createTransformActionName(int configIndex, String pluginName) {
public static String createTransformActionName(int configIndex, String pluginName) {
return String.format("Transform[%s]-%s", configIndex, pluginName);
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The methods createSourceActionName, createSinkActionName, and createTransformActionName are changed from package-private to public visibility. While this change enables their use in tests, there is no documentation explaining their purpose, parameters, or expected usage. Consider adding JavaDoc comments to these newly public methods to help external callers understand how to use them correctly, especially since they are now part of the public API of the class.

Copilot uses AI. Check for mistakes.
Map<String, List<String>> tableToSinkIdentifiersMap = new HashMap<>();
if (jobDAGInfo != null && jobDAGInfo.getVertexInfoMap() != null) {
for (VertexInfo vertexInfo : jobDAGInfo.getVertexInfoMap().values()) {
String identifier = extractSinkIdentifier(vertexInfo.getConnectorType());
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition at line 273 lacks a clarifying comment. The check 'identifier.equals(vertexInfo.getConnectorType())' is used to detect when the extractSinkIdentifier method failed to extract a pattern (meaning it returned the original input unchanged). This logic is not immediately obvious to readers. Consider adding a comment explaining that this condition filters out vertices where no identifier pattern was found, or refactor extractSinkIdentifier to return an Optional to make the intent clearer.

Suggested change
String identifier = extractSinkIdentifier(vertexInfo.getConnectorType());
String identifier = extractSinkIdentifier(vertexInfo.getConnectorType());
// When extractSinkIdentifier cannot find a pattern, it returns the original
// connectorType. In that case (identifier equals connectorType) we skip this
// vertex because no specific identifier was extracted for metrics grouping.

Copilot uses AI. Check for mistakes.
Comment on lines 323 to +375
processMetric(
metricName, tableName, metricNode, tableMetricsMaps);
metricName,
tableName,
metricNode,
tableMetricsMaps,
identifiersMap);
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential null pointer issue: When identifiersMap is null (which happens when a metric name contains "#" but doesn't start with TableSource, Source, TableSink, or Sink), the method processMetric is still called with a null map at line 375. Inside processMetric at line 415, calling .get(tableName) on a null map will throw a NullPointerException. While current metric names appear to always match the expected prefixes, this creates a fragile dependency. Consider adding a null check at line 415 (e.g., 'if (tableToSinkIdentifiersMap != null)') or validating that identifiersMap is not null before calling processMetric to make the code more robust.

Copilot uses AI. Check for mistakes.
return;
}

java.util.List<String> sinkIdentifiers = tableToSinkIdentifiersMap.get(tableName);
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable 'sinkIdentifiers' at line 415 is misleadingly named. This variable can contain either source or sink identifiers depending on which map is passed in (tableToSourceIdentifiersMap or tableToSinkIdentifiersMap). Using the name 'sinkIdentifiers' when it might actually contain source identifiers makes the code confusing. Consider renaming it to 'identifiers' to accurately reflect what it contains.

Copilot uses AI. Check for mistakes.
}
}

private String extractSinkIdentifier(String vertexName) {
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name 'extractSinkIdentifier' is misleading because it can extract identifiers for Source, Sink, or Transform plugins, not just sinks. The regex pattern on line 500 explicitly matches all three types. Consider renaming the method to 'extractPluginIdentifier' or 'extractVertexIdentifier' to accurately reflect its broader purpose.

Suggested change
private String extractSinkIdentifier(String vertexName) {
private String extractPluginIdentifier(String vertexName) {

Copilot uses AI. Check for mistakes.
Comment on lines 417 to 443
if (sinkIdentifiers != null
&& !sinkIdentifiers.isEmpty()
&& metricNode.isArray()
&& sinkIdentifiers.size() > 1) {
int arraySize = metricNode.size();

if (arraySize == sinkIdentifiers.size()) {
ObjectMapper mapper = new ObjectMapper();
for (int i = 0; i < arraySize; i++) {
String sinkIdentifier = sinkIdentifiers.get(i);
String metricKey = sinkIdentifier + "." + tableName;

try {
String json = "[" + mapper.writeValueAsString(metricNode.get(i)) + "]";
JsonNode arrayNode = mapper.readTree(json);
putMetricToMap(metricName, metricKey, arrayNode, tableMetricsMaps);
} catch (JsonProcessingException e) {
putMetricToMap(metricName, metricKey, metricNode.get(i), tableMetricsMaps);
}
}
return;
}
}

String metricKey = tableName;
if (sinkIdentifiers != null && !sinkIdentifiers.isEmpty()) {
metricKey = sinkIdentifiers.get(0) + "." + tableName;
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition at line 420 checks if sinkIdentifiers.size() is greater than 1, but the subsequent check at line 423 verifies if arraySize equals sinkIdentifiers.size(). This means when there's only one identifier (sinkIdentifiers.size() == 1), the code skips to line 441 and uses sinkIdentifiers.get(0). However, when sinkIdentifiers.size() > 1 but arraySize != sinkIdentifiers.size(), the code also falls through to line 441 and only uses the first identifier, potentially losing data. Consider adding a warning log or handling this mismatch case more explicitly to alert developers when the array size doesn't match the expected number of identifiers.

Copilot uses AI. Check for mistakes.
if (vertexInfo.getType() == PluginType.SOURCE) {
targetMap = tableToSourceIdentifiersMap;
} else if (vertexInfo.getType() == PluginType.SINK) {
targetMap = tableToSinkIdentifiersMap;
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code at lines 276-281 only handles SOURCE and SINK plugin types, ignoring TRANSFORM types. However, the extractSinkIdentifier method on line 271 explicitly extracts Transform identifiers (as shown in the regex pattern on line 500). This inconsistency could lead to confusion. If Transform metrics are not expected to be processed in this context, consider updating the regex pattern to exclude Transform. If Transform metrics should be handled, add a case for PluginType.TRANSFORM.

Suggested change
targetMap = tableToSinkIdentifiersMap;
targetMap = tableToSinkIdentifiersMap;
} else if (vertexInfo.getType() == PluginType.TRANSFORM) {
// Currently, transform plugin metrics are not aggregated by table
// in this method. We still call extractSinkIdentifier for all
// plugin types, but intentionally do not populate a target map
// for TRANSFORM here.

Copilot uses AI. Check for mistakes.
Comment on lines 430 to 431
String json = "[" + mapper.writeValueAsString(metricNode.get(i)) + "]";
JsonNode arrayNode = mapper.readTree(json);
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JSON serialization and deserialization at lines 430-431 appears inefficient. The code converts a single JsonNode element to a JSON string, wraps it in brackets to create an array string, and then parses it back to a JsonNode array. This round-trip conversion is unnecessary overhead. Consider creating the array node directly using Jackson's API (e.g., mapper.createArrayNode().add(metricNode.get(i))) instead of string manipulation and re-parsing.

Copilot uses AI. Check for mistakes.
Co-authored-by: Guangdong Liu <804167098@qq.com>
@davidzollo
Copy link
Contributor

davidzollo commented Jan 26, 2026

Good job. Here is the result from Claudcode that you can refer to.

Issue 1: Breaking Change Not Documented

Problem: This PR introduces a breaking change to the metrics API format but does not document it in incompatible-changes.md.

Impact:

  • All existing monitoring dashboards (Grafana) will break
  • Prometheus alerting rules will fail
  • Custom monitoring integrations will need updates
  • Users upgrading will experience silent monitoring failures

Required Actions:

  1. Update docs/zh/concept/incompatible-changes.md (Chinese)
  2. Update docs/en/concept/incompatible-changes.md (English)
  3. Provide migration guide with before/after examples
  4. Update PR description with clear breaking change warning

Suggested Documentation Content:

## v2.x.x

### Breaking Change: Metrics API Format Change

**Affected Version**: 2.x.x+
**Affected Component**: REST API, Metrics System

**Description**:
To support multiple sinks/sources processing the same table, metric key format
has been changed from `{tableName}` to `{VertexIdentifier}.{tableName}`.

**Before**:
```json
{
  "TableSinkWriteCount": {
    "fake.user_table": "15"
  }
}

After:

{
  "TableSinkWriteCount": {
    "Sink[0].fake.user_table": "10",
    "Sink[1].fake.user_table": "5"
  }
}

Migration Guide:

  1. Update Grafana dashboard queries to use new metric key format
  2. Update Prometheus alerting rules
  3. If compatibility needed, add configuration: metrics.use-legacy-format=true

**Files to Update**:
- `docs/zh/concept/incompatible-changes.md`
- `docs/en/concept/incompatible-changes.md`
- PR description

---

### Issue 2: Array Size Mismatch Handling is Flawed  **HIGH**

**Location**: [BaseService.java:417-446](seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java#L417-L446)

**Problem**: When metric array size doesn't match the number of sinks, the code blindly uses the first sink identifier, which can lead to incorrect metric attribution.

**Current Code**:
```java
if (arraySize == sinkIdentifiers.size()) {
    // Handle matched case
    // ...
    return;
}

// If sizes don't match, blindly use first identifier
String metricKey = tableName;
if (sinkIdentifiers != null && !sinkIdentifiers.isEmpty()) {
    metricKey = sinkIdentifiers.get(0) + "." + tableName;  // ❌ Wrong!
}

Why This is Critical:

  • Scenario: 2 sinks configured, but only Sink[1] has reported metrics yet
  • Current Behavior: Labels the metric as Sink[0].table (WRONG)
  • Actual: The metric is from Sink[1]
  • Impact: Monitoring data is incorrect, misleading operations team

Common Scenarios for Size Mismatch:

  1. Sink startup delay (very common in production)
  2. Sink failure and restart
  3. Network latency causing delayed metric reporting
  4. Different parallelism across sinks

Suggested Fix:

if (sinkIdentifiers != null
        && !sinkIdentifiers.isEmpty()
        && metricNode.isArray()
        && sinkIdentifiers.size() > 1) {
    int arraySize = metricNode.size();

    if (arraySize == sinkIdentifiers.size()) {
        // Perfect match: assign by index
        ObjectMapper mapper = new ObjectMapper();
        for (int i = 0; i < arraySize; i++) {
            String sinkIdentifier = sinkIdentifiers.get(i);
            String metricKey = sinkIdentifier + "." + tableName;

            try {
                String json = "[" + mapper.writeValueAsString(metricNode.get(i)) + "]";
                JsonNode arrayNode = mapper.readTree(json);
                putMetricToMap(metricName, metricKey, arrayNode, tableMetricsMaps);
            } catch (JsonProcessingException e) {
                putMetricToMap(metricName, metricKey, metricNode.get(i), tableMetricsMaps);
            }
        }
        return;
    } else if (arraySize > 0 && arraySize < sinkIdentifiers.size()) {
        // Partial match: log warning and assign by index for available metrics
        log.warn("Metric array size mismatch for table {}: expected {} sinks, got {} metrics. "
                + "Some sinks may not be reporting metrics yet. This could indicate: "
                + "1) Sink startup delay, 2) Sink failure, 3) Network latency",
                tableName, sinkIdentifiers.size(), arraySize);

        // Assign available metrics by index position
        for (int i = 0; i < arraySize; i++) {
            String sinkIdentifier = sinkIdentifiers.get(i);
            String metricKey = sinkIdentifier + "." + tableName;
            try {
                String json = "[" + mapper.writeValueAsString(metricNode.get(i)) + "]";
                JsonNode arrayNode = mapper.readTree(json);
                putMetricToMap(metricName, metricKey, arrayNode, tableMetricsMaps);
            } catch (JsonProcessingException e) {
                putMetricToMap(metricName, metricKey, metricNode.get(i), tableMetricsMaps);
            }
        }
        return;
    } else if (arraySize > sinkIdentifiers.size()) {
        // More metrics than expected sinks - serious configuration issue
        log.error("Invalid metric array size for table {}: received {} metrics but only {} sinks configured. "
                + "This indicates a serious configuration or collection error.",
                tableName, arraySize, sinkIdentifiers.size());
        // Fall through to default handling
    }
}

// Default/fallback handling for single value or unmatched scenarios
String metricKey = tableName;
if (sinkIdentifiers != null && !sinkIdentifiers.isEmpty()) {
    if (metricNode.isArray() && metricNode.size() == 1) {
        log.debug("Single metric value for table {} with {} configured sinks, assigning to first sink {}",
                tableName, sinkIdentifiers.size(), sinkIdentifiers.get(0));
        metricKey = sinkIdentifiers.get(0) + "." + tableName;
    } else {
        log.warn("Cannot reliably determine sink assignment for table {} metric (isArray={}, size={}), "
                + "using table name only to avoid incorrect attribution",
                tableName, metricNode.isArray(),
                metricNode.isArray() ? metricNode.size() : "N/A");
    }
}

putMetricToMap(metricName, metricKey, metricNode, tableMetricsMaps);

Key Improvements:

  1. Handle partial match scenario (some sinks not reporting yet)
  2. Add comprehensive logging for debugging
  3. Explicitly handle array size > sink count (configuration error)
  4. Fall back to table name when assignment is unreliable

Issue 3: JSON Exception Silently Drops All Metrics HIGH

Location: BaseService.java:349-388

Problem: When JSON parsing fails, the method returns an empty metrics map without any logging, causing complete silent failure of metrics collection.

Current Code:

try {
    JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);

    jobMetricsStr.fieldNames().forEachRemaining(metricName -> {
        // Process metrics
    });

    aggregateMetrics(...);

} catch (JsonProcessingException e) {
    return metricsMap;  // ❌ Returns empty map, no logging!
}

Impact:

  • Complete loss of monitoring visibility when JSON parsing fails
  • No way to diagnose the issue without logs
  • Production job appears to have no metrics (misleading)

Suggested Fix:

try {
    JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);

    jobMetricsStr.fieldNames().forEachRemaining(metricName -> {
        try {
            if (metricName.contains("#")) {
                String tableName =
                        TablePath.of(metricName.split("#")[1]).getFullName();
                JsonNode metricNode = jobMetricsStr.get(metricName);

                Map<String, java.util.List<String>> identifiersMap = null;
                if (metricName.startsWith("TableSource")
                        || metricName.startsWith("Source")) {
                    identifiersMap = tableToSourceIdentifiersMap;
                } else if (metricName.startsWith("TableSink")
                        || metricName.startsWith("Sink")) {
                    identifiersMap = tableToSinkIdentifiersMap;
                }

                processMetric(
                        metricName,
                        tableName,
                        metricNode,
                        tableMetricsMaps,
                        identifiersMap);
            }
        } catch (Exception e) {
            // Don't let one metric failure kill all metrics
            log.error("Failed to process individual metric '{}': {}. Continuing with other metrics.",
                    metricName, e.getMessage(), e);
        }
    });

    aggregateMetrics(...);

} catch (JsonProcessingException e) {
    log.error("Failed to parse job metrics JSON: {}. Raw input (first 500 chars): {}",
            e.getMessage(),
            jobMetrics != null && jobMetrics.length() > 500
                ? jobMetrics.substring(0, 500) + "..."
                : jobMetrics,
            e);
    return metricsMap;
} catch (Exception e) {
    log.error("Unexpected error while processing job metrics: {}", e.getMessage(), e);
    return metricsMap;
}

Key Improvements:

  1. Log detailed error with raw JSON sample for debugging
  2. Wrap individual metric processing to prevent one failure from killing all metrics
  3. Add catch for unexpected exceptions
  4. Truncate long JSON in logs to avoid log explosion

Important Issues (Strongly Recommended)

Issue 4: Method Naming is Inaccurate MEDIUM

Location: BaseService.java:495-506

Problem: Method named extractSinkIdentifier but actually extracts Source, Sink, AND Transform identifiers.

Current Code:

private String extractSinkIdentifier(String vertexName) {
    if (vertexName == null) {
        return "";
    }

    Pattern pattern = Pattern.compile("((?:Sink|Source|Transform)\\[\\d+\\])");
    Matcher matcher = pattern.matcher(vertexName);
    if (matcher.find()) {
        return matcher.group(1);
    }
    return vertexName;
}

Suggested Fix:

/**
 * Extracts the vertex identifier (Source[n], Sink[n], or Transform[n]) from a vertex name.
 * <p>
 * This method is used to distinguish metrics from multiple vertices processing the same table.
 * For example, if two sinks write to "user_table", their metrics are distinguished as
 * "Sink[0].user_table" and "Sink[1].user_table".
 * </p>
 *
 * @param vertexName the vertex name from JobDAGInfo, typically in format
 *                   "pipeline-1 [Sink[0]-console-MultiTableSink]"
 * @return the extracted identifier (e.g., "Sink[0]"), or original name if no match found
 */
private String extractVertexIdentifier(String vertexName) {
    if (vertexName == null || vertexName.isEmpty()) {
        log.debug("Null or empty vertex name provided");
        return "";
    }

    Matcher matcher = VERTEX_IDENTIFIER_PATTERN.matcher(vertexName);
    if (matcher.find()) {
        String identifier = matcher.group(1);
        log.trace("Extracted vertex identifier '{}' from '{}'", identifier, vertexName);
        return identifier;
    }

    log.debug("Failed to extract vertex identifier from '{}', using original name", vertexName);
    return vertexName;
}

Also Update:

  • Rename all call sites: extractSinkIdentifierextractVertexIdentifier
  • Update variable names for clarity

Issue 5: Regex Pattern Not Cached MEDIUM (Performance)

Location: BaseService.java:500

Problem: Pattern.compile() is called on every invocation, which is expensive.

Current Code:

Pattern pattern = Pattern.compile("((?:Sink|Source|Transform)\\[\\d+\\])");

Impact:

  • In large DAGs (100+ vertices), this adds noticeable overhead
  • Pattern compilation is expensive (regex parsing + optimization)
  • Called once per vertex per metrics query

Suggested Fix:

// Add as class-level constant
private static final Pattern VERTEX_IDENTIFIER_PATTERN =
    Pattern.compile("((?:Sink|Source|Transform)\\[\\d+\\])");

private String extractVertexIdentifier(String vertexName) {
    if (vertexName == null || vertexName.isEmpty()) {
        return "";
    }

    Matcher matcher = VERTEX_IDENTIFIER_PATTERN.matcher(vertexName);
    if (matcher.find()) {
        return matcher.group(1);
    }
    return vertexName;
}

Issue 6: Missing Critical Logging MEDIUM

Problem: Key decision points have no logging, making production issues difficult to diagnose.

Missing Logs:

  1. When identifier extraction fails
  2. When array size doesn't match sink count
  3. When metrics are assigned to specific sinks
  4. When falling back to table name only

Suggested Additions: (Already included in Issue #2 fix above)


Issue 7: Missing Edge Case Tests MEDIUM

Problem: Test coverage doesn't include critical edge cases that will occur in production.

Missing Test Scenarios:

@Test
public void testMetricsWithArraySizeMismatch_FewerMetricsThanSinks() throws Exception {
    // Scenario: 2 sinks configured, but only 1 has reported metrics (common during startup)
    String jobMetrics =
            "{"
                    + "\"SinkWriteCount#fake.user_table\": [{\"value\": 100}],"  // Only 1 metric
                    + "\"SinkWriteCount\": [{\"value\": 100}]"
                    + "}";

    JobDAGInfo dagInfo = createDAGInfoWithMultipleSinks();  // 2 sinks

    Map<String, Object> result =
            (Map<String, Object>) getJobMetricsMethod.invoke(jobInfoService, jobMetrics, dagInfo);

    Map<String, Object> tableSinkCount = (Map<String, Object>) result.get("TableSinkWriteCount");

    // Should have exactly 1 entry for Sink[0]
    Assertions.assertEquals(1, tableSinkCount.size());
    Assertions.assertTrue(tableSinkCount.containsKey("Sink[0].fake.user_table"));
    Assertions.assertEquals(100L, tableSinkCount.get("Sink[0].fake.user_table"));

    // Should NOT have entry for Sink[1] (not reporting yet)
    Assertions.assertFalse(tableSinkCount.containsKey("Sink[1].fake.user_table"));
}

@Test
public void testMetricsWithMalformedJSON() throws Exception {
    // Scenario: Invalid JSON from metrics collection
    String malformedMetrics = "{\"SinkWriteCount#table\": [invalid}";

    JobDAGInfo dagInfo = createDAGInfoWithMultipleSinks();

    Map<String, Object> result =
            (Map<String, Object>) getJobMetricsMethod.invoke(jobInfoService, malformedMetrics, dagInfo);

    // Should return empty map, but not crash
    Assertions.assertNotNull(result);
    // Verify error was logged (would require log capture framework)
}

@Test
public void testMetricsWithNullJobDAGInfo() throws Exception {
    // Scenario: Job just started, DAG info not available yet
    String jobMetrics =
            "{"
                    + "\"SinkWriteCount#fake.user_table\": [{\"value\": 100}],"
                    + "\"SinkWriteCount\": [{\"value\": 100}]"
                    + "}";

    Map<String, Object> result =
            (Map<String, Object>) getJobMetricsMethod.invoke(jobInfoService, jobMetrics, null);

    Map<String, Object> tableSinkCount = (Map<String, Object>) result.get("TableSinkWriteCount");

    // Should fall back to table name only
    Assertions.assertTrue(tableSinkCount.containsKey("fake.user_table"));
}

@Test
public void testMetricsWithEmptyVertexName() throws Exception {
    // Scenario: Malformed vertex info with empty connector type
    // ... create DAG with empty vertex name ...
}

@Test
public void testMetricsPerformanceWithLargeDAG() throws Exception {
    // Scenario: Large-scale production job with 100+ vertices
    JobDAGInfo largeDAG = createLargeDAG(100, 1000);  // 100 vertices, 1000 tables

    String jobMetrics = createMockMetrics(1000);  // Large metrics JSON

    long startTime = System.currentTimeMillis();
    Map<String, Object> result =
            (Map<String, Object>) getJobMetricsMethod.invoke(jobInfoService, jobMetrics, largeDAG);
    long duration = System.currentTimeMillis() - startTime;

    Assertions.assertTrue(duration < 200,
        "Metrics processing should complete within 200ms, but took " + duration + "ms");
}

Optional Improvements (Quality Enhancement)

Issue 8: Missing Javadoc Comments LOW

Problem: Core methods lack proper Javadoc documentation.

Methods Missing Javadoc:

  • getJobMetrics
  • processMetric
  • putMetricToMap
  • extractSinkIdentifier / extractVertexIdentifier

Example:

/**
 * Retrieves and processes job metrics, adding vertex identifiers for multi-sink scenarios.
 * <p>
 * When multiple sinks process the same table, this method distinguishes their metrics by
 * prepending the vertex identifier (e.g., "Sink[0].table", "Sink[1].table").
 * </p>
 *
 * @param jobMetrics raw metrics JSON string from the job
 * @param jobDAGInfo DAG information containing vertex details (can be null for backward compatibility)
 * @return map of processed metrics with proper identifiers
 */
private Map<String, Object> getJobMetrics(String jobMetrics, JobDAGInfo jobDAGInfo) {
    // ...
}

…ices with updated attribution logic and tests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants