AI & GPU
چگونه با Apache Airflow شروع کنیم

معرفی Apache Airflow

چیست Apache Airflow؟

تعریف و هدف

Apache Airflow یک پلتفرم متن‌باز برای برنامه‌نویسی،排程 و پایش جریان‌های کاری است. این ابزار برای هماهنگ‌سازی جریان‌های محاسباتی پیچیده و خط‌های لوله پردازش داده طراحی شده است و به کاربران اجازه می‌دهد تا وظایف و وابستگی‌ها را به صورت کد تعریف کنند، اجرای آن‌ها را برنامه‌ریزی کنند و پیشرفت آن‌ها را از طریق یک رابط کاربری وب پایش کنند.

تاریخچه و توسعه

Apache Airflow توسط Maxime Beauchemin در Airbnb در سال 2014 ایجاد شد تا به چالش‌های مدیریت و برنامه‌ریزی جریان‌های کاری پیچیده داده پاسخ دهد. این ابزار در سال 2015 به صورت متن‌باز منتشر شد و در سال 2016 به پروژه‌ی Apache Incubator پیوست. از آن زمان، Airflow مورد استقبال گسترده‌ای قرار گرفته و به یک انتخاب محبوب برای هماهنگ‌سازی داده در صنایع مختلف تبدیل شده است.

مفاهیم اساسی

DAG‌ها (گراف‌های جهت‌دار بدون چرخه)

در Airflow، جریان‌های کاری به صورت گراف‌های جهت‌دار بدون چرخه (DAG) تعریف می‌شوند. یک DAG مجموعه‌ای از وظایف است که به نحوی سازمان‌دهی شده‌اند که وابستگی‌ها و روابط آن‌ها را منعکس می‌کند. هر DAG یک جریان کاری کامل را نمایش می‌دهد و در یک اسکریپت Python تعریف می‌شود.

اینجا یک مثال ساده از تعریف یک DAG آورده شده است:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
 
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
 
dag = DAG(
    'example_dag',
    default_args=default_args,
    description='A simple DAG',
    schedule_interval=timedelta(days=1),
)
 
start_task = DummyOperator(task_id='start', dag=dag)
end_task = DummyOperator(task_id='end', dag=dag)
 
start_task >> end_task

وظایف و عملگرها

وظایف واحدهای اجرایی اصلی در Airflow هستند. آن‌ها یک واحد کاری مانند اجرای یک دستور را نمایش می‌دهند.اینجا ترجمه فارسی فایل مارک‌داون است:

گ یک تابع پایتون، اجرای یک پرس و جوی SQL، یا ارسال ایمیل. وظایف با استفاده از عملگرها تعریف می‌شوند، که قالب‌های از پیش تعریف شده برای وظایف رایج هستند.

Airflow طیف گسترده‌ای از عملگرهای داخلی را ارائه می‌دهد، از جمله:

  • BashOperator: یک فرمان Bash را اجرا می‌کند
  • PythonOperator: یک تابع پایتون را اجرا می‌کند
  • EmailOperator: ایمیلی را ارسال می‌کند
  • HTTPOperator: یک درخواست HTTP انجام می‌دهد
  • SqlOperator: یک پرس و جوی SQL را اجرا می‌کند
  • و بسیاری موارد دیگر...

اینجا مثالی از تعریف یک وظیفه با استفاده از PythonOperator آمده است:

from airflow.operators.python_operator import PythonOperator
 
def print_hello():
    print("Hello, Airflow!")
 
hello_task = PythonOperator(
    task_id='hello_task',
    python_callable=print_hello,
    dag=dag,
)

برنامه‌ریزی و فواصل

Airflow به شما امکان برنامه‌ریزی اجرای DAG‌ها در فواصل منظم را می‌دهد. می‌توانید برنامه را با استفاده از عبارات cron یا اشیاء timedelta تعریف کنید. پارامتر schedule_interval در تعریف DAG، فرکانس اجرا را مشخص می‌کند.

به عنوان مثال، برای اجرای یک DAG روزانه در نیمه‌شب، می‌توانید schedule_interval را به صورت زیر تنظیم کنید:

dag = DAG(
    'example_dag',
    default_args=default_args,
    description='A simple DAG',
    schedule_interval='0 0 * * *',  # روزانه در نیمه‌شب
)

اجراکننده‌ها

اجراکننده‌ها مسئول اجرای واقعی وظایف تعریف شده در یک DAG هستند. Airflow از چندین نوع اجراکننده پشتیبانی می‌کند، که به شما امکان مقیاس‌پذیری و توزیع اجرای وظایف در میان چندین کارگر را می‌دهد.

اجراکننده‌های موجود شامل:

  • SequentialExecutor: وظایف را به صورت متوالی در یک فرآیند واحد اجرا می‌کند
  • LocalExecutor: وظایف را به صورت موازی در همان ماشین اجرا می‌کند
  • CeleryExecutor: وظایف را به یک خوشه Celery برای اجرای موازی ارسال می‌کند
  • KubernetesExecutor: وظایف را در یک خوشه Kubernetes اجرا می‌کند

اتصالات و قالب‌ها

اتصالات در Airflow نحوه اتصال به سیستم‌های خارجی مانند پایگاه‌های داده، API‌ها یا خدمات ابری را تعریف می‌کنند. آنها اطلاعات مورد نیاز (مانند میزبان، پورت، اطلاعات احراز هویت) را ذخیره می‌کنند.به فارسی ترجمه شده است:

هوک‌ها راهی برای تعامل با سیستم‌های خارجی تعریف شده در اتصالات فراهم می‌کنند. آن‌ها منطق اتصال و ارتباط با سیستم خاص را کپسوله می‌کنند، که انجام عملیات‌های رایج را آسان‌تر می‌کند.

Airflow دارای هوک‌های داخلی برای سیستم‌های مختلف است، مانند:

  • PostgresHook: برای تعامل با پایگاه‌های داده PostgreSQL
  • S3Hook: برای تعامل با Amazon S3
  • HttpHook: برای انجام درخواست‌های HTTP
  • و بسیاری موارد دیگر...

این یک مثال از استفاده از هوک برای بازیابی داده‌ها از یک پایگاه داده PostgreSQL است:

from airflow.hooks.postgres_hook import PostgresHook
 
def fetch_data(**context):
    # از هوک PostgreSQL برای اتصال به پایگاه داده استفاده می‌کنیم
    hook = PostgresHook(postgres_conn_id='my_postgres_conn')
    # داده‌ها را از جدول 'my_table' بازیابی می‌کنیم
    result = hook.get_records(sql="SELECT * FROM my_table")
    print(result)
 
fetch_data_task = PythonOperator(
    task_id='fetch_data_task',
    python_callable=fetch_data,
    dag=dag,
)

ویژگی‌های کلیدی Apache Airflow

مقیاس‌پذیری و انعطاف‌پذیری

اجرای توزیع‌شده وظایف

Airflow امکان مقیاس‌پذیری افقی اجرای وظایف را از طریق توزیع آن‌ها در میان چندین کارگر فراهم می‌کند. این امر پردازش موازی را ممکن ساخته و به مدیریت کارآمد جریان‌های کاری بزرگ کمک می‌کند. با پیکربندی مناسب اجرا‌کننده، Airflow می‌تواند از قدرت محاسبات توزیع‌شده برای اجرای همزمان وظایف استفاده کند.

پشتیبانی از اجرا‌کننده‌های مختلف

Airflow از انواع مختلفی از اجرا‌کننده‌ها پشتیبانی می‌کند، که انعطاف‌پذیری در نحوه اجرای وظایف را فراهم می‌کند. انتخاب اجرا‌کننده بستگی به نیازهای خاص و تنظیمات زیرساخت دارد. به عنوان مثال:

  • SequentialExecutor برای جریان‌های کاری کوچک یا اهداف آزمایشی مناسب است، زیرا وظایف را به صورت متوالی در یک فرآیند اجرا می‌کند.
  • LocalExecutor امکان اجرای موازی وظایف در همان ماشین را با استفاده از چندین فرآیند فراهم می‌کند.
  • CeleryExecutor وظایف را در یک خوشه Celery توزیع می‌کند، که به مقیاس‌پذیری افقی در چندین گره کمک می‌کند.
  • KubernetesExecutor وظایف را در یک خوشه Kubernetes اجرا می‌کند، که امکان استفاده از منابع پویا را فراهم می‌کند.## قابلیت گسترش

پلاگین‌ها و عملگرهای سفارشی

Airflow دارای معماری قابل گسترش است که به شما امکان می‌دهد تا پلاگین‌ها و عملگرهای سفارشی ایجاد کنید تا عملکرد آن را گسترش دهید. پلاگین‌ها می‌توانند برای افزودن ویژگی‌های جدید، یکپارچه‌سازی با سیستم‌های خارجی یا تغییر رفتار اجزای موجود استفاده شوند.

عملگرهای سفارشی به شما امکان می‌دهند تا انواع جدیدی از وظایف را که مخصوص مورد استفاده شما هستند، تعریف کنید. با ایجاد عملگرهای سفارشی، می‌توانید منطق پیچیده را کپسوله‌بندی کنید، با سیستم‌های مالکیتی تعامل داشته باشید یا محاسبات تخصصی را انجام دهید.

در اینجا مثالی از یک عملگر سفارشی که یک وظیفه خاص را انجام می‌دهد:

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
 
class MyCustomOperator(BaseOperator):
    @apply_defaults
    def __init__(self, my_param, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.my_param = my_param
 
    def execute(self, context):
        # منطق وظیفه سفارشی در اینجا قرار می‌گیرد
        print(f"اجرای MyCustomOperator با پارامتر: {self.my_param}")

یکپارچه‌سازی با انواع مختلف منابع داده و سیستم‌ها

Airflow به طور روان با طیف گسترده‌ای از منابع داده و سیستم‌ها یکپارچه می‌شود، به آن قابلیت چندمنظوره‌ای برای ارکسترسیون داده می‌بخشد. این ابزار دارای هوک‌ها و عملگرهای داخلی برای پایگاه‌های داده محبوب (مانند PostgreSQL، MySQL، Hive)، پلتفرم‌های ابری (مانند AWS، GCP، Azure) و چارچوب‌های پردازش داده (مانند Apache Spark، Apache Hadoop) است.

این قابلیت یکپارچه‌سازی به شما امکان می‌دهد تا خط‌های لوله داده‌ای بسازید که چندین سیستم را در بر می‌گیرند، به طوری که وظایف بتوانند از منابع داده مختلف بخوانند و به آنها بنویسند، فرآیندهای خارجی را فعال کنند و جریان داده را در اجزای مختلف تسهیل کنند.

رابط کاربری و پایش

رابط کاربری مبتنی بر وب برای مدیریت و پایش DAG‌ها

Airflow یک رابط کاربری (UI) مبتنی بر وب کاربر‌پسند برای مدیریت و پایش DAG‌ها ارائه می‌دهد. این UI به شما امکان می‌دهد تا ساختار و وابستگی‌های DAG‌های خود را مشاهده کنید، اجراهای دستی را فعال کنید، ...فارسی:

پیشرفت کار پایش کنید و سوابق را مشاهده کنید.

رابط کاربری Airflow یک نمای متمرکز از جریان‌های کاری شما را فراهم می‌کند، که امکان ردیابی وضعیت وظایف، شناسایی گلوگاه‌ها و رفع مشکلات را آسان می‌سازد. این رابط کاربری دارای ناوبری آسان، امکان جستجو و فیلترهای متنوع است که به شما در مدیریت و پایش موثر DAG‌های خود کمک می‌کند.

ردیابی وضعیت وظایف و مدیریت خطاها

Airflow وضعیت اجرای هر وظیفه را ردیابی می‌کند و به شما دید کاملی از پیشرفت و سلامت جریان‌های کاری شما ارائه می‌دهد. رابط کاربری وضعیت وظایف را در زمان واقعی نمایش می‌دهد و مشخص می‌کند که آیا در حال اجرا هستند، موفق شده‌اند، شکست خورده‌اند یا در هر وضعیت دیگری قرار دارند.

هنگامی که یک وظیفه با خطا مواجه شود یا شکست بخورد، Airflow استثنا را ثبت کرده و پیام‌های خطای تفصیلی و ردیابی استک را ارائه می‌دهد. این اطلاعات در رابط کاربری در دسترس است، به شما امکان بررسی و رفع سریع مشکلات را می‌دهد. Airflow همچنین از مکانیزم‌های قابل پیکربندی برای تلاش مجدد پشتیبانی می‌کند، که به شما امکان تعریف سیاست‌های تلاش مجدد برای وظایف شکست خورده را می‌دهد.

قابلیت‌های ثبت و اشکال‌زدایی

Airflow سوابق جامعی را برای هر اجرای وظیفه ایجاد می‌کند، که اطلاعات مهمی مانند پارامترهای وظیفه، جزئیات زمان اجرا و هرگونه خروجی یا خطا را ثبت می‌کند. این سوابق از طریق رابط کاربری Airflow قابل دسترسی هستند و بینش‌های ارزشمندی را برای اشکال‌زدایی و رفع مشکلات فراهم می‌کنند.

علاوه بر رابط کاربری، Airflow به شما امکان پیکربندی تنظیمات ثبت متنوعی را می‌دهد، مانند سطوح ثبت، قالب‌های ثبت و مقصدهای ثبت. می‌توانید سوابق را به سیستم‌های ذخیره‌سازی مختلف (مانند فایل‌های محلی، ذخیره‌سازی از راه دور) هدایت کرده یا با راه‌حل‌های ثبت و پایش خارجی برای مدیریت متمرکز سوابق ادغام کنید.

امنیت و احراز هویت

کنترل دسترسی مبتنی بر نقش (RBAC)

Airflow از کنترل دسترسی مبتنی بر نقش (RBAC) پشتیبانی می‌کند تا مجوزهای کاربران و دسترسی به DAG‌ها و وظایف را مدیریت کند. RBAC به شما امکان تعریف نقش‌هایی با امتیازات خاص و اختصاص آن نقش‌ها به کاربران را می‌دهد. این امر اطمینان حاصل می‌کند که کاربران سطح مناسبی از دسترسی را بر اساس مسئولیت‌های خود داشته باشند و از تغییرات غیرمجاز در جریان‌های کاری جلوگیری می‌کند.فایل مارک‌داون زیر را به فارسی ترجمه کرده‌ام. برای کد، فقط نظرات را ترجمه کرده‌ام و هیچ نظر اضافی‌ای در ابتدای فایل اضافه نکرده‌ام.

کنترل دسترسی مبتنی بر نقش (RBAC)

با استفاده از RBAC، می‌توانید کنترل کنید که چه کسی می‌تواند DAG‌ها را مشاهده، ویرایش یا اجرا کند و دسترسی به اطلاعات حساس یا وظایف حیاتی را محدود کنید. Airflow یک مدل مجوز انعطاف‌پذیر ارائه می‌دهد که به شما امکان تعریف نقش‌ها و مجوزهای سفارشی بر اساس الزامات امنیتی سازمان خود را می‌دهد.

مکانیزم‌های احراز هویت و مجوز

Airflow انواع مختلفی از مکانیزم‌های احراز هویت و مجوز را برای تأمین امنیت دسترسی به رابط کاربری وب و API ارائه می‌دهد. این سیستم از چندین بک‌اند احراز هویت پشتیبانی می‌کند، از جمله:

  • احراز هویت مبتنی بر رمزعبور: کاربران می‌توانند با استفاده از نام کاربری و رمزعبور وارد سیستم شوند.
  • OAuth/OpenID Connect: Airflow می‌تواند با ارائه‌دهندگان هویت خارجی برای ورود واحد (SSO) و مدیریت متمرکز کاربران ادغام شود.
  • احراز هویت Kerberos: Airflow از احراز هویت Kerberos برای دسترسی امن در محیط‌های سازمانی پشتیبانی می‌کند.

علاوه بر احراز هویت، Airflow کنترل‌های مجوز را برای محدود کردن دسترسی به ویژگی‌های خاص، نماها و اقدامات بر اساس نقش‌ها و مجوزهای کاربر ارائه می‌دهد. این اطمینان می‌دهد که کاربران فقط می‌توانند اقداماتی را انجام دهند که توسط نقش‌های تخصیص‌یافته به آنها مجاز است.

اتصالات امن و مدیریت داده

Airflow امنیت اتصالات و مدیریت داده را اولویت می‌دهد. به شما امکان می‌دهد اطلاعات حساس مانند اطلاعات اعتبار پایگاه داده و کلیدهای API را به طور امن با استفاده از اشیاء اتصال ذخیره کنید. این اشیاء اتصال می‌توانند رمزگذاری شده و در یک بک‌اند امن مانند Hashicorp Vault یا AWS Secrets Manager ذخیره شوند.

هنگام تعامل با سیستم‌های خارجی، Airflow از پروتکل‌های ارتباطی امن مانند SSL/TLS پشتیبانی می‌کند تا داده‌ها را در حین انتقال رمزگذاری کند. همچنین مکانیزم‌هایی را برای مدیریت و پنهان‌سازی داده‌های حساس مانند اطلاعات شناسایی فردی (PII) یا اطلاعات محرمانه کسب‌وکار فراهم می‌کند تا از نمایش آنها در سیاهه‌ها یا رابط‌های کاربری جلوگیری شود.

معماری Apache Airflow

اجزای اصلی

زمان‌بند

زمان‌بند یک جزء اصلی Airflow است که مسئول زمان‌بندی و فراخوانی اجرای وظایف است. این مؤلفه به طور مداوم DAG‌ها و وظایف مرتبط با آنها را پایش می‌کند.اینجا ترجمه فارسی فایل مارک‌داون است:

زمان‌بندی کننده

زمان‌بندی کننده مسئول اجرای وظایف مرتبط است و با بررسی برنامه‌ها و وابستگی‌های آنها، زمان اجرای آنها را تعیین می‌کند.

زمان‌بندی کننده تعاریف DAG را از دایرکتوری تنظیم‌شده DAG می‌خواند و برای هر DAG فعال بر اساس برنامه‌ریزی آن، یک اجرای DAG ایجاد می‌کند. سپس وظایف را به اجراکننده‌های موجود برای اجرا تخصیص می‌دهد و عواملی مانند وابستگی‌های وظیفه، اولویت و در دسترس بودن منابع را در نظر می‌گیرد.

وب‌سرور

وب‌سرور مؤلفه‌ای است که رابط کاربری وب Airflow را ارائه می‌دهد. این رابط کاربری دوست‌داشتنی برای مدیریت و پایش DAG‌ها، وظایف و اجرای آنها است. وب‌سرور با زمان‌بندی کننده و پایگاه داده متا‌داده ارتباط برقرار می‌کند تا اطلاعات مربوطه را بازیابی و نمایش دهد.

وب‌سرور مدیریت احراز هویت و مجوز کاربران را انجام می‌دهد و به کاربران امکان ورود و دسترسی به رابط کاربری را بر اساس نقش و مجوزهای تخصیص‌یافته به آنها می‌دهد. همچنین API‌هایی را برای تعامل برنامه‌ای با Airflow ارائه می‌کند که امکان یکپارچه‌سازی با سیستم‌ها و ابزارهای خارجی را فراهم می‌کند.

اجراکننده

اجراکننده مسئول اجرای واقعی وظایف تعریف‌شده در یک DAG است. Airflow انواع مختلفی از اجراکننده‌ها را پشتیبانی می‌کند که هر کدام ویژگی‌ها و موارد استفاده خاص خود را دارند. اجراکننده وظایف را از زمان‌بندی کننده دریافت می‌کند و آنها را اجرا می‌کند.

یکپارچه‌سازی با ابزارها و سیستم‌های دیگر

پردازش داده و ETL

یکپارچه‌سازی با Apache Spark

Apache Airflow به طور روان با Apache Spark، یک چارچوب قدرتمند پردازش داده توزیع‌شده، یکپارچه می‌شود. Airflow عملگرها و قالب‌های داخلی برای تعامل با Spark را ارائه می‌دهد که به شما امکان ارسال کارهای Spark، پایش پیشرفت آنها و بازیابی نتایج را می‌دهد.

SparkSubmitOperator به شما امکان ارسال برنامه‌های Spark به یک خوشه Spark را مستقیماً از DAG‌های Airflow خود می‌دهد. می‌توانید پارامترهای برنامه Spark مانند کلاس اصلی، آرگومان‌های برنامه و خصوصیات پیکربندی را مشخص کنید.

اینجا مثالی از استفاده از SparkSubmitOperator برای ارسال یک کار Spark است:

from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
 
spark_submit_task = Spar.
 
kSubmitOperator(
    task_id='spark_submit_task',
    application='/path/to/your/spark/app.jar',
    name='your_spark_job',
    conn_id='spark_default',
    conf={
        'spark.executor.cores': '2',
        'spark.executor.memory': '4g',
    },
    dag=dag,
)

ادغام با Apache Hadoop و HDFS

Airflow با Apache Hadoop و HDFS (سیستم فایل توزیع شده Hadoop) ادغام می‌شود تا پردازش و ذخیره‌سازی داده در محیط Hadoop را امکان‌پذیر کند. Airflow اپراتورها و قالب‌های (hooks) را برای تعامل با HDFS فراهم می‌کند، که به شما امکان انجام عملیات فایل، اجرای کارهای Hadoop و مدیریت داده در HDFS را می‌دهد.

HdfsSensor به شما امکان می‌دهد تا قبل از انجام وظایف بعدی، منتظر حضور یک فایل یا دایرکتوری در HDFS بمانید. HdfsHook روش‌هایی را برای تعامل برنامه‌ای با HDFS فراهم می‌کند، مانند بارگذاری فایل‌ها، لیست کردن دایرکتوری‌ها و حذف داده‌ها.

اینجا مثالی از استفاده از HdfsHook برای بارگذاری یک فایل به HDFS آورده شده است:

from airflow.hooks.hdfs_hook import HdfsHook
 
def upload_to_hdfs(**context):
    hdfs_hook = HdfsHook(hdfs_conn_id='hdfs_default')
    local_file = '/path/to/local/file.txt'
    hdfs_path = '/path/to/hdfs/destination/'
    hdfs_hook.upload_file(local_file, hdfs_path)
 
upload_task = PythonOperator(
    task_id='upload_to_hdfs',
    python_callable=upload_to_hdfs,
    dag=dag,
)

ادغام با چارچوب‌های پردازش داده

Airflow با چارچوب‌های مختلف پردازش داده مانند Pandas و Hive ادغام می‌شود تا پردازش و تحلیل داده را در جریان‌های کاری تسهیل کند.

به عنوان مثال، می‌توانید از PandasOperator برای اجرای کد Pandas در یک وظیفه Airflow استفاده کنید. این به شما امکان می‌دهد تا از قدرت Pandas برای تمیزکاری، تبدیل و تحلیل داده در وظایف خود بهره ببرید.

همچنین، Airflow اپراتورها و قالب‌های (hooks) را برای تعامل با Hive فراهم می‌کند، مانند HiveOperator برای اجرای پرس‌وجوهای Hive و HiveServer2Hook برای اتصال به سرور Hive.

پلتفرم‌ها و خدمات ابری

ادغام با AWS

Airflow با انواع خدمات AWS ادغام می‌شود.اینجا ترجمه فارسی فایل مارک‌داون است:

آمازون وب سرویس‌ها (AWS) برای فعال‌سازی پردازش داده، ذخیره‌سازی و استقرار در محیط ابری AWS.

  • آمازون S3: Airflow ابزارهای S3Hook و S3Operator را برای تعامل با ذخیره‌سازی آمازون S3 فراهم می‌کند. می‌توانید از این ابزارها برای بارگذاری فایل‌ها به S3، دانلود فایل‌ها از S3 و انجام سایر عملیات‌های S3 در جریان‌های کاری خود استفاده کنید.

  • آمازون EC2: Airflow می‌تواند با استفاده از EC2Operator نمونه‌های آمازون EC2 را راه‌اندازی و مدیریت کند. این امکان را فراهم می‌کند که منابع محاسباتی را به صورت پویا برای وظایف خود تأمین کرده و جریان‌های کاری را بر اساس تقاضا مقیاس‌بندی کنید.

  • آمازون Redshift: Airflow با آمازون Redshift، یک سرویس انبار داده ابری، یکپارچه است. می‌توانید از RedshiftHook و RedshiftOperator برای اجرای پرس‌وجو، بارگذاری داده در جداول Redshift و انجام تحول داده استفاده کنید.

یکپارچه‌سازی با GCP

Airflow با سرویس‌های پلتفرم ابری گوگل (GCP) یکپارچه است تا از قابلیت‌های اکوسیستم GCP بهره ببرد.

  • Google Cloud Storage (GCS): Airflow ابزارهای GCSHook و GCSOperator را برای تعامل با Google Cloud Storage فراهم می‌کند. می‌توانید از این ابزارها برای بارگذاری فایل‌ها به GCS، دانلود فایل‌ها از GCS و انجام سایر عملیات‌های GCS در جریان‌های کاری خود استفاده کنید.

  • BigQuery: Airflow با BigQuery، سرویس انبار داده کاملاً مدیریت‌شده گوگل، یکپارچه است. می‌توانید از BigQueryHook و BigQueryOperator برای اجرای پرس‌وجو، بارگذاری داده در جداول BigQuery و انجام وظایف تحلیل داده استفاده کنید.

  • Dataflow: Airflow می‌تواند با استفاده از DataflowCreateJavaJobOperator و DataflowCreatePythonJobOperator کارهای Google Cloud Dataflow را ارکسترا کند. این امکان را فراهم می‌کند که خطوط لوله پردازش داده موازی را اجرا کرده و از مقیاس‌پذیری Dataflow در جریان‌های کاری Airflow خود بهره ببرید.

یکپارچه‌سازی با Azure

Airflow با سرویس‌های مایکروسافت Azure یکپارچه است تا پردازش داده و ذخیره‌سازی را در محیط ابری Azure فعال کند.

  • Azure Blob Storage: Airflow ابزارهای AzureBlobStorageHook و AzureBlobStorageOperator را برای تعامل با Azure Blob Storage فراهم می‌کند. می‌توانید از این ابزارها برای بارگذاری.فایل پرشین:

  • توابع Azure: Airflow می‌تواند با استفاده از AzureFunctionOperator توابع Azure را فراخوانی کند. این امکان را فراهم می‌کند که توابع بدون سرور را به عنوان بخشی از جریان‌های کاری Airflow اجرا کنید و معماری‌های رویدادمحور و بدون سرور را فعال سازید.

سایر ادغام‌ها

ادغام با ابزارهای تصویرسازی داده

Airflow می‌تواند با ابزارهای تصویرسازی داده مانند Tableau و Grafana ادغام شود تا تصویرسازی داده و گزارش‌گیری را در جریان‌های کاری فعال کند.

به عنوان مثال، می‌توانید از TableauOperator برای تازه‌سازی استخراج‌های Tableau یا انتشار کتاب‌کارها در سرور Tableau استفاده کنید. همچنین، Airflow می‌تواند به‌روزرسانی داشبورد Grafana را فراخوانی کند یا داده را به Grafana برای پایش و تصویرسازی آنی ارسال کند.

ادغام با چارچوب‌های یادگیری ماشینی

Airflow با چارچوب‌های محبوب یادگیری ماشینی مانند TensorFlow و PyTorch ادغام می‌شود، به شما امکان می‌دهد تا وظایف یادگیری ماشینی را در جریان‌های کاری خود ادغام کنید.

می‌توانید از Airflow برای ارکسترسازی آموزش، ارزیابی و استقرار مدل‌های یادگیری ماشینی استفاده کنید. به عنوان مثال، می‌توانید از PythonOperator برای اجرای کد TensorFlow یا PyTorch برای آموزش مدل استفاده کنید و سپس از سایر عملگرها برای استقرار مدل‌های آموزش‌دیده یا انجام وظایف استنتاج استفاده کنید.

ادغام با سیستم‌های کنترل نسخه

Airflow می‌تواند با سیستم‌های کنترل نسخه مانند Git ادغام شود تا کنترل نسخه و همکاری برای DAG‌ها و جریان‌های کاری شما را فعال کند.

می‌توانید DAG‌های Airflow و فایل‌های مرتبط را در یک مخزن Git ذخیره کنید، که به شما امکان می‌دهد تغییرات را ردیابی کنید، با اعضای تیم همکاری کنید و نسخه‌های مختلف جریان‌های کاری خود را مدیریت کنید. Airflow می‌تواند برای بارگیری DAG‌ها از یک مخزن Git پیکربندی شود، که ادغام آن با سیستم کنترل نسخه شما را تسهیل می‌کند.

موارد استفاده و مثال‌های واقعی

خط‌های لوله داده و ETL

ساخت خط‌های لوله استخراج و تبدیل داده

Airflow به طور معمول برای ساخت خط‌های لوله استخراج و تبدیل داده استفاده می‌شود.اینجا ترجمه فارسی فایل مارک‌داون است:

شما می‌توانید DAG هایی ایجاد کنید که مراحل استخراج داده از منابع مختلف، اعمال تغییرات و بارگذاری داده در سیستم‌های هدف را تعریف می‌کنند.

به عنوان مثال، می‌توانید از Airflow برای موارد زیر استفاده کنید:

  • استخراج داده از پایگاه‌های داده، API ها یا سیستم‌های فایلی.
  • انجام عملیات پاکسازی، فیلتر کردن و تجمیع داده.
  • اعمال منطق کسب و کار پیچیده و تغییرات داده.
  • بارگذاری داده تبدیل شده در انبارهای داده یا پلتفرم‌های تحلیلی.

برنامه‌ریزی و هماهنگ‌سازی جریان‌های کاری ETL

Airflow در برنامه‌ریزی و هماهنگ‌سازی جریان‌های کاری ETL (استخراج، تبدیل، بارگذاری) تبحر دارد. می‌توانید وابستگی‌های بین وظایف را تعریف کنید، برنامه‌های زمانی را تنظیم کنید و اجرای خطوط لوله ETL را پایش کنید.

با Airflow می‌توانید:

  • برنامه‌ریزی کارهای ETL برای اجرا در فواصل زمانی مشخص (مانند ساعتی، روزانه، هفتگی).
  • تعریف وابستگی‌های وظایف برای اطمینان از ترتیب صحیح اجرا.
  • مدیریت خطاها و تلاش‌های مجدد در وظایف ETL.
  • پایش پیشرفت و وضعیت جریان‌های کاری ETL.

یادگیری ماشینی و علوم داده

خودکارسازی آموزش و استقرار مدل

Airflow می‌تواند فرآیند آموزش و استقرار مدل‌های یادگیری ماشینی را خودکار کند. می‌توانید DAG هایی ایجاد کنید که مراحل آماده‌سازی داده، آموزش مدل، ارزیابی و استقرار را در بر می‌گیرند.

به عنوان مثال، می‌توانید از Airflow برای موارد زیر استفاده کنید:

  • پیش‌پردازش و مهندسی ویژگی داده‌های آموزشی.
  • آموزش مدل‌های یادگیری ماشینی با استفاده از کتابخانه‌هایی مانند scikit-learn، TensorFlow یا PyTorch.
  • ارزیابی عملکرد مدل و انتخاب بهترین مدل.
  • استقرار مدل آموزش‌دیده در محیط تولید.
  • برنامه‌ریزی آموزش مجدد و به‌روزرسانی مدل به طور منظم.

هماهنگ‌سازی وظایف پیش‌پردازش داده و مهندسی ویژگی

Airflow می‌تواند وظایف پیش‌پردازش داده و مهندسی ویژگی را در جریان‌های کاری یادگیری ماشینی هماهنگ کند. می‌توانید وظایفی تعریف کنید که عملیات پاکسازی داده، نرمال‌سازی، انتخاب ویژگی و تبدیل ویژگی را انجام می‌دهند.

با Airflow می‌توانید:

  • اجرای وظایف پیش‌پردازش داده با استفاده از کتابخانه‌هایی مانند Pandas یا PySpark.
  • اعمال تکنیک‌های مهندسی ویژگی.ترجمه فارسی:

استفاده از Airflow برای ایجاد ویژگی‌های اطلاعاتی

  • مدیریت وابستگی‌های داده و اطمینان از یکپارچگی داده‌ها.
  • ادغام وظایف پیش‌پردازش داده با آموزش و ارزیابی مدل.

DevOps و CI/CD

ادغام Airflow با خطوط لوله CI/CD

Airflow می‌تواند در خطوط لوله CI/CD (Continuous Integration/Continuous Deployment) ادغام شود تا فرآیند استقرار و آزمایش جریان‌های کاری را خودکار کند. می‌توانید از Airflow برای هماهنگی فرآیند استقرار و اطمینان از انتقال روان جریان‌های کاری از توسعه به تولید استفاده کنید.

به عنوان مثال، می‌توانید از Airflow برای:

  • راه‌اندازی استقرار جریان‌های کاری بر اساس تغییرات کد یا رویدادهای Git.
  • اجرای آزمون‌ها و بررسی‌های کیفیت بر روی جریان‌های کاری قبل از استقرار.
  • هماهنگی استقرار جریان‌های کاری در محیط‌های مختلف (مانند مرحله آزمایشی، تولید).
  • پایش و بازگرداندن استقرارها در صورت لزوم.

خودکارسازی وظایف استقرار و تأمین زیرساخت

Airflow می‌تواند وظایف استقرار و تأمین زیرساخت را خودکار کند، که این امر مدیریت و مقیاس‌پذیری جریان‌های کاری را آسان‌تر می‌کند. می‌توانید وظایفی تعریف کنید که منابع ابری را تأمین و پیکربندی می‌کنند، محیط‌ها را پیکربندی می‌کنند و برنامه‌ها را مستقر می‌کنند.

با Airflow می‌توانید:

  • منابع ابری را با استفاده از ارائه‌دهندگانی مانند AWS، GCP یا Azure تأمین و پیکربندی کنید.
  • وظایف زیرساخت‌به‌عنوان‌کد را با استفاده از ابزارهایی مانند Terraform یا CloudFormation اجرا کنید.
  • برنامه‌ها و خدمات را مستقر و پیکربندی کنید.
  • چرخه عمر منابع را مدیریت و وظایف پاکسازی را انجام دهید.

بهترین شیوه‌ها و نکات

طراحی و سازماندهی DAG

ساختاردهی DAG‌ها برای قابلیت نگهداری و خوانایی

هنگام طراحی DAG‌های Airflow، مهم است که آن‌ها را به شکلی ساختاردهی کنید که قابلیت نگهداری و خوانایی را ارتقا دهد. اینجا چند نکته وجود دارد:

  • از نام‌های معنادار و توصیفی برای DAG‌ها و وظایف استفاده کنید.

  • وظایف را در گروه‌ها یا بخش‌های منطقی در DAG سازمان‌دهی کنید.

  • از وابستگی‌های وظیفه برای تعریف جریان و ترتیب اجرا استفاده کنید.

  • DAG‌ها را مختصر و متمرکز بر یک جریان کاری یا هدف خاص نگه دارید.

  • از نظرات و docstring‌ها برای ارائه توضیحات استفاده کنید.### مدول‌سازی وظایف و استفاده از اجزای قابل استفاده مجدد برای بهبود قابلیت استفاده مجدد و نگهداری کد، در نظر بگیرید که وظایف را مدول‌سازی کرده و از اجزای قابل استفاده مجدد در DAG های Airflow خود استفاده کنید.

  • عملکرد مشترک را در توابع یا کلاس‌های جداگانه Python استخراج کنید.

  • از SubDagOperator Airflow برای کپسول‌بندی زیرمجموعه‌های قابل استفاده مجدد وظایف استفاده کنید.

  • از BaseOperator Airflow برای ایجاد عملگرهای سفارشی و قابل استفاده مجدد استفاده کنید.

  • از PythonOperator Airflow با توابع قابل فراخوانی برای منطق خاص وظیفه استفاده کنید.

بهینه‌سازی عملکرد

تنظیم پیکربندی Airflow برای عملکرد بهینه

برای بهینه‌سازی عملکرد پیاده‌سازی Airflow خود، در نظر بگیرید که پیکربندی‌های زیر را تنظیم کنید:

  • تنظیمات اجرا کننده: اجرا کننده مناسب (مانند LocalExecutor، CeleryExecutor، KubernetesExecutor) را بر اساس نیازهای مقیاس‌پذیری و همزمانی خود انتخاب کنید.
  • موازی‌سازی: پارامتر parallelism را تنظیم کنید تا تعداد حداکثر وظایفی که می‌توانند همزمان اجرا شوند را کنترل کنید.
  • همزمانی: پارامترهای dag_concurrency و max_active_runs_per_dag را تنظیم کنید تا تعداد اجراهای همزمان DAG و وظایف را محدود کنید.
  • منابع کارگر: بر اساس بار کاری و نیازهای وظیفه، منابع کافی (مانند CPU، حافظه) را به کارگران Airflow اختصاص دهید.

بهینه‌سازی اجرای وظیفه و استفاده از منابع

برای بهینه‌سازی اجرای وظیفه و استفاده از منابع، به این روش‌ها توجه کنید:

  • از عملگرها و قالب‌های مناسب برای اجرای کارآمد وظایف استفاده کنید.
  • از وظایف گران قیمت یا طولانی مدت در DAG ها به حداقل استفاده کنید.
  • از پول‌های وظیفه برای محدود کردن تعداد وظایف همزمان و مدیریت استفاده از منابع استفاده کنید.
  • از ویژگی XCom Airflow برای به اشتراک‌گذاری داده‌های سبک بین وظایف استفاده کنید.
  • عملکرد وظیفه را پایش و پروفایل کنید تا گلوگاه‌ها را شناسایی و بهینه‌سازی کنید.

آزمایش و اشکال‌زدایی

نوشتن آزمون‌های واحد برای DAG ها و وظایف

برای اطمینان از قابلیت اعتماد و صحت جریان‌های کاری Airflow خود، مهم است که آزمون‌های واحد برای DAG ها و وظایف خود بنویسید. اینجا چند مورد است.فارسی:

  • از ماژول unittest Airflow برای ایجاد موارد آزمون برای DAG ها و وظایف خود استفاده کنید.
  • وابستگی های خارجی و خدمات را مسخره کنید تا محدوده آزمایش را جدا کنید.
  • وظایف فردی و رفتار مورد انتظار آنها را آزمایش کنید.
  • صحت وابستگی های وظیفه و ساختار DAG را تأیید کنید.
  • موارد لبه و سناریوهای خطا را آزمایش کنید تا اطمینان حاصل شود که به درستی مدیریت می شوند.

تکنیک های اشکال زدایی و عیب یابی

هنگام اشکال زدایی و عیب یابی جریان های کاری Airflow، به این تکنیک ها توجه کنید:

  • از رابط کاربری وب Airflow برای پایش وضعیت وظیفه و DAG، گزارش ها و پیام های خطا استفاده کنید.
  • ثبت گزارش مفصل را فعال کنید تا اطلاعات مفصلی درباره اجرای وظیفه ثبت شود.
  • از دستورات print Airflow یا ماژول logging Python برای افزودن گزارش های ثبت سفارشی استفاده کنید.
  • از عملگر PDB (Python Debugger) Airflow برای تنظیم نقاط توقف و اشکال زدایی تعاملی وظایف استفاده کنید.
  • گزارش های وظیفه و ردیابی های انباشته را تجزیه و تحلیل کنید تا ریشه مشکل را شناسایی کنید.
  • از دستور airflow test Airflow برای آزمایش وظایف فردی در انزوا استفاده کنید.

مقیاس پذیری و پایش

راهبردهای مقیاس پذیری مستقرسازی های Airflow

همانطور که جریان های کاری Airflow شما از نظر پیچیدگی و مقیاس رشد می کنند، راهبردهای زیر را برای مقیاس پذیری مستقرسازی Airflow خود در نظر بگیرید:

  • کارگران Airflow را به صورت افقی مقیاس دهید با افزودن گره های کارگر بیشتر برای مدیریت همزمانی وظیفه افزایش یافته.
  • اجزای Airflow (مانند排程器، وب سرور) را به صورت عمودی مقیاس دهید با اختصاص دادن منابع بیشتر (CPU، حافظه) برای مدیریت بارهای بالاتر.
  • از یک اجرا کننده توزیع شده (مانند CeleryExecutor، KubernetesExecutor) برای توزیع وظایف در میان چندین گره کارگر استفاده کنید.
  • از CeleryExecutor Airflow با یک صف پیام (مانند RabbitMQ، Redis) برای مقیاس پذیری و تحمل خطای بهبود یافته استفاده کنید.
  • مکانیزم های مقیاس پذیری خودکار را پیاده سازی کنید تا به طور پویا تعداد کارگران را بر اساس تقاضای بار کاری تنظیم کنند.

پایش شاخص های Airflow و عملکرد

برای اطمینان از سلامت و عملکرد مستقرسازی Airflow خود، پایش شاخص های کلیدی و شاخص های عملکرد ضروری است. به این موارد توجه کنید.اینجا ترجمه فارسی فایل مارک‌داون است:

  • از رابط کاربری وب Airflow برای پایش وضعیت DAG و وظایف، زمان اجرا و نرخ موفقیت استفاده کنید.
  • Airflow را با ابزارهای پایش مانند Prometheus، Grafana یا Datadog ادغام کنید تا معیارها را جمع‌آوری و نمایش دهید.
  • معیارهای سطح سیستم مانند استفاده از CPU، مصرف حافظه و ورودی/خروجی دیسک اجزای Airflow را پایش کنید.
  • هشدارها و اعلان‌های مربوط به رویدادهای بحرانی مانند شکست وظایف یا استفاده بالای منابع را تنظیم کنید.
  • به طور منظم سیاهه‌های Airflow را بررسی و تحلیل کنید تا گلوگاه‌های عملکرد را شناسایی و جریان‌های کاری را بهینه سازی کنید.

نتیجه‌گیری

در این مقاله، Apache Airflow را که یک پلتفرم قدرتمند برای نگارش، زمان‌بندی و پایش جریان‌های کاری به صورت برنامه‌ای است، بررسی کردیم. مفاهیم کلیدی، معماری و ویژگی‌های Airflow از جمله DAG‌ها، وظایف، اپراتورها و اجراکننده‌ها را پوشش دادیم.

ادغام‌های مختلف موجود در Airflow را که امکان اتصال روان با چارچوب‌های پردازش داده، پلتفرم‌های ابری و ابزارهای خارجی را فراهم می‌کند، بحث کردیم. همچنین مورد استفاده واقعی Airflow در خط‌های لوله داده، جریان‌های کاری یادگیری ماشین و فرآیندهای CI/CD را بررسی کردیم.

علاوه بر این، به بهترین شیوه‌ها و نکات برای طراحی و سازماندهی DAG‌ها، بهینه‌سازی عملکرد، آزمایش و اشکال‌زدایی جریان‌های کاری و مقیاس‌پذیری مستقرسازی Airflow پرداختیم. با پیروی از این راهنماها، می‌توانید جریان‌های کاری مقاوم، قابل نگهداری و کارآمد با استفاده از Airflow بسازید.

خلاصه نکات کلیدی

  • Airflow یک پلتفرم متن‌باز برای نگارش، زمان‌بندی و پایش جریان‌های کاری به صورت برنامه‌ای است.

  • از DAG‌ها برای تعریف جریان‌های کاری به صورت کد با وظایف به عنوان واحدهای کاری استفاده می‌کند.

  • Airflow مجموعه غنی از اپراتورها و قلاب‌ها برای ادغام با سیستم‌ها و خدمات مختلف را ارائه می‌دهد.

  • انواع مختلف اجراکننده را برای مقیاس‌پذیری و توزیع اجرای وظایف پشتیبانی می‌کند.

  • Airflow امکان جریان‌های کاری پردازش داده، یادگیری ماشین و CI/CD را از طریق ادغام‌های گسترده خود فراهم می‌کند.

  • بهترین شیوه‌ها شامل ساختاردهی DAG‌ها برای قابلیت نگهداری، ...مدول سازی وظایف، بهینه سازی عملکرد و آزمایش و اشکال زدایی جریان کار

  • مقیاس دهی Airflow شامل راهبردهایی مانند مقیاس دهی افقی و عمودی، اجراکنندگان توزیع شده و مقیاس پذیری خودکار است.

  • پایش شاخص‌های Airflow و عملکرد آن برای اطمینان از سلامت و کارایی جریان کار بسیار مهم است.

تحولات آینده و نقشه راه Apache Airflow

Apache Airflow به طور فعال توسعه داده می‌شود و جامعه فعالی در حال مشارکت در رشد آن است. برخی از تحولات آینده و موارد نقشه راه شامل موارد زیر است:

  • بهبود رابط کاربری و تجربه کاربری رابط وب Airflow.
  • بهبود مقیاس پذیری و عملکرد Airflow، به ویژه برای استقرارهای بزرگ مقیاس.
  • گسترش اکوسیستم افزونه‌ها و ادغام‌های Airflow برای پشتیبانی از سیستم‌ها و خدمات بیشتر.
  • ساده‌سازی استقرار و مدیریت Airflow با استفاده از فناوری‌های کانتینرسازی و ارکسترسازی.
  • ادغام ویژگی‌های پیشرفته مانند تولید پویای وظایف و بازپخش خودکار وظایف.
  • بهبود مکانیزم‌های امنیتی و احراز هویت در Airflow.

همانطور که جامعه Airflow همچنان در حال رشد و تکامل است، می‌توان انتظار بهبودها و نوآوری‌های بیشتری در این پلتفرم را داشت که آن را برای مدیریت جریان کار قدرتمندتر و کاربر پسندتر می‌کند.

منابع برای یادگیری و اکتشاف بیشتر

برای کاوش و یادگیری بیشتر در مورد Apache Airflow، به منابع زیر توجه کنید:

منابع مرجع: https://airflow.apache.org/community/meetups/ (opens in a new tab)

با استفاده از این منابع و مشارکت فعال در جامعه Airflow، می‌توانید درک عمیق‌تری از Airflow پیدا کنید، از متخصصان با تجربه یاد بگیرید و به رشد و بهبود این پلتفرم کمک کنید.

Apache Airflow به عنوان یک پلتفرم پیشرو و متن‌باز برای مدیریت جریان کار ظهور کرده است و به مهندسان داده، دانشمندان داده و تیم‌های DevOps امکان می‌دهد تا جریان‌های کاری پیچیده را به راحتی ایجاد و هماهنگ کنند. ویژگی‌ها، ادغام‌ها و انعطاف‌پذیری گسترده آن، آن را به ابزاری ارزشمند در اکوسیستم داده تبدیل کرده است.

هنگامی که به سفر خود با Apache Airflow شروع می‌کنید، به یاد داشته باشید که از کوچک شروع کنید، با ویژگی‌ها و ادغام‌های مختلف آزمایش کنید و به طور مداوم جریان‌های کاری خود را بهبود و اصلاح کنید. با قدرت Airflow در دسترس شما، می‌توانید خط‌های لوله داده خود را ساده‌سازی کنید، جریان‌های کاری یادگیری ماشینی خود را خودکارسازی کنید و برنامه‌های کاربردی قدرتمند و مقیاس‌پذیر محور داده بسازید.