Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbt_internal_packages/dbt-adapters/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: dbt
180 changes: 180 additions & 0 deletions dbt_internal_packages/dbt-adapters/macros/adapters/apply_grants.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
{# ------- BOOLEAN MACROS --------- #}

{#
-- COPY GRANTS
-- When a relational object (view or table) is replaced in this database,
-- do previous grants carry over to the new object? This may depend on:
-- whether we use alter-rename-swap versus CREATE OR REPLACE
-- user-supplied configuration (e.g. copy_grants on Snowflake)
-- By default, play it safe, assume TRUE: that grants ARE copied over.
-- This means dbt will first "show" current grants and then calculate diffs.
-- It may require an additional query than is strictly necessary,
-- but better safe than sorry.
#}

-- funcsign: () -> bool
{% macro copy_grants() %}
{{ return(adapter.dispatch('copy_grants', 'dbt')()) }}
{% endmacro %}

-- funcsign: () -> bool
{% macro default__copy_grants() %}
{{ return(True) }}
{% endmacro %}


{#
-- SUPPORT MULTIPLE GRANTEES PER DCL STATEMENT
-- Does this database support 'grant {privilege} to {grantee_1}, {grantee_2}, ...'
-- Or must these be separate statements:
-- `grant {privilege} to {grantee_1}`;
-- `grant {privilege} to {grantee_2}`;
-- By default, pick the former, because it's what we prefer when available.
#}

-- funcsign: () -> bool
{% macro support_multiple_grantees_per_dcl_statement() %}
{{ return(adapter.dispatch('support_multiple_grantees_per_dcl_statement', 'dbt')()) }}
{% endmacro %}

-- funcsign: () -> bool
{%- macro default__support_multiple_grantees_per_dcl_statement() -%}
{{ return(True) }}
{%- endmacro -%}

-- funcsign: (optional[relation], optional[bool]) -> bool
{% macro should_revoke(existing_relation, full_refresh_mode=True) %}

{% if not existing_relation %}
{#-- The table doesn't already exist, so no grants to copy over --#}
{{ return(False) }}
{% elif full_refresh_mode %}
{#-- The object is being REPLACED -- whether grants are copied over depends on the value of user config --#}
{{ return(copy_grants()) }}
{% else %}
{#-- The table is being merged/upserted/inserted -- grants will be carried over --#}
{{ return(True) }}
{% endif %}

{% endmacro %}

{# ------- DCL STATEMENT TEMPLATES --------- #}

-- funcsign: (relation) -> string
{% macro get_show_grant_sql(relation) %}
{{ return(adapter.dispatch("get_show_grant_sql", "dbt")(relation)) }}
{% endmacro %}

-- funcsign: (relation) -> string
{% macro default__get_show_grant_sql(relation) %}
show grants on {{ relation.render() }}
{% endmacro %}

-- funcsign: (relation, string, list[string]) -> string
{% macro get_grant_sql(relation, privilege, grantees) %}
{{ return(adapter.dispatch('get_grant_sql', 'dbt')(relation, privilege, grantees)) }}
{% endmacro %}

-- funcsign: (relation, string, list[string]) -> string
{%- macro default__get_grant_sql(relation, privilege, grantees) -%}
grant {{ privilege }} on {{ relation.render() }} to {{ grantees | join(', ') }}
{%- endmacro -%}


-- funcsign: (relation, string, list[string]) -> string
{% macro get_revoke_sql(relation, privilege, grantees) %}
{{ return(adapter.dispatch('get_revoke_sql', 'dbt')(relation, privilege, grantees)) }}
{% endmacro %}

-- funcsign: (relation, string, list[string]) -> string
{%- macro default__get_revoke_sql(relation, privilege, grantees) -%}
revoke {{ privilege }} on {{ relation.render() }} from {{ grantees | join(', ') }}
{%- endmacro -%}


{# ------- RUNTIME APPLICATION --------- #}
-- funcsign: (relation, dict[string, list[string]], (relation, string, list[string]) -> string) -> list[string]
{% macro get_dcl_statement_list(relation, grant_config, get_dcl_macro) %}
{{ return(adapter.dispatch('get_dcl_statement_list', 'dbt')(relation, grant_config, get_dcl_macro)) }}
{% endmacro %}

-- funcsign: (relation, dict[string, list[string]], (relation, string, list[string]) -> string) -> list[string]
{%- macro default__get_dcl_statement_list(relation, grant_config, get_dcl_macro) -%}
{#
-- Unpack grant_config into specific privileges and the set of users who need them granted/revoked.
-- Depending on whether this database supports multiple grantees per statement, pass in the list of
-- all grantees per privilege, or (if not) template one statement per privilege-grantee pair.
-- `get_dcl_macro` will be either `get_grant_sql` or `get_revoke_sql`
#}
{%- set dcl_statements = [] -%}
{%- for privilege, grantees in grant_config.items() %}
{%- if support_multiple_grantees_per_dcl_statement() and grantees -%}
{%- set dcl = get_dcl_macro(relation, privilege, grantees) -%}
{%- do dcl_statements.append(dcl) -%}
{%- else -%}
{%- for grantee in grantees -%}
{% set dcl = get_dcl_macro(relation, privilege, [grantee]) %}
{%- do dcl_statements.append(dcl) -%}
{% endfor -%}
{%- endif -%}
{%- endfor -%}
{{ return(dcl_statements) }}
{%- endmacro %}

-- funcsign: (list[string]) -> string
{% macro call_dcl_statements(dcl_statement_list) %}
{{ return(adapter.dispatch("call_dcl_statements", "dbt")(dcl_statement_list)) }}
{% endmacro %}

-- funcsign: (list[string]) -> string
{% macro default__call_dcl_statements(dcl_statement_list) %}
{#
-- By default, supply all grant + revoke statements in a single semicolon-separated block,
-- so that they're all processed together.

-- Some databases do not support this. Those adapters will need to override this macro
-- to run each statement individually.
#}
{% call statement('grants') %}
{% for dcl_statement in dcl_statement_list %}
{{ dcl_statement }};
{% endfor %}
{% endcall %}
{% endmacro %}


-- funcsign: (relation, optional[dict[string, list[string]]], bool) -> string
{% macro apply_grants(relation, grant_config, should_revoke) %}
{{ return(adapter.dispatch("apply_grants", "dbt")(relation, grant_config, should_revoke)) }}
{% endmacro %}

-- funcsign: (relation, optional[dict[string, list[string]]], bool) -> string
{% macro default__apply_grants(relation, grant_config, should_revoke) %}
{#-- If grant_config is {} or None, this is a no-op --#}
{% if grant_config %}
{% if should_revoke %}
{#-- We think previous grants may have carried over --#}
{#-- Show current grants and calculate diffs --#}
{% set current_grants_table = run_query(get_show_grant_sql(relation)) %}
{% set current_grants_dict = adapter.standardize_grants_dict(current_grants_table) %}
{% set needs_granting = diff_of_two_dicts(grant_config, current_grants_dict) %}
{% set needs_revoking = diff_of_two_dicts(current_grants_dict, grant_config) %}
{% if not (needs_granting or needs_revoking) %}
{{ log('On ' ~ relation.render() ~': All grants are in place, no revocation or granting needed.')}}
{% endif %}
{% else %}
{#-- We don't think there's any chance of previous grants having carried over. --#}
{#-- Jump straight to granting what the user has configured. --#}
{% set needs_revoking = {} %}
{% set needs_granting = grant_config %}
{% endif %}
{% if needs_granting or needs_revoking %}
{% set revoke_statement_list = get_dcl_statement_list(relation, needs_revoking, get_revoke_sql) %}
{% set grant_statement_list = get_dcl_statement_list(relation, needs_granting, get_grant_sql) %}
{% set dcl_statement_list = revoke_statement_list + grant_statement_list %}
{% if dcl_statement_list %}
{{ call_dcl_statements(dcl_statement_list) }}
{% endif %}
{% endif %}
{% endif %}
{% endmacro %}
151 changes: 151 additions & 0 deletions dbt_internal_packages/dbt-adapters/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
-- funcsign: (relation) -> optional[agate_table]
{% macro get_columns_in_relation(relation) -%}
{{ return(adapter.dispatch('get_columns_in_relation', 'dbt')(relation)) }}
{% endmacro %}

-- funcsign: (relation) -> optional[agate_table]
{% macro default__get_columns_in_relation(relation) -%}
{{ exceptions.raise_not_implemented(
'get_columns_in_relation macro not implemented for adapter '+adapter.type()) }}
{% endmacro %}

{# helper for adapter-specific implementations of get_columns_in_relation #}
-- funcsign: (agate_table) -> list[api.column]
{% macro sql_convert_columns_in_relation(table) -%}
{% set columns = [] %}
{% for row in table %}
{% do columns.append(api.Column(*row)) %}
{% endfor %}
{{ return(columns) }}
{% endmacro %}

-- funcsign: (string, optional[string]) -> string
{% macro get_empty_subquery_sql(select_sql, select_sql_header=none) -%}
{{ return(adapter.dispatch('get_empty_subquery_sql', 'dbt')(select_sql, select_sql_header)) }}
{% endmacro %}

{#
Builds a query that results in the same schema as the given select_sql statement, without necessitating a data scan.
Useful for running a query in a 'pre-flight' context, such as model contract enforcement (assert_columns_equivalent macro).
#}
-- funcsign: (string, optional[string]) -> string
{% macro default__get_empty_subquery_sql(select_sql, select_sql_header=none) %}
{%- if select_sql_header is not none -%}
{{ select_sql_header }}
{%- endif -%}
select * from (
{{ select_sql }}
) as __dbt_sbq
where false
limit 0
{% endmacro %}

-- funcsign: (dict[string, api.column]) -> string
{% macro get_empty_schema_sql(columns) -%}
{{ return(adapter.dispatch('get_empty_schema_sql', 'dbt')(columns)) }}
{% endmacro %}

-- funcsign: (dict[string, api.column]) -> string
{% macro default__get_empty_schema_sql(columns) %}
{%- set col_err = [] -%}
{%- set col_naked_numeric = [] -%}
select
{% for i in columns %}
{%- set col = columns[i] -%}
{%- if col['data_type'] is not defined -%}
{%- do col_err.append(col['name']) -%}
{#-- If this column's type is just 'numeric' then it is missing precision/scale, raise a warning --#}
{#-- TYPE CHECK: col['data_type'] is optional[string] but user have this constraint --#}
{%- elif col['data_type'].strip().lower() in ('numeric', 'decimal', 'number') -%}
{%- do col_naked_numeric.append(col['name']) -%}
{%- endif -%}
{% set col_name = adapter.quote(col['name']) if col.get('quote') else col['name'] %}
{{ cast('null', col['data_type']) }} as {{ col_name }}{{ ", " if not loop.last }}
{%- endfor -%}
{%- if (col_err | length) > 0 -%}
{{ exceptions.column_type_missing(column_names=col_err) }}
{%- elif (col_naked_numeric | length) > 0 -%}
{{ exceptions.warn("Detected columns with numeric type and unspecified precision/scale, this can lead to unintended rounding: " ~ col_naked_numeric ~ "`") }}
{%- endif -%}
{% endmacro %}

-- funcsign: (string, optional[string]) -> list[base_column]
{% macro get_column_schema_from_query(select_sql, select_sql_header=none) -%}
{% set columns = [] %}
{# -- Using an 'empty subquery' here to get the same schema as the given select_sql statement, without necessitating a data scan.#}
{% set sql = get_empty_subquery_sql(select_sql, select_sql_header) %}
{% set column_schema = adapter.get_column_schema_from_query(sql) %}
{{ return(column_schema) }}
{% endmacro %}

-- here for back compat
-- funcsign: (string) -> list[string]
{% macro get_columns_in_query(select_sql) -%}
{{ return(adapter.dispatch('get_columns_in_query', 'dbt')(select_sql)) }}
{% endmacro %}

-- funcsign: (string) -> list[string]
{% macro default__get_columns_in_query(select_sql) %}
{% call statement('get_columns_in_query', fetch_result=True, auto_begin=False) -%}
{{ get_empty_subquery_sql(select_sql) }}
{% endcall %}
{{ return(load_result('get_columns_in_query').table.columns | map(attribute='name') | list) }}
{% endmacro %}

-- funcsign: (relation, string, string) -> string
{% macro alter_column_type(relation, column_name, new_column_type) -%}
{{ return(adapter.dispatch('alter_column_type', 'dbt')(relation, column_name, new_column_type)) }}
{% endmacro %}

-- funcsign: (relation, string, string) -> string
{% macro default__alter_column_type(relation, column_name, new_column_type) -%}
{#
1. Create a new column (w/ temp name and correct type)
2. Copy data over to it
3. Drop the existing column (cascade!)
4. Rename the new column to existing column
#}
{%- set tmp_column = column_name + "__dbt_alter" -%}

{% call statement('alter_column_type') %}
alter table {{ relation.render() }} add column {{ adapter.quote(tmp_column) }} {{ new_column_type }};
update {{ relation.render() }} set {{ adapter.quote(tmp_column) }} = {{ adapter.quote(column_name) }};
alter table {{ relation.render() }} drop column {{ adapter.quote(column_name) }} cascade;
alter table {{ relation.render() }} rename column {{ adapter.quote(tmp_column) }} to {{ adapter.quote(column_name) }}
{% endcall %}

{% endmacro %}


-- funcsign: (relation, optional[list[base_column]], optional[list[base_column]]) -> string
{% macro alter_relation_add_remove_columns(relation, add_columns = none, remove_columns = none) -%}
{{ return(adapter.dispatch('alter_relation_add_remove_columns', 'dbt')(relation, add_columns, remove_columns)) }}
{% endmacro %}

-- funcsign: (relation, optional[list[base_column]], optional[list[base_column]]) -> string
{% macro default__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}

{% if add_columns is none %}
{% set add_columns = [] %}
{% endif %}
{% if remove_columns is none %}
{% set remove_columns = [] %}
{% endif %}

{% set sql -%}

alter {{ relation.type }} {{ relation.render() }}

{% for column in add_columns %}
add column {{ column.name }} {{ column.data_type }}{{ ',' if not loop.last }}
{% endfor %}{{ ',' if add_columns and remove_columns }}

{% for column in remove_columns %}
drop column {{ column.name }}{{ ',' if not loop.last }}
{% endfor %}

{%- endset -%}

{% do run_query(sql) %}

{% endmacro %}
36 changes: 36 additions & 0 deletions dbt_internal_packages/dbt-adapters/macros/adapters/freshness.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- funcsign: (string, string, optional[string]) -> any
{% macro collect_freshness(source, loaded_at_field, filter) %}
{{ return(adapter.dispatch('collect_freshness', 'dbt')(source, loaded_at_field, filter))}}
{% endmacro %}

-- funcsign: (string, string, optional[string]) -> any
{% macro default__collect_freshness(source, loaded_at_field, filter) %}
{% call statement('collect_freshness', fetch_result=True, auto_begin=False) -%}
select
max({{ loaded_at_field }}) as max_loaded_at,
{{ current_timestamp() }} as snapshotted_at
from {{ source }}
{% if filter %}
where {{ filter }}
{% endif %}
{% endcall %}
{{ return(load_result('collect_freshness')) }}
{% endmacro %}

-- funcsign: (string, string) -> any
{% macro collect_freshness_custom_sql(source, loaded_at_query) %}
{{ return(adapter.dispatch('collect_freshness_custom_sql', 'dbt')(source, loaded_at_query))}}
{% endmacro %}

-- funcsign: (string, string) -> any
{% macro default__collect_freshness_custom_sql(source, loaded_at_query) %}
{% call statement('collect_freshness_custom_sql', fetch_result=True, auto_begin=False) -%}
with source_query as (
{{ loaded_at_query }}
)
select
(select * from source_query) as max_loaded_at,
{{ current_timestamp() }} as snapshotted_at
{% endcall %}
{{ return(load_result('collect_freshness_custom_sql')) }}
{% endmacro %}
Loading