Skip to content

Risk of excessive logical plan growth (Catalyst) with VTL operations in Trevas-Spark on wide datasets #437

@MiguelRosaTauroni

Description

@MiguelRosaTauroni

Hi again guys 😄

We are evaluating VTL statements close to the complexity expected in a production environment, with a certain level of demand in order to detect potential runtime issues and measure performance. Due to problems experienced in the past #413, we have revisited Catalyst outputs to check whether, under more complex conditions —both in terms of the number of columns in input datasets and the complexity of VTL statements— similar issues could arise.

Our preliminary conclusion is that the Trevas engine passes the usual test scenarios; however, under more complex situations we observe a potential runtime error risk associated with the size/width of the logical plan.

Current status

So far we have not encountered new errors in this test batch related to Catalyst logical plan calculation. However, when inspecting the results, we already observe a significant plan expansion under the tested conditions. In previous tests —with equivalent input datasets and comparable queries in Spark SQL and VTL— the Catalyst output size (number of nodes/chained Projects, depth, etc.) was notably larger with Trevas-VTL. We believe that if both the number of columns and the complexity of VTL operations increase, the risk of error also increases.

Motivation

Our main concern is that if a runtime error occurs during logical plan calculation, based on past experience we have not found a way to recover from it. That is why we previously made a modification in your code (#413) to reduce the amount of chained Projects generated by Catalyst in scenarios that systematically occurred when creating any SparkDataset, and which were strongly impacted by the number of columns in the input DataFrame (problem solved and it's working well now, at least the issue that we had at the beginning)

Driver resources and Catalyst

We understand that Catalyst cannot be assigned its own dedicated memory, but it runs within the driver process. In managed environments, driver memory can be increased (e.g., by changing the driver class/size or parameters such as spark.driver.memory). Nevertheless, when the plan grows too large, the process may still become impractical to continue. Therefore, we prefer to act on plan generation (minimizing consecutive Projects and intermediate renames) rather than relying solely on resource scaling.

Conditions of the current test batch

  • Trevas Version: v1.11.0
  • VTL script engine: Spark
  • Environment: AWS Glue v5.0
  • Execution resources: Irrelevant (tested with various configurations)
  • Archetype of VTL statements (we understand that there are different approaches to address the same situation and we are aligning with the functional team to reach best practices in statement design, aiming for optimal performance):
party := m10_prty
[keep
    m10_prty_col_a
    , m10_prty_col_b
    , m10_prty_col_c
    , m10_prty_col_d
    , m10_prty_col_e
    , m10_prty_col_f
    , m10_prty_col_g
]
[rename
    m10_prty_col_c to party_col_h
    , m10_prty_col_d to party_col_i
    , m10_prty_col_e to party_col_j
    , m10_prty_col_f to party_col_k
    , m10_prty_col_g to party_col_l
]
[calc is_intragroup := 0]
[calc identifier party_col_h := party_col_h]
;

deposit := m10_dpst
[keep
    m10_dpst_col_a
    , m10_dpst_col_b
    , m10_dpst_col_c
    , m10_dpst_col_d
    , m10_dpst_col_e
    , m10_dpst_col_f
    , m10_dpst_col_g
    , m10_dpst_col_h
    , m10_dpst_col_i
    , m10_dpst_col_j
    , m10_dpst_col_k
    , m10_dpst_col_l
    , m10_dpst_col_m
    , m10_dpst_col_n
]
[rename
    m10_dpst_col_c to deposit_col_o
    , m10_dpst_col_d to deposit_col_p
    , m10_dpst_col_e to deposit_col_q
    , m10_dpst_col_f to deposit_col_r
    , m10_dpst_col_g to deposit_col_s
    , m10_dpst_col_h to deposit_col_t
    , m10_dpst_col_i to deposit_col_u
    , m10_dpst_col_j to deposit_col_v
    , m10_dpst_col_k to deposit_col_w
    , m10_dpst_col_l to deposit_col_x
    , m10_dpst_col_m to deposit_col_y
    , m10_dpst_col_n to deposit_col_z
]
[calc identifier deposit_col_p := deposit_col_p]
;

deposit_and_party := left_join(deposit, party using m10_dpst_col_a, m10_dpst_col_b, deposit_col_p);

deposit_mgo_0 := deposit_and_party
[calc
    oa_country := substr(m10_dpst_col_b, 1, 2)
    , counterpart_activity_two_digits := substr(party_col_j, 1, 2)
    , ibsi_instrument_flag := 1
    , domestic_currency := if deposit_col_y = "CURR1" and party_col_k in
    {
        "C1", "C2", "C3", "C4", "C5"
    }
    then 1
    else 0
]
[rename
    party_col_i to counterpart_col_a
    , party_col_k to counterpart_col_b
    , party_col_h to counterpart_col_c
    , party_col_l to counterpart_col_d
]
;

deposit_mgo_1 := deposit_mgo_0[calc
    bs_count_sector_1a := case when (counterpart_col_c = "ID001") and (counterpart_col_a = "S121") then "1110"
        when (counterpart_col_c <> "ID001") and (counterpart_col_a = "S121") then "1120"
        when (counterpart_col_a in {"S122", "S122_A"}) then "1211"
        else ""
    , bs_count_sector_1b := case when (counterpart_col_a in {"S121", "S122"}) then "00BK" else "00NB"
    , bs_count_sector_1c := case when (counterpart_col_a not_in {"S121", "S122"}) then "00NR" else ""
    , bs_count_sector_1d := case when (counterpart_col_c = "ID002") then "0099" else ""
    , bs_count_sector_1e := case when (counterpart_col_a = "S125_A") then "2271"
        when (counterpart_col_a = "S125_E") then "2272"
        else ""
]
.....

Input datasets:

  • m10_prty = 19 columns and 67,460,082 rows
  • m10_dpst = 73 columns and 134,839,646 rows

Plan inspection in Spark UI

In the SQL/Dataframe tab of the Spark UI, DAGs are visible, but Catalyst information is not always fully shown. When not directly available, we have been able to recover the Parsed / Analyzed / Optimized / Physical plan sections from the HTML using developer tools (F12) and exploring the embedded body/payload.

Catalyst content (observations and small evidence of the situation)

== Analyzed Logical Plan ==
count: bigint
Aggregate [count(1) AS count#5469L]
+- Project [deposit_col_z#4803 AS deposit_col_z#5246, counterpart_col_c#4804 AS counterpart_col_c#5247, deposit_col_w#4805 AS deposit_col_w#5248, counterpart_col_a#4806 AS counterpart_col_a#5249, counterpart_col_d#4807 AS counterpart_col_d#5250, deposit_col_o#4808 AS deposit_col_o#5251, party_col_j#4809 AS party_col_j#5252, is_intragroup#4810L AS is_intragroup#5253L, m10_dpst_col_b#4811 AS m10_dpst_col_b#5254, deposit_col_q#4812 AS deposit_col_q#5255, deposit_col_x#4813 AS deposit_col_x#5256, deposit_col_t#4814 AS deposit_col_t#5257, deposit_col_y#4815 AS deposit_col_y#5258, counterpart_col_b#4816 AS counterpart_col_b#5259, deposit_col_v#4817L AS deposit_col_v#5260L, deposit_col_u#4818 AS deposit_col_u#5261, deposit_col_r#4819 AS deposit_col_r#5262, deposit_col_s#4820 AS deposit_col_s#5263, m10_dpst_col_a#4821 AS m10_dpst_col_a#5264, oa_country#4822 AS oa_country#5265, counterpart_activity_two_digits#4823 AS counterpart_activity_two_digits#5266, ibsi_instrument_flag#4824L AS ibsi_instrument_flag#5267L, domestic_currency#4825L AS domestic_currency#5268L, bs_count_sector_1a#4826 AS bs_count_sector_1a#5269, ... 50 more fields]
   +- Project [deposit_col_z#4803, counterpart_col_c#4804, deposit_col_w#4805, counterpart_col_a#4806, counterpart_col_d#4807, deposit_col_o#4808, party_col_j#4809, is_intragroup#4810L, m10_dpst_col_b#4811, deposit_col_q#4812, deposit_col_x#4813, deposit_col_t#4814, deposit_col_y#4815, counterpart_col_b#4816, deposit_col_v#4817L, deposit_col_u#4818, deposit_col_r#4819, deposit_col_s#4820, m10_dpst_col_a#4821, oa_country#4822, counterpart_activity_two_digits#4823, ibsi_instrument_flag#4824L, domestic_currency#4825L, bs_count_sector_1a#4826, ... 50 more fields]
      +- Project [deposit_col_z#4803, counterpart_col_c#4804, deposit_col_w#4805, counterpart_col_a#4806, counterpart_col_d#4807, deposit_col_o#4808, party_col_j#4809, is_intragroup#4810L, m10_dpst_col_b#4811, deposit_col_q#4812, deposit_col_x#4813, deposit_col_t#4814, deposit_col_y#4815, counterpart_col_b#4816, deposit_col_v#4817L, deposit_col_u#4818, deposit_col_r#4819, deposit_col_s#4820, m10_dpst_col_a#4821, oa_country#4822, counterpart_activity_two_digits#4823, ibsi_instrument_flag#4824L, domestic_currency#4825L, bs_count_sector_1a#4826, ... 50 more fields]

From the extracted information and the number of nested Projects, we have started to outline a possible action plan. When analyzing the behavior of the keep statement, derived from issue #430, we reviewed the following lines of code (we may not have fully understood the intended design philosophy and, if our hypothesis is incorrect, we kindly ask for clarification):

InseeFr/Trevas/vtl-engine/src/main/java/fr/insee/vtl/engine/visitors/ClauseVisitor.java

@Override
 public DatasetExpression visitKeepOrDropClause(VtlParser.KeepOrDropClauseContext ctx) {
   // Normalize to keep operation.
   var keep = ctx.op.getType() == VtlParser.KEEP;
   var names = ctx.componentID().stream().map(ClauseVisitor::getName).collect(Collectors.toSet());
   List<String> columnNames =
       datasetExpression.getDataStructure().values().stream()
           .map(Dataset.Component::getName)
           .filter(name -> keep == names.contains(name))
           .collect(Collectors.toList());

   return processingEngine.executeProject(datasetExpression, columnNames);
 }

InseeFr/Trevas/vtl-spark/src/main/java/fr/insee/vtl/spark/SparkProcessingEngine.java

@Override
public DatasetExpression executeProject(DatasetExpression expression, List<String> columnNames) {
  SparkDataset dataset = asSparkDataset(expression);

  List<Column> columns = columnNames.stream().map(Column::new).collect(Collectors.toList());
  Seq<Column> columnSeq = iterableAsScalaIterable(columns).toSeq();

  // Project in spark.
  Dataset<Row> result = dataset.getSparkDataset().select(columnSeq);

  return new SparkDatasetExpression(new SparkDataset(result, getRoleMap(dataset)), expression);
}

We understand that each invocation of visitKeepOrDropClause normalizes to keep and delegates to executeProject, which applies a select(...) on the underlying Dataset and returns a new SparkDatasetExpression while preserving roles. The wrapper itself should not be the main cost, but each select introduces a Project, and if multiple operations and/or renames are chained, the logical plan grows.

This same situation occurs in other functions within SparkProcessingEngine which, by design, also tend to introduce additional Projects:

InseeFr/Trevas/vtl-spark/src/main/java/fr/insee/vtl/spark/SparkProcessingEngine.java

@Override
public DatasetExpression executeCalc(
    DatasetExpression expression,
    Map<String, ResolvableExpression> expressions,
    Map<String, Role> roles,
    Map<String, String> expressionStrings) {
  SparkDataset dataset = asSparkDataset(expression);
  Dataset<Row> ds = dataset.getSparkDataset();

  // Rename all the columns to avoid conflicts (static single assignment).
  Map<String, String> aliasesToName = new HashMap<>();
  Map<String, ResolvableExpression> renamedExpressions = new LinkedHashMap<>();
  Map<String, String> renamedExpressionString = new LinkedHashMap<>();
  for (var name : expressions.keySet()) {
    String alias = name + "_" + aliasesToName.size();
    renamedExpressions.put(alias, expressions.get(name));
    renamedExpressionString.put(alias, expressionStrings.get(name));
    aliasesToName.put(alias, name);
  }

  // First pass with interpreted spark expressions
  Dataset<Row> interpreted = executeCalcInterpreted(ds, renamedExpressionString);

  // Execute the rest using the resolvable expressions
  Dataset<Row> evaluated = executeCalcEvaluated(interpreted, renamedExpressions);

  // Rename the columns back to their original names
  Dataset<Row> renamed = rename(evaluated, aliasesToName);

  // Create the new role map.
  var roleMap = getRoleMap(dataset);
  roleMap.putAll(roles);

  return new SparkDatasetExpression(new SparkDataset(renamed, roleMap), expression);
}

@Override
public DatasetExpression executeFilter(
    DatasetExpression expression, ResolvableExpression filter, String filterText) {
  SparkDataset dataset = asSparkDataset(expression);

  Dataset<Row> ds = dataset.getSparkDataset();
  try {
    Dataset<Row> result = ds.filter(filterText);
    return new SparkDatasetExpression(new SparkDataset(result, getRoleMap(dataset)), expression);
  } catch (Exception e) {
    SparkFilterFunction filterFunction = new SparkFilterFunction(filter);
    Dataset<Row> result = ds.filter(filterFunction);
    return new SparkDatasetExpression(new SparkDataset(result, getRoleMap(dataset)), expression);
  }
}

@Override
public DatasetExpression executeRename(DatasetExpression expression, Map<String, String> fromTo) {
  SparkDataset dataset = asSparkDataset(expression);

  var result = rename(dataset.getSparkDataset(), fromTo);

  var originalRoles = getRoleMap(dataset);
  var renamedRoles = new LinkedHashMap<>(originalRoles);
  for (Map.Entry<String, String> fromToEntry : fromTo.entrySet()) {
    renamedRoles.put(fromToEntry.getValue(), originalRoles.get(fromToEntry.getKey()));
  }

  return new SparkDatasetExpression(new SparkDataset(result, renamedRoles), expression);
}

public Dataset<Row> rename(Dataset<Row> dataset, Map<String, String> fromTo) {
  List<Column> columns = new ArrayList<>();
  for (String name : dataset.columns()) {
    if (fromTo.containsKey(name)) {
      columns.add(col(name).as(fromTo.get(name)));
    } else if (!fromTo.containsValue(name)) {
      columns.add(col(name));
    }
  }
  return dataset.select(iterableAsScalaIterable(columns).toSeq());
}

What we guess from our understanding?

  • In workloads with wide datasets and many VTL transformations, the pattern calc → rename → keep/drop → … can chain Projects and produce very large plans.
  • Although Spark has rules to collapse adjacent Projects, these do not always apply (e.g., due to expression complexity or intermediate aliasing/dependencies).

We would like to know your perspective on this matter and better understand the reasoning behind the design choices made when implementing the engine. This would help us assess whether it makes sense to pursue refactoring tasks or define a contingency plan in case such errors occur in the near future, since we are concerned about the possibility of the issue arising without a viable way to resolve it within a reasonable timeframe.

Thank you in advance, and apologies for the length of this issue description! If you need any additional details, please let us know.

Best regards, and congratulations on the great work! 🥇

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions