Workflow-менеджмент с Apache Airflow
Для чего нужна оркестрация
Если ваша работа с данными в хранилище сводится к тому, чтобы сделать ad-hoc экспорт в Excel, то наверняка, что в оркестрации нет смысла. Однако если компания получает инсайты из данных, принадлежащих ей или это компания, построенная вокруг обработки данных, то процесс обязательно будет намного сложнее.
К примеру, ваша компания обрабатывает большие массивы данных и предоставляет своим клиентам ценные советы: в таком случае процесс может выглядеть следующим образом. Каждую ночь в S3-бакете у вашего провайдера данных появляются файлы с «сырыми» данными. Но так получилось, что ваша компания выбрала Google Cloud Platform, потому в первую очередь необходимо перенести эти файлы в корпоративный Cloud Storage. После этого данные необходимо загрузить в хранилище данных BigQuery, где будет происходить дальнейшая обработка. Уже в BigQuery сложными SQL скриптами вы фильтруете ошибочные записи, производите агрегацию и обогащение данных.
Представим, что полученные данные необходимо каким-то образом провалидировать через сторонний API, но данных так много, что этот процесс нужно распараллелить, поэтому вам необходимо создать Dataproc кластер, куда будет отправляться определенный файл с кодом на Spark, а результат сохранить в специальную таблицу BigQuery. Когда результирующая таблица уже обновлена, вам необходимо триггернуть обновление Tableau дашборда, который уже подключен к этой же таблице, чтобы утром конечный пользователь видел новые данные.
Возможны и гораздо более сложные пайплайны, где одни пайплайны зависят от других.
Именно для того чтобы описанный выше процесс происходил по графику и в определенной последовательности, вам нужен workflow менеджер. Последние несколько лет де-факто стандартным инструментом считается Apache Airflow.
История
Airflow появился на свет в качестве внутреннего проекта компании Airbnb в 2014 году, но с самого начала он был open-source. Автором проекта является Максим Бушемин, который также разработал BI-инструмент с открытым кодом — Apache Superset. Уже в 2016 году проект переходит в Apache Incubator, а в 2019 году становится top-level проектом Apache Software Foundation. Проект написан на Python, на нем и описывается вся последовательность задач.
Компоненты
Конфигурация зависит от того, какое количество параллельных задач должно выполнять этот инструмент. Уже от конфигурации зависит какие компоненты вам необходимы. Обязательные компоненты:
- Metadata database — это база данных, где Airflow хранит всю необходимую цель-информацию о текущих и предыдущих задачах, их статусе и результате выполнения. Рекомендуется использовать СУБД Postgres для более стабильной и эффективной работы, однако, при необходимости можно подключить MySQL, MSSQL и SQLite (с ограничениями).
- Scheduler — компонент системы, парящий файлы с описанными пайплайнами и, при необходимости, передает на исполнение в Executor.
- Webserver — это приложение, использующее Flask и запускаемое через gunicorn. Основная роль этого компонента – это графическое отображение пайплайна и хода его выполнения.
Компоненты, зависящие от конфигурации и задач:
- Triggerer — по сути, является имплементацией event-loop для асинхронных операторов, впервые появившихся в версии 2.2. Сейчас таких операторов очень мало, поэтому если вы не планируете их использовать, то нет необходимости запускать компоненту Triggerer.
- Worker – это модифицированный воркер из библиотеки Celery. Вы можете запустить столько воркеров, сколько вам нужно, но для этого нужно указать CeleryExecutor .
Исполнение
Очевидно, что Python-код, описывающий задачи и их порядок, должен где-то выполняться. Именно этим и занимается Executor. Airflow поддерживает несколько типов исполнителей:
Тип Executor’а | Где выполняются задачи |
SequentialExecutor | Локально, в основном потоке выполнения |
LocalExecutor | Локально, в отдельных процессах операционной системы |
CeleryExecutor | В Celery воркерах |
DaskExecutor | В узлах Dask кластера |
KubernetesExecutor | В Kubernetes подах |
Для production-а чаще всего используются CeleryExecutor и KubernetesExecutor. Это нужно учесть и при построении пайплайна, чтобы между отдельными задачами не было зависимостей, кроме прописанных в самом пайплайне. То есть каждая задача будет выполняться в собственной изолированной среде и, скорее всего, в разных физических компьютерах, поэтому последовательность задач скачивания файла на диск и загрузить файл в облачное хранилище не будет рабочей.
Архитектуры
Как вы уже догадались, Apache Airflow – это достаточно гибкий инструмент, поэтому его можно конфигурировать точно под конкретные требования. Однако существуют две общеупотребительные архитектуры: single-node и multi-node.
Установка
Есть несколько разных способов установить и запустить Airflow.
- Через менеджер пакетов PIP.
Наверное, это самый удобный способ. Сначала придется установить все необходимые зависимости, затем настроить и запустить СУБД по вашему выбору. Конечно, вы можете выбрать SQLite, однако тогда придется использовать SequentialExecutor и пожертвовать параллельностью выполнения задач. После этого желательно создать и активировать виртуальное окружение для Python и уже тогда можно приступать к установке и запуску Airflow компонентов.
1 2 3 |
python -m pip install apache-airflow airflow webserver airflow scheduler |
Трудно перечислить, сколько может возникнуть нюансов при этом варианте. Если вы используете Linux или MacOS, то наверняка проблем будет не так много. Однако для Windows придется самостоятельно компилировать и устанавливать некоторые дополнительные библиотеки, поэтому лучше сразу использовать WSL .
2. Через отдельные Docker-контейнеры.
Наверное, что этот метод – не самый удобный для установки на локальной машине, но может подойти, если вам нужно вручную настраивать Airflow-кластер на своих bare-metal серверах.
1 2 3 |
docker run … postgres docker run … apache/airflow scheduler docker run … apache/airflow webserver |
3. Через docker-compose.
Это наиболее удобный способ локального запуска. Для этого нужно скомпоновать свой docker-compose . yaml файл, или воспользоваться уже готовыми вариантами и выполнить
1 |
docker-compose up |
Более подробно об этом способе написано в одном из официальных туториалов .
4. Через Astronomer CLI.
Последний способ, который я бы хотел назвать, не универсален, но он прекрасно подходит для локальной разработки, особенно если ваша компания использует managed-версию Apache Airflow от компании Astronomer. Для начала работы необходимо установить Astronomer CLI и выполнить:
1 2 |
astro dev init astro dev start |
Кроме удобства настройки локальной разработки через Astronomer CLI очень легко деплоить.
Managed версии
Учитывая популярность этого инструмента, появление managed-версий у облачных провайдеров было только вопросом времени. Сейчас Airflow находится на Google Cloud Platform под названием Cloud Composer и на AWS как Managed Workflows for Apache Airflow (MWAA). Также облачную версию предлагает провайдер Cloud Data Lake Qubole.
Отдельно хотелось бы рассмотреть компанию Astronomer, основанную и в которой работают топ-контрибьюторы опенсорсного проекта. Также эта компания проводит много вебинаров, посвященных этому инструменту, и имеет два подготовительных курса по сертификации с Apache Airflow.
Интерфейс
Галерею скриншотов можно посмотреть здесь .
Концепт
Перейдем к рассмотрению основных сущностей, которые нужно знать, когда вы почитаете работу по Airflow.
Сущности
Основной сущностью workflow является DAG (directed acyclic graph) – ориентированный ациклический граф, объединяющий все задачи в один пайплайн. Понятие DAG-а встречается и в других популярных инструментах, например в Apache Spark.
Узлы этого графа — Task , являющийся экземпляром класса Operator . Операторы условно можно разделить на:
- Action operator — выполняет определенное действие, как DataprocSubmitSparkJobOperatorи
- Transfer operator — переносит данные из одного места в другое, как S3ToGCSOperator
- Sensor operator — ожидает определенного события, как BigQueryTablePartitionExistenceSensor
В основном пайплайны запускаются с определенной регулярностью, поэтому существует понятие Task Instance — экземпляр определенного оператора с меткой времени, когда произошел запуск пайплайна. Для примера:
- PythonOperator – выполняет определенную python функцию.
- BashOperator – запускает bash команду или скрипт.
- PostgresOperator – выполняет SQL запрос в PostgreSQL.
- KubernetesPodOperator – запускает pod на кластере и выполняет определенные команды.
А через веб-интерфейс есть возможность создавать Variables вроде переменных окружения и Connections – данные необходимые для подключения для внешних ресурсов.
Обе сущности хранятся в базе метаданных и доступны из всех компонентов системы, причем пароли для Connection хранятся в зашифрованном виде.
Также нужно упомянуть и о Hook – интерфейсе для работы с внешними сервисами. Хуки – это обертки над популярными библиотеками для работы с определенным инструментом, например, хуки для работы с реляционными СУБД используют SQLAlchemy, а хуки для работы с AWS используют библиотеку boto3.
Хуков и операторов сейчас насчитывается около 850, поэтому актуальна идея реестра для удобного поиска. Как раз это недавно было имплементировано в Astronomer Registry. Но даже если вы не нашли нужный вам оператор, то вы можете с легкостью написать свой, следуя классу BaseOperator.
Дополнительные библиотеки
Airflow имеет модульную структуру и для полноценной работы в определенной конфигурации требуется дополнительные библиотеки, которые могут устанавливаться вместе с Airflow. Например, если вы хотите настроить CeleryExecutor, вам необходимо также установить библиотеки celery и redis, если вы хотите настроить другую аутентификацию, то вам необходимы соответствующие библиотеки. Это все core extras.
Сейчас Airflow поддерживает работу с более чем 50 сторонними ресурсами, но нет смысла их все одновременно устанавливать. То есть, если для работы вам нужна только интеграция с Snowflake и AWS, то вы устанавливаете соответствующие интеграции и не ставите библиотеки для работы, например, с Oracle, Neo4j и MongoDB. Это providers extras.
Более подробно об этом в официальной документации.
Создание DAG-ов
Основные моменты
Мы пришли к основному вопросу: как выглядит пайплайн в Airflow.
Сначала импортируем необходимые библиотеки и функции.
1 2 3 4 5 |
import requests import pandas as pd from pathlib import Path from airflow. models import DAG from airflow. Operators . python import PythonOperator |
Далее напишем две python-функции, которые производят определенные действия. Но хочу уточнить, что такой подход нежелателен потому, что чаще всего для production используется CeleryExecutor или KubernetesExecutor, поэтому вторая функция вернет ошибку, потому что в ее окружении не будет файла.
1 2 3 4 5 6 7 8 9 |
def download_data_fn(): url = 'https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv' resp = requests. get (url) Path ( 'titanic.csv' ). write_text (resp.content. decode ()) def pivot_data_fn (): df=pd. read_csv ( 'titanic.csv' ) df=df. pivot_table (index= 'Sex' , columns= 'Pclass' , values= 'Name' , aggfunc= 'count' ) df. reset_index (). to_csv ( 'titanic_pivoted.csv' ) |
Ну и дальше создаем экземпляр DAG-а и экземпляры операторов – таски. Первое условие dag_id
должно быть уникальным в пределах всего деплоймента. Вторая – все task_id
также должны быть уникальными в пределах одного DAG-а.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
<span class="hljs-function">with <span class="hljs-title">DAG ( <span class="hljs-params">dag_id= 'titanic_dag' , schedule_interval= '*/9 * * * *' ) as dag: download_data = PythonOperator( task_id= 'download_data' , python_callable=download_data_fn, dag=dag, ) pivot_data = PythonOperator( task_id= 'pivot_data' , python_callable=pivot_data_fn, dag=dag, ) </span></span></span> |
Начиная со второй версии Airflow, нет необходимости давать на вход оператору экземпляр DAG-а, если оператор находится в контексте DAG-а.
Следующим шагом указываем зависимости между тасками, то есть порядок их выполнения.
1 2 3 4 |
download_data >> pivot_data # pivot_data << download_data # download_data .set_downstream(pivot_data) # pivot_data.set_upstream(download_data) |
Замечу, что каждый из вышеуказанных вариантов создает ту же зависимость, и вы можете использовать любой из них. Рекомендуется использовать либо только вызовы метода, либо только битшифт оператор.
Написанный файл должен быть в директории, где Airflow ищет DAG. По умолчанию это $AIRFLOW_HOME/dags. После этого уже scheduler будет ставить этот пайплайн в очередь исполнения, а Executor будет выполнять этот пайплайн каждые девять минут.
Taskflow API
Также в Airflow 2.0 появилась возможность значительно упростить использование функций.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import requests import pandas as pd from pathlib import Path from airflow. models import DAG from airflow. decorators import task with DAG (dag_id= 'titanic_dag' , schedule_interval= '*/9 * * * *' ) as <span class="hljs-attr">dag : @task def download_data (): url = 'https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv' resp = requests. get (url) Path ( 'titanic.csv' ). write_text (resp. content . decode ()) @task def pivot_data (): df=pd. read_csv ( 'titanic.csv' ) df=df. pivot_table (index= 'Sex' , columns= 'Pclass' , values= 'Name' , aggfunc= 'count' ) df. reset_index (). to_csv ( 'titanic_pivoted.csv' ) </span> |
В этом примере декоратор @task принимает на вход python-функцию и возвращает экземпляр PythonOperator. При этом зависимости прописываются следующими способами:
1 2 3 4 |
pivot_data(download_data()) # download_data_op = download_data() # pivot_data_op = pivot_data() # download_data_op >> pivot_data_op |
Немного о start_date и schedule_interval
Почти у каждого, кто только начинает работу с Airflow, возникают проблемы с пониманием интервалов и дат. Перед тем, как начать, стоит уточнить, что Airflow использует только UTC в процессе работы.
Каждый DAG принимает такие параметры как start_date и schedule_interval . start_date, определяет, с какого времени можно запускать пайплайн, а schedule_interval показывает, с какой периодичностью необходимо запускать. Интересно, что schedule_interval может быть относительным, если мы указали timedelta или абсолютным, если это cron expression.
В первом случае, запуск пайплана состоится в start_date + schedule_interval , но execution_date будет равна start_date . Во втором же случае логика scheduler-а следующая – первый execution_date – это первая дата и время после start_date , удовлетворяющая условиям schedule_interval , а первый запуск пайплайна, соответственно, это следующие такие дата и время.
Чтобы описать расписание, изображенное на рисунке выше, мы можем использовать:
- schedule_interval = timedelta(hours=1) и start_date = datetime(2021, 01, 26, 5, 0, 0)
- schedule_interval = ‘0 * * * *’, а start_date может быть в любое время от datetime(2021, 01, 26, 4, 0, 1) до datetime(2021, 01, 26, 5, 0, 0)
Из-за определенной путаницы, разработчики решили отказаться от этого названия и приняли решение, что у каждого запуска будут два параметра data_interval_start и data_interval_end . Первый показывает, когда последний раз запускался пайплайн, а второй – текущий запуск.
XComs
Что делать, если между задачами дополнительные зависимости, а не только условие, что «Задача Б» должна запускаться после «Задачи Б»?
Эту проблему решают XComs ( cross-task comminications ). Благодаря этому механизму задача может записывать в базу метаданных любые данные, а другая задача – их читать. Возьмем за основу прошлый пример, но в этом случае допустим, что название файла может быть произвольным и будет генерироваться первой задачей.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
def download_data_fn(**context): filename = 'titanic.csv' url = 'https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv' resp = requests. get (url) Path (filename). write_text (resp.content. decode ()) #context[ 'ti' ]. xcom_push (key= 'filename' , value=filename) # option 1 return filename # option 2 def pivot_data_fn (ti, **context): #filename=ti. xcom_pull (task_ids=[ 'download_data' ], key= 'filename' ) # option 1 filename = ti. xcom_pull (task_ids=[ 'download_data' ], key= 'return_value' ) # option 2 df = pd. read_csv (filename) df=df. pivot_table (index= 'Sex' , columns= 'Pclass' , values= 'Name' , aggfunc= 'count' ) df. reset_index (). to_csv ( 'titanic_pivoted.csv' ) with DAG (dag_id= 'titanic_dag' , schedule_interval= '*/9 * * * *' ) as dag: download_data = PythonOperator ( task_id= 'download_data' , python_callable=download_data_fn, provide_context=True, ) pivot_data = PythonOperator ( task_id= 'pivot_data' , python_callable=pivot_data_fn, provide_context=True, ) download_data >> pivot_data |
Как видим, существует несколько способов записывать данные в XCom, однако следует быть внимательными и передавать через этот механизм лишь небольшие данные. Во-первых, это замедлит работу базы, а превышение определенных лимитов, зависящих от СУБД, вызовет ошибку, во-вторых – следует помнить, что Airflow – это оркестратор, и он не предназначен для обработки данных.
Недостатки
Трудно ошибиться, если сказать, что у каждого довольно сложного инструмента есть определенные особенности, к которым можно привыкнуть, и определенные недостатки, очень мешающие. Для меня таким недостатком есть сложный процесс разработки.
То есть для того чтобы начать разрабатывать, необходимо установить и запустить Airflow локально. А для того чтобы оттестировать, необходимо прописать все Connections и Variables. Количество первых может считаться десятками, а вторых – сотнями. Это не говоря о том, что для тестирования необходимо иметь development и/или staging окружение.
Альтернативы
Несмотря на лидирующие позиции Airflow как workflow менеджера, у него есть конкуренты. Все, что я хотел бы привести в пример, являются open-source проектами и имеют определенное сообщество вокруг себя.
Наиболее перспективным сейчас выглядит Dagster , на это намекающий раунд А инвестиций в компанию Elementl , занимающуюся разработкой и коммерциализацией этого инструмента. Еще совсем недавно основным конкурентом Airflow можно было считать инструмент Prefect , который тоже разрабатывается одноименной компанией. Стремительно теряет позиции Spotify Luigi , однако все еще используется во многих компаниях из-за легкости в использовании. Ну и в конце я хотел бы упомянуть Apache Oozie как представителя экосистемы Apache Hadoop. Этот проект очень медленно развивается, но имеет узкую нишу. Также я бы посоветовал обратить внимание на такие инструменты как Flyte , разрабатываемый компанией Lyft и Azkaban., в котором много контрибьют HashiCorp.
Наверное, к альтернативам можно было бы отнести и такие инструменты как dbt , Apache NiFi , а также AWS Glue, Azure Data Factory, Databricks Live Tables и множество подобных инструментов для построения ETL/ELT пайплайнов.
Перевод статьи Дмитри Казанжи.