Airflow это: ликбез инженеру данных про ETL в Big Data

Airflow — инструмент, чтобы удобно и быстро разрабатывать и поддерживать batch-процессы обработки данных / Хабр

Привет, Хабр! В этой статье я хочу рассказать об одном замечательном инструменте для разработки batch-процессов обработки данных, например, в инфраструктуре корпоративного DWH или вашего DataLake. Речь пойдет об Apache Airflow (далее Airflow). Он несправедливо обделен вниманием на Хабре, и в основной части я попытаюсь убедить вас в том, что как минимум на Airflow стоит смотреть при выборе планировщика для ваших ETL/ELT-процессов.

Ранее я писал серию статей на тему DWH, когда работал в Тинькофф Банке. Теперь я стал частью команды Mail.Ru Group и занимаюсь развитием платформы для анализа данных на игровом направлении. Собственно, по мере появления новостей и интересных решений мы с командой будем рассказывать тут о нашей платформе для аналитики данных.


Пролог

Итак, начнем. Что такое Airflow? Это библиотека (ну или набор библиотек) для разработки, планирования и мониторинга рабочих процессов. Основная особенность Airflow: для описания (разработки) процессов используется код на языке Python. Отсюда вытекает масса преимуществ для организации вашего проекта и разработки: по сути, ваш (например) ETL-проект — это просто Python-проект, и вы можете его организовывать как вам удобно, учитывая особенности инфраструктуры, размер команды и другие требования. Инструментально всё просто. Используйте, например, PyCharm + Git. Это прекрасно и очень удобно!

Теперь рассмотрим основные сущности Airflow. Поняв их суть и назначение, вы оптимально организуете архитектуру процессов. Пожалуй, основная сущность — это Directed Acyclic Graph (далее DAG).


DAG

DAG — это некоторое смысловое объединение ваших задач, которые вы хотите выполнить в строго определенной последовательности по определенному расписанию. Airflow представляет удобный web-интерфейс для работы с DAG’ами и другими сущностями:

DAG может выглядеть таким образом:

Разработчик, проектируя DAG, закладывает набор операторов, на которых будут построены задачи внутри DAG’а. Тут мы приходим еще к одной важной сущности: Airflow Operator.


Операторы

Оператор — это сущность, на основании которой создаются экземпляры заданий, где описывается, что будет происходить во время исполнения экземпляра задания. Релизы Airflow с GitHub уже содержат набор операторов, готовых к использованию. Примеры:


  • BashOperator — оператор для выполнения bash-команды.
  • PythonOperator — оператор для вызова Python-кода.
  • EmailOperator — оператор для отправки email’а.
  • HTTPOperator — оператор для работы с http-запросами.
  • SqlOperator — оператор для выполнения SQL-кода.
  • Sensor — оператор ожидания события (наступления нужного времени, появления требуемого файла, строки в базе БД, ответа из API — и т. д., и т. п.).

Есть более специфические операторы: DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator.

Вы также можете разрабатывать операторы, ориентируясь на свои особенности, и использовать их в проекте.

Например, мы создали MongoDBToHiveViaHdfsTransfer, оператор экспорта документов из MongoDB в Hive, и несколько операторов для работы с ClickHouse: CHLoadFromHiveOperator и CHTableLoaderOperator. По сути, как только в проекте возникает часто используемый код, построенный на базовых операторах, можно задуматься о том, чтобы собрать его в новый оператор. Это упростит дальнейшую разработку, и вы пополните свою библиотеку операторов в проекте.

Далее все эти экземпляры задачек нужно выполнять, и теперь речь пойдет о планировщике.


Планировщик

Планировщик задач в Airflow построен на Celery. Celery — это Python-библиотека, позволяющая организовать очередь плюс асинхронное и распределенное исполнение задач. Со стороны Airflow все задачи делятся на пулы. Пулы создаются вручную. Как правило, их цель — ограничить нагрузку на работу с источником или типизировать задачи внутри DWH. Пулами можно управлять через web-интерфейс:

Каждый пул имеет ограничение по количеству слотов. При создании DAG’а ему задается пул:

ALERT_MAILS =  Variable.get("gv_mail_admin_dwh")
DAG_NAME = 'dma_load'
OWNER = 'Vasya Pupkin'
DEPENDS_ON_PAST = True
EMAIL_ON_FAILURE = True
EMAIL_ON_RETRY = True
RETRIES = int(Variable.get('gv_dag_retries'))
POOL = 'dma_pool'
PRIORITY_WEIGHT = 10
start_dt = datetime.today() - timedelta(1)
start_dt = datetime(start_dt.year, start_dt.month, start_dt.day)
default_args = {
    'owner': OWNER,
    'depends_on_past': DEPENDS_ON_PAST,
    'start_date': start_dt,
    'email': ALERT_MAILS,
    'email_on_failure': EMAIL_ON_FAILURE,
    'email_on_retry': EMAIL_ON_RETRY,
    'retries': RETRIES,
    'pool': POOL,
    'priority_weight': PRIORITY_WEIGHT
}
dag = DAG(DAG_NAME, default_args=default_args)
dag.doc_md = __doc__

Пул, заданный на уровне DAG’а, можно переопределить на уровне задачи.

За планировку всех задач в Airflow отвечает отдельный процесс — Scheduler. Собственно, Scheduler занимается всей механикой постановки задачек на исполнение. Задача, прежде чем попасть на исполнение, проходит несколько этапов:


  1. В DAG’е выполнены предыдущие задачи, новую можно поставить в очередь.
  2. Очередь сортируется в зависимости от приоритета задач (приоритетами тоже можно управлять), и, если в пуле есть свободный слот, задачу можно взять в работу.
  3. Если есть свободный worker celery, задача направляется в него; начинается работа, которую вы запрограммировали в задачке, используя тот или иной оператор.

Достаточно просто.

Scheduler работает на множестве всех DAG’ов и всех задач внутри DAG’ов.

Чтобы Scheduler начал работу с DAG’ом, DAG’у нужно задать расписание:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='@hourly')

Есть набор готовых preset’ов: @once, @hourly, @daily, @weekly, @monthly, @yearly.

Также можно использовать cron-выражения:

dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='*/10 * * * *')

Execution Date

Чтобы разобраться в том, как работает Airflow, важно понимать, что такое Execution Date для DAG’а. В Airflow DAG имеет измерение Execution Date, т. е. в зависимости от расписания работы DAG’а создаются экземпляры задачек на каждую Execution Date. И за каждую Execution Date задачи можно выполнить повторно — или, например, DAG может работать одновременно в нескольких Execution Date. Это наглядно отображено здесь:

К сожалению (а может быть, и к счастью: зависит от ситуации), если правится реализация задачки в DAG’е, то выполнение в предыдущих Execution Date пойдет уже с учетом корректировок. Это хорошо, если нужно пересчитать данные в прошлых периодах новым алгоритмом, но плохо, потому что теряется воспроизводимость результата (конечно, никто не мешает вернуть из Git’а нужную версию исходника и разово посчитать то, что нужно, так, как нужно).


Генерация задач

Реализация DAG’а — код на Python, поэтому у нас есть очень удобный способ сократить объем кода при работе, например, с шардированными источниками. Пускай у вас в качестве источника три шарда MySQL, вам нужно слазить в каждый и забрать какие-то данные. Причем независимо и параллельно. Код на Python в DAG’е может выглядеть так:

connection_list = lv.get('connection_list')
export_profiles_sql = '''
SELECT
  id,
  user_id,
  nickname,
  gender,
  {{params.shard_id}} as shard_id
FROM profiles
'''
for conn_id in connection_list:
    export_profiles = SqlToHiveViaHdfsTransfer(
        task_id='export_profiles_from_' + conn_id,
        sql=export_profiles_sql,
        hive_table='stg.profiles',
        overwrite=False,
        tmpdir='/data/tmp',
        conn_id=conn_id,
        params={'shard_id': conn_id[-1:], },
        compress=None,
        dag=dag
    )
    export_profiles.set_upstream(exec_truncate_stg)
    export_profiles.set_downstream(load_profiles)

DAG получается таким:

При этом можно добавить или убрать шард, просто скорректировав настройку и обновив DAG. Удобно!

Можно использовать и более сложную генерацию кода, например работать с источниками в виде БД или описывать табличную структуру, алгоритм работы с таблицей и с учетом особенностей инфраструктуры DWH генерировать процесс загрузки N таблиц к вам в хранилище.

Или же, например, работу с API, которое не поддерживает работу с параметром в виде списка, вы можете сгенерировать по этому списку N задач в DAG’е, ограничить параллельность запросов в API пулом и выгрести из API необходимые данные. Гибко!


Репозиторий

В Airflow есть свой бекенд-репозиторий, БД (может быть MySQL или Postgres, у нас Postgres), в которой хранятся состояния задач, DAG’ов, настройки соединений, глобальные переменные и т. д., и т. п. Здесь хотелось бы сказать, что репозиторий в Airflow очень простой (около 20 таблиц) и удобный, если вы хотите построить какой-либо свой процесс над ним. Вспоминается 100500 таблиц в репозитории Informatica, которые нужно было долго вкуривать, прежде чем понять, как построить запрос.


Мониторинг

Учитывая простоту репозитория, вы можете сами построить удобный для вас процесс мониторинга задачек. Мы используем блокнот в Zeppelin, где смотрим состояние задач:

Это может быть и web-интерфейс самого Airflow:

Код Airflow открыт, поэтому мы у себя добавили алертинг в Telegram. Каждый работающий инстанс задачи, если происходит ошибка, спамит в группу в Telegram, где состоит вся команда разработки и поддержки.

Получаем через Telegram оперативное реагирование (если такое требуется), через Zeppelin — общую картину по задачам в Airflow.


Итого

Airflow в первую очередь open source, и не нужно ждать от него чудес. Будьте готовы потратить время и силы на то, чтобы выстроить работающее решение. Цель из разряда достижимых, поверьте, оно того стоит. Скорость разработки, гибкость, простота добавления новых процессов — вам понравится. Конечно, нужно уделять много внимания организации проекта, стабильности работы самого Airflow: чудес не бывает.

Сейчас у нас Airflow ежедневно отрабатывает около 6,5 тысячи задач. По характеру они достаточно разные. Есть задачи загрузки данных в основное DWH из множества разных и очень специфических источников, есть задачи расчета витрин внутри основного DWH, есть задачи публикации данных в быстрое DWH, есть много-много разных задач — и Airflow все их пережевывает день за днем. Если же говорить цифрами, то это 2,3 тысячи ELT задач различной сложности внутри DWH (Hadoop), около 2,5 сотен баз данных источников, это команда из 4-ёх ETL разработчиков, которые делятся на ETL процессинг данных в DWH и на ELT процессинг данных внутри DWH и конечно ещё одного админа, который занимается инфраструктурой сервиса.


Планы на будущее

Количество процессов неизбежно растет, и основное, чем мы будем заниматься в части инфраструктуры Airflow, — это масштабирование. Мы хотим построить кластер Airflow, выделить пару ног для worker’ов Celery и сделать дублирующую себя голову с процессами планировки заданий и репозиторием.


Эпилог

Это, конечно, далеко не всё, что хотелось бы рассказать об Airflow, но основные моменты я постарался осветить. Аппетит приходит во время еды, попробуйте — и вам понравится 🙂

Что такое AirFlow и как работает технология

Основные сущности AirFlow

Архитектура AirFlow и принципы его работы

Плюсы и минусы AirFlow

AirFlow и Cloud-Native подход

Кому подойдет AirFlow

Популярное

Ликбез

Что такое озера данных и почему в них дешевле хранить big data

Тренды

Эволюция квантовых вычислений: от гипотез до реальных компьютеров

Разработка

Три уровня автомасштабирования в Kubernetes: как их эффективно использовать

Обработка данных в информационных системах чаще всего проводится в три этапа: извлечение, трансформация и загрузка (Extract Transform Load, ETL).

В решениях, использующих Big Data, именно с помощью ETL исходные («сырые») данные преобразуются в информацию, пригодную для бизнес-анализа.

Однако с увеличением данных и усложнением аналитических задач увеличивается и количество ETL-процессов, которые необходимо планировать, отслеживать и перезапускать в случае сбоев — возникает необходимость в оркестраторе.

В статье расскажем об эффективном Open-Source инструменте Apache Airflow, который помогает в управлении сложными ETL-процессами и отлично сочетается с принципами Cloud-Native приложений.

Основные сущности AirFlow

Процессы обработки данных, или пайплайны, в Airflow описываются при помощи DAG (Directed Acyclic Graph). Это смысловое объединение задач, которые необходимо выполнить в строго определенной последовательности согласно указанному расписанию. Визуально DAG выглядит как направленный ациклический граф, то есть граф, не имеющий циклических зависимостей.

В качестве узлов DAG выступают задачи (Task). Это непосредственно операции, применяемые к данным, например: загрузка данных из различных источников, их агрегирование, индексирование, очистка от дубликатов, сохранение полученных результатов и прочие ETL-процессы. На уровне кода задачи могут представлять собой Python-функции или Bash-скрипты.

За реализацию задач чаще всего отвечают операторы (Operator). Если задачи описывают, какие действия выполнять с данными, то операторы — как эти действия выполнять. По сути, это шаблон для выполнения задач.

Особую группу операторов составляют сенсоры (Sensor), которые позволяют прописывать реакцию на определенное событие. В качестве триггера может выступать наступление конкретного времени, получение некоторого файла или строки с данными, другой DAG/Task и так далее.

В AirFlow богатый выбор встроенных операторов. Кроме этого, доступно множество специальных операторов — путем установки пакетов поставщиков, поддерживаемых сообществом. Также возможно добавление пользовательских операторов — за счет расширения базового класса BaseOperator. Когда в проекте возникает часто используемый код, построенный на стандартных операторах, рекомендуется его преобразование в собственный оператор.

Примеры операторов приведены ниже.

ОператорНазначение
PythonOperatorИсполнение Python-кода
BranchPythonOperatorЗапуск задач в зависимости от выполнения заданного условия
BashOperatorЗапуск Bash-скриптов
SimpleHttpOperatorОтправка HTTP-запросов
MySqlOperatorОтправка запросов к базе данных MySQL
PostgresOperatorОтправка запросов к базе данных PostgreSQL
S3FileTransformOperatorЗагрузка данных из S3 во временную директорию в локальной файловой системе, преобразование согласно указанному сценарию и сохранение результатов обработки в S3
DockerOperatorЗапуск Docker-контейнера под выполнение задачи
KubernetesPodOperatorСоздание отдельного Pod под выполнение задачи. Используется совместно с K8s
SqlSensorПроверка выполнения SQL-запроса
SlackAPIOperatorОтправка сообщений в Slack
EmailOperatorОтправка электронных писем
DummyOperator«Пустой» оператор, который можно использовать для группировки задач

Примечание

Наряду с операторами в последних версиях AirFlow появилась возможность оформления задач в виде TaskFlow — путем объединения задач в цепочки для передачи выходных данных вышестоящим задачам и операторам.

Рассмотрим различия между DAG, Task и Operator на простом примере. Предположим, есть база данных MySQL и необходимо отслеживать появление некоторых данных в одной из ее таблиц. При появлении этих данных нужно выполнить их агрегацию и сохранение в хранилище Apache Hive, после чего отправить почтовое уведомление определенным адресатам.

DAG для этого примера может состоять из трех узлов:

Каждому из них будет соответствовать задача, а за ее выполнение, в свою очередь, будет отвечать оператор. Перечень задач и возможных операторов для их реализации отражает таблица ниже.

Имя (task_id)Задача (Task)Возможный оператор (Operator)
check_mysql_for_rawPython-функция для проверки данных в БД MySQLSqlSensor
load_to_hivePython-функция для выполнения агрегации данных и их сохранения в БД Apache HiveHiveOperator
send_emailPython-функция для отправки электронного письмаEmailOperator

Еще одна важная концепция, лежащая в основе AirFlow — это хранение информации о каждом запуске DAG в соответствии с указанным расписанием. Так, если в нашем примере указать, что DAG должен запускаться начиная с 07. 05.2021 00:00:00 раз в сутки — AirFlow будет хранить информацию о запуске экземпляров DAG для следующих временных отметок: 07.05.2021 00:00:00, 08.05.2021 00:00:00, 09.05.2021 00:00:00 и так далее. Временные отметки при этом называются execution_date, соответствующие им экземпляры DAG — DAG Run, а связанные с конкретными DAG Run экземпляры задач — Task Instance.

Хранение в базе метаданных AirFlow статуса выполнения DAG Run и его Task Instance за каждую execution_date

Концепция execution_date очень важна для соблюдения идемпотентности: запуск или перезапуск задачи за какую-то дату в прошлом никак не зависит от времени фактического выполнения. Это позволяет точно воспроизводить результаты, полученные ранее. Кроме этого, возможен одновременный запуск задач одного DAG за различные временные отметки (нескольких Dag Run).

Архитектура AirFlow и принципы его работы

Основу архитектуры AirFlow составляют следующие компоненты:

Тип ExecutorНазначение
SequentialExecutorПоследовательно запускает поступающие задачи и на время их выполнения приостанавливает планировщик, в связи с чем рекомендован исключительно для тестирования — для продуктивной среды он не подходит.
LocalExecutorНа каждую задачу запускает новый дочерний процесс, позволяя обрабатывать несколько задач параллельно. Отлично имитирует продуктивную среду в тестовом окружении, но для реального использования не рекомендуется ввиду низкой отказоустойчивости: в случае сбоя на машине, где запущен Executor, задача не может быть передана другим узлам.
CeleryExecutorОснован на Celery, позволяет иметь несколько Worker, работающих на разных машинах. Требует дополнительной настройки брокера сообщений, например, Redis либо RabbitMQ. Обладает высокой масштабируемостью и отказоустойчивостью: при увеличении нагрузки достаточно подключить дополнительный Worker, а в случае падения любого Worker его работа будет передана остальным узлам.
DaskExecutorВо многом похож на CeleryExecutor, но вместо Celery использует Dask – библиотеку для параллельных вычислений.
KubernetesExecutorНа каждый экземпляр задачи Task Instance запускает новый Worker на отдельном поде в Kubernetes.
CeleryKubernetesExecutorПозволяет одновременно запускать CeleryExecutor и KubernetesExecutor. Конкретный тип будет выбираться в зависимости от состояния очереди задач.
DebugExecutorСоздан для запуска и отладки пайплайнов из IDE.

Взаимодействие компонентов AirFlow в общем случае можно описать следующей схемой. В зависимости от типа, выбранного Executor на схеме, могут использоваться дополнительные компоненты, например, очередь сообщений для CeleryExecutor.

Архитектура AirFlow в упрощенном виде

Плюсы и минусы AirFlow

Чаще всего выделяют следующие преимущества AirFlow:

  1. Открытый исходный код. AirFlow активно поддерживается сообществом и имеет хорошо описанную документацию.
  2. На основе Python. Python считается относительно простым языком для освоения и общепризнанным стандартом для специалистов в области Big Data и Data Science. Когда ETL-процессы определены как код, они становятся более удобными для разработки, тестирования и сопровождения.
    Также устраняется необходимость использовать JSON- или XML-конфигурационные файлы для описания пайплайнов.
  3. Богатый инструментарий и дружественный UI. Работа с AirFlow возможна при помощи CLI, REST API и веб-интерфейса, построенного на основе Python-фреймворка Flask.
  4. Интеграция со множеством источников данных и сервисов. AirFlow поддерживает множество баз данных и Big Data-хранилищ: MySQL, PostgreSQL, MongoDB, Redis, Apache Hive, Apache Spark, Apache Hadoop, объектное хранилище S3 и другие.
  5. Кастомизация. Есть возможность настройки собственных операторов.
  6. Масштабируемость. Допускается неограниченное число DAG за счет модульной архитектуры и очереди сообщений. Worker могут масштабироваться при использовании Celery или Kubernetes.
  7. Мониторинг и алертинг. Поддерживается интеграция с Statsd и FluentD — для сбора и отправки метрик и логов. Также доступен Airflow-exporter для интеграции с Prometheus.
  8. Возможность настройки ролевого доступа. По умолчанию AirFlow предоставляет 5 ролей с различными уровнями доступа: Admin, Public, Viewer, Op, User. Также допускается создание собственных ролей с доступом к ограниченному числу DAG. Дополнительно возможна интеграция с Active Directory и гибкая настройка доступов с помощью RBAC (Role-Based Access Control).
  9. Поддержка тестирования.
    Можно добавить базовые Unit-тесты, которые будут проверять как пайплайны в целом, так и конкретные задачи в них.

Разумеется, есть и недостатки, но связаны они по большей части с довольно высоким порогом входа и необходимостью учитывать различные нюансы при работе с AirFlow:

  1. При проектировании задач важно соблюдать идемпотентность: задачи должны быть написаны так, чтобы независимо от количества их запусков, для одних и тех же входных параметров возвращался одинаковый результат.
  2. Необходимо разобраться в механизмах обработки execution_date. Важно понимать, что корректировки кода задач будут отражаться на всех их запусках за предыдущее время. Это исключает воспроизводимость результатов, но, с другой стороны, позволяет получить результаты работы новых алгоритмов за прошлые периоды.
  3. Нет возможности спроектировать DAG в графическом виде, как это, например, доступно в Apache NiFi. Многие видят в этом, напротив, плюс, так как ревью кода проводится легче, чем ревью схем.
  4. Некоторые пользователи отмечают незначительные временные задержки в запуске задач из-за нюансов работы планировщика, связанных с накладными расходами на постановку задач в очередь и их приоритезацию. Однако в версии Airflow 2 подобные задержки были сведены к минимуму, а также появилась возможность запуска нескольких планировщиков для достижения максимальной производительности.

AirFlow и Cloud-Native подход

Использование локальной инфраструктуры для работы с Big Data часто оказывается дорогим и неэффективным: под задачи, занимающие всего несколько часов в неделю, требуются огромные вычислительные мощности, которые необходимо оплачивать, настраивать и поддерживать. Поэтому многие компании переносят обработку больших данных в облако, где за считаные минуты можно получить полностью настроенный и оптимизированный кластер обработки данных с посекундной оплатой — за фактически используемые ресурсы.

Еще одна причина, по которой работать с BigData предпочтительнее в облаке — возможность использования Kubernetes aaS. Главные преимущества работы с Big Data в Kubernetes — это гибкое масштабирование и изоляция сред. Первое позволяет автоматически изменять выделяемые в облаке ресурсы в зависимости от меняющихся нагрузок, второе — обеспечивает совместимость различных версий библиотек и приложений в кластере за счет контейнеризации.

Если вы хотите разобраться в работе с большими данными в Kubernetes aaS, вам будут интересны эти статьи на Хабре:

  • Как и зачем разворачивать приложение на Apache Spark в Kubernetes
  • MLOps без боли в облаке: как развернуть Kubeflow в продакшен-кластере Kubernetes

Так как AirFlow предназначен для оркестровки ETL-процессов в области Big Data и Data Science — его запуск возможен и даже рекомендован в облаке. Также AirFlow отлично сочетается с Kubernetes. Способы запуска Airflow в Kubernetes кратко упоминались выше — опишем их подробнее:

  • С помощью оператора KubernetesPodOperator — в этом случае в Kubernetes выносятся только некоторые Airflow-задачи, которым сопоставлен соответствующий оператор. На каждую задачу внутри Kubernetes будет создан отдельный под. В качестве Executor при этом может использоваться стандартный CeleryExecutor.
  • С помощью исполнителя Kubernetes Executor — в этом случае на каждую Airflow-задачу будет создан отдельный Worker внутри Kubernetes, который при необходимости будет создавать новые поды. Если одновременно использовать KubernetesPodOperator и Kubernetes Executor, то сначала будет создан первый под — Worker, а затем он создаст следующий под и запустит на нем Airflow-задачу.

    Метод хорош тем, что создает поды только по требованию, тем самым экономя ресурсы и давая масштабироваться по мере необходимости. Однако нужно учитывать задержки в создании новых подов. Поэтому при большом количестве задач, работающих всего несколько минут, лучше использовать CeleryExecutor, а наиболее ресурсоемкие задачи выносить в Kubernetes с помощью KubernetesPodOperator.

  • С помощью исполнителя CeleryKubernetes Executor — в этом случае могут совместно использоваться CeleryExecutor и KubernetesExecutor. Метод рекомендуется использовать в трех случаях:
    1. Есть много небольших задач, которые могут быть выполнены в CeleryExecutor, но также есть ресурсоемкие задачи, требующие KubernetesExecutor.
    2. Относительно малое количество задач требует изолированной среды.
    3. Предполагаемые пиковые нагрузки значительно превышают возможности Kubernetes-кластера.

Подробнее о Kubernetes aaS на платформе VK Cloud (бывш. MCS) в этой статье на хабре.

Кому подойдет AirFlow

Конечно, AirFlow — далеко не единственное подобное решение на IT-рынке. Существует множество других инструментов для планирования и мониторинга ETL-процессов — как платных, так и Open-Source. В самых простых случаях можно и вовсе обойтись стандартным планировщиком Cron, настраивая рабочие процессы через Crontab. Назовем ряд типовых сценариев, когда AirFlow может стать наилучшим выбором:

  • Для планирования задач недостаточно возможностей Cron — требуется автоматизация.
  • У команды уже есть достаточная экспертиза в программировании на Python.
  • На проекте используется пакетная обработка данных (Batch), а не потоковая (Stream). AirFlow предназначен для Batch-заданий, для потоковой обработки данных лучше использовать Apache NiFi.
  • Для задач, используемых на проекте, возможно определить зависимости, представив их в виде графа DAG.
  • Планируется или уже осуществлен переход в облако и необходим надежный оркестратор, поддерживающий все принципы Cloud-Native.

Как устроен Kubernetes as a Service на платформе VK Cloud (бывш. MCS)

VK Cloud (бывш. MCS) запускает облачный сервис для обработки большого потока запросов

Не терять клиентов и быстрее обрабатывать запросы пользователей: зачем нужны очереди сообщений в ритейле, видео- и фотохостингах, онлайн-школах и банках

AirFlow big data облака облачные вычисления kubernetes Open-Source-технологии

Что такое воздушный поток? — Документация по воздушному потоку

Apache Airflow — это платформа с открытым исходным кодом для разработки, планирования, и мониторинг пакетно-ориентированных рабочих процессов. Расширяемая среда Python Airflow позволяет создавать рабочие процессы связь практически с любой техникой. Веб-интерфейс помогает управлять состоянием ваших рабочих процессов. Воздушный поток могут быть развернуты разными способами: от одного процесса на вашем ноутбуке до распределенной установки для поддержки даже самые большие рабочие процессы.

Основная характеристика рабочих процессов Airflow заключается в том, что все рабочие процессы определены в коде Python. «Рабочие процессы как code» служит нескольким целям:

  • Dynamic : Конвейеры Airflow настроены как код Python, что позволяет создавать динамические конвейеры.

  • Расширяемый : Платформа Airflow содержит операторов для подключения к многочисленным технологиям. Все компоненты Airflow расширяемы, что позволяет легко адаптировать их к вашей среде.

  • Гибкий : Встроенная параметризация рабочего процесса использует механизм шаблонов Jinja.

Взгляните на следующий фрагмент кода:

 from datetime import datetime
от импорта воздушного потока DAG
из задачи импорта airflow.decorators
из airflow.operators.bash импортировать BashOperator
# DAG представляет рабочий процесс, набор задач
с DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") в качестве dag:
    # Задачи представлены в виде операторов
    привет = BashOperator(task_id="привет", bash_command="эхо привет")
    @задача()
    воздушный поток ():
        печать("поток воздуха")
    # Установить зависимости между задачами
    привет >> воздушный поток ()
 

Здесь вы видите:

  • DAG под названием «demo», начиная с 1 января 2022 года и работая один раз в день. DAG — это представление рабочего процесса в Airflow.

  • Две задачи: BashOperator, выполняющий сценарий Bash, и функция Python, определенная с помощью декоратора @task

  • >> между задачами определяет зависимость и управляет порядком выполнения задач

Airflow оценивает этот сценарий и выполняет задачи с заданным интервалом и в определенном порядке. Статус «демо» DAG виден в веб-интерфейсе:

В этом примере демонстрируется простой сценарий Bash и Python, но эти задачи могут выполнять любой произвольный код. Думать выполнения задания Spark, перемещения данных между двумя сегментами или отправки электронной почты. Такая же структура может быть замечено выполнение с течением времени:

Каждый столбец представляет один запуск DAG. Это два наиболее часто используемых представления в Airflow, но есть и несколько других. другие представления, позволяющие глубже погрузиться в состояние ваших рабочих процессов.

Airflow — это платформа для оркестровки пакетных рабочих процессов. Платформа Airflow содержит операторов для подключения многих технологий и легко расширяется для подключения к новой технологии. Если ваши рабочие процессы имеют четкую запускаться и заканчиваться, а также запускаться через равные промежутки времени, их можно запрограммировать как DAG Airflow.

Если вы предпочитаете кодирование кликам, Airflow — это инструмент для вас. Рабочие процессы определяются как код Python, который означает:

  • Рабочие процессы могут храниться в системе управления версиями, чтобы можно было вернуться к предыдущим версиям

  • Рабочие процессы могут разрабатываться несколькими людьми одновременно

  • Тесты могут быть написаны для проверки функциональности

  • Компоненты расширяемы, и вы можете использовать широкий набор существующих компонентов

Богатая семантика планирования и выполнения позволяет легко определять сложные конвейеры, работающие на регулярной основе. интервалы. Обратная засыпка позволяет вам (повторно) запускать конвейеры на исторических данных после внесения изменений в вашу логику. А возможность повторного запуска части конвейера после устранения ошибки помогает максимально повысить эффективность.

Пользовательский интерфейс Airflow обеспечивает как подробное представление конвейеров и отдельных задач, так и обзор трубопроводы с течением времени. Из интерфейса вы можете просматривать журналы и управлять задачами, например, повторяя задачу в случае неудачи.

Открытый исходный код Airflow гарантирует, что вы работаете с компонентами, разработанными, протестированными и используемыми многими другими компаний по всему миру. В активном сообщества вы можете найти множество полезных ресурсов в виде посты в блогах, статьи, конференции, книги и многое другое. Вы можете подключаться к другим пирам через несколько каналов такие как Slack и списки рассылки.

Воздушный поток как платформа имеет широкие возможности настройки. Используя общедоступный интерфейс Airflow, вы можете расширить и настроить почти каждый аспект Airflow.

Airflow был создан для ограниченных пакетных рабочих процессов. Хотя CLI и REST API позволяют запускать рабочие процессы, Airflow не был создан для бесконечно работающих рабочих процессов, основанных на событиях. Airflow не является потоковым решением. Однако потоковая система, такая как Apache Kafka, часто работает вместе с Apache Airflow. Кафка может использоваться для приема и обработки в режиме реального времени, данные о событиях записываются в место хранения, а Airflow периодически запускает рабочий процесс, обрабатывающий пакет данных.

Если вы предпочитаете щелкать, а не кодировать, Airflow, вероятно, не является правильным решением. Веб-интерфейс призван сделать максимально упростить управление рабочими процессами, а инфраструктура Airflow постоянно совершенствуется, чтобы сделать опыт разработчика как можно более плавным. Однако философия Airflow заключается в том, чтобы определить рабочие процессы как код. поэтому кодирование всегда будет требоваться.

Была ли эта запись полезной?

Установка — Документация по воздушному потоку

  • Использование выпущенных исходников

  • Использование PyPI

  • Использование производственных образов Docker

  • Использование официальной таблицы Airflow Helm

  • Использование служб управляемого воздушного потока

  • Использование сторонних изображений, диаграмм, развертываний

На этой странице описаны варианты установки, которые вы можете использовать при рассмотрении вопроса об установке Airflow. Airflow состоит из множества компонентов, часто распределенных между многими физическими или виртуальными машинами, поэтому установка Airflow может быть довольно сложной, в зависимости от выбранных вами параметров.

Вы также должны проверить предварительные условия, которые должны быть выполнены при установке Airflow а также Поддерживаемые версии, чтобы узнать, каковы политики поддержки Airflow, Python и Kubernetes.

Airflow требует установки дополнительных зависимостей — что можно сделать через дополнения и провайдеров.

При установке Airflow необходимо настроить базу данных, которая должна также обновляться при обновлении Airflow.

Предупреждение

По состоянию на июнь 2021 г. Airflow 1.10 устарела и не получит никаких исправлений, даже критических. исправления безопасности. Следите за обновлениями с 1.10 до 2, чтобы узнать как обновить устаревшую версию 1.10 до Airflow 2.

Подробнее: Установка из исходников

Когда этот вариант работает лучше всего

  • Этот вариант лучше всего подходит, если вы планируете собирать все свое программное обеспечение из исходников.

  • Apache Airflow — один из проектов, принадлежащих Apache Software Foundation. Для всех проектов ASF требуется, чтобы они могли быть установлены с использованием официальных исходных кодов, опубликованных через Official Apache Downloads.

  • Это лучший выбор, если вам необходимо проверить целостность и происхождение программного обеспечения

Предполагаемые пользователи

С чем вы собираетесь работать

  • Предполагается, что вы самостоятельно создадите и установите воздушный поток и его компоненты.

  • Вы должны разработать и выполнить развертывание всех компонентов Airflow.

  • Вы отвечаете за настройку базы данных, создание схемы базы данных и управление ею с помощью команд airflow db , автоматизированный запуск и восстановление, обслуживание, очистка и обновления Airflow и поставщиков Airflow.

Что Apache Airflow Community предоставляет для этого метода

  • У вас есть инструкции по сборке программного обеспечения, но из-за различных сред и инструменты, которые вы, возможно, захотите использовать, вы можете ожидать, что возникнут проблемы, характерные для вашего развертывания и среды. вам придется диагностировать и решать.

Куда обратиться за помощью

  • #development резервный канал для создания программного обеспечения.

  • Slack #troubleshooting — это канал для быстрых общих вопросов по устранению неполадок. Обсуждения GitHub, если вы ищете более продолжительное обсуждение и хотите поделиться дополнительной информацией.

  • Если вы можете предоставить описание воспроизводимой проблемы с программным обеспечением Airflow, вы можете открыть проблему на GitHub Issues

Подробнее: Установка из PyPI

Когда этот вариант работает лучше всего

  • Этот метод установки полезен, если вы не знакомы с контейнерами и Docker и хотите установить Apache Airflow на физических или виртуальных машинах, и вы привыкли устанавливать и запускать программное обеспечение с помощью механизм развертывания.

  • Единственный официально поддерживаемый механизм установки — через pip с использованием механизмов ограничений. Ограничение файлы управляются менеджерами выпусков Apache Airflow, чтобы убедиться, что вы можете повторно установить Airflow из PyPI со всеми поставщиками и требуемые зависимости.

  • В случае установки PyPI вы также можете проверить целостность и происхождение пакетов загружается из PyPI, как описано на странице установки, но программное обеспечение, которое вы загружаете из PyPI, предварительно собрано для вас, чтобы вы могли установить его без сборки, и вы не собирали программное обеспечение из исходников.

Предполагаемые пользователи

С чем вы собираетесь работать

  • Предполагается, что вы установите Airflow — все его компоненты — самостоятельно.

  • Вы должны разработать и выполнить развертывание всех компонентов Airflow.

  • Вы отвечаете за настройку базы данных, создание схемы базы данных и управление ею с помощью команд airflow db , автоматизированный запуск и восстановление, обслуживание, очистка и обновление Airflow и Airflow Providers.

Что Apache Airflow Community предоставляет для этого метода

  • У вас есть установка из PyPI о том, как установить программное обеспечение, но из-за различных сред и инструментов, которые вы, возможно, захотите использовать, вы можете ожидайте, что возникнут проблемы, характерные для вашего развертывания и среды, которые вам придется решать. диагностировать и решить.

  • У вас есть Quick Start, где вы можете увидеть пример Quick Start с запущенным Airflow локально, который вы можете использовать для быстрого запуска Airflow для локального тестирования и разработки. Впрочем, это только для вдохновения. Не ожидайте, что этот docker-compose готов к производственной установке, вам нужно создать собственное готовое к работе развертывание, если вы следуете этому подходу.

Куда обратиться за помощью

  • #устранение неполадок 9Канал 0042 на Airflow Slack для быстрого общего вопросы по устранению неполадок. Обсуждения на GitHub если вы ищете более продолжительное обсуждение и хотите поделиться дополнительной информацией.

  • Если вы можете предоставить описание воспроизводимой проблемы с программным обеспечением Airflow, вы можете открыть проблема на GitHub выпускает

Дополнительные сведения: Образ Docker для Apache Airflow

Когда этот вариант работает лучше всего

Этот метод установки полезен, если вы знакомы со стеком Container/Docker. Он обеспечивает возможность запуск компонентов Airflow изолированно от другого программного обеспечения, работающего на тех же физических или виртуальных машинах, с помощью простого поддержание зависимостей.

Образы создаются менеджерами выпусков Apache Airflow и используют официально выпущенные пакеты из PyPI. и официальные файлы ограничений — те же, что используются для установки Airflow из PyPI.

Предполагаемые пользователи

  • Пользователи, которые знакомы с контейнерами и стеком Docker и понимают, как создавать собственные образы контейнеров.

  • Пользователи, которые понимают, как устанавливать поставщиков и зависимости от PyPI с ограничениями, если они хотят расширить или настроить образ.

  • Пользователи, которые знают, как создавать развертывания с помощью Docker, связывая вместе несколько контейнеров Docker и поддерживая такие развертывания.

Что вы должны обрабатывать

  • Ожидается, что вы сможете настраивать или расширять образы Container/Docker, если хотите добавить дополнительные зависимости. Ожидается, что вы соберете развертывание, состоящее из нескольких контейнеров. (например, используя docker-compose) и убедиться, что они связаны друг с другом.

  • Вы отвечаете за настройку базы данных, создание схемы базы данных и управление ею с помощью команд airflow db , автоматизированный запуск и восстановление, обслуживание, очистка и обновления Airflow и поставщиков Airflow.

  • Вы несете ответственность за управление собственными настройками и расширениями для ваших пользовательских зависимостей. С официальными образами Airflow Docker обновления Airflow и Airflow Providers, которые являются частью эталонного изображения, обрабатываются сообществом — вам нужно обязательно подобрать эти изменения при выпуске путем обновления базового образа. Однако вы несете ответственность за создание конвейер создания ваших собственных пользовательских образов с вашими собственными добавленными зависимостями и провайдерами, и вам нужно повторите шаг настройки и создайте собственный образ, когда будет выпущена новая версия образа Airflow.

  • Следует выбрать правильный механизм развертывания. Существует ряд доступных вариантов развертывание контейнеров. Вы можете использовать свой собственный настраиваемый механизм, настраиваемые развертывания Kubernetes, настраиваемый Docker Compose, настраиваемые диаграммы Helm и т. д., и вам следует выбирать их в зависимости от вашего опыта. и ожидания.

Что Apache Airflow Community предоставляет для этого метода

  • У вас есть инструкции: Создание образа о том, как создать и настроить свой образ.

  • У вас есть работающий поток воздуха в Docker, где вы можете увидеть пример быстрого запуска, который вы можете использовать для быстрого запуска Airflow для локального тестирования и разработки. Впрочем, это только для вдохновения. Не рассчитывайте использовать этот файл docker-compose.yml для производственной установки, вам нужно ознакомиться с Docker Compose и его возможностями и создайте с его помощью собственное готовое к работе развертывание, если вы выбираете Docker Compose для своего развертывания.

  • Образ Docker управляется теми же людьми, которые создают Airflow, и они обязуются поддерживать он обновляется всякий раз, когда выпускаются новые функции и возможности Airflow.

Куда обратиться за помощью

  • Для быстрых вопросов об официальном образе Docker есть канал #production-docker-image в Airflow Slack.

  • Канал #troubleshooting в Airflow Slack для быстрого общего вопросы по устранению неполадок. Обсуждения на GitHub если вы ищете более продолжительное обсуждение и хотите поделиться дополнительной информацией.

  • Если вы можете предоставить описание воспроизводимой проблемы с программным обеспечением Airflow, вы можете открыть проблема на GitHub выпускает

Дополнительные сведения: Helm Chart для Apache Airflow

Когда этот вариант работает лучше всего

  • Этот метод установки полезен, если вы не только знакомы со стеком Container/Docker, но и использовать Kubernetes и хотите установить и поддерживать Airflow с помощью установки Kubernetes, управляемой сообществом механизм через диаграмму Хельма.

  • Он обеспечивает не только возможность запуска компонентов Airflow отдельно от другого программного обеспечения. работает на тех же физических или виртуальных машинах и управляет зависимостями, но также предоставляет возможности более простое обслуживание, настройка и обновление Airflow таким образом, который стандартизирован и будет поддерживаться сообществом.

  • The Chart использует официальные образы Docker Airflow Production для запуска Airflow.

Предполагаемые пользователи

  • Пользователи, которые знакомы с контейнерами и стеком Docker и понимают, как создавать собственные образы контейнеров.

  • Пользователи, которые понимают, как устанавливать поставщиков и зависимости от PyPI с ограничениями, если они хотят расширить или настроить образ.

  • Пользователи, которые управляют своей инфраструктурой с помощью Kubernetes и управляют своими приложениями в Kubernetes с помощью Helm Charts.

Что вы должны обрабатывать

  • Предполагается, что вы сможете настраивать или расширять образы Container/Docker, если хотите. добавить дополнительные зависимости. Ожидается, что вы соберете развертывание, состоящее из нескольких контейнеров. (например, с помощью Docker Compose) и убедиться, что они связаны друг с другом.

  • Вы отвечаете за настройку базы данных.

  • The Helm Chart управляет схемой вашей базы данных, автоматизирует запуск, восстановление и перезапуск компоненты приложения и связывание их вместе, поэтому вам не нужно об этом беспокоиться.

  • Вы несете ответственность за управление собственными настройками и расширениями для ваших пользовательских зависимостей. С официальными образами Airflow Docker обновления Airflow и Airflow Providers, которые являются частью эталонного изображения, обрабатываются сообществом — вам нужно обязательно подобрать эти изменения при выпуске путем обновления базового образа. Однако вы несете ответственность за создание конвейер создания ваших собственных пользовательских образов с вашими собственными добавленными зависимостями и провайдерами, и вам нужно повторите шаг настройки и создайте собственный образ, когда будет выпущена новая версия образа Airflow.

Что Apache Airflow Community предоставляет для этого метода

  • У вас есть инструкции: Создание образа о том, как создать и настроить свой образ.

  • У вас есть Helm Chart для Apache Airflow — полная документация по настройке и установке Helm Chart.

  • The Helm Chart управляется теми же людьми, которые создают Airflow, и они стремятся сохранить он обновляется всякий раз, когда выпускаются новые функции и возможности Airflow.

Куда обратиться за помощью

  • Для быстрых вопросов об официальном образе Docker есть канал #production-docker-image в Airflow Slack.

  • Для быстрых вопросов по официальной диаграмме Helm есть канал #helm-chart-official в Slack.

  • Канал #troubleshooting в Airflow Slack для быстрого общего вопросы по устранению неполадок. Обсуждения на GitHub если вы ищете более продолжительное обсуждение и хотите поделиться дополнительной информацией.

  • Если вы можете предоставить описание воспроизводимой проблемы с программным обеспечением Airflow, вы можете открыть проблема на GitHub выпускает

Перейдите на страницу экосистемы, чтобы найти все управляемые службы для воздушного потока.

Когда этот вариант работает лучше всего

Предполагаемые пользователи

Что вы должны обрабатывать

Что Apache Airflow Community предоставляет для этого метода 90 005

Куда обратиться за помощью

  • Первым выбором должна быть поддержка, предоставляемая управляемыми службами. Есть несколько каналы в Apache Airflow Slack, предназначенные для разных групп пользователей, и если у вас есть прийти к выводу, что вопрос больше связан с Airflow, чем с управляемой службой, вы можете использовать эти каналы.

Перейдите на страницу «Экосистема», чтобы найти все варианты развертывания сторонних производителей.

Когда этот вариант работает лучше всего

  • Эти методы установки полезны в случае, если ни один из официальных методов, упомянутых ранее, не работает для вас, или вы исторически использовали их.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *