1. Anaconda 가상환경 만들기
$ conda create -n {가상환경 이름} python={파이썬 버전}
가상환경이 잘 생성 되었는지 확인하고 가상환경을 활성화 해준다
$ conda info -envs
$ source ~/anaconda3/bin/activate {가상환경 이름}
2. Airflow 설치하기
가상환경을 활성화한 상태에서 Airflow를 설치해준다
shell scrip를 작성해서 한번에 설치되도록 해 주었다
#!/bin/bash
# Apache Airflow 버전 및 Python 버전 설정
AIRFLOW_VERSION=2.7.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# 제약 사항(constraints) 파일 URL 생성
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# Apache Airflow 설치
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
# 설치 확인
airflow version
설치가 다 되었으면 airflow를 실행시켜 보자
(airflfow) $ airflow webserver
인터넷 주소창에 localhost:8080을 쳐서 airflow에 접속해본다
처음 서버를 실행시키면 로그인 창이 뜨게 된다
airflow server을 종료하고 airflow DB 초기화를 실행해주자
(airflow) $ airflow db init
이후 airflow에 접속하기 위해 계정을 생성해보자
$ airflow users create \
-u {id} \
-f {이름} \
-l {성} \
-r {역할} \
-p {password} \
-e {email}
$ airflow users create \
--username {id} \
--firstname {이름} \
--lastname {성} \
--role {역할} \
--password {password} \
--email {email}
처음 airflow에 접속을 하면 DAGs(Directed Acyclic Graph)에 여러가지 sample scheduler들이 있는 것을 확인할 수 있다
sample dag들이 보이지 않도록 지워주도록 하자
(airflow) $ ~/airflow/aiflow.cfg
airflow에서 sample dag들이 지워졌는지 확인해보자
3. DAGs 생성하기
airflow_test.py 파일을 생성해서 airflow에 넣어보자
from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
"airflow_test",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"email": ["winter0pear@gmail.com"],
"email_on_failure": True,
"email_on_retry": True,
"retries": 3,
"retry_delay": timedelta(minutes=1),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'trigger_rule': 'all_success'
},
description="my first airflow task",
schedule=timedelta(seconds=60),
start_date=datetime(2023, 8, 1),
catchup=False,
tags=["test"],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
t1.doc_md = dedent(
"""
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
DAG는 airflow 폴더 안의 dags에서 불러오는데 나 같은 경우엔 dags 폴더가 없어서 새로 만들어 주었다
dags 폴더안에 방금 작성한 airflow_test.py 파일을 집어넣고 airflow scheduler을 터미널에 입력해 주고 airflow webpage를 새로고침 해주면 airflow_test.py가 DAGs에 잘 들어간 것을 확인할 수 있다
(airflow) $ airflow scheduler
4. airflow_test.py가 잘 작동되는지 확인
몇가지 명령어를 통해 airflow_test.py가 잘 작동되는지 확인 할 수 있다
airflow tasks list airflow_test
airflow tasks list airflow_test --tree
airflow dags list
5. 참고
Apache 에어플로우(Airflow) 시작하기 - Airflow란?, Airflow 설치 및 기본 예제
포스팅 개요 본 포스팅은 Apache Airflow(에어플로우)에 대해서 정리하는 Airflow 시리즈 포스팅입니다. Airflow 포스팅에서는 아래와 같은 순서로 Airflow에 대해서 정리해보려고 합니다. Airflow란 무엇인
lsjsj92.tistory.com
Tutorial — Airflow Documentation
airflow.apache.org
'DE > Airflow' 카테고리의 다른 글
Airflow 구조 이해하기 (0) | 2024.06.20 |
---|---|
Airflow code editor 플러그인 설치하기 (0) | 2023.09.07 |
EC2 에서 Airflow 이용해서 scheduling하기 (0) | 2023.09.04 |