DE/Airflow

가상환경에서 에어플로우 구동하기

winter0pear 2023. 9. 3. 01:57

1. Anaconda 가상환경 만들기

$ conda create -n {가상환경 이름} python={파이썬 버전}

가상환경이 잘 생성 되었는지 확인하고 가상환경을 활성화 해준다

$ conda info -envs
$ source ~/anaconda3/bin/activate {가상환경 이름}

가상환경이 생성되면 앞에 가상환경 이름이 붙는다

 

2. Airflow 설치하기

가상환경을 활성화한 상태에서 Airflow를 설치해준다

shell scrip를 작성해서 한번에 설치되도록 해 주었다

#!/bin/bash

# Apache Airflow 버전 및 Python 버전 설정
AIRFLOW_VERSION=2.7.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

# 제약 사항(constraints) 파일 URL 생성
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

# Apache Airflow 설치
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# 설치 확인
airflow version

설치가 다 되었으면 airflow를 실행시켜 보자

(airflfow) $ airflow webserver

인터넷 주소창에 localhost:8080을 쳐서 airflow에 접속해본다

처음 서버를 실행시키면 로그인 창이 뜨게 된다

airflow server을 종료하고 airflow DB 초기화를 실행해주자

(airflow) $ airflow db init

이후 airflow에 접속하기 위해 계정을 생성해보자

airflow users create를 치면 계정생성 할때 필요한 옵션을 볼 수 있다

$ airflow users create \
-u {id} \
-f {이름} \
-l {성} \
-r {역할} \
-p {password} \
-e {email}

$ airflow users create \
--username {id} \
--firstname {이름} \
--lastname {성} \
--role {역할} \
--password {password} \
--email {email}

성과 이름은 나중에 airflow에서 이름을 볼때 이쁘게 나오게 하기 위해 반대로 적었다

처음 airflow에 접속을 하면 DAGs(Directed Acyclic Graph)에 여러가지 sample scheduler들이 있는 것을 확인할 수 있다

sample dag들이 보이지 않도록 지워주도록 하자

(airflow) $ ~/airflow/aiflow.cfg

load_example을 False로 설정해주자

 airflow에서 sample dag들이 지워졌는지 확인해보자

 

3. DAGs 생성하기

airflow_test.py 파일을 생성해서  airflow에 넣어보자

from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator

with DAG(
    "airflow_test",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
    "depends_on_past": False,
    "email": ["winter0pear@gmail.com"],
    "email_on_failure": True,
    "email_on_retry": True,
    "retries": 3,
    "retry_delay": timedelta(minutes=1),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function, # or list of functions
    # 'on_success_callback': some_other_function, # or list of functions
    # 'on_retry_callback': another_function, # or list of functions
    # 'sla_miss_callback': yet_another_function, # or list of functions
    # 'trigger_rule': 'all_success'
    },
    description="my first airflow task",
    schedule=timedelta(seconds=60),
    start_date=datetime(2023, 8, 1),
    catchup=False,
    tags=["test"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = dedent(
        """
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

DAG는 airflow 폴더 안의 dags에서 불러오는데 나 같은 경우엔 dags 폴더가 없어서 새로 만들어 주었다

dags 폴더안에 방금 작성한 airflow_test.py 파일을 집어넣고 airflow scheduler을 터미널에 입력해 주고 airflow webpage를 새로고침 해주면 airflow_test.py가 DAGs에 잘 들어간 것을 확인할 수 있다

(airflow) $ airflow scheduler

 

4. airflow_test.py가 잘 작동되는지 확인

몇가지 명령어를 통해 airflow_test.py가 잘 작동되는지 확인 할 수 있다

airflow tasks list airflow_test
airflow tasks list airflow_test --tree
airflow dags list

 

5. 참고

 

Apache 에어플로우(Airflow) 시작하기 - Airflow란?, Airflow 설치 및 기본 예제

포스팅 개요 본 포스팅은 Apache Airflow(에어플로우)에 대해서 정리하는 Airflow 시리즈 포스팅입니다. Airflow 포스팅에서는 아래와 같은 순서로 Airflow에 대해서 정리해보려고 합니다. Airflow란 무엇인

lsjsj92.tistory.com

 

 

Tutorial — Airflow Documentation

 

airflow.apache.org

 

'DE > Airflow' 카테고리의 다른 글

Airflow 구조 이해하기  (0) 2024.06.20
Airflow code editor 플러그인 설치하기  (0) 2023.09.07
EC2 에서 Airflow 이용해서 scheduling하기  (0) 2023.09.04