Skip to content

[Bug]: incremental_predicates option has no effect #306

@quangbilly79

Description

@quangbilly79

Is there an existing issue for this?

  • I have searched the existing issues

Current Behavior

The incremental_predicates option has no effect at all.
About the incremental_predicates option on the dbt doc

 incremental_strategy: merge
      # this limits the scan of the existing table to the last 7 days of data
      incremental_predicates: ["DBT_INTERNAL_DEST.session_start > dateadd(day, -7, current_date)"]
      # `incremental_predicates` accepts a list of SQL statements. 
      # `DBT_INTERNAL_DEST` and `DBT_INTERNAL_SOURCE` are the standard aliases for the target table and temporary table, respectively, during an incremental run using the merge strategy. 

My model.sql
{{
config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy = 'merge',
incremental_predicates = ["DBT_INTERNAL_DEST.id >= 2"]
)
}}
select
*
from hive_source.test_dbt.TestDBTSourceMergeComplex1

Below is the SQL Merge query generated by dbt-dremio on the --debug log

    merge into "hive_source"."test_dbt"."testdbtdesmergecomplex1" as DBT_INTERNAL_DEST
        using "hive_source"."test_dbt"."testdbtdesmergecomplex1__dbt_tmp" as DBT_INTERNAL_SOURCE
        on (
                DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id
            )


    when matched then update set
        "id" = DBT_INTERNAL_SOURCE."id","data" = DBT_INTERNAL_SOURCE."data","data1" = DBT_INTERNAL_SOURCE."data1"


    when not matched then insert
        ("id", "data", "data1")
    values
        (DBT_INTERNAL_SOURCE.id,DBT_INTERNAL_SOURCE.data,DBT_INTERNAL_SOURCE.data1)

As you can see, there should be something like

        on (
                DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id
                and DBT_INTERNAL_DEST.id >= 2
            )

I also checked the .py source code Lib\site-packages\dbt\include\dremio\macros\materializations\incremental\strategies.sql
There are some lines of code related to incremental_predicates

{% macro dremio__get_incremental_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) -%}
 {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}
    {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
    {%- set merge_update_columns = config.get('merge_update_columns') -%}
    {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
    {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}
    {%- set sql_header = config.get('sql_header', none) -%}

    {% if unique_key %}
        {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
            {% for key in unique_key %}
                {% set this_key_match %}
                    DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }}
                {% endset %}
                {% do predicates.append(this_key_match) %}
            {% endfor %}
        {% else %}
            {% set unique_key_match %}
                DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
            {% endset %}
            {% do predicates.append(unique_key_match) %}
        {% endif %}
    {% else %}
        {% do predicates.append('FALSE') %}
    {% endif %}

    {{ sql_header if sql_header is not none }}

    merge into {{ target }} as DBT_INTERNAL_DEST
        using {{ source }} as DBT_INTERNAL_SOURCE
        on {{"(" ~ predicates | join(") and (") ~ ")"}}

    {% if unique_key %}
    when matched then update set
        {% for column_name in update_columns -%}
            {{ column_name }} = DBT_INTERNAL_SOURCE.{{ column_name }}
            {%- if not loop.last %}, {%- endif %}
        {%- endfor %}
    {% endif %}

    when not matched then insert
        ({{ dest_cols_csv }})
    values
        ({% for column_name in dest_columns | map(attribute="name") -%}
            DBT_INTERNAL_SOURCE.{{ column_name }}
            {%- if not loop.last %}, {%- endif %}
        {%- endfor %})

As you can see, the incremental_predicates should pass in the part

    merge into {{ target }} as DBT_INTERNAL_DEST
        using {{ source }} as DBT_INTERNAL_SOURCE
        on {{"(" ~ predicates | join(") and (") ~ ")"}}

Try to add some debug lines into the incremental\strategies.sql source code file

{% macro dremio__get_incremental_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) -%}
 -- DEBUG: incremental_predicates raw = {{ incremental_predicates | tojson }}
 {%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}
 -- DEBUG: predicates after init = {{ predicates | tojson }}
    {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

Turns out incremental_predicates is passed as null, why? Which files are responsible for passing this option to strategies.sql?
-- DEBUG: incremental_predicates raw = null-- DEBUG: predicates after init = []

Currently using dbt-dremio 1.9.0 version

Expected Behavior

No response

Steps To Reproduce

No response

Environment

- OS:Window
- dbt-dremio:1.9.0
- Dremio Software: 26.0.0
- Dremio Cloud:

Relevant log output

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions