Astro 10/2/2023, 5:13:53 PM
You can run a dbt model from Apache Airflow by using the BashOperator
or the DbtTaskGroup
class from the Cosmos package.
Here's an example of how to use the BashOperator
to run a dbt model:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
with DAG('dbt_dag', start_date=datetime(2022, 1, 1)) as dag:
run_dbt = BashOperator(
task_id='run_dbt',
bash_command='dbt run --profiles-dir ~/.dbt/ --project-dir ~/my_dbt_project/'
)
In this example, dbt run
is the command to run the dbt model. The --profiles-dir
flag indicates the directory where the profiles.yml file is located, and the --project-dir
flag indicates the directory of your dbt project.
Alternatively, you can use the DbtTaskGroup
class from the Cosmos package to create a task group from the models in your dbt project. Dependencies between your dbt models are automatically turned into dependencies between Airflow tasks. Here's an example:
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig
from airflow import DAG
from datetime import datetime
with DAG('dbt_dag', start_date=datetime(2022, 1, 1)) as dag:
dbt_task_group = DbtTaskGroup(
project_config=ProjectConfig("/path/to/my_dbt_project"),
profile_config=ProfileConfig(
profile_name="my_profile",
target_name="my_target",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="my_postgres_dbt",
profile_args={"schema": "public"},
),
)
)
In this example, the DbtTaskGroup
class uses the ProjectConfig
to specify the directory of your dbt project and the ProfileConfig
to specify the profile and target to use for your dbt project. The PostgresUserPasswordProfileMapping
class is used to map your Airflow connection to a dbt profile.
Human 10/2/2023, 5:13:26 PM
how can I run a dbt model from Airflow ?
Sources
model1.sql
:
```sql
SELECT '{{ var("my_name") }}' as name
```
`model2.sql`:
```sql
SELECT * FROM {{ ref('model1') }}
```
`model1.sql` selects the variable `my_name`. `model2.sql` depends on `model1.sql` and selects everything from the upstream model.
You should now have the following structure within your Astro project:
.
└── dags
└── dbt
└── my_simple_dbt_project
├── dbt_project.yml
└── models
├── model1.sql
└── model2.sql
Step 3: Create an Airflow connection to your data warehouse
Cosmos allows you to apply Airflow connections to your dbt project.
- Start Airflow by running
astro dev start
. - In the Airflow UI, go to Admin -> Connections and click +.
- Create a new connection named
db_conn
. Select the connection type and supplied parameters based on the data warehouse you are using. For a Postgres connection, enter the following information:- Connection ID:
db_conn
. - Connection Type:
Postgres
. - Host: Your Postgres host address.
- Schema: Your Postgres database.
- Login: Your Postgres login username.
- Password: Your Postgres password.
- Port: Your Postgres port.
- Connection ID:
:::info
If a connection type for your database isn't available, you might need to make it available by adding the relevant provider package to requirements.txt
and running astro dev restart
.
:::
Step 4: Write your Airflow DAG
The DAG you'll write uses Cosmos to create tasks from existing dbt models and the PostgresOperator to query a table that was created. You can add more upstream and downstream tasks to embed the dbt project within other actions in your data ecosystem.
- In your
dags
folder, create a file calledmy_simple_dbt_dag.py
. - Copy and paste the following DAG code into the file: {cosmos_dag} This DAG uses the
DbtTaskGroup
class from the Cosmos package to create a task group from the models in your dbt project. Dependencies between your dbt models are automatically turned into dependencies between Airflow tasks. Make sure to add your own values forYOUR_NAME
,DB_NAME
, andSCHEMA_NAME
. Using thevars
keyword in the dictionary provided to theoperator_args
parameter, you can inject variables into the dbt project. This DAG injectsYOUR_NAME
for themy_name
variable. If your dbt project contains dbt tests, they will be run directly after a model has completed. - Run the DAG manually by clicking the play button and view the DAG in the graph view. Double click the task groups in order to expand them and see all tasks.
- Check the XCom returned by the
query_table
task to see your name in themodel2
table.
:::info
The DbtTaskGroup class populates an Airflow task group with Airflow tasks created from dbt models inside of a normal DAG. To directly define a full DAG containing only dbt models use the DbtDag
class, as shown in the Cosmos documentation.
:::
Congratulations! You've run a DAG using Cosmos to automatically create tasks from dbt models. You can learn more about how to configure Cosmos in the Cosmos documentation.
Alternative ways to run dbt Core with Airflow
While using Cosmos is recommended, there are several other ways to run dbt Core with Airflow.
Using the BashOperator
Apache Airflow, often hailed as the “swiss army knife” of data engineering, is an open-source platform that enables the creation, scheduling, and monitoring of complex data pipelines. A typical pipeline is responsible for extracting, transforming, then loading data - this is where the name “ETL” comes from. Airflow has, and always will, have strong support for each of these data operations through its rich ecosystem of providers and operators. In recent years, dbt Core has emerged as a popular transformation tool in the data engineering and analytics communities. And this is for good reason: it offers users a simple, intuitive SQL interface backed by rich functionality and software engineering best practices. Many data teams have adopted dbt to support their transformation workloads, and dbt’s popularity is fast-growing. The best part: Airflow and dbt are a match made in heaven. dbt equips data analysts with the right tool and capabilities to express their transformations. Data engineers can then take these dbt transformations and schedule them reliably with Airflow, putting the transformations in the context of upstream data ingestion. By using them together, data teams can have the best of both worlds: dbt’s analytics-friendly interfaces and Airflow’s rich support for arbitrary python execution and end-to-end state management of the data pipeline. Airflow and dbt: a short history Despite the obvious benefit of using dbt to run transformations in Airflow, there has not been a method of running dbt in Airflow that’s become ubiquitous. For a long time, Airflow users would use the BashOperator to call the dbt CLI and execute a dbt project. While this worked, it never felt like a complete solution - with this approach, Airflow has no visibility into what it’s executing, and thus treats the dbt project like a black box. When dbt models fail, Airflow doesn’t know why; a user has to spend time manually digging through logs and hopping between systems to understand what happened. When the issue is fixed, the entire project has to be restarted, wasting time and compute re-running models that have already been run successfully. There are also Python dependency conflicts between Airflow and dbt that make getting dbt installed in the same environment very challenging. In 2020, Astronomer partnered with Updater to release a series of three blog posts on integrating dbt projects into Airflow in a more “Airflow-native” way by parsing dbt’s manifest.json file and constructing an Airflow DAG as part of a CI/CD process. This was certainly a more powerful approach and solved some of the growing pains called out in the initial blog post. However, this approach was not scalable; an end user would download code (either directly from the blog post, or from a corresponding GitHub repository) and manage it themselves. As improvements or bug fixes are made to the code, there’s no way to push updates to end users. Similarly, the code is somewhat opinionated: it works very well for the use case it solves, but if you want to do something more complex or tailored to your use case, you were better off writing your own solution. Cosmos: the next generation of running dbt in Airflow Our team has worked with countless customers to set up Airflow and dbt. At a company-wide hackathon in December, we decided to materialize our experience in the form of an Airflow provider. We gave it a fun, Astronomer-themed name - Cosmos - and because of overwhelming demand from our customers and the open source community, we’ve decided to continue developing on the original idea and taking it from hack week project to production-ready package. Cosmos, which is Apache 2.0 licensed, can be used with any version of Airflow 2.3 or greater. In just the last 6 months it’s grown to 17,000 downloads per month, almost 200 GitHub stars, and 35+ contributors from Astronomer, Cosmos users, and the entire Airflow community.
title: "Orchestrate dbt Cloud jobs with Airflow" sidebar_label: "Tutorial" id: airflow-dbt-cloud sidebar_custom_props: { icon: 'img/integrations/dbt.png' }
import CodeBlock from '@theme/CodeBlock'; import airflow_dbt_simple from '!!raw-loader!../code-samples/dags/airflow-dbt-cloud/airflow_dbt_simple.py';
dbt Cloud is a managed service that provides a hosted architecture to run dbt, a tool that helps you build interdependent SQL models for in-warehouse data transformation.
The dbt Cloud Airflow provider allows users to orchestrate and execute actions in dbt Cloud as DAGs. Running dbt with Airflow ensures a reliable, scalable environment for models, as well as the ability to trigger models based on upstream dependencies in your data ecosystem.
:::info
For a tutorial on how to use the open-source dbt Core package with Airflow see Orchestrate dbt Core with Cosmos.
:::
Assumed knowledge
To get the most out of this tutorial, make sure you have an understanding of:
- The basics of dbt. See Getting started with dbt Cloud.
- Airflow fundamentals, such as writing DAGs and defining tasks. See Get started with Apache Airflow.
- Airflow operators. See Operators 101.
Time to complete
This tutorial takes approximately 30 minutes to complete.
Prerequisites
- The Astro CLI.
- A dbt Cloud account. A 14-day free trial is available.
- Access to a data warehouse supported by dbt Cloud. View the dbt documentation for an up-to-date list of adapters.
Step 1: Configure your Astro project
An Astro project contains all of the files you need to run Airflow locally.
- Create a new Astro project:
$ mkdir astro-dbt-cloud-tutorial && cd astro-dbt-cloud-tutorial $ astro dev init
- Add the dbt Cloud provider to your
requirements.txt
file.apache-airflow-providers-dbt-cloud
- Run the following command to start your Astro project:
$ astro dev start
Step 2: Configure a dbt connection
- In the Airflow UI, go to Admin -> Connections and click +.
- Create a new connection named
dbt_conn
and choose thedbt Cloud
connection type. Configure the following values for the connection:- Tenant: The URL under which your API cloud is hosted. The default value is
cloud.getdbt.com
. - Account ID: (Optional) The default dbt account to use with this connection.
- API Token: A dbt user token.
- Tenant: The URL under which your API cloud is hosted. The default value is
Step 3: Configure a dbt Cloud job
In the dbt Cloud UI, create one dbt Cloud job. The contents of this job do not matter for this tutorial. Optionally, you can use the jaffle shop example from dbt's Quickstart documentation. Copy the dbt Cloud job_id
for use in the next step.
Step 4: Write a dbt Cloud DAG
- In your
dags
folder, create a file calledcheck_before_running_dbt_cloud_job.py
. - Copy the following code into the file, making sure to replace `` with the
job_id
you copied. {airflow_dbt_simple} This DAG shows a simple implementation of using the DbtCloudRunJobOperator and DbtCloudHook. The DAG consists of two tasks:
Alternative ways to run dbt Core with Airflow
While using Cosmos is recommended, there are several other ways to run dbt Core with Airflow.
Using the BashOperator
You can use the BashOperator to execute specific dbt commands. It's recommended to run dbt-core
and the dbt adapter for your database in a virtual environment because there often are dependency conflicts between dbt and other packages.
The DAG below uses the BashOperator to activate the virtual environment and execute dbt_run
for a dbt project.
{airflow_dbt_bashoperator}
Using the BashOperator
to run dbt run
and other dbt commands can be useful during development. However, running dbt at the project level has a couple of issues:
- There is low observability into what execution state the project is in.
- Failures are absolute and require all models in a project to be run again, which can be costly.
Using a manifest file
Using a dbt-generated manifest.json
file gives you more visibility into the steps dbt is running in each task. This file is generated in the target directory of your dbt
project and contains its full representation. For more information on this file, see the dbt documentation.
You can learn more about a manifest-based dbt and Airflow project structure, view example code, and read about the DbtDagParser
in a 3-part blog post series on Building a Scalable Analytics Architecture With Airflow and dbt.
TITLE: Airflow + DBT Configuration DATE: 2021-09-21 18:25:53.907000 BY: 12179015 SCORE: 2 I'm starting learn dbt and I'm long time Airflow user. I've built, ran and tested a dbt model and I've pushed it to a remote repo. Now, I want pull this remote to another repo where is my Airflow DAGs. My Airflow structure is the following:
~/airflow
- .git
- dags/
But I have some questions:
- I read the
git submodule
the best way to pull my dbt repo on my Airflow repo. Am I correct? - Which location should I have to pull my dbt repo?
- If I built other dbt project, how shoud I have to config my dbt to support two projects?
I hope be clear, but let me know if you need any detail. Thanks for your help guys!
DATE: 2022-07-30 13:45:56.810000 BY: 14850064 SCORE: 0 I understand that
- You have airflow jobs running already in a machine where you have DBT installed
- You have your DBT project in another repo which you want to add to your Airflow repo inside DAGS folder so you trigger DBT jobs.
You can indeed do it using [subtree](https://gitcheatsheet.org/how-to/git- subtree-add).
Once you have this, if you have DBT installed Airflow can run DBT DAGS.
TITLE: Is it possible to trigger a Databricks job from Airflow, which will consume a Python project I deployed? DATE: 2022-10-19 13:46:16.327000 BY: 10535123 SCORE: 1 [Following this tutorial](https://www.databricks.com/blog/2017/07/19/integrating-apache- airflow-with-databricks.html), I can give the task a path to a notebook I want to run in Databricks -
notebook_task_params = {
'new_cluster': new_cluster,
'notebook_task': {
'notebook_path': '/Users/airflow@example.com/PrepareData',
},
}
But is there a way to tell the task to take a Python project (or maybe a path to a wheel file?) from S3 or Artifactory and run it instead of a notebook?
How can I make it work so it will be ready for production? I want to create a process so that after I push my change to the git repo, a CI/CD process will build the project, deploy it to S3/Artifactory, and trigger a Databricks job from Airflow, which consumes the project I deployed. Is it possible?
DATE: 2022-10-21 12:08:09.413000
BY: 18627
SCORE: 1
Airflow provider for Databricks support all task/job types provided by the
Databricks REST API. If you want to run Python file, then you can use the
spark_python_task
parameter ([doc](https://airflow.apache.org/docs/apache-
airflow-providers-
databricks/stable/_api/airflow/providers/databricks/operators/databricks/index.html#airflow.providers.databricks.operators.databricks.DatabricksSubmitRunOperator))
to specify a path to the file. If you need to run a wheel file, then you can
either use the spark_submit_task
, or provide a python_wheel_task
object
inside the json
parameter that is used to fill the data for submission to
REST API as we don't support this task yet in Airflow. Refer to the
[Databricks REST API](https://docs.databricks.com/dev-
tools/api/latest/jobs.html#operation/JobsRunsSubmit) docs for more information
what parameters of this task you need to specify.
10535123 on 2022-10-22 10:17:05.257000 [Score: 0]: Thank you very much for your answer. Can you please provide an example? I really couldn't understand the DB doc for how to submit a job properly to the spark-submit. For example, they mentioned that In the new_cluster specification, libraries and spark_conf are not supported. Instead, use --jars and --py-files to add Java and Python libraries and --conf to set the Spark configuration.
but didn't specify in which script I should add these py-files, conf, and py-files
parameters - in the new_cluster
json or in the parameters
json
DATE: 2023-02-02 13:26:59.490000
BY: 16861953
SCORE: 0
Like Alex Ott said, the python_wheel_task
argument is not available yet in
DatabricksSubmitRunOperator
.
However, you can do it using the (multi) tasks
argument. Like below:
trigger_wheel = DatabricksSubmitRunOperator(
task_id = 'trigger_wheel',
tasks = [{"task_key": 'my_task',
"python_wheel_task": {
'package_name':'wheel_package_name',
'entry_point':'wheel_entry_point',
},
"new_cluster":cluster_config,
"libraries":[{"whl": "dbfs:/.../wheel_package_name-0.0.1-py3-none-any.whl"}]
}]
)
My example uses dbfs, but ofcourse you can use a wheel that is on S3. Just make sure your cluster has access to it.
You can also check Databricks DBX. This allows you to develop in an IDE and deploy your package easily as a Databricks job (E.g. a job that executes a wheel).
“With Cosmos, we could focus more on the analytical aspects and less on the operational overhead of coordinating tasks. The ability to achieve end-to-end automation and detailed monitoring within Airflow significantly improved our data pipeline’s reliability, reproducibility, and overall efficiency.”
Péter Szécsi, Information Technology Architect, Data Engineer, Hungarian Post
Airflow is an extremely powerful and fully-functional orchestrator, and one of the driving principles behind Cosmos is to take full advantage of that functionality. To do so, Airflow needs to understand what it’s executing. Cosmos gives Airflow that visibility by expanding your dbt project into a Task Group (using the DbtTaskGroup class) or a full DAG (using the DbtDAG class). If one of your dbt models fails you can immediately drill into the specific task that corresponds to the model, troubleshoot the issue, and retry the model. Once the model is successful, your project continues running as if nothing happened.
We’ve also built a tight coupling with Airflow’s connection management functionality. dbt requires a user to supply a profiles.yml file on execution with credentials to connect to your database. However, most of the time, Airflow’s already interacting with that database with an Airflow connection. Cosmos will translate your Airflow connection into a dbt profile on runtime, meaning that you don’t have to manage two separate sets of credentials. You can also take advantage of secrets backends to manage your dbt profiles this way! It’s easy to use
“Astronomer Cosmos has allowed us to seamlessly orchestrate our dbt projects using Apache Airflow for our start-up. The ability to render dbt models as individual tasks and run tests after a model has been materialized has been valuable for lineage tracking and verifying data quality. I was impressed with how quickly we could take our existing dbt projects and set up an Airflow DAG using Cosmos.”
Justin Bandoro, Senior Data Engineer at Kevala Inc.
Running your dbt projects in Airflow shouldn’t be difficult– it should “just work”. Cosmos is designed to be a drop-in replacement for your current dbt Airflow tasks. All you need to do is import the class from Cosmos, point it at your dbt project, and tell it where to find credentials. That’s it! In most cases, it takes less than 10 lines of code to set up. Here’s an example that uses Cosmos to render dbt’s jaffle_shop (their equivalent of a “hello world”) project and execute it against an Airflow Postgres connection:
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig
then, in your DAG
jaffle_shop = DbtTaskGroup( project_config=ProjectConfig("/path/to/jaffle_shop"), profile_config=ProfileConfig( profile_name="my_profile", target_name="my_target", profile_mapping=PostgresUserPasswordProfileMapping( conn_id="my_postgres_dbt", profile_args={"schema": "public"}, ), ) ) While it’s easy to set up, it’s also extremely flexible and can be customized to your specific use case. There’s a whole set of configuration you can look at to find out more, and we’re actively adding more configuration. For example, you can break up your dbt project into multiple sub-projects based on tags, you can configure the testing behavior to run after each model so you don’t run extra queries if the tests fail early, and you can run your dbt models using Kubernetes and Dockers for extra isolation. Most importantly, Cosmos has native support for executing dbt in a Python virtual environment. If you’ve tried to set up Airflow and dbt together, chances are you’ve experienced dependency hell. Airflow and dbt share Python requirements (looking at you, Jinja) and they’re typically not compatible with each other; Cosmos solves this by letting you install dbt into a virtual environment and execute your models using that environment. And it runs better on Astro!