Incremental data transformation for ADLA using dbt to get data into Delta Lake tables.
dbt Docs
Β·
Azure Data Lake Analytics docs
Β·
Delta Lake
This is an opinionated dbt adapter that makes it easier to test and schedule ADLA via dbt CLI without requiring an external orchestrator (such as Data Factory) to get non-Delta Lake source data lightly-transformed with SQL and incrementally ingestied via ADLA compute into Delta Lake Tables using quasi ANSI-SQL syntax.
The adapter handles performing non-SQL syntax generation at compile time in the dbt adapter using dbt macros.
As a result of this conscious design decision, the adapter does not encourage working with non-SQL constructs such as pre-processing imperative directives like #FOO.
In fact, in the future, the adapter can/will use sqlglot to block non-SQL syntax in the model SQL like
#FOO, the goal here is to keep the business logic as close to ANSI-SQL as possible for portability across engines. As a result of this, a tradeoff is the ADLA feature surface is limited in this dbt adapter to only support run-time syntax, not ADLA compile-time syntax such as#FOOor#IFDEFetc.
- Clean SQL models β write
SELECT ... FROM @data; macros generate#DECLARE,EXTRACT,INSERT INTO - Microbatch incremental β append-only by default; opt into idempotent
DELETE+INSERTper partition withdelete_before_insert: true. dbt retry/backfill built in - Batch bin-packing β
days_per_batchgroups multiple days into a single SCOPE job to reduce ADLA overhead - High-watermark skip β queries
MAX(partition_col)from Delta via DuckDB and skips already-processed batches - Smart WHERE injection β models with an existing
WHEREclause get date filters merged withAND, not duplicated - Declarative table properties β compression, checkpoint intervals via
scope_settings
SS files live on ADLS in date-partitioned directories. dbt run generates a SCOPE script per batch and submits each as an ADLA job. Each job reads only its date range (FileSet partition elimination), optionally deletes the target partition in Delta (when delete_before_insert: true), and inserts into a Delta table on ADLS.
The adapter detects existing Delta tables by checking ADLS for _delta_log/ directories. This tells dbt-core whether a model is running for the first time or incrementally:
| Scenario | What runs |
|---|---|
First run or --full-refresh |
Every batch from begin (model config) through today |
| Incremental run (table exists) | Only the lookback most recent batches (default: yesterday + today) |
| Manual backfill | Exactly the range you pass via --event-time-start / --event-time-end |
The lookback parameter (default 1) controls how many recent batches to reprocess on each incremental run, catching late-arriving SS files. For gap recovery (e.g. backfilling a missed week), use the CLI flags.
High-watermark skip β on each incremental run the adapter queries MAX(partition_col) from the Delta transaction log via DuckDB (no ADLA compute). Batches whose partition value is β€ the high watermark are emitted as no-ops, avoiding redundant SCOPE jobs. The result is cached once per dbt run invocation.
flowchart TB
subgraph dbt["dbt-core β microbatch orchestrator"]
direction TB
Detect["Adapter checks ADLS<br/><i>_delta_log/ exists?</i>"]
Config["Model config<br/><i>begin, lookback, batch_size</i>"]
BatchCalc["Compute pending batches<br/>first run: begin β today<br/>incremental: lookback window"]
Detect --> BatchCalc
Config --> BatchCalc
end
subgraph ADLA["ADLA β runs one SCOPE script per batch"]
direction TB
S1["SET @@FeaturePreviews<br/>#DECLARE @startDate, @endDate"]
DDL["CREATE TABLE IF NOT EXISTS<br/>PARTITIONED BY event_year_date<br/>OPTIONS LAYOUT = DELTA"]
DEL["DELETE FROM @target<br/>WHERE partition in batch range<br/><i>only if delete_before_insert</i>"]
EXT["π EXTRACT FROM SS files<br/>WHERE _date in batch range<br/>β @data rowset"]
TX["π SQL Transform β your dbt model (.sql)<br/>SELECT β¦, _date.ToString(β¦) AS partition_col<br/>FROM @data β @batch_data"]
INS["πΎ INSERT INTO @target<br/>SELECT * FROM @batch_data"]
S1 --> DDL --> DEL --> EXT --> TX --> INS
end
subgraph Storage["Azure Data Lake Storage"]
direction LR
subgraph Sources["Gen1 β SS source files"]
direction TB
SS1["π /2026/04/01/<br/>20260401_*.ss"]
SS2["π /2026/04/02/<br/>20260402_*.ss"]
SS3["π /2026/04/03/<br/>20260403_*.ss"]
end
subgraph Target["Gen2 β Delta Lake table"]
direction TB
P1["π event_year_date=20260401/<br/>part-*.parquet"]
P2["π event_year_date=20260402/<br/>part-*.parquet"]
DL["π _delta_log/"]
end
end
BatchCalc -- "one SCOPE script per batch<br/>(REST API submit + poll)" --> ADLA
EXT -. "reads SS files" .-> Sources
INS -- "writes partitions" --> Target
style DEL fill:#fee,stroke:#c00
style TX fill:#e8e0f8,stroke:#6a3cbc
style Detect fill:#e8f4e8,stroke:#2a2
On full refresh, every batch from begin to today runs and there is no DELETE step.
On incremental, only the lookback window runs. The DELETE step (red) is opt-in via delete_before_insert: true β when enabled, re-running the same date range replaces the partition rather than creating duplicates. When omitted (default), batches are append-only. Table detection (green) checks ADLS for _delta_log/ to determine if the model should run incrementally. Batches already present in Delta (per the high-watermark check) are skipped automatically.
The scope jobs end up looking like this in ADLA:
curl -LsSf https://astral.sh/uv/install.sh | sh # one-time install for uv
uv sync --extra dev # creates .venv and installs dbt-scope + dev depsAll sensitive values live in .env (see .env.example). The profile references them via env_var():
# profiles.yml
my_project:
target: dev
outputs:
dev:
type: scope
database: "{{ env_var('SCOPE_STORAGE_ACCOUNT') }}"
schema: "{{ env_var('SCOPE_CONTAINER') }}"
adla_account: "{{ env_var('SCOPE_ADLA_ACCOUNT') }}"
storage_account: "{{ env_var('SCOPE_STORAGE_ACCOUNT') }}"
container: "{{ env_var('SCOPE_CONTAINER') }}"
delta_base_path: delta
au: 100
priority: 1| dbt concept | SCOPE concept |
|---|---|
database |
Storage account name |
schema |
ADLS container |
table |
Full-refresh: CREATE TABLE + INSERT INTO |
incremental |
Microbatch: INSERT (append); opt-in DELETE+INSERT via delete_before_insert |
| model SQL | SELECT from @data (extracted SS rowset) |
{{ config(
materialized='table',
delta_location='abfss://ctr@acct.dfs.core.windows.net/delta/my_table',
ss_source_path='/my/cosmos/path/to/MyStream',
partition_by='event_year_date',
scope_columns=[
{'name': 'server_name', 'type': 'string'},
{'name': 'event_year_date', 'type': 'string'}
],
scope_settings={
'microsoft.scope.compression': 'vorder:zstd#11',
'delta.checkpointInterval': 5
}
) }}
SELECT logical_server_name_DT_String AS server_name,
_date.ToString("yyyyMMdd") AS event_year_date
FROM @data{{ config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='event_year_date',
batch_size='day',
begin='2026-04-01',
lookback=1,
partition_by='event_year_date',
delta_location='abfss://ctr@acct.dfs.core.windows.net/delta/my_model',
ss_source_path='/my/cosmos/path/to/MyStream',
days_per_batch=15,
scope_columns=[
{'name': 'server_name', 'type': 'string'},
{'name': 'event_year_date', 'type': 'string'}
]
) }}
SELECT logical_server_name_DT_String AS server_name,
_date.ToString("yyyyMMdd") AS event_year_date
FROM @dataAdd delete_before_insert: true to DELETE the partition range before INSERT, making re-runs safe against duplicates:
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='event_year_date',
batch_size='day',
begin='2026-04-01',
lookback=1,
partition_by='event_year_date',
delta_location='abfss://ctr@acct.dfs.core.windows.net/delta/my_model',
ss_source_path='/my/cosmos/path/to/MyStream',
delete_before_insert=true,
days_per_batch=15,
scope_columns=[
{'name': 'server_name', 'type': 'string'},
{'name': 'event_year_date', 'type': 'string'}
]
) }}
SELECT logical_server_name_DT_String AS server_name,
_date.ToString("yyyyMMdd") AS event_year_date
FROM @dataModels that already have a WHERE clause work seamlessly β the adapter uses sqlglot to detect the existing WHERE and merges the batch date filter with AND:
{{ config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='event_year_date',
batch_size='day',
begin='2026-04-01',
lookback=1,
partition_by=['event_year_date', 'edition'],
delta_location='abfss://ctr@acct.dfs.core.windows.net/delta/my_filtered_model',
ss_source_path='/my/cosmos/path/to/MyStream',
scope_columns=[
{'name': 'server_name', 'type': 'string'},
{'name': 'edition', 'type': 'string'},
{'name': 'event_year_date', 'type': 'string'}
]
) }}
SELECT logical_server_name_DT_String AS server_name,
edition,
_date.ToString("yyyyMMdd") AS event_year_date
FROM @data
WHERE edition == "Standard"| Config | Default | Description |
|---|---|---|
delete_before_insert |
false |
DELETE the partition range before INSERT for idempotent re-runs |
days_per_batch |
1 |
Days per SCOPE job. 15 β a 30-day backlog produces 2 jobs instead of 30 |
partition_by |
β | Single column name or list of columns. First column drives date partitioning |
lookback |
1 |
How many recent batches to reprocess, catching late-arriving SS files |
dbt retry re-runs failed batches. dbt run --event-time-start/end backfills a range.
See CONTRIBUTING.md.


