تخطي إلى المحتوى الرئيسي

Orchestrate dbt with Airflow on Spark: A Complete Guide

This guide demonstrates how to orchestrate scalable dbt pipelines on Ilum's أباتشي سبارك cluster using تدفق هواء أباتشي و Astronomer Cosmos. You will learn how to build a robust medallion architecture (Bronze → Silver → Gold) utilizing Kubernetes-native compute for efficient data transformation. We will cover automatic DAG generation, data quality testing, and incremental processing strategies.

بقشيش

For a comprehensive deep-dive into the architecture, benefits, and strategic considerations of running dbt on Spark with Airflow, see our detailed blog post: Orchestrate dbt on Spark with Airflow: A Guide to Modern Data Engineering on Ilum.

Prerequisites for Airflow and dbt on Ilum

  • Ilum version 6.6.2 or later
  • Airflow enabled with the 3.1.1-dbtصورة
  • Spark SQL (Thrift Server) or سبارك كونكت تمكين
  • Basic familiarity with dbt and Airflow concepts

dbt-Airflow-Spark Architecture Overview

The integration combines four key components:

ComponentTechnology
Compute EngineApache Spark on Ilum (Kubernetes-native)
Data Modelingdbt-spark (dbt-core + Spark adapter)
تزامن Apache Airflow 3.1 with KubernetesExecutor
DAG GenerationAstronomer Cosmos

سير العمل: Git repository → gitSync → Airflow → Cosmos (auto-generates DAG) → Spark SQL / Spark Connect → Ilum Spark Cluster

معمار
┌──────────────────────────────────────────────────────────────────┐
│ Gitea Repository │
│ airflow.git/ilum_dbt_project/ │
│ ├── dbt_project.yml │
│ ├── seeds/crypto_prices_raw.csv │
│ └── models/{bronze,silver,gold}/*.sql │
└──────────────────────────────────────────────────────────────────┘
│ gitSync

┌──────────────────────────────────────────────────────────────────┐
│ Airflow 3.1 (Kubernetes) │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Astronomer Cosmos │ │
│ │ - Parses dbt project │ │
│ │ - Generates DAG with tasks for each model + test │ │
│ │ - Maintains dbt ref() dependencies │ │
│ └─────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘

┌───────────────────┴───────────────────┐
▼ ▼
┌─────────────────────┐ ┌─────────────────────┐
│ Thrift Server │ │ Spark Connect │
│ (ilum-sql-thrift) │ │ (job-xxx-driver) │
│ Port: 10009 │ │ Port: 15002 │
└─────────────────────┘ └─────────────────────┘
│ │
└───────────────────┬───────────────────┘

┌──────────────────────────────────────────────────────────────────┐
│ Ilum Spark Cluster │
│ Executes SQL queries, creates managed tables │
└──────────────────────────────────────────────────────────────────┘

Quick Start: Deploying dbt Pipelines

1. Enable Airflow with dbt Support

Install Ilum with Airflow and dbt pre-configured:

helm install
Helm تثبيت ilum ilum / ilum \ 
--set ilum-hive-metastore.enabled=true \
--set ilum-core.metastore.enabled=true \
--set ilum-core.metastore.type=hive \
--set ilum-core.sql.enabled=true \
--set ilum-sql.enabled=true \
--set airflow.enabled=true \
--set airflow.images.airflow.tag=3.1.1-dbt
Pre-loaded Example

When using the 3.1.1-dbt image tag, the complete dbt project and DAG described in this guide are automatically pre-loaded into the internal Gitea repository and synced to Airflow. You can trigger the example pipeline immediately after installation.

2. Access Airflow

Navigate to the Airflow UI via the Ilum console. The default credentials are typically المسؤول:مشرف .

3. Verify the Connection

Starting from Ilum 6.6.2ال spark_thrift_default connection is automatically configured. Verify it exists:

AdminConnections → Search for spark_thrift_default

If missing, create it manually:

Fieldقيمة
Connection IDspark_thrift_default
Connection Typeشراره أو spark_sql
Hostilum-sql-thrift-binary.<NAMESPACE>.svc.cluster.local
Port10009

4. Trigger the DAG

Find ilum_dbt_thrift_pipeline in the Airflow UI and trigger it manually. Monitor the Graph View to see the auto-generated task dependencies.

Airflow DAG Graph generated by Astronomer Cosmos showing dbt model dependencies The Airflow DAG automatically generated by Astronomer Cosmos, reflecting the dbt model dependencies.


Project Structure

The dbt project follows a standard medallion architecture:

Directory Structure
ilum_dbt_project/
├── dbt_project.yml
├── packages.yml
├── seeds/
│ └── crypto_prices_raw.csv
└── models/
├── bronze/
│ └── crypto_prices_bronze.sql
├── silver/
│ └── crypto_prices_silver_daily.sql
├── gold/
│ └── crypto_prices_gold_latest.sql
└── schema.yml

dbt Configuration

dbt_project.yml

The central configuration file defines project metadata, paths, and layer-specific settings:

dbt_project.yml
اسم :  "ilum_dbt_project"
الإصدار : "1.0.0"
config-version: 2

profile: "ilum_dbt_project" # must match ProfileConfig.profile_name

model-paths: [ "models"]
seed-paths: [ "seeds"]
macro-paths: [ "macros"]
test-paths: [ "tests"]

models:
ilum_dbt_project :
+materialized: جدول
bronze:
+tags: [ "bronze"]
silver:
+tags: [ "silver"]
gold:
+tags: [ "gold"]

seeds:
ilum_dbt_project :
+column_types:
تاريخ : تاريخ
symbol: خيط
price_usd: double
volume_usd: double
market_cap_usd: double
crypto_prices_raw:
+pre-hook:
- "{{ drop_this_seed() }}"

Key features:

  • العلامات enable selective execution: dbt run --select tag:gold
  • Column types ensure correct Spark table schemas
  • Pre-hook drop_this_seed() provides idempotent seed loading for demos

Medallion Architecture Layers

Bronze Layer: Type Normalization

File: models/bronze/crypto_prices_bronze.sql

Converts raw seed data into typed, normalized tables with incremental processing:

models/bronze/crypto_prices_bronze.sql
{{ تكوين ( 
تتحقق = 'incremental',
unique_key= [ 'date', 'symbol']
) }}

select
cast( تاريخ مثل تاريخ ) مثل تاريخ ,
upper( symbol) مثل symbol,
cast( price_usd مثل double) مثل price_usd,
cast( volume_usd مثل double) مثل volume_usd,
cast( market_cap_usd مثل double) مثل market_cap_usd
من {{ المرجع ( 'crypto_prices_raw') }}

{ % لو is_incremental( ) % }
where تاريخ > ( select max( تاريخ ) من {{ this }})
{ % endif % }

فوائد:

  • Only processes new records after initial load
  • Reduces compute costs for large datasets

Silver Layer: Data Enrichment

File: models/silver/crypto_prices_silver_daily.sql

Adds 7-day moving averages using Spark window functions:

models/silver/crypto_prices_silver_daily.sql
{{ تكوين ( تتحقق = "الجدول" ) }} 

select
تاريخ ,
symbol,
price_usd,
volume_usd,
market_cap_usd,
متوسط ( price_usd) over (
partition ب symbol
order ب تاريخ
rows بين 6 preceding و current صف
) مثل price_usd_7d_avg,
متوسط ( volume_usd) over (
partition ب symbol
order ب تاريخ
rows بين 6 preceding و current صف
) مثل volume_usd_7d_avg
من {{ المرجع ( 'crypto_prices_bronze') }}

Gold Layer: Business-Ready Views

File: models/gold/crypto_prices_gold_latest.sql

Produces analytics-ready data showing the latest 30 days per symbol:

models/gold/crypto_prices_gold_latest.sql
{{ تكوين ( تتحقق = "الجدول" ) }} 

مع ranked مثل (
select
* ,
row_number( ) over (
partition ب symbol
order ب تاريخ desc
) مثل rn
من {{ المرجع ( 'crypto_prices_silver_daily') }}
)

select
تاريخ ,
symbol,
price_usd,
price_usd_7d_avg,
volume_usd_7d_avg,
market_cap_usd
من ranked
where rn <= 30

Data Quality Tests

Define tests in models/schema.yml to create quality gates:

models/schema.yml
الإصدار :  2 

seeds:
- اسم : crypto_prices_raw
columns:
- اسم : تاريخ
tests: [ not_null]
- اسم : symbol
tests: [ not_null]
- اسم : price_usd
tests: [ not_null]

models:
- اسم : crypto_prices_bronze
وصف : "Bronze layer - typed and normalized crypto prices."
columns:
- اسم : تاريخ
tests: [ not_null]
- اسم : symbol
tests: [ not_null]
- اسم : price_usd
tests: [ not_null]

In Airflow: Cosmos converts each test into a separate task that blocks downstream models if it fails.


Generating Airflow DAGs with Cosmos

File: dags/ilum_dbt_thrift.py

dags/ilum_dbt_thrift.py
من التاريخ والوقت استورد التاريخ والوقت ,  timedelta
استورد os
من airflow. configuration استورد كونف
من cosmos استورد DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
من cosmos. profiles استورد SparkThriftProfileMapping

DAGS_FOLDER = كونف . حصل ( "core", "dags_folder")
DBT_PROJECT_PATH = os. مسار . join( DAGS_FOLDER, "ilum_dbt_project")
DBT_BIN = "/home/airflow/.local/bin/dbt"

profile_config = ProfileConfig(
profile_name= "ilum_dbt_project",
target_name= "prod",
profile_mapping= SparkThriftProfileMapping(
conn_id= "spark_thrift_default",
profile_args= {
"schema": "افتراضي" ,
"threads": 4 ,
} ,
) ,
)

dbt_dag = DbtDag(
project_config= ProjectConfig(
dbt_project_path= DBT_PROJECT_PATH,
) ,
profile_config= profile_config,
execution_config= ExecutionConfig(
dbt_executable_path= DBT_BIN,
) ,
dag_id= "ilum_dbt_thrift_pipeline",
schedule= "@daily",
start_date= التاريخ والوقت ( 2024, 1 , 1 ) ,
catchup= False,
default_args= {
"owner": "data-team",
"retries": 2 ,
"retry_delay": timedelta( minutes= 5 ) ,
} ,
)

How it works:

  • SparkThriftProfileMapping uses the Airflow connection spark_thrift_default
  • Cosmos scans the dbt project and auto-generates tasks for each model, seed, and test
  • Dependencies mirror dbt's ref() relationships

Thrift vs Spark Connect

الجانب Thrift Serverسبارك كونكت
نقطه النهايه Central SQL server (shared)Per-job endpoint (isolated)
Use caseMultiple tools sharing one endpointIsolated compute per project
ProtocolJDBC/ThriftgRPC (native Spark API)
ConnectionStable service nameDynamic job-based URL

Tracking Data Lineage and Dependencies

Once the pipeline runs successfully, tables are stored in the Hive Metastore and accessible across Ilum components:

  • Ilum SQL: Query tables directly
  • دفاتر Jupyter : Analyze data interactively
  • وظائف Spark : Use as input for other pipelines
  • Lineage View: Visualize table dependencies in Ilum UI

Ilum data lineage view showing medallion architecture Figure 2: The Ilum Data Lineage view visualizing the full medallion architecture (Bronze → Silver → Gold) and table dependencies.


الفوائد الرئيسية

ميزة Benefit
No dbt CloudFully open-source, no subscription costs
Medallion patternClean data architecture (bronze → silver → gold)
Incremental modelsProcess only new data, reduce compute costs
Quality gatesdbt tests block downstream if data fails
KubernetesExecutorEach task isolated in separate pod
gitSyncCode changes auto-deployed from Gitea
Auto DAG generationCosmos creates tasks from dbt models automatically
Full lineageTrack model dependencies in Airflow UI and Ilum

استكشاف الاخطاء

Click to view troubleshooting steps

Connection Errors

If you see failed to resolve sockaddr errors in dbt logs:

[Errno -2] Name or service not known

حل: Verify the Thrift service exists:

Verify Thrift Service
kubectl get svc -n <NAMESPACE> | grep thrift

Ensure the connection host matches the service name exactly.

DAG Not Appearing

If the DAG doesn't show up in Airflow:

  1. Check gitSync logs to ensure the dbt project is synced
  2. Verify the DBT_PROJECT_PATH points to the correct directory
  3. Look for parsing errors in Airflow logs

Tests Failing

If dbt tests fail unexpectedly:

  1. Check the test task logs in Airflow
  2. Query the table directly via Ilum SQL to verify data quality
  3. Adjust test thresholds or fix upstream data issues

موارد إضافية