
Although Airflow can send an email upon task failure, sometime it’s not good enough for you. You may have other kind of communication channels like Slack, or you may just want to post it to the SNS like Twitter. In this case, on_success_callback and on_failure_callback could be a good choice. These callback functions are called TaskStateChangeCallbacks.
TaksStateChangeCallback is a python Callable which receives Airflow Context as its the only argument and returns None.1) So you can define your own TaskStateChangeCallbacks in your DAGs like this.
def my_callback(context):
print('Callback works')
The context parameter hasn’t been used in the function yet, it’ll be discussed later on. Now this callback function can be called from tasks like this.
with DAG(
...
) as dag:
task = SimpleHttpOperator(
...
on_success_callback=my_callback,
on_failure_callback=my_callback,
)
If you open the log of the above task, you can see the message has been printed out from the callback function.
Now that you know how to make and execute a callback function upon task success or failure, the next step is calling webhooks from the callback function. There can be many options. You can use python http.client2) module directly or more high level modules like requests3). But with Airflow we have another option. A SimpleHttpOperator which can also be used asAirflow tasks. I’ll assume that you already have enough knowledge to define your own. But if not in the case, you may have a look on the official documentation for an example. One big difference from when you’re defining the SimpleHttpOperator as a task belonged to a DAG is you have to execute it by yourself. So our callback function look like this now.
def my_callback(context):
webhook = SimpleHttpOperator(
...
)
webhook.execute(context=context)
Now you can see the webhook is being called upon task success or failure.
There’s only one thing remains. Tailoring the message as you want. When calling the webhook, you may want it contains informations related with the task execution. Now it’s the time to talk about the Context parameter of the callback function.
Context is a dictionary which contains references to related objects to the task instance and is documented under the macros section of the API. The reference for the available keys can be found from here. For example, you can print out various task information from the callback function like this:
import pendulum
def my_callback(context):
print('Callback works')
dt_fmt = 'YYYY-MM-DD HH:mm:ss zz'
msg = """
DagID: {dag}
TaskID: {task}
Tags: {tags}
Start Time: {start_date}
End Time: {end_date}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
tags=", ".join(context.get('dag').tags),
start_date=pendulum.instance(context.get('task_instance').start_date).format(dt_fmt),
end_date=pendulum.instance(context.get('task_instance').end_date).format(dt_fmt),
)
print(msg)
Then you may get following output from the task execution log.
Callback works
DagID: my-dag-id
TaskID: my-task-id
Tags: my-tag-1, my-tag-2
Start Time: 2022-02-13 20:33:54 UTC
End Time: 2022-02-13 20:34:55 UTC
Now you can make your own Airflow callback function which calls your webhooks on task success or failure.