How to conditionally skip tasks in an Airflow DAG

Bartosz Mikulski

The more I use Airflow, the stranger uses cases I find. Recently, I implemented a DAG, which we are going to use for data backfills.

The problems started with a simple question: what is the DAG supposed to do when there is nothing to backfill?

For sure, we can manually disable it when we don’t need the DAG, but I am willing to bet my money on that someone will forget to pause the DAG, and it is going to run when it is not supposed to do anything.

Another way to avoid problems is to make sure that all tasks in the DAG don’t break anything if the output already exists. It’s not ideal for two reasons.

First, the sole purpose of a backfill DAG is to have an easy way to overwrite files when we need to fix something. It is supposed to destroy the existing data and replace it with the freshly calculated values.

Second, at some point, someone is going to add another task to that DAG. I will bet my money again that the author of that task is not going to read the documentation or other task source code before making the change.

Use XCom to store the backfill date

I am not using the backfill feature available in Airflow because I need to pass more parameters than just an execution date.

Also, it annoys me that the backfill feature does not schedule all DAG runs at once. I want to run only one script that setups the backfill and don’t do anything more. Fire and forget. No long-running screen sessions with SSH connection to Airflow to make sure that the backfill schedules all dates.

Because of that, my backfill DAG reads the execution date from a DynamoDB table. Of course, after reading the backfill parameters, it marks the particular backfill as started in the DynamoDB to avoid reprocessing in case of DAG failures.

As you saw in the subsection title, I use the XCom feature to store the backfill parameters. We can do it in two ways. I can either return a value from the function running in the PythonOperator, which automatically assigns it to the return_value variable of the task or explicitly call the xcom_push function inside my function.

Let me show you an example in which I use both methods. Note that I need access to the task instance, so the function run by the PythonOperator takes the Airflow context as a parameter, and I have to set provide_context to True.

Stopping DAG execution conditionally

Now, I have the function that returns the next backfill date or None if there is nothing to backfill. I am going to read the return_value and check whether it is None. If the backfill date is not defined, I have to stop the current DAG run.

In Airflow, we have the Sensors to trigger tasks when we observe a desired external state. In this case, I am going to use the PythonSensor , which runs a Python function and continues running the DAG if the value returned by that function is truthy - boolean True or anything that produces True after being cast to a boolean.

Once again, my function needs access to the Airflow context to use XCom functions. I also need the name of the task that stored the variable in XCom and the variable name.

Skip DAG instead of failing the run

By default, the sensor either continues the DAG or marks the DAG execution as failed. This is not what I want. If the DAG has nothing to backfill, it should skip all the remaining tasks, not fail the DAG. Fortunately, there is a simple configuration parameter that changes the sensor behavior. When I set soft_fail to True, it skips the tasks instead of marking them as failed.

PythonSensor waits for too long

The default BaseSensorOperator parameters (inherited by PythonSensor ) are not ideal in this case. By default, it calls the Python function every minute for a week before it gives up! That would be ok if I were querying the DynamoDB table inside the function called by the PythonSensor . I don’t want to do that, so the sensor must not wait for that long.

To change the maximal waiting time, I change the timeout parameter to 10 seconds. Strangely, because of that, the PythonSensor waits over a minute before it skips the tasks. It turns out that the timeout is evaluated after the function is called, so if my function returns False, the task sleeps for 60 seconds before it re-evaluates the conditions (configurable using the poke_interval parameter, 60 seconds by default).

Clicking the newsletter button opens a separate page hosted by ActiveCampaign with a Google Captcha. You will get their cookies.

It has to be a separate page because many spam bots subscribed recently, so I must filter them out using a Captcha. Unfortunately, Captcha requires cookies, and I don't want any cookies (especially third-party cookies) used on this page.

The problem with software testing in data engineering

Why data engineers don't write unit tests?

Measuring data quality using AWS Deequ

How to measure data quality in Athena tables using AWS Deequ running on an EMR cluster.

Related Posts

Cupid properties in data engineering.

Does it make sense to use SOLID principles in data engineering? What about CUPID properties in data pipelines? Published on: 10 Sep 2022

How to add tests to existing code in data transformation pipelines

How data engineers can write tests for legacy code in their ETL pipelines without breaking the existing implementation Published on: 30 Aug 2022

Software engineering practices in data engineering and data science

How to produce high-quality software in data teams by applying software engineering practices to data science and data engineering Published on: 20 Aug 2022

Conditional Tasks in Airflow

Managing Conditional Tasks in Airflow

Abstract: In this article, we will explore how to manage conditional tasks in Airflow, a popular open-source platform for creating, scheduling, and monitoring workflows. We will cover the basics of Airflow, how to create and schedule tasks, and how to create and execute conditional tasks. We will also discuss best practices for managing conditional tasks in Airflow, and provide examples of how to use conditional tasks in real-world scenarios.

In data engineering, managing conditional tasks is a crucial aspect of orchestrating complex workflows. Apache Airflow is an open-source platform that enables data engineers to programmatically define and manage workflows. In this article, we will discuss how to manage conditional tasks in Airflow, providing an overview of the platform, its architecture, and its features.

What is Apache Airflow?

Apache Airflow is an open-source platform that allows data engineers to programmatically define, schedule, and monitor workflows. It provides a rich user interface that enables users to visualize pipelines, monitor their progress, and troubleshoot issues. Airflow is written in Python, making it easy for developers to write custom operators, sensors, and hooks. It supports a wide range of data sources, including SQL, NoSQL, and cloud-based storage.

Airflow Architecture

Airflow's architecture is based on a directed acyclic graph (DAG) that represents a workflow. A DAG is a collection of tasks and their dependencies, and it is defined as a Python script that specifies the tasks and their relationships. The DAG is executed by the Airflow scheduler, which runs on a separate process, and it is responsible for triggering the tasks based on their dependencies and schedules. The Airflow web server provides a user interface that enables users to monitor the progress of the DAGs and their tasks.

In Airflow, conditional tasks are managed using the BranchPythonOperator and the ShortCircuitOperator . The BranchPythonOperator is a Python function that returns a string that represents the next task to be executed. The ShortCircuitOperator is a sensor that stops the execution of the DAG if a condition is met. In this section, we will discuss how to use these operators to manage conditional tasks in Airflow.

BranchPythonOperator

The BranchPythonOperator is a Python function that returns a string that represents the next task to be executed. It is used to manage conditional tasks, where the next task to be executed depends on the result of a Python function. The BranchPythonOperator is defined as follows:

In this example, the branch_func() function defines a condition that returns a string representing the next task to be executed. The BranchPythonOperator is then used to define the task that will execute the function. The task_id parameter is used to identify the task, and the python_callable parameter is used to specify the Python function to be executed.

ShortCircuitOperator

The ShortCircuitOperator is a sensor that stops the execution of the DAG if a condition is met. It is used to manage conditional tasks, where the execution of the DAG depends on the result of a condition. The ShortCircuitOperator is defined as follows:

In this example, the short_circuit_func() function defines a condition that returns a boolean value. The ShortCircuitOperator is then used to define the task that will execute the function. The task_id parameter is used to identify the task, and the python_callable parameter is used to specify the Python function to be executed. If the function returns True , the DAG will be stopped, and if it returns False , the DAG will continue executing the next task.

Managing conditional tasks is a crucial aspect of orchestrating complex workflows in data engineering. Apache Airflow provides a powerful platform that enables data engineers to programmatically define, schedule, and monitor workflows. The BranchPythonOperator and the ShortCircuitOperator are two operators that enable data engineers to manage conditional tasks in Airflow. By using these operators, data engineers can create dynamic workflows that adapt to changing conditions and requirements.

  • Apache Airflow Documentation
  • Airflow Concepts

Discover more about software development and stay updated with the latest trends and best practices.

Understanding tfsjs loadgraphmodel summary() method for efficient tensorflow.js development.

This article provides an in-depth explanation of the TFSJS LoadGraphModel summary() method, which is used to generate a summary of a loaded graph model in TensorFlow.js. The article covers the syntax and usage of the method, as well as the information it provides. It also includes examples of how to use the method to optimize the performance of TensorFlow.js applications. Additionally, the article explores the benefits of using TensorFlow.js for machine learning and deep learning development. By the end of the article, readers will have a solid understanding of the TFSJS LoadGraphModel summary() method and how to use it effectively in their own projects.

How to Find Drive Letter Assignments for Hub Slots?

Learn how to find drive letter assignments for hub slots in Windows. This guide will help you understand how to identify the drive letters assigned to each slot in your hub.

Tags: :  Airflow Workflow Management Conditional Tasks Data Engineering

Latest news

  • Discord Bot: Interaction Failed with selectMenus in Python - Troubleshooting
  • Detect Hot Keys in Pure Python: A Practical Example
  • Maintaining Page Centering with Uneven div Sides in Tailwind CSS
  • SSH Forwarding Audio in Debian 12 (KDE) Fails
  • Error Populating FormArray in Angular (version 17)
  • Opening .shp Files Directly in QGIS: Similar Behavior to Drag-and-Drop
  • Changing the Filename of Logback Logfile: A Step-by-Step Guide
  • Comparing Images Across S3 Buckets in Different AWS Regions Using Amazon Rekognition
  • Understanding PyTorch's Softmax Outputs and Probabilities
  • Unable to Extract AMAccountName from DirectoryObject using MsGraph-.Net 4.8: A Comprehensive Guide
  • Installing Pyenv on Windows using PowerShell: A Fresh Start
  • Deploying CAS 6.6.15 on Wildfly 23: WFLYCTL0080 Error
  • Avoiding the Performance Penalty of Mixing SSE and AVX Instructions in MSVC
  • Importing Python Files: A Look at the vectors.py Example
  • Using Showdown.js for OpenAI Streaming Response in Software Development
  • Discord Python API: Presence Update Event Handler Not Working
  • Fixing the Double Click Issue in a Rust Application using Tauri
  • Bootstrap 5 Navbar Customization Issue: Items Displaying Vertically Instead of Horizontally
  • Directory Request Memoization and Data Caching in Next.js: Stored Server-side Exactly?
  • Stop Updating RecyclerView Fragment in Android Studio (Kotlin)
  • Implementing Google Login with Spring Java: Setting Up the Resource Server
  • Error with Duplicate Signal Name in Altair Charts in Google Colab
  • Convert jQuery Code to Vanilla JavaScript: Scroll Event Example
  • VBA UserForm: Implementing Merge Hover Effect in UserForm_Initialize
  • Handling Non-Standard Terminal Input in Unix Systems: Mouse Events and Key Modifiers
  • Losing 'string' Attribute with std::make_shared<custom_struct> in C++
  • Resolving 'No matching call function template for parameter' in C++
  • Addressing Chart Scroll View Transformation Issues in Swift
  • Getting Metadata Streaming: Troubleshooting Audio Feed
  • Failed to Build Multi-platform Docker Image (Mac) with Docker BuildX
  • Name Clash Resolution in Interfaces: A Compilation Error Solution
  • Parent-Child Relationship Modeling in REST API
  • Set Interval Not Working: Bullet Function in Game Development
  • Creating an iOS Sharesheet Extension: An Open Miniature App Appears in the Sharesheet
  • Oracle Linux Docker: Unsupported Error 'MEMORY_TARGET' during Database Creation

Branching in Airflow

When designing your data pipelines, you may encounter use cases that require more complex task flows than "Task A > Task B > Task C". For example, you may have a use case where you need to decide between multiple tasks to execute based on the results of an upstream task. Or you may have a case where part of your pipeline should only run under certain external conditions. Fortunately, Airflow has multiple options for building conditional logic and/or branching into your DAGs.

In this guide, you'll learn how you can use @task.branch (BranchPythonOperator) and @task.short_circuit (ShortCircuitOperator), other available branching operators, and additional resources to implement conditional logic in your Airflow DAGs.

There are multiple resources for learning about this topic. See also:

  • Astronomer Academy: Airflow: Branching module.

Assumed knowledge ​

To get the most out of this guide, you should have an understanding of:

  • Airflow operators. See Operators 101 .
  • Dependencies in Airflow. See Managing Dependencies in Apache Airflow .
  • Using Airflow decorators. See Introduction to Airflow decorators .

@task.branch (BranchPythonOperator) ​

One of the simplest ways to implement branching in Airflow is to use the @task.branch decorator, which is a decorated version of the BranchPythonOperator . @task.branch accepts any Python function as an input as long as the function returns a list of valid IDs for Airflow tasks that the DAG should run after the function completes.

In the following example we use a choose_branch function that returns one set of task IDs if the result is greater than 0.5 and a different set if the result is less than or equal to 0.5:

  • TaskFlow API
  • Traditional syntax

In general, the @task.branch decorator is a good choice if your branching logic can be easily implemented in a simple Python function. Whether you want to use the decorated version or the traditional operator is a question of personal preference.

The code below shows a full example of how to use @task.branch in a DAG:

In this DAG, random.choice() returns one random option out of a list of four branches. In the following screenshot, where branch_b was randomly chosen, the two tasks in branch_b were successfully run while the others were skipped.

Branching Graph View

If you have downstream tasks that need to run regardless of which branch is taken, like the join task in the previous example, you need to update the trigger rule. The default trigger rule in Airflow is all_success , which means that if upstream tasks are skipped, then the downstream task will not run. In the previous example, none_failed_min_one_success is specified to indicate that the task should run as long as one upstream task succeeded and no tasks failed.

Finally, note that with the @task.branch decorator your Python function must return at least one task ID for whichever branch is chosen (i.e. it can't return nothing). If one of the paths in your branching should do nothing, you can use an EmptyOperator in that branch.

More examples using the @task.branch decorator, can be found on the Astronomer Registry .

@task.short_circuit (ShortCircuitOperator) ​

Another option for implementing conditional logic in your DAGs is the @task.short_circuit decorator, which is a decorated version of the ShortCircuitOperator . This operator takes a Python function that returns True or False based on logic implemented for your use case. If True is returned, the DAG continues, and if False is returned, all downstream tasks are skipped.

@task.short_circuit is useful when you know that some tasks in your DAG should run only occasionally. For example, maybe your DAG runs daily, but some tasks should only run on Sundays. Or maybe your DAG orchestrates a machine learning model, and tasks that publish the model should only be run if a certain accuracy is reached after training. This type of logic can also be implemented with @task.branch , but that operator requires a task ID to be returned. Using the @task.short_circuit decorator can be cleaner in cases where the conditional logic equates to "run or not" as opposed to "run this or that".

The following DAG shows an example of how to implement @task.short_circuit :

In this DAG there are two short circuits, one which always returns True and one which always returns False . When you run the DAG, you can see that tasks downstream of the True condition operator ran, while tasks downstream of the False condition operator were skipped.

Short Circuit

Another example using the ShortCircuitOperator, can be found on the Astronomer Registry .

Other branch operators ​

Airflow offers a few other branching operators that work similarly to the BranchPythonOperator but for more specific contexts:

  • BranchSQLOperator : Branches based on whether a given SQL query returns true or false .
  • BranchDayOfWeekOperator : Branches based on whether the current day of week is equal to a given week_day parameter.
  • BranchDateTimeOperator : Branches based on whether the current time is between target_lower and target_upper times.
  • ExternalBranchPythonOperator : Branches based on a Python function like the BranchPythonOperator , but runs in a preexisting virtual environment like the ExternalPythonOperator (available in Airflow 2.7+).
  • BranchPythonVirtualenvOperator : Branches based on a Python function like the BranchPythonOperator , but runs in newly created virtual environment like the PythonVirtualenvOperator (available in Airflow 2.8+). The environment can be cached by providing a venv_cache_path .

All of these operators take follow_task_ids_if_true and follow_task_ids_if_false parameters which provide the list of task(s) to include in the branch based on the logic returned by the operator.

Additional branching resources ​

There is much more to the BranchPythonOperator than simply choosing one task over another.

  • What if you want to trigger your tasks only on specific days? And not on holidays?
  • What if you want to trigger a DAG Run only if the previous one succeeded?

For more guidance and best practices on common use cases like the ones above, try out Astronomer's Academy Course on Branching for free today.

Was this page helpful?

Sign up for developer updates.

Get a summary of new Astro features once a month.

You can unsubscribe at any time. By proceeding you agree to our Privacy Policy , our Website Terms and to receive emails from Astronomer.

  • Assumed knowledge
  • @task.branch (BranchPythonOperator)
  • @task.short_circuit (ShortCircuitOperator)
  • Other branch operators
  • Additional branching resources

We use cookies (and other similar technologies) to collect data to improve your experience on our site. By using our website, you’re agreeing to the collection of data as described in our Privacy Policy .

You can change your preferences.

5 Complex task dependencies

This chapter covers:

●      Examining how to differentiate the order of task dependencies in an Airflow DAG.

●      Explaining how to use trigger rules to implement joins at specific points in an Airflow DAG.

●      Showing how to make conditional tasks in an Airflow DAG, which can be skipped under certain conditions.

●      Giving a basic idea of how trigger rules function in Airflow and how this affects the execution of your tasks.

In previous chapters, we’ve seen how to build a basic DAG and define simple dependencies between tasks. In this chapter, we will further explore exactly how task dependencies are defined in Airflow and how these capabilities can be used to implement more complex patterns including conditional tasks, branches and joins. Towards the end of the chapter we’ll also dive into XComs, which allow passing data between different tasks in a DAG run, and discuss the merits and drawbacks of using this type of approach.

Effective Use of Branching and Conditional Logic in Airflow

Effective Use of Branching and Conditional Logic in Airflow image

Branching with the BranchPythonOperator

The BranchPythonOperator is a versatile tool in Airflow that allows for dynamic branching in workflows. It works by executing a Python callable and expecting a task id or list of task ids to be returned. These task ids determine which subsequent task(s) should be run in the current execution path of the DAG.

In the example above, branch_function evaluates a condition and returns the id of the next task to execute. The true_branch and false_branch tasks are executed based on the result of this function, eventually joining back together at the join task.

Implementing Conditional Logic with Sensors

Sensors are a special kind of operator in Airflow that will keep checking for a certain criterion and only proceed when the criterion is met. This can be seen as a form of conditional logic where the subsequent tasks are only triggered if certain conditions are true.

The FileSensor , for example, will check for the existence of a specified file and only allow the workflow to continue once it has been detected.

Advanced Branching Techniques

To create more advanced branching logic, developers can combine multiple BranchPythonOperator s or even create custom operators. It's important to ensure that your branching logic aligns with your business logic needs and that it's tested thoroughly to prevent unexpected behavior in production environments.

Hiring Expertise for Complex Workflows

If you're looking to scale your team's capacity to build out or maintain complex Airflow workflows with branching and conditional logic, you might consider bringing on specialized talent. You can hire remote airflow database developers who can help optimize your data pipelines and ensure best practices are followed.

If you're interested in enhancing this article or becoming a contributing author, we'd love to hear from you.

Please contact Sasha at [email protected] to discuss the opportunity further or to inquire about adding a direct link to your resource. We welcome your collaboration and contributions!

secure.food

dnmtechs logo

DNMTechs – Sharing and Storing Technology Knowledge

Javascript, Python, Android, Bash, Hardware, Software and more…

Creating a Conditional Task in Airflow

Airflow is an open-source platform used for orchestrating complex workflows. It allows users to define, schedule, and monitor tasks as directed acyclic graphs (DAGs). One of the powerful features of Airflow is the ability to create conditional tasks, which allows for more flexibility and control in workflow execution. In this article, we will explore how to create a conditional task in Airflow using Python.

Understanding Conditional Tasks

Conditional tasks in Airflow are tasks that are executed based on certain conditions. These conditions can be defined using Python expressions, allowing for dynamic control over the workflow. Conditional tasks can be useful in scenarios where certain tasks should only be executed if specific conditions are met.

Let’s consider an example where we have a DAG that performs data processing. We want to execute a task to download data only if a certain file does not already exist. In this case, we can use a conditional task to check if the file exists and decide whether to execute the download task or skip it.

Creating a Conditional Task

To create a conditional task in Airflow, we can use the `BranchPythonOperator` and `PythonOperator` classes. The `BranchPythonOperator` allows us to define a Python function that returns the task ID of the next task to execute based on the condition. The `PythonOperator` is used to define the tasks that will be executed based on the condition.

In the above example, we define a DAG called ‘conditional_task_example’ with three tasks: `check_file_exists`, `download_task`, and `skip_download_task`. The `check_file_exists` task uses the `BranchPythonOperator` to determine whether to execute the `download_task` or the `skip_download_task` based on the condition defined in the `check_file_exists` function.

The `download_task` and `skip_download_task` are defined using the `PythonOperator` and their respective Python functions. These functions can contain any logic or operations that need to be performed as part of the task.

The `check_file_exists_task` is set as the starting point of the DAG, and its output determines the execution path of the subsequent tasks. If the condition in the `check_file_exists` function returns ‘skip_download_task’, the `skip_download_task` will be executed. Otherwise, the `download_task` will be executed.

Conditional tasks in Airflow provide a powerful way to control workflow execution based on dynamic conditions. By using the `BranchPythonOperator` and `PythonOperator`, we can create conditional tasks that allow for more flexibility and control in workflow design. This enables us to build complex and adaptive workflows that can handle various scenarios efficiently.

Example 1: Simple Conditional Task

In this example, we will create a simple conditional task in Airflow using the Python programming language. The task will run only if a certain condition is met.

Example 2: Multiple Conditional Tasks

In this example, we will create multiple conditional tasks in Airflow. Each task will have its own condition and will run only if the condition is met.

Reference Links:

  • Airflow Documentation
  • Conditional Triggers in Airflow
  • Stack Overflow: Conditional Task Execution in Apache Airflow

Conclusion:

Creating conditional tasks in Airflow allows you to control the execution flow based on specific conditions. By using Python programming, you can define the conditions and actions to be taken accordingly. This flexibility enables you to build complex workflows with dynamic task dependencies. With the help of Airflow’s powerful features and the ability to create conditional tasks, you can efficiently manage and automate your data pipelines.

Related Posts

conditional task airflow

Title: Converting the First Character of a String to Lowercase in Python 3

conditional task airflow

The Dangers of Using ‘eval’ in Python 3 Programming

conditional task airflow

Efficient Python3 Project Cleanup: Removing __pycache__ Folders and .pyc Files

python – How to create a conditional task in Airflow

Creating a conditional task in Airflow with Python involves using the `BranchPythonOperator` to create a task that determines the next task to execute based on a condition. Here are 8 examples with step-by-step explanations: Example 1: python from datetime import datetime from airflow import DAG from airflow.operators.python_operator import BranchPythonOperator def condition_task(): if datetime.now().minute % 2 == 0: return "even_task" else: return "odd_task" dag = DAG("conditional_task_example", start_date=datetime(2022, 1, 1), schedule_interval="*/5 * * * *") branch_operator = BranchPythonOperator( task_id = "condition_check", python_callable = condition_task, dag = dag ) In this example, we define a `BranchPythonOperator` with `condition_task` as the callable function. The `condition_task` checks if the current minute is even or odd and returns the task_id of the appropriate task (“even_task” or “odd_task”). Example 2: python from datetime import datetime from airflow import DAG from airflow.operators.python_operator import BranchPythonOperator def condition_task(): if datetime.now().day % 2 == 0: return "even_day_task" else: return "odd_day_task" dag = DAG("conditional_task_example", start_date=datetime(2022, 1, 1), schedule_interval="@daily") branch_operator = BranchPythonOperator( task_id = "condition_check", python_callable = condition_task, dag = dag ) This example checks if the current day is even or odd, and returns the task_id of the appropriate task (“even_day_task” or “odd_day_task”). Example 3: python from datetime import datetime from airflow import DAG from airflow.operators.python_operator import BranchPythonOperator def condition_task(): if datetime.now().hour < 12: return "morning_task" else: return "afternoon_task" dag = DAG("conditional_task_example", start_date=datetime(2022, 1, 1), schedule_interval="@hourly") branch_operator = BranchPythonOperator( task_id = "condition_check", python_callable = condition_task, dag = dag ) Here, the condition checks if the current hour is before 12 PM, and returns the task_id of either “morning_task” or “afternoon_task”. Example 4: python from datetime import datetime from airflow import DAG from airflow.operators.python_operator import BranchPythonOperator def condition_task(): if datetime.now().weekday() < 5: return "weekday_task" else: return "weekend_task" dag = DAG("conditional_task_example", start_date=datetime(2022, 1, 1), schedule_interval="@daily") branch_operator = BranchPythonOperator( task_id = "condition_check", python_callable = condition_task, dag = dag ) This example checks if the current day is a weekday or a weekend and returns the task_id of the appropriate task (“weekday_task” or “weekend_task”). Example 5: python from datetime import datetime from airflow import DAG from airflow.operators.python_operator import BranchPythonOperator def condition_task(): if datetime.now().month == 12: return "december_task" else: return "not_december_task" dag = DAG("conditional_task_example", start_date=datetime(2022, 1, 1), schedule_interval="@monthly") branch_operator = BranchPythonOperator( task_id = "condition_check", python_callable = condition_task, dag = dag ) Here, the condition checks if the current month is December and returns the task_id of either “december_task” or “not_december_task”. Example 6: python from datetime import datetime from airflow import DAG from airflow.operators.python_operator import BranchPythonOperator def condition_task(): if datetime.now().year == 2022: return "2022_task" else: return "other_year_task" dag = DAG("conditional_task_example", start_date=datetime(2022, 1, 1), schedule_interval="@yearly") branch_operator = BranchPythonOperator( task_id = "condition_check", python_callable = condition_task, dag = dag ) This example checks if the current year is 2022 and returns the task_id of either “2022_task” or “other_year_task”. Example 7: python from datetime import datetime from airflow import DAG from airflow.operators.python_operator import BranchPythonOperator def condition_task(): if datetime.now().day % 7 == 0: return "weekly_task" else: return "not_weekly_task" dag = DAG("conditional_task_example", start_date=datetime(2022, 1, 1), schedule_interval="@daily") branch_operator = BranchPythonOperator( task_id = "condition_check", python_callable = condition_task, dag = dag ) In this example, the condition checks if the current day is a multiple of 7 (i.e., end of the week) and returns the task_id of either “weekly_task” or “not_weekly_task”. Example 8: python from datetime import datetime from airflow import DAG from airflow.operators.python_operator import BranchPythonOperator def condition_task(): if datetime.now().hour < 6: return "night_task" elif datetime.now().hour < 12: return "morning_task" elif datetime.now().hour < 18: return "afternoon_task" else: return "evening_task" dag = DAG("conditional_task_example", start_date=datetime(2022, 1, 1), schedule_interval="@hourly") branch_operator = BranchPythonOperator( task_id = "condition_check", python_callable = condition_task, dag = dag ) Here, the condition checks the current hour and returns the task_id based on the hour of the day (“night_task”, “morning_task”, “afternoon_task”, or “evening_task”). In all these examples, the `BranchPythonOperator` executes the `condition_task` function and returns the task_id of the next task to execute. Make sure to define the respective tasks after the `branch_operator` to complete the conditional workflow. Remember, these examples are just illustrations to showcase the usage of a conditional task in Airflow. You can modify the conditions based on your requirements and add more complexity to the logic.

Similar Posts

Python – python indentation error: [duplicate].

It seems you have encountered an indentation error in your Python code. Indentation errors occur when the spaces/tabs used for indentation are not consistent or incorrect. In Python, proper indentation is crucial as it defines the block of code within a function, loop, or conditional statement. Indentation errors can be fixed by ensuring that the…

python – listing png files in folder [duplicate]

To list all PNG files in a folder using Python, you can use the `glob` module. Here’s an example with a step-by-step explanation: 1. Import the `glob` module: 2. Specify the folder path where your PNG files are located: 3. Use the `glob` function with a wildcard pattern to find all PNG files in the…

python – How to append a new row to an old CSV file in Python?

To append a new row to an existing CSV file in Python, you can use the csv module in the following steps: 1. Import the csv module: 2. Open the CSV file in ‘append’ mode using the `open()` function: In the above code, `’a’` is the mode for appending to a file, and `newline=”` is…

python – NaN in mapper – name 'nan' is not defined

The error message “NaN in mapper – name ‘nan’ is not defined” is typically encountered when attempting to use the NaN value from the numpy library without first importing it. NaN stands for “Not a Number” and represents an undefined or unrepresentable value in mathematical operations. To resolve the error, you need to import the…

python – Make Javascript do List Comprehension

In Python, list comprehension allows you to create lists in a concise and readable way. JavaScript, however, does not have built-in support for list comprehension. Nevertheless, you can achieve similar functionality using array methods and functional programming techniques. Here are eight examples of how you can replicate list comprehension in JavaScript: 1. Squaring Numbers: Python…

python – Why does the floating-point value of 4*0.1 look nice in Python 3 but 3*0.1 doesn't?

The reason why the floating-point value of `4*0.1` may look nicer in Python 3 compared to `3*0.1` is due to how floating-point arithmetic works in the language. Python uses the IEEE 754 standard for floating-point arithmetic, which is a binary representation. However, numbers with a finite decimal representation cannot always be represented exactly in binary…

  • Quick Start
  • Installation of Airflow™
  • How-to Guides
  • UI / Screenshots
  • Architecture Overview
  • Task Dependencies
  • Loading DAGs
  • Running DAGs
  • DAG Assignment
  • Default Arguments
  • The DAG decorator
  • Latest Only
  • Depends On Past
  • Trigger Rules
  • Setup and teardown
  • Dynamic DAGs
  • Edge Labels
  • DAG & Task Documentation
  • TaskGroups vs SubDAGs
  • Packaging DAGs
  • .airflowignore
  • DAG Dependencies
  • DAG pausing, deactivation and deletion
  • Object Storage
  • Authoring and Scheduling
  • Administration and Deployment
  • Integration
  • Public Interface of Airflow
  • Best Practices
  • Release Policies
  • Release Notes
  • Privacy Notice
  • Operators and hooks
  • Stable REST API
  • Deprecated REST API
  • Configurations
  • Extra packages

Internal DB details

  • Database Migrations
  • Database ERD Schema
  • Core Concepts

A DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run.

Here’s a basic example DAG:

../_images/basic-dag.png

It defines four Tasks - A, B, C, and D - and dictates the order in which they have to run, and which tasks depend on what others. It will also say how often to run the DAG - maybe “every 5 minutes starting tomorrow”, or “every day since January 1st, 2020”.

The DAG itself doesn’t care about what is happening inside the tasks; it is merely concerned with how to execute them - the order to run them in, how many times to retry them, if they have timeouts, and so on.

Declaring a DAG ¶

There are three ways to declare a DAG - either you can use a context manager, which will add the DAG to anything inside it implicitly:

Or, you can use a standard constructor, passing the DAG into any operators you use:

Or, you can use the @dag decorator to turn a function into a DAG generator :

DAGs are nothing without Tasks to run, and those will usually come in the form of either Operators , Sensors or TaskFlow .

Task Dependencies ¶

A Task/Operator does not usually live alone; it has dependencies on other tasks (those upstream of it), and other tasks depend on it (those downstream of it). Declaring these dependencies between tasks is what makes up the DAG structure (the edges of the directed acyclic graph ).

There are two main ways to declare individual task dependencies. The recommended one is to use the >> and << operators:

Or, you can also use the more explicit set_upstream and set_downstream methods:

There are also shortcuts to declaring more complex dependencies. If you want to make two lists of tasks depend on all parts of each other, you can’t use either of the approaches above, so you need to use cross_downstream :

And if you want to chain together dependencies, you can use chain :

Chain can also do pairwise dependencies for lists the same size (this is different from the cross dependencies created by cross_downstream !):

Loading DAGs ¶

Airflow loads DAGs from Python source files, which it looks for inside its configured DAG_FOLDER . It will take each file, execute it, and then load any DAG objects from that file.

This means you can define multiple DAGs per Python file, or even spread one very complex DAG across multiple Python files using imports.

Note, though, that when Airflow comes to load DAGs from a Python file, it will only pull any objects at the top level that are a DAG instance. For example, take this DAG file:

While both DAG constructors get called when the file is accessed, only dag_1 is at the top level (in the globals() ), and so only it is added to Airflow. dag_2 is not loaded.

When searching for DAGs inside the DAG_FOLDER , Airflow only considers Python files that contain the strings airflow and dag (case-insensitively) as an optimization.

To consider all Python files instead, disable the DAG_DISCOVERY_SAFE_MODE configuration flag.

You can also provide an .airflowignore file inside your DAG_FOLDER , or any of its subfolders, which describes patterns of files for the loader to ignore. It covers the directory it’s in plus all subfolders underneath it. See .airflowignore below for details of the file syntax.

In the case where the .airflowignore does not meet your needs and you want a more flexible way to control if a python file needs to be parsed by Airflow. You can plug your callable by setting might_contain_dag_callable in the config file. Note, this callable will replace the default Airflow heuristic, i.e. checking if the strings airflow and dag (case-insensitively) in the file.

Running DAGs ¶

DAGs will run in one of two ways:

When they are triggered either manually or via the API On a defined schedule , which is defined as part of the DAG

DAGs do not require a schedule, but it’s very common to define one. You define it via the schedule argument, like this:

The schedule argument takes any value that is a valid Crontab schedule value, so you could also do:

For more information on schedule values, see DAG Run .

If schedule is not enough to express the DAG’s schedule, see Timetables . For more information on logical date , see Data Interval and What does execution_date mean? .

Every time you run a DAG, you are creating a new instance of that DAG which Airflow calls a DAG Run . DAG Runs can run in parallel for the same DAG, and each has a defined data interval, which identifies the period of data the tasks should operate on.

As an example of why this is useful, consider writing a DAG that processes a daily set of experimental data. It’s been rewritten, and you want to run it on the previous 3 months of data—no problem, since Airflow can backfill the DAG and run copies of it for every day in those previous 3 months, all at once.

Those DAG Runs will all have been started on the same actual day, but each DAG run will have one data interval covering a single day in that 3 month period, and that data interval is all the tasks, operators and sensors inside the DAG look at when they run.

In much the same way a DAG instantiates into a DAG Run every time it’s run, Tasks specified inside a DAG are also instantiated into Task Instances along with it.

A DAG run will have a start date when it starts, and end date when it ends. This period describes the time when the DAG actually ‘ran.’ Aside from the DAG run’s start and end date, there is another date called logical date (formally known as execution date), which describes the intended time a DAG run is scheduled or triggered. The reason why this is called logical is because of the abstract nature of it having multiple meanings, depending on the context of the DAG run itself.

For example, if a DAG run is manually triggered by the user, its logical date would be the date and time of which the DAG run was triggered, and the value should be equal to DAG run’s start date. However, when the DAG is being automatically scheduled, with certain schedule interval put in place, the logical date is going to indicate the time at which it marks the start of the data interval, where the DAG run’s start date would then be the logical date + scheduled interval.

DAG Assignment ¶

Note that every single Operator/Task must be assigned to a DAG in order to run. Airflow has several ways of calculating the DAG without you passing it explicitly:

If you declare your Operator inside a with DAG block

If you declare your Operator inside a @dag decorator

If you put your Operator upstream or downstream of an Operator that has a DAG

Otherwise, you must pass it into each Operator with dag= .

Default Arguments ¶

Often, many Operators inside a DAG need the same set of default arguments (such as their retries ). Rather than having to specify this individually for every Operator, you can instead pass default_args to the DAG when you create it, and it will auto-apply them to any operator tied to it:

The DAG decorator ¶

New in version 2.0.

As well as the more traditional ways of declaring a single DAG using a context manager or the DAG() constructor, you can also decorate a function with @dag to turn it into a DAG generator function:

airflow/example_dags/example_dag_decorator.py [source]

As well as being a new way of making DAGs cleanly, the decorator also sets up any parameters you have in your function as DAG parameters, letting you set those parameters when triggering the DAG . You can then access the parameters from Python code, or from {{ context.params }} inside a Jinja template .

Airflow will only load DAGs that appear in the top level of a DAG file. This means you cannot just declare a function with @dag - you must also call it at least once in your DAG file and assign it to a top-level object, as you can see in the example above.

Control Flow ¶

By default, a DAG will only run a Task when all the Tasks it depends on are successful. There are several ways of modifying this, however:

Branching - select which Task to move onto based on a condition

Trigger Rules - set the conditions under which a DAG will run a task

Setup and Teardown - define setup and teardown relationships

Latest Only - a special form of branching that only runs on DAGs running against the present

Depends On Past - tasks can depend on themselves from a previous run

Branching ¶

You can make use of branching in order to tell the DAG not to run all dependent tasks, but instead to pick and choose one or more paths to go down. This is where the @task.branch decorator come in.

The @task.branch decorator is much like @task , except that it expects the decorated function to return an ID to a task (or a list of IDs). The specified task is followed, while all other paths are skipped. It can also return None to skip all downstream tasks.

The task_id returned by the Python function has to reference a task directly downstream from the @task.branch decorated task.

When a Task is downstream of both the branching operator and downstream of one or more of the selected tasks, it will not be skipped:

../_images/branch_note.png

The paths of the branching task are branch_a , join and branch_b . Since join is a downstream task of branch_a , it will still be run, even though it was not returned as part of the branch decision.

The @task.branch can also be used with XComs allowing branching context to dynamically decide what branch to follow based on upstream tasks. For example:

If you wish to implement your own operators with branching functionality, you can inherit from BaseBranchOperator , which behaves similarly to @task.branch decorator but expects you to provide an implementation of the method choose_branch .

The @task.branch decorator is recommended over directly instantiating BranchPythonOperator in a DAG. The latter should generally only be subclassed to implement a custom operator.

As with the callable for @task.branch , this method can return the ID of a downstream task, or a list of task IDs, which will be run, and all others will be skipped. It can also return None to skip all downstream task:

Similar like @task.branch decorator for regular Python code there are also branch decorators which use a virtual environment called @task.branch_virtualenv or external python called @task.branch_external_python .

Latest Only ¶

Airflow’s DAG Runs are often run for a date that is not the same as the current date - for example, running one copy of a DAG for every day in the last month to backfill some data.

There are situations, though, where you don’t want to let some (or all) parts of a DAG run for a previous date; in this case, you can use the LatestOnlyOperator .

This special Operator skips all tasks downstream of itself if you are not on the “latest” DAG run (if the wall-clock time right now is between its execution_time and the next scheduled execution_time, and it was not an externally-triggered run).

Here’s an example:

airflow/example_dags/example_latest_only_with_trigger.py [source]

In the case of this DAG:

task1 is directly downstream of latest_only and will be skipped for all runs except the latest.

task2 is entirely independent of latest_only and will run in all scheduled periods

task3 is downstream of task1 and task2 and because of the default trigger rule being all_success will receive a cascaded skip from task1 .

task4 is downstream of task1 and task2 , but it will not be skipped, since its trigger_rule is set to all_done .

../_images/latest_only_with_trigger.png

Depends On Past ¶

You can also say a task can only run if the previous run of the task in the previous DAG Run succeeded. To use this, you just need to set the depends_on_past argument on your Task to True .

Note that if you are running the DAG at the very start of its life—specifically, its first ever automated run—then the Task will still run, as there is no previous run to depend on.

Trigger Rules ¶

By default, Airflow will wait for all upstream (direct parents) tasks for a task to be successful before it runs that task.

However, this is just the default behaviour, and you can control it using the trigger_rule argument to a Task. The options for trigger_rule are:

all_success (default): All upstream tasks have succeeded

all_failed : All upstream tasks are in a failed or upstream_failed state

all_done : All upstream tasks are done with their execution

all_skipped : All upstream tasks are in a skipped state

one_failed : At least one upstream task has failed (does not wait for all upstream tasks to be done)

one_success : At least one upstream task has succeeded (does not wait for all upstream tasks to be done)

one_done : At least one upstream task succeeded or failed

none_failed : All upstream tasks have not failed or upstream_failed - that is, all upstream tasks have succeeded or been skipped

none_failed_min_one_success : All upstream tasks have not failed or upstream_failed , and at least one upstream task has succeeded.

none_skipped : No upstream task is in a skipped state - that is, all upstream tasks are in a success , failed , or upstream_failed state

always : No dependencies at all, run this task at any time

You can also combine this with the Depends On Past functionality if you wish.

It’s important to be aware of the interaction between trigger rules and skipped tasks, especially tasks that are skipped as part of a branching operation. You almost never want to use all_success or all_failed downstream of a branching operation .

Skipped tasks will cascade through trigger rules all_success and all_failed , and cause them to skip as well. Consider the following DAG:

join is downstream of follow_branch_a and branch_false . The join task will show up as skipped because its trigger_rule is set to all_success by default, and the skip caused by the branching operation cascades down to skip a task marked as all_success .

../_images/branch_without_trigger.png

By setting trigger_rule to none_failed_min_one_success in the join task, we can instead get the intended behaviour:

../_images/branch_with_trigger.png

Setup and teardown ¶

In data workflows it’s common to create a resource (such as a compute resource), use it to do some work, and then tear it down. Airflow provides setup and teardown tasks to support this need.

Please see main article Setup and Teardown for details on how to use this feature.

Dynamic DAGs ¶

Since a DAG is defined by Python code, there is no need for it to be purely declarative; you are free to use loops, functions, and more to define your DAG.

For example, here is a DAG that uses a for loop to define some tasks:

In general, we advise you to try and keep the topology (the layout) of your DAG tasks relatively stable; dynamic DAGs are usually better used for dynamically loading configuration options or changing operator options.

DAG Visualization ¶

If you want to see a visual representation of a DAG, you have two options:

You can load up the Airflow UI, navigate to your DAG, and select “Graph”

You can run airflow dags show , which renders it out as an image file

We generally recommend you use the Graph view, as it will also show you the state of all the Task Instances within any DAG Run you select.

Of course, as you develop out your DAGs they are going to get increasingly complex, so we provide a few ways to modify these DAG views to make them easier to understand.

TaskGroups ¶

A TaskGroup can be used to organize tasks into hierarchical groups in Graph view. It is useful for creating repeating patterns and cutting down visual clutter.

Unlike SubDAGs , TaskGroups are purely a UI grouping concept. Tasks in TaskGroups live on the same original DAG, and honor all the DAG settings and pool configurations.

Dependency relationships can be applied across all tasks in a TaskGroup with the >> and << operators. For example, the following code puts task1 and task2 in TaskGroup group1 and then puts both tasks upstream of task3 :

TaskGroup also supports default_args like DAG, it will overwrite the default_args in DAG level:

If you want to see a more advanced use of TaskGroup, you can look at the example_task_group_decorator.py example DAG that comes with Airflow.

By default, child tasks/TaskGroups have their IDs prefixed with the group_id of their parent TaskGroup. This helps to ensure uniqueness of group_id and task_id throughout the DAG.

To disable the prefixing, pass prefix_group_id=False when creating the TaskGroup, but note that you will now be responsible for ensuring every single task and group has a unique ID of its own.

When using the @task_group decorator, the decorated-function’s docstring will be used as the TaskGroups tooltip in the UI except when a tooltip value is explicitly supplied.

Edge Labels ¶

As well as grouping tasks into groups, you can also label the dependency edges between different tasks in the Graph view - this can be especially useful for branching areas of your DAG, so you can label the conditions under which certain branches might run.

To add labels, you can use them directly inline with the >> and << operators:

Or, you can pass a Label object to set_upstream / set_downstream :

Here’s an example DAG which illustrates labeling different branches:

../_images/edge_label_example.png

airflow/example_dags/example_branch_labels.py [source]

DAG & Task Documentation ¶

It’s possible to add documentation or notes to your DAGs & task objects that are visible in the web interface (“Graph” & “Tree” for DAGs, “Task Instance Details” for tasks).

There are a set of special task attributes that get rendered as rich content if defined:

Please note that for DAGs, doc_md is the only attribute interpreted. For DAGs it can contain a string or the reference to a template file. Template references are recognized by str ending in .md . If a relative path is supplied it will start from the folder of the DAG file. Also the template file must exist or Airflow will throw a jinja2.exceptions.TemplateNotFound exception.

This is especially useful if your tasks are built dynamically from configuration files, as it allows you to expose the configuration that led to the related tasks in Airflow:

SubDAG is deprecated hence TaskGroup is always the preferred choice.

Sometimes, you will find that you are regularly adding exactly the same set of tasks to every DAG, or you want to group a lot of tasks into a single, logical unit. This is what SubDAGs are for.

For example, here’s a DAG that has a lot of parallel tasks in two sections:

../_images/subdag_before.png

We can combine all of the parallel task-* operators into a single SubDAG, so that the resulting DAG resembles the following:

../_images/subdag_after.png

Note that SubDAG operators should contain a factory method that returns a DAG object. This will prevent the SubDAG from being treated like a separate DAG in the main UI - remember, if Airflow sees a DAG at the top level of a Python file, it will load it as its own DAG . For example:

airflow/example_dags/subdags/subdag.py [source]

This SubDAG can then be referenced in your main DAG file:

airflow/example_dags/example_subdag_operator.py [source]

You can zoom into a SubDagOperator from the graph view of the main DAG to show the tasks contained within the SubDAG:

../_images/subdag_zoom.png

Some other tips when using SubDAGs:

By convention, a SubDAG’s dag_id should be prefixed by the name of its parent DAG and a dot ( parent.child )

You should share arguments between the main DAG and the SubDAG by passing arguments to the SubDAG operator (as demonstrated above)

SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once , the SubDAG will succeed without having done anything.

Clearing a SubDagOperator also clears the state of the tasks within it.

Marking success on a SubDagOperator does not affect the state of the tasks within it.

Refrain from using Depends On Past in tasks within the SubDAG as this can be confusing.

You can specify an executor for the SubDAG. It is common to use the SequentialExecutor if you want to run the SubDAG in-process and effectively limit its parallelism to one. Using LocalExecutor can be problematic as it may over-subscribe your worker, running multiple tasks in a single slot.

See airflow/example_dags for a demonstration.

Parallelism is not honored by SubDagOperator , and so resources could be consumed by SubdagOperators beyond any limits you may have set.

TaskGroups vs SubDAGs ¶

SubDAGs, while serving a similar purpose as TaskGroups, introduces both performance and functional issues due to its implementation.

The SubDagOperator starts a BackfillJob, which ignores existing parallelism configurations potentially oversubscribing the worker environment.

SubDAGs have their own DAG attributes. When the SubDAG DAG attributes are inconsistent with its parent DAG, unexpected behavior can occur.

Unable to see the “full” DAG in one view as SubDAGs exists as a full fledged DAG.

SubDAGs introduces all sorts of edge cases and caveats. This can disrupt user experience and expectation.

TaskGroups, on the other hand, is a better option given that it is purely a UI grouping concept. All tasks within the TaskGroup still behave as any other tasks outside of the TaskGroup.

You can see the core differences between these two constructs.

Packaging DAGs ¶

While simpler DAGs are usually only in a single Python file, it is not uncommon that more complex DAGs might be spread across multiple files and have dependencies that should be shipped with them (“vendored”).

You can either do this all inside of the DAG_FOLDER , with a standard filesystem layout, or you can package the DAG and all of its Python files up as a single zip file. For instance, you could ship two DAGs along with a dependency they need as a zip file with the following contents:

Note that packaged DAGs come with some caveats:

They cannot be used if you have pickling enabled for serialization

They cannot contain compiled libraries (e.g. libz.so ), only pure Python

They will be inserted into Python’s sys.path and importable by any other code in the Airflow process, so ensure the package names don’t clash with other packages already installed on your system.

In general, if you have a complex set of compiled dependencies and modules, you are likely better off using the Python virtualenv system and installing the necessary packages on your target systems with pip .

.airflowignore ¶

An .airflowignore file specifies the directories or files in DAG_FOLDER or PLUGINS_FOLDER that Airflow should intentionally ignore. Airflow supports two syntax flavors for patterns in the file, as specified by the DAG_IGNORE_FILE_SYNTAX configuration parameter ( added in Airflow 2.3 ): regexp and glob .

The default DAG_IGNORE_FILE_SYNTAX is regexp to ensure backwards compatibility.

For the regexp pattern syntax (the default), each line in .airflowignore specifies a regular expression pattern, and directories or files whose names (not DAG id) match any of the patterns would be ignored (under the hood, Pattern.search() is used to match the pattern). Use the # character to indicate a comment; all characters on a line following a # will be ignored.

As with most regexp matching in Airflow, the regexp engine is re2 , which explicitly doesn’t support many advanced features, please check its documentation for more information.

With the glob syntax, the patterns work just like those in a .gitignore file:

The * character will any number of characters, except /

The ? character will match any single character, except /

The range notation, e.g. [a-zA-Z] , can be used to match one of the characters in a range

A pattern can be negated by prefixing with ! . Patterns are evaluated in order so a negation can override a previously defined pattern in the same file or patterns defined in a parent directory.

A double asterisk ( ** ) can be used to match across directories. For example, **/__pycache__/ will ignore __pycache__ directories in each sub-directory to infinite depth.

If there is a / at the beginning or middle (or both) of the pattern, then the pattern is relative to the directory level of the particular .airflowignore file itself. Otherwise the pattern may also match at any level below the .airflowignore level.

The .airflowignore file should be put in your DAG_FOLDER . For example, you can prepare a .airflowignore file using the regexp syntax with content

Or, equivalently, in the glob syntax

Then files like project_a_dag_1.py , TESTING_project_a.py , tenant_1.py , project_a/dag_1.py , and tenant_1/dag_1.py in your DAG_FOLDER would be ignored (If a directory’s name matches any of the patterns, this directory and all its subfolders would not be scanned by Airflow at all. This improves efficiency of DAG finding).

The scope of a .airflowignore file is the directory it is in plus all its subfolders. You can also prepare .airflowignore file for a subfolder in DAG_FOLDER and it would only be applicable for that subfolder.

DAG Dependencies ¶

Added in Airflow 2.1

While dependencies between tasks in a DAG are explicitly defined through upstream and downstream relationships, dependencies between DAGs are a bit more complex. In general, there are two ways in which one DAG can depend on another:

triggering - TriggerDagRunOperator

waiting - ExternalTaskSensor

Additional difficulty is that one DAG could wait for or trigger several runs of the other DAG with different data intervals. The Dag Dependencies view Menu -> Browse -> DAG Dependencies helps visualize dependencies between DAGs. The dependencies are calculated by the scheduler during DAG serialization and the webserver uses them to build the dependency graph.

The dependency detector is configurable, so you can implement your own logic different than the defaults in DependencyDetector

DAG pausing, deactivation and deletion ¶

The DAGs have several states when it comes to being “not running”. DAGs can be paused, deactivated and finally all metadata for the DAG can be deleted.

Dag can be paused via UI when it is present in the DAGS_FOLDER , and scheduler stored it in the database, but the user chose to disable it via the UI. The “pause” and “unpause” actions are available via UI and API. Paused DAG is not scheduled by the Scheduler, but you can trigger them via UI for manual runs. In the UI, you can see Paused DAGs (in Paused tab). The DAGs that are un-paused can be found in the Active tab. When a DAG is paused, any running tasks are allowed to complete and all downstream tasks are put in to a state of “Scheduled”. When the DAG is unpaused, any “scheduled” tasks will begin running according to the DAG logic. DAGs with no “scheduled” tasks will begin running according to their schedule.

Dag can be deactivated (do not confuse it with Active tag in the UI) by removing them from the DAGS_FOLDER . When scheduler parses the DAGS_FOLDER and misses the DAG that it had seen before and stored in the database it will set is as deactivated. The metadata and history of the DAG` is kept for deactivated DAGs and when the DAG is re-added to the DAGS_FOLDER it will be again activated and history will be visible. You cannot activate/deactivate DAG via UI or API, this can only be done by removing files from the DAGS_FOLDER . Once again - no data for historical runs of the DAG are lost when it is deactivated by the scheduler. Note that the Active tab in Airflow UI refers to DAGs that are not both Activated and Not paused so this might initially be a little confusing.

You can’t see the deactivated DAGs in the UI - you can sometimes see the historical runs, but when you try to see the information about those you will see the error that the DAG is missing.

You can also delete the DAG metadata from the metadata database using UI or API, but it does not always result in disappearing of the DAG from the UI - which might be also initially a bit confusing. If the DAG is still in DAGS_FOLDER when you delete the metadata, the DAG will re-appear as Scheduler will parse the folder, only historical runs information for the DAG will be removed.

This all means that if you want to actually delete a DAG and its all historical metadata, you need to do it in three steps:

pause the DAG

delete the historical metadata from the database, via UI or API

delete the DAG file from the DAGS_FOLDER and wait until it becomes inactive

Was this entry helpful?

Search code, repositories, users, issues, pull requests...

Provide feedback.

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly.

To see all available qualifiers, see our documentation .

  • Notifications

Conditional Task Skip option #35098

@match-jasoncavnar

match-jasoncavnar Oct 20, 2023

Beta Was this translation helpful? Give feedback.

Replies: 2 comments

Boring-cyborg[bot] bot oct 20, 2023, abhishekbhakat oct 21, 2023.

@match-jasoncavnar

This discussion was converted from issue #35089 on October 21, 2023 09:26.

  • Numbered list
  • Unordered list
  • Attach files

Select a reply

IMAGES

  1. How to create a conditional task in Airflow

    conditional task airflow

  2. How to create a conditional task in Airflow

    conditional task airflow

  3. How to create a multiple conditional tasks in Airflow?

    conditional task airflow

  4. How to create a conditional task in Airflow

    conditional task airflow

  5. python

    conditional task airflow

  6. [Solved] How to create a conditional task in Airflow

    conditional task airflow

VIDEO

  1. un conditional place 😰 Very risky working please support me

  2. As temperatures climb, so do A/C service calls

  3. Airflow docker Installation

  4. Conditional Formatting

  5. How to Chain Multiple DAG's Together in Airflow!

  6. The ExternalPythonOperator: No more dependency conflicts in Apache Airflow

COMMENTS

  1. Use conditional tasks with Apache Airflow

    Use conditional tasks with Apache Airflow Guillaume Payen · Follow 6 min read · Jul 27, 2018 -- 3 One of the great things about Apache Airflow is that it allows to create simple and also...

  2. How to create a conditional task in Airflow

    3 Answers Sorted by: 93 Airflow 2.x Airflow provides a branching decorator that allows you to return the task_id (or list of task_ids) that should run: @task.branch(task_id="branch_task") def branch_func(ti): xcom_value = int(ti.xcom_pull(task_ids="start_task")) if xcom_value >= 5: return "big_task" # run just this one task, skip all else

  3. Tasks

    A Task is the basic unit of execution in Airflow. Tasks are arranged into DAGs, and then have upstream and downstream dependencies set between them into order to express the order they should run in. There are three basic kinds of Task: Operators, predefined task templates that you can string together quickly to build most parts of your DAGs.

  4. Dynamic Task Mapping

    Dynamic Task Mapping allows a way for a workflow to create a number of tasks at runtime based upon current data, rather than the DAG author having to know in advance how many tasks would be needed.

  5. How to conditionally skip tasks in an Airflow DAG

    In Airflow, we have the Sensors to trigger tasks when we observe a desired external state. In this case, I am going to use the PythonSensor , which runs a Python function and continues running the DAG if the value returned by that function is truthy - boolean True or anything that produces True after being cast to a boolean.

  6. Working with TaskFlow

    This tutorial builds on the regular Airflow Tutorial and focuses specifically on writing data pipelines using the TaskFlow API paradigm which is introduced as part of Airflow 2.0 and contrasts this with DAGs written using the traditional paradigm.

  7. Managing Conditional Tasks in Airflow

    In Airflow, conditional tasks are managed using the BranchPythonOperator and the ShortCircuitOperator. The BranchPythonOperator is a Python function that returns a string that represents the next task to be executed. The ShortCircuitOperator is a sensor that stops the execution of the DAG if a condition is met.

  8. Branching in Airflow

    In this guide, you'll learn how you can use @task.branch (BranchPythonOperator) and @task.short_circuit (ShortCircuitOperator), other available branching operators, and additional resources to implement conditional logic in your Airflow DAGs. Assumed knowledge To get the most out of this guide, you should have an understanding of:

  9. 5 Complex task dependencies · Data Pipelines with Apache Airflow

    Showing how to make conditional tasks in an Airflow DAG, which can be skipped under certain conditions. Giving a basic idea of how trigger rules function in Airflow and how this affects the execution of your tasks. In previous chapters, we've seen how to build a basic DAG and define simple dependencies between tasks.

  10. #67 Airflow (part 3): Conditional tasks by weekday

    Change the database if needed. Set up own password and username for airflow. Then just go to the folder containings dag and file docker-compose and write this command to cmd: docker-compose up -d. Executor: LocalExecutor. Remember to check localhost:8080 to see if you can log in airflow or not.

  11. Effective Use of Branching and Conditional Logic in Airflow

    Apache Airflow is an open-source platform used to programmatically author, schedule, and monitor workflows. As a workflow management system, it allows developers to think of the workflow as a directed acyclic graph (DAG) of tasks. In complex workflows, conditional execution and branching are key features that enable sophisticated job ...

  12. Creating a Conditional Task in Airflow

    Airflow is an open-source platform used for orchestrating complex workflows. It allows users to define, schedule, and monitor tasks as directed acyclic graphs (DAGs). One of the powerful features of Airflow is the ability to create conditional tasks, which allows for more flexibility and control in workflow execution. In this article, we will explore how […]

  13. python

    Creating a conditional task in Airflow with Python involves using the `BranchPythonOperator` to create a task that determines the next task to execute based on a condition. Here are 8 examples with step-by-step explanations: Example 1: ... The `condition_task` checks if the current minute is even or odd and returns the task_id of the ...

  14. How do you call tasks conditionally in Airflow?

    1 Answer Sorted by: 0 You could use the BranchPythonOperator to create a task that determines whether to call pub_result or some other DummyTask. Here is an example from the Airflow Github repo. Share Improve this answer Follow answered May 13, 2020 at 23:30

  15. 14. Tasks Dependencies in Airflow

    In this article, we will explore 4 different types of task dependencies: linear, fan out/in, branching, and conditional. Linear dependencies The simplest dependency among Airflow tasks is linear ...

  16. DAGs

    A DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run. Here's a basic example DAG: It defines four Tasks - A, B, C, and D - and dictates the order in which they have to run, and which tasks depend on what others.

  17. How to Skip Tasks in Airflow DAGs

    Skipping tasks in Airflow DAGs based on specific conditions Giorgos Myrianthous · Follow Published in Towards Data Science · 9 min read · Feb 7, 2023 Photo by Hello I'm Nik on Unsplash Recently, I was attempting to add a new task in an existing Airflow DAG that would only run on specific days of the week.

  18. airflow

    1 Answer Sorted by: 2 Primary problem in your code The dag-definition-file is continuously parsed by Airflow in background and the generated DAGs & tasks are picked by scheduler. The way your file wires tasks together creates several problems

  19. Conditional Task Skip option · apache airflow

    Use case/motivation. We would like to run our DAG in three environments; Dev, QA, and Prod. In Dev and QA we have sensors and file download tasks that would fail because the files do not exist in the environments. The skip keyword would allow us to conditionally set if the task should be skipped based on the DAG running in our DEV and QA ...

  20. How to create a conditional task in Airflow

    How to create a conditional task in Airflow - Coding Discuss How to create a conditional task in Airflow All tasks above are SSHExecuteOperator. Here is the idea to solve your problem: Alexis.Rolland BranchPythonOperator that can be used to express the branching dependency more directly.

  21. How to run airflow DAG with conditional tasks

    1 First of all, dependency is not correct, this should work: task_1 >> [task_2 , task_3] >> task_4 >> task_5 >> task_6 It is not possible to order tasks with list_1 >> list_2, but there are helper methods to provide this, see: cross_downstream. For branching, you can use BranchPythonOperator with changing trigger rules of your tasks.