반응형
Apache Airflow DAG 생성부터 실행까지 완벽 가이드
개요
데이터 엔지니어링 환경에서 복잡한 워크플로우를 자동화하고 관리하는 것은 필수적입니다. 매일 정해진 시간에 실행되어야 하는 배치 작업, 작업 간 의존성 관리, 실패 시 재시도 및 알림 등 수많은 요소를 효율적으로 처리해야 합니다.
Apache Airflow는 이러한 데이터 파이프라인을 코드로 정의하고, 스케줄링하며, 모니터링할 수 있는 강력한 오픈소스 플랫폼입니다. Python 코드로 워크플로우를 작성하고 시각적으로 모니터링하며, REST API를 통해 외부 시스템과 연동할 수 있습니다.
이 글에서는 DAG 생성 방법부터 Airflow 설정, 자동/수동 실행, 그리고 다른 백엔드 서버에서 REST API를 통해 배치 작업을 호출하는 방법까지 실무에 필요한 모든 내용을 상세히 다룹니다.
Apache Airflow는 이러한 데이터 파이프라인을 코드로 정의하고, 스케줄링하며, 모니터링할 수 있는 강력한 오픈소스 플랫폼입니다. Python 코드로 워크플로우를 작성하고 시각적으로 모니터링하며, REST API를 통해 외부 시스템과 연동할 수 있습니다.
이 글에서는 DAG 생성 방법부터 Airflow 설정, 자동/수동 실행, 그리고 다른 백엔드 서버에서 REST API를 통해 배치 작업을 호출하는 방법까지 실무에 필요한 모든 내용을 상세히 다룹니다.
목차
1. Airflow DAG의 이해와 기본 구조
2. DAG 생성 및 Airflow 업로드 방법
3. Airflow 설치 및 설정 방법
4. DAG 자동 실행과 수동 실행 방법
5. 백엔드 서버에서 REST API로 배치 작업 실행하기
1. Airflow DAG의 이해와 기본 구조
1) DAG(Directed Acyclic Graph)란?
DAG는 방향성 비순환 그래프(Directed Acyclic Graph)의 약자로, Airflow에서 워크플로우를 표현하는 핵심 개념입니다. DAG는 작업들과 그 의존성을 정의한 Python 파일로 구성됩니다.
(1) DAG의 핵심 특징
① Directed(방향성) - 작업들 사이에 명확한 방향이 있습니다. Task A에서 Task B로 흐름이 진행됩니다.
② Acyclic(비순환) - 순환 구조가 허용되지 않습니다. A → B → C → A와 같은 순환은 불가능하며, 이를 통해 무한 루프를 방지하고 워크플로우의 시작과 끝을 명확하게 합니다.
③ Graph(그래프) - 작업들을 노드로, 의존성을 간선으로 표현한 그래프 구조입니다.
(2) DAG를 사용하는 이유
전통적인 cron 스케줄러와 달리 DAG는 작업 간 의존성을 명확하게 정의할 수 있습니다. 예를 들어, 데이터 추출 작업이 완료된 후에만 데이터 변환 작업이 실행되도록 보장할 수 있으며, 실패 시 자동 재시도, 알림 전송 등의 기능을 쉽게 구현할 수 있습니다.
. . . . .
2) DAG의 주요 구성 요소
(1) Task와 Operator
Task는 DAG 내에서 실행되는 개별 작업 단위이며, Operator는 Task가 실제로 수행할 작업의 유형을 정의합니다.
① PythonOperator - Python 함수를 실행합니다. 데이터 처리 로직을 Python으로 작성할 때 사용합니다.
② BashOperator - Bash 명령을 실행합니다. 셸 스크립트나 시스템 명령을 실행할 때 유용합니다.
③ SQLOperator - SQL 쿼리를 실행합니다. 데이터베이스 작업에 활용됩니다.
④ SensorOperator - 특정 조건이 충족될 때까지 대기합니다. 파일이 생성될 때까지 기다리는 등의 작업에 사용됩니다.
(2) Task 의존성
Airflow에서는 >> 연산자를 사용하여 Task 간의 의존성을 정의합니다. 예를 들어, task_a >> task_b는 task_a가 완료된 후 task_b가 실행됨을 의미합니다.
# Task 의존성 정의 예시
task_extract >> task_transform >> task_load # 순차 실행
# 병렬 실행 후 하나의 Task로 수렴
task_a >> [task_b, task_c] >> task_d
task_extract >> task_transform >> task_load # 순차 실행
# 병렬 실행 후 하나의 Task로 수렴
task_a >> [task_b, task_c] >> task_d
. . . . .
3) DAG 파라미터
DAG를 생성할 때 필수적으로 설정해야 하는 주요 파라미터들이 있습니다.
(1) 필수 파라미터
① dag_id - DAG의 고유 식별자입니다. 모든 DAG에서 유일해야 합니다.
② start_date - DAG가 스케줄링을 시작할 날짜와 시간입니다. 과거 날짜를 지정할 수 있으며, catchup 옵션과 함께 사용됩니다.
③ schedule_interval - DAG 실행 주기를 정의합니다. cron 표현식 또는 preset을 사용합니다.
(2) 주요 선택 파라미터
① catchup - False로 설정 시 과거 실행 건을 건너뛰고 현재부터 시작합니다.
② default_args - 모든 Task에 적용될 기본 인자를 딕셔너리로 정의합니다.
③ tags - DAG를 분류하고 필터링하기 위한 태그 목록입니다.
2. DAG 생성 및 Airflow 업로드 방법
1) 기본 DAG 파일 작성
DAG 파일은 Python 스크립트로 작성되며, dags 디렉토리에 저장됩니다. 다음은 간단한 ETL(Extract, Transform, Load) 워크플로우의 예시입니다.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# 기본 인자 정의
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['admin@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
# DAG 정의
with DAG(
dag_id='etl_pipeline',
default_args=default_args,
description='데이터 ETL 파이프라인',
schedule_interval='0 9 * * *', # 매일 오전 9시
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'production'],
) as dag:
# Task 함수 정의
def extract_data():
print("데이터 추출 중...")
return {'records': 1000}
def transform_data(**kwargs):
ti = kwargs['ti']
data = ti.xcom_pull(task_ids='extract')
print(f"데이터 변환 중... {data}")
return {'processed': data['records']}
def load_data(**kwargs):
ti = kwargs['ti']
data = ti.xcom_pull(task_ids='transform')
print(f"데이터 적재 중... {data}")
# Task 정의
task_extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
)
task_transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
)
task_load = PythonOperator(
task_id='load',
python_callable=load_data,
)
task_notify = BashOperator(
task_id='notify',
bash_command='echo "ETL 작업 완료"',
)
# Task 의존성 정의
task_extract >> task_transform >> task_load >> task_notify
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# 기본 인자 정의
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['admin@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
# DAG 정의
with DAG(
dag_id='etl_pipeline',
default_args=default_args,
description='데이터 ETL 파이프라인',
schedule_interval='0 9 * * *', # 매일 오전 9시
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'production'],
) as dag:
# Task 함수 정의
def extract_data():
print("데이터 추출 중...")
return {'records': 1000}
def transform_data(**kwargs):
ti = kwargs['ti']
data = ti.xcom_pull(task_ids='extract')
print(f"데이터 변환 중... {data}")
return {'processed': data['records']}
def load_data(**kwargs):
ti = kwargs['ti']
data = ti.xcom_pull(task_ids='transform')
print(f"데이터 적재 중... {data}")
# Task 정의
task_extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
)
task_transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
)
task_load = PythonOperator(
task_id='load',
python_callable=load_data,
)
task_notify = BashOperator(
task_id='notify',
bash_command='echo "ETL 작업 완료"',
)
# Task 의존성 정의
task_extract >> task_transform >> task_load >> task_notify
. . . . .
2) DAG 파일 업로드 방법
(1) dags 디렉토리에 직접 배치
작성한 DAG 파일을 Airflow의 dags 디렉토리에 저장하면 Scheduler가 자동으로 파일을 감지합니다. 기본 경로는 $AIRFLOW_HOME/dags이며, 일반적으로 ~/airflow/dags입니다.
# DAG 파일을 dags 디렉토리로 복사
cp etl_pipeline.py ~/airflow/dags/
# 또는 Docker 환경에서
docker cp etl_pipeline.py airflow-scheduler:/opt/airflow/dags/
cp etl_pipeline.py ~/airflow/dags/
# 또는 Docker 환경에서
docker cp etl_pipeline.py airflow-scheduler:/opt/airflow/dags/
(2) Git을 활용한 배포
프로덕션 환경에서는 Git 저장소를 통한 버전 관리가 권장됩니다. DAG 파일을 Git에 커밋하고, CI/CD 파이프라인을 통해 자동으로 배포할 수 있습니다.
# Git 저장소 클론
cd ~/airflow/dags
git clone https://github.com/your-company/airflow-dags.git .
# DAG 파일 추가 및 커밋
git add etl_pipeline.py
git commit -m "Add ETL pipeline DAG"
git push origin main
cd ~/airflow/dags
git clone https://github.com/your-company/airflow-dags.git .
# DAG 파일 추가 및 커밋
git add etl_pipeline.py
git commit -m "Add ETL pipeline DAG"
git push origin main
(3) DAG 파일 검증
DAG 파일을 배포하기 전에 구문 오류가 없는지 확인하는 것이 중요합니다. Airflow CLI를 사용하여 DAG를 테스트할 수 있습니다.
# Python 구문 검사
python ~/airflow/dags/etl_pipeline.py
# DAG 목록 확인
airflow dags list
# 특정 DAG 상세 정보
airflow dags show etl_pipeline
# DAG 파싱 오류 확인
airflow dags list-import-errors
# DAG 테스트 실행
airflow dags test etl_pipeline 2024-01-01
python ~/airflow/dags/etl_pipeline.py
# DAG 목록 확인
airflow dags list
# 특정 DAG 상세 정보
airflow dags show etl_pipeline
# DAG 파싱 오류 확인
airflow dags list-import-errors
# DAG 테스트 실행
airflow dags test etl_pipeline 2024-01-01
. . . . .
3) DAG 작성 시 베스트 프랙티스
(1) 멱등성(Idempotency) 보장
같은 입력으로 여러 번 실행해도 같은 결과가 나오도록 작성해야 합니다. 이를 통해 실패 시 안전하게 재실행할 수 있습니다.
# 나쁜 예: 멱등성 없음
INSERT INTO daily_summary VALUES (...)
# 좋은 예: 멱등성 보장
DELETE FROM daily_summary WHERE date = '2024-01-01';
INSERT INTO daily_summary VALUES (...);
# 또는 UPSERT 사용
INSERT INTO daily_summary VALUES (...)
ON CONFLICT (date) DO UPDATE SET ...;
INSERT INTO daily_summary VALUES (...)
# 좋은 예: 멱등성 보장
DELETE FROM daily_summary WHERE date = '2024-01-01';
INSERT INTO daily_summary VALUES (...);
# 또는 UPSERT 사용
INSERT INTO daily_summary VALUES (...)
ON CONFLICT (date) DO UPDATE SET ...;
(2) Task 분리 원칙
각 Task는 하나의 명확한 책임만 가져야 합니다. 큰 작업을 여러 작은 Task로 나누면 병렬 실행이 가능하고, 실패 시 어느 단계에서 문제가 발생했는지 명확히 파악할 수 있습니다.
(3) DAG 파일을 가볍게 유지
DAG 파일은 Scheduler에 의해 주기적으로 파싱되므로, 무거운 연산이나 외부 API 호출은 피해야 합니다. 이러한 로직은 Task 함수 내부에 작성하여 실제 실행 시에만 수행되도록 합니다.
# 나쁜 예: DAG 파일에서 직접 데이터 로드
import pandas as pd
df = pd.read_csv('large_file.csv') # 파싱할 때마다 실행됨!
# 좋은 예: Task 함수 내부에서 실행
def process_data():
import pandas as pd
df = pd.read_csv('large_file.csv') # Task 실행 시에만 실행됨
# 데이터 처리 로직...
import pandas as pd
df = pd.read_csv('large_file.csv') # 파싱할 때마다 실행됨!
# 좋은 예: Task 함수 내부에서 실행
def process_data():
import pandas as pd
df = pd.read_csv('large_file.csv') # Task 실행 시에만 실행됨
# 데이터 처리 로직...
3. Airflow 설치 및 설정 방법
1) Docker Compose를 이용한 설치 (권장)
가장 빠르고 안정적인 설치 방법은 Docker Compose를 이용하는 것입니다. 공식 docker-compose.yaml 파일을 사용하면 필요한 모든 서비스를 한 번에 설정할 수 있습니다.
(1) Docker 환경 준비
# Docker와 Docker Compose 설치 확인
docker --version
docker-compose --version
# 공식 docker-compose.yaml 다운로드
curl -LfO \
'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
# Airflow 디렉토리 구조 생성
mkdir -p ./dags ./logs ./plugins ./config
# 환경 변수 설정
echo -e "AIRFLOW_UID=$(id -u)" > .env
docker --version
docker-compose --version
# 공식 docker-compose.yaml 다운로드
curl -LfO \
'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
# Airflow 디렉토리 구조 생성
mkdir -p ./dags ./logs ./plugins ./config
# 환경 변수 설정
echo -e "AIRFLOW_UID=$(id -u)" > .env
(2) Airflow 초기화 및 실행
# 데이터베이스 초기화
docker-compose up airflow-init
# Airflow 서비스 시작
docker-compose up -d
# 서비스 상태 확인
docker-compose ps
# 로그 확인
docker-compose logs -f airflow-webserver
docker-compose logs -f airflow-scheduler
docker-compose up airflow-init
# Airflow 서비스 시작
docker-compose up -d
# 서비스 상태 확인
docker-compose ps
# 로그 확인
docker-compose logs -f airflow-webserver
docker-compose logs -f airflow-scheduler
웹 브라우저에서 http://localhost:8080에 접속하여 Airflow UI를 확인할 수 있습니다. 기본 계정은 airflow / airflow입니다.
. . . . .
2) pip를 이용한 로컬 설치
로컬 환경에 직접 설치하는 방법도 있습니다. Python 3.8 이상이 필요합니다.
(1) 가상환경 생성 및 설치
# Python 가상환경 생성
python -m venv airflow-venv
# 가상환경 활성화
source airflow-venv/bin/activate # Windows: airflow-venv\Scripts\activate
# Airflow 홈 디렉토리 설정
export AIRFLOW_HOME=~/airflow
# Airflow 설치
AIRFLOW_VERSION=2.8.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/\
constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" \
--constraint "${CONSTRAINT_URL}"
# 데이터베이스 초기화
airflow db init
# 관리자 계정 생성
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password admin
python -m venv airflow-venv
# 가상환경 활성화
source airflow-venv/bin/activate # Windows: airflow-venv\Scripts\activate
# Airflow 홈 디렉토리 설정
export AIRFLOW_HOME=~/airflow
# Airflow 설치
AIRFLOW_VERSION=2.8.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/\
constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" \
--constraint "${CONSTRAINT_URL}"
# 데이터베이스 초기화
airflow db init
# 관리자 계정 생성
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password admin
(2) Airflow 서비스 실행
# Web Server 시작 (터미널 1)
airflow webserver --port 8080
# Scheduler 시작 (터미널 2)
airflow scheduler
airflow webserver --port 8080
# Scheduler 시작 (터미널 2)
airflow scheduler
. . . . .
3) Airflow 주요 설정 (airflow.cfg)
Airflow의 동작은 airflow.cfg 파일을 통해 제어됩니다. 이 파일은 $AIRFLOW_HOME/airflow.cfg 경로에 위치합니다.
(1) 핵심 설정 항목
| 섹션 | 설정 항목 | 설명 | 권장 값 |
|---|---|---|---|
| [core] | dags_folder | DAG 파일이 위치한 디렉토리 | /opt/airflow/dags |
| [core] | dags_are_paused_at_creation | DAG 생성 시 일시정지 상태로 시작 | True |
| [core] | load_examples | 예제 DAG 로드 여부 (프로덕션에서는 False 권장) | False |
| [core] | executor | Executor 선택 (SequentialExecutor, LocalExecutor, CeleryExecutor 등) | LocalExecutor |
| [core] | parallelism | 전체 병렬 실행 Task 수 | 32 |
| [core] | dag_concurrency | DAG당 병렬 실행 Task 수 | 16 |
(2) REST API 활성화 설정
REST API를 통해 외부에서 DAG를 제어하려면 인증 백엔드를 설정해야 합니다.
# [api] 섹션
[api]
# REST API 인증 활성화
auth_backends = airflow.api.auth.backend.basic_auth
# 또는 세션 기반 인증
# auth_backends = airflow.api.auth.backend.session
[api]
# REST API 인증 활성화
auth_backends = airflow.api.auth.backend.basic_auth
# 또는 세션 기반 인증
# auth_backends = airflow.api.auth.backend.session
(3) 이메일 알림 설정
# [smtp] 섹션
[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_port = 587
smtp_mail_from = airflow@example.com
smtp_user = your-email@gmail.com
smtp_password = your-app-password
[smtp]
smtp_host = smtp.gmail.com
smtp_starttls = True
smtp_ssl = False
smtp_port = 587
smtp_mail_from = airflow@example.com
smtp_user = your-email@gmail.com
smtp_password = your-app-password
4. DAG 자동 실행과 수동 실행 방법
1) DAG 자동 실행 (스케줄링)
(1) schedule_interval을 이용한 자동 실행
schedule_interval 파라미터를 설정하면 DAG가 정해진 시간에 자동으로 실행됩니다. cron 표현식 또는 preset을 사용할 수 있습니다.
from datetime import datetime
from airflow import DAG
# Cron 표현식 사용 예시
dag1 = DAG(
dag_id='daily_job',
schedule_interval='0 9 * * *', # 매일 오전 9시
start_date=datetime(2024, 1, 1),
catchup=False
)
# Preset 사용 예시
dag2 = DAG(
dag_id='hourly_job',
schedule_interval='@hourly', # 매시간
start_date=datetime(2024, 1, 1),
catchup=False
)
dag3 = DAG(
dag_id='weekly_job',
schedule_interval='@weekly', # 매주 일요일 자정
start_date=datetime(2024, 1, 1),
catchup=False
)
dag4 = DAG(
dag_id='monthly_job',
schedule_interval='@monthly', # 매월 1일 자정
start_date=datetime(2024, 1, 1),
catchup=False
)
from airflow import DAG
# Cron 표현식 사용 예시
dag1 = DAG(
dag_id='daily_job',
schedule_interval='0 9 * * *', # 매일 오전 9시
start_date=datetime(2024, 1, 1),
catchup=False
)
# Preset 사용 예시
dag2 = DAG(
dag_id='hourly_job',
schedule_interval='@hourly', # 매시간
start_date=datetime(2024, 1, 1),
catchup=False
)
dag3 = DAG(
dag_id='weekly_job',
schedule_interval='@weekly', # 매주 일요일 자정
start_date=datetime(2024, 1, 1),
catchup=False
)
dag4 = DAG(
dag_id='monthly_job',
schedule_interval='@monthly', # 매월 1일 자정
start_date=datetime(2024, 1, 1),
catchup=False
)
(2) 주요 Cron 표현식 예시
① '0 9 * * *' - 매일 오전 9시
② '0 */2 * * *' - 2시간마다
③ '0 9 * * 1-5' - 평일 오전 9시
④ '0 0 1 * *' - 매월 1일 자정
⑤ '*/15 * * * *' - 15분마다
(3) DAG 활성화/비활성화
DAG가 스케줄에 따라 자동 실행되려면 활성화(Unpause) 상태여야 합니다. Web UI 또는 CLI를 통해 제어할 수 있습니다.
# CLI로 DAG 활성화
airflow dags unpause etl_pipeline
# CLI로 DAG 비활성화
airflow dags pause etl_pipeline
# DAG 상태 확인
airflow dags list
airflow dags unpause etl_pipeline
# CLI로 DAG 비활성화
airflow dags pause etl_pipeline
# DAG 상태 확인
airflow dags list
. . . . .
2) DAG 수동 실행
(1) Web UI를 통한 수동 실행
Airflow Web UI에서 DAG 오른쪽의 재생 버튼(▶)을 클릭하면 즉시 DAG를 실행할 수 있습니다. 이 방법은 테스트나 즉시 실행이 필요할 때 유용합니다.
① Airflow UI (http://localhost:8080)에 접속합니다.
② DAG 목록에서 실행할 DAG를 찾습니다.
③ DAG 이름 오른쪽의 재생 버튼(▶)을 클릭합니다.
④ "Trigger DAG" 다이얼로그에서 실행 날짜와 config를 설정할 수 있습니다.
⑤ Trigger 버튼을 클릭하면 즉시 실행됩니다.
(2) CLI를 통한 수동 실행
# DAG 트리거 (현재 시간으로 실행)
airflow dags trigger etl_pipeline
# 특정 날짜로 DAG 실행
airflow dags trigger etl_pipeline --exec-date 2024-01-01
# Config 파라미터와 함께 실행
airflow dags trigger etl_pipeline --conf '{"key": "value"}'
# 특정 Task만 실행
airflow tasks run etl_pipeline extract 2024-01-01
# DAG 실행 상태 확인
airflow dags list-runs -d etl_pipeline
airflow dags trigger etl_pipeline
# 특정 날짜로 DAG 실행
airflow dags trigger etl_pipeline --exec-date 2024-01-01
# Config 파라미터와 함께 실행
airflow dags trigger etl_pipeline --conf '{"key": "value"}'
# 특정 Task만 실행
airflow tasks run etl_pipeline extract 2024-01-01
# DAG 실행 상태 확인
airflow dags list-runs -d etl_pipeline
(3) Backfill로 과거 데이터 재처리
Backfill은 과거의 특정 기간 동안 DAG를 실행하는 기능입니다. 데이터 재처리나 누락된 작업을 보완할 때 유용합니다.
# 특정 기간 동안 DAG 실행
airflow dags backfill etl_pipeline \
--start-date 2024-01-01 \
--end-date 2024-01-31
# 특정 Task만 backfill
airflow tasks backfill etl_pipeline \
--task-regex 'extract|transform' \
--start-date 2024-01-01 \
--end-date 2024-01-31
# Dry run (실제 실행하지 않고 시뮬레이션)
airflow dags backfill etl_pipeline \
--start-date 2024-01-01 \
--end-date 2024-01-31 \
--dry-run
airflow dags backfill etl_pipeline \
--start-date 2024-01-01 \
--end-date 2024-01-31
# 특정 Task만 backfill
airflow tasks backfill etl_pipeline \
--task-regex 'extract|transform' \
--start-date 2024-01-01 \
--end-date 2024-01-31
# Dry run (실제 실행하지 않고 시뮬레이션)
airflow dags backfill etl_pipeline \
--start-date 2024-01-01 \
--end-date 2024-01-31 \
--dry-run
. . . . .
3) DAG 실행 모니터링
(1) Web UI 모니터링
Airflow UI는 DAG 실행 상태를 실시간으로 모니터링할 수 있는 다양한 뷰를 제공합니다.
① Grid View - DAG Run과 Task 상태를 격자 형태로 표시합니다.
② Graph View - Task 간 의존성과 실행 상태를 그래프로 시각화합니다.
③ Gantt Chart - Task 실행 시간과 병목 지점을 분석할 수 있습니다.
④ Task Duration - 과거 실행 기록을 바탕으로 Task 실행 시간을 비교합니다.
(2) Task 로그 확인
각 Task의 실행 로그는 Web UI에서 Task를 클릭하여 확인할 수 있습니다. 실패 원인을 파악하고 디버깅할 때 매우 유용합니다.
# CLI로 Task 로그 확인
airflow tasks logs etl_pipeline extract 2024-01-01
# 로그 파일 직접 확인
cat $AIRFLOW_HOME/logs/etl_pipeline/extract/2024-01-01T00:00:00+00:00/1.log
airflow tasks logs etl_pipeline extract 2024-01-01
# 로그 파일 직접 확인
cat $AIRFLOW_HOME/logs/etl_pipeline/extract/2024-01-01T00:00:00+00:00/1.log
5. 백엔드 서버에서 REST API로 배치 작업 실행하기
1) Airflow REST API 개요
Airflow는 Stable REST API를 제공하여 외부 시스템에서 프로그래밍 방식으로 DAG를 제어할 수 있습니다. Spring Boot, Django, Flask 등 다양한 백엔드 애플리케이션에서 HTTP 요청을 통해 배치 작업을 실행할 수 있습니다.
(1) REST API의 주요 기능
① DAG 트리거 - 외부에서 DAG를 즉시 실행합니다.
② DAG 상태 조회 - DAG 실행 이력과 현재 상태를 확인합니다.
③ Task 상태 조회 - 개별 Task의 실행 상태를 조회합니다.
④ Variable 관리 - Airflow Variable을 CRUD 할 수 있습니다.
⑤ Connection 관리 - 외부 시스템 연결 정보를 관리합니다.
(2) API 엔드포인트
Airflow REST API의 기본 URL은 http://<airflow-host>:8080/api/v1/입니다. 주요 엔드포인트는 다음과 같습니다.
| HTTP 메서드 | 엔드포인트 | 기능 |
|---|---|---|
| POST | /api/v1/dags/{dag_id}/dagRuns | DAG 실행 |
| GET | /api/v1/dags/{dag_id}/dagRuns | DAG 실행 목록 조회 |
| GET | /api/v1/dags/{dag_id}/dagRuns/{dag_run_id} | 특정 DAG Run 조회 |
| GET | /api/v1/dags | 모든 DAG 목록 조회 |
| PATCH | /api/v1/dags/{dag_id} | DAG 활성화/비활성화 |
. . . . .
2) REST API 인증 설정
(1) Basic Authentication 설정
REST API를 사용하려면 airflow.cfg에서 인증을 활성화해야 합니다.
# airflow.cfg 파일 수정
[api]
auth_backends = airflow.api.auth.backend.basic_auth
# Airflow 재시작
docker-compose restart
# 또는 로컬 설치의 경우
# kill {webserver-pid}
# airflow webserver --port 8080
[api]
auth_backends = airflow.api.auth.backend.basic_auth
# Airflow 재시작
docker-compose restart
# 또는 로컬 설치의 경우
# kill {webserver-pid}
# airflow webserver --port 8080
(2) API 사용자 생성
REST API 전용 사용자를 생성하는 것이 보안상 권장됩니다.
# API 전용 사용자 생성
airflow users create \
--username api_user \
--firstname API \
--lastname User \
--role Admin \
--email api@example.com \
--password api_password
# 사용자 목록 확인
airflow users list
airflow users create \
--username api_user \
--firstname API \
--lastname User \
--role Admin \
--email api@example.com \
--password api_password
# 사용자 목록 확인
airflow users list
. . . . .
3) cURL을 이용한 API 호출
(1) DAG 트리거
# 기본 DAG 트리거
curl -X POST 'http://localhost:8080/api/v1/dags/etl_pipeline/dagRuns' \
-H 'Content-Type: application/json' \
--user "api_user:api_password" \
-d '{}'
# Config 파라미터와 함께 트리거
curl -X POST 'http://localhost:8080/api/v1/dags/etl_pipeline/dagRuns' \
-H 'Content-Type: application/json' \
--user "api_user:api_password" \
-d '{
"conf": {
"source": "mysql",
"target": "postgres",
"date": "2024-01-01"
}
}'
# 특정 logical_date로 트리거
curl -X POST 'http://localhost:8080/api/v1/dags/etl_pipeline/dagRuns' \
-H 'Content-Type: application/json' \
--user "api_user:api_password" \
-d '{
"logical_date": "2024-01-01T00:00:00Z",
"conf": {}
}'
# Base64로 인증 정보 인코딩하여 사용
# echo -n "api_user:api_password" | base64 → YXBpX3VzZXI6YXBpX3Bhc3N3b3Jk
curl -X POST 'http://localhost:8080/api/v1/dags/etl_pipeline/dagRuns' \
-H 'Content-Type: application/json' \
-H 'Authorization: Basic YXBpX3VzZXI6YXBpX3Bhc3N3b3Jk' \
-d '{}'
curl -X POST 'http://localhost:8080/api/v1/dags/etl_pipeline/dagRuns' \
-H 'Content-Type: application/json' \
--user "api_user:api_password" \
-d '{}'
# Config 파라미터와 함께 트리거
curl -X POST 'http://localhost:8080/api/v1/dags/etl_pipeline/dagRuns' \
-H 'Content-Type: application/json' \
--user "api_user:api_password" \
-d '{
"conf": {
"source": "mysql",
"target": "postgres",
"date": "2024-01-01"
}
}'
# 특정 logical_date로 트리거
curl -X POST 'http://localhost:8080/api/v1/dags/etl_pipeline/dagRuns' \
-H 'Content-Type: application/json' \
--user "api_user:api_password" \
-d '{
"logical_date": "2024-01-01T00:00:00Z",
"conf": {}
}'
# Base64로 인증 정보 인코딩하여 사용
# echo -n "api_user:api_password" | base64 → YXBpX3VzZXI6YXBpX3Bhc3N3b3Jk
curl -X POST 'http://localhost:8080/api/v1/dags/etl_pipeline/dagRuns' \
-H 'Content-Type: application/json' \
-H 'Authorization: Basic YXBpX3VzZXI6YXBpX3Bhc3N3b3Jk' \
-d '{}'
(2) DAG 상태 조회
# 모든 DAG 목록 조회
curl -X GET 'http://localhost:8080/api/v1/dags' \
-H 'Content-Type: application/json' \
--user "api_user:api_password"
# 특정 DAG의 실행 목록 조회
curl -X GET 'http://localhost:8080/api/v1/dags/etl_pipeline/dagRuns' \
-H 'Content-Type: application/json' \
--user "api_user:api_password"
# 특정 DAG Run 상세 조회
curl -X GET 'http://localhost:8080/api/v1/dags/etl_pipeline/dagRuns/manual__2024-01-01T00:00:00+00:00' \
-H 'Content-Type: application/json' \
--user "api_user:api_password"
curl -X GET 'http://localhost:8080/api/v1/dags' \
-H 'Content-Type: application/json' \
--user "api_user:api_password"
# 특정 DAG의 실행 목록 조회
curl -X GET 'http://localhost:8080/api/v1/dags/etl_pipeline/dagRuns' \
-H 'Content-Type: application/json' \
--user "api_user:api_password"
# 특정 DAG Run 상세 조회
curl -X GET 'http://localhost:8080/api/v1/dags/etl_pipeline/dagRuns/manual__2024-01-01T00:00:00+00:00' \
-H 'Content-Type: application/json' \
--user "api_user:api_password"
(3) DAG 활성화/비활성화
# DAG 활성화 (Unpause)
curl -X PATCH 'http://localhost:8080/api/v1/dags/etl_pipeline' \
-H 'Content-Type: application/json' \
--user "api_user:api_password" \
-d '{"is_paused": false}'
# DAG 비활성화 (Pause)
curl -X PATCH 'http://localhost:8080/api/v1/dags/etl_pipeline' \
-H 'Content-Type: application/json' \
--user "api_user:api_password" \
-d '{"is_paused": true}'
curl -X PATCH 'http://localhost:8080/api/v1/dags/etl_pipeline' \
-H 'Content-Type: application/json' \
--user "api_user:api_password" \
-d '{"is_paused": false}'
# DAG 비활성화 (Pause)
curl -X PATCH 'http://localhost:8080/api/v1/dags/etl_pipeline' \
-H 'Content-Type: application/json' \
--user "api_user:api_password" \
-d '{"is_paused": true}'
. . . . .
4) Python으로 REST API 호출
(1) requests 라이브러리 사용
import requests
from requests.auth import HTTPBasicAuth
import json
# Airflow 설정
AIRFLOW_URL = "http://localhost:8080"
USERNAME = "api_user"
PASSWORD = "api_password"
def trigger_dag(dag_id, conf=None):
"""DAG 트리거 함수"""
url = f"{AIRFLOW_URL}/api/v1/dags/{dag_id}/dagRuns"
headers = {
"Content-Type": "application/json"
}
payload = {
"conf": conf or {}
}
response = requests.post(
url,
headers=headers,
auth=HTTPBasicAuth(USERNAME, PASSWORD),
data=json.dumps(payload)
)
if response.status_code == 200:
print(f"DAG {dag_id} 실행 성공!")
return response.json()
else:
print(f"오류: {response.status_code} - {response.text}")
return None
def get_dag_runs(dag_id, limit=10):
"""DAG 실행 목록 조회"""
url = f"{AIRFLOW_URL}/api/v1/dags/{dag_id}/dagRuns"
params = {
"limit": limit,
"order_by": "-execution_date"
}
response = requests.get(
url,
params=params,
auth=HTTPBasicAuth(USERNAME, PASSWORD)
)
if response.status_code == 200:
return response.json()
else:
print(f"오류: {response.status_code} - {response.text}")
return None
def get_dag_run_status(dag_id, dag_run_id):
"""특정 DAG Run 상태 조회"""
url = f"{AIRFLOW_URL}/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}"
response = requests.get(
url,
auth=HTTPBasicAuth(USERNAME, PASSWORD)
)
if response.status_code == 200:
data = response.json()
return data['state'] # 'running', 'success', 'failed' 등
else:
return None
# 사용 예시
if __name__ == "__main__":
# DAG 트리거
result = trigger_dag(
dag_id="etl_pipeline",
conf={
"source": "mysql",
"target": "s3",
"date": "2024-01-01"
}
)
if result:
dag_run_id = result['dag_run_id']
print(f"DAG Run ID: {dag_run_id}")
# 상태 확인
status = get_dag_run_status("etl_pipeline", dag_run_id)
print(f"현재 상태: {status}")
# DAG 실행 이력 조회
runs = get_dag_runs("etl_pipeline", limit=5)
if runs:
print(f"\n최근 실행 이력: {len(runs['dag_runs'])}개")
for run in runs['dag_runs']:
print(f"- {run['dag_run_id']}: {run['state']}")
from requests.auth import HTTPBasicAuth
import json
# Airflow 설정
AIRFLOW_URL = "http://localhost:8080"
USERNAME = "api_user"
PASSWORD = "api_password"
def trigger_dag(dag_id, conf=None):
"""DAG 트리거 함수"""
url = f"{AIRFLOW_URL}/api/v1/dags/{dag_id}/dagRuns"
headers = {
"Content-Type": "application/json"
}
payload = {
"conf": conf or {}
}
response = requests.post(
url,
headers=headers,
auth=HTTPBasicAuth(USERNAME, PASSWORD),
data=json.dumps(payload)
)
if response.status_code == 200:
print(f"DAG {dag_id} 실행 성공!")
return response.json()
else:
print(f"오류: {response.status_code} - {response.text}")
return None
def get_dag_runs(dag_id, limit=10):
"""DAG 실행 목록 조회"""
url = f"{AIRFLOW_URL}/api/v1/dags/{dag_id}/dagRuns"
params = {
"limit": limit,
"order_by": "-execution_date"
}
response = requests.get(
url,
params=params,
auth=HTTPBasicAuth(USERNAME, PASSWORD)
)
if response.status_code == 200:
return response.json()
else:
print(f"오류: {response.status_code} - {response.text}")
return None
def get_dag_run_status(dag_id, dag_run_id):
"""특정 DAG Run 상태 조회"""
url = f"{AIRFLOW_URL}/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}"
response = requests.get(
url,
auth=HTTPBasicAuth(USERNAME, PASSWORD)
)
if response.status_code == 200:
data = response.json()
return data['state'] # 'running', 'success', 'failed' 등
else:
return None
# 사용 예시
if __name__ == "__main__":
# DAG 트리거
result = trigger_dag(
dag_id="etl_pipeline",
conf={
"source": "mysql",
"target": "s3",
"date": "2024-01-01"
}
)
if result:
dag_run_id = result['dag_run_id']
print(f"DAG Run ID: {dag_run_id}")
# 상태 확인
status = get_dag_run_status("etl_pipeline", dag_run_id)
print(f"현재 상태: {status}")
# DAG 실행 이력 조회
runs = get_dag_runs("etl_pipeline", limit=5)
if runs:
print(f"\n최근 실행 이력: {len(runs['dag_runs'])}개")
for run in runs['dag_runs']:
print(f"- {run['dag_run_id']}: {run['state']}")
. . . . .
5) Java (Spring Boot)에서 REST API 호출
(1) RestTemplate 사용
import org.springframework.http.*;
import org.springframework.web.client.RestTemplate;
import org.springframework.stereotype.Service;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
@Service
public class AirflowService {
private static final String AIRFLOW_URL = "http://localhost:8080";
private static final String USERNAME = "api_user";
private static final String PASSWORD = "api_password";
private final RestTemplate restTemplate = new RestTemplate();
/**
* DAG 트리거 메서드
*/
public String triggerDag(String dagId, Map<String, Object> conf) {
String url = AIRFLOW_URL + "/api/v1/dags/" + dagId + "/dagRuns";
// 요청 헤더 설정
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("Authorization", getBasicAuthHeader());
// 요청 바디 설정
Map<String, Object> body = new HashMap<>();
body.put("conf", conf != null ? conf : new HashMap<>());
HttpEntity<Map<String, Object>> request = new HttpEntity<>(body, headers);
// API 호출
try {
ResponseEntity<String> response = restTemplate.exchange(
url,
HttpMethod.POST,
request,
String.class
);
if (response.getStatusCode() == HttpStatus.OK) {
System.out.println("DAG " + dagId + " 실행 성공!");
return response.getBody();
} else {
System.out.println("오류: " + response.getStatusCode());
return null;
}
} catch (Exception e) {
System.out.println("DAG 트리거 실패: " + e.getMessage());
return null;
}
}
/**
* DAG 실행 상태 조회
*/
public String getDagRunStatus(String dagId, String dagRunId) {
String url = AIRFLOW_URL + "/api/v1/dags/" + dagId + "/dagRuns/" + dagRunId;
HttpHeaders headers = new HttpHeaders();
headers.set("Authorization", getBasicAuthHeader());
HttpEntity<String> request = new HttpEntity<>(headers);
try {
ResponseEntity<String> response = restTemplate.exchange(
url,
HttpMethod.GET,
request,
String.class
);
return response.getBody();
} catch (Exception e) {
System.out.println("상태 조회 실패: " + e.getMessage());
return null;
}
}
/**
* Basic Auth 헤더 생성
*/
private String getBasicAuthHeader() {
String auth = USERNAME + ":" + PASSWORD;
byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes());
return "Basic " + new String(encodedAuth);
}
/**
* 사용 예시
*/
public static void main(String[] args) {
AirflowService service = new AirflowService();
// Config 파라미터 준비
Map<String, Object> conf = new HashMap<>();
conf.put("source", "mysql");
conf.put("target", "s3");
conf.put("date", "2024-01-01");
// DAG 트리거
String result = service.triggerDag("etl_pipeline", conf);
System.out.println("결과: " + result);
}
}
import org.springframework.web.client.RestTemplate;
import org.springframework.stereotype.Service;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
@Service
public class AirflowService {
private static final String AIRFLOW_URL = "http://localhost:8080";
private static final String USERNAME = "api_user";
private static final String PASSWORD = "api_password";
private final RestTemplate restTemplate = new RestTemplate();
/**
* DAG 트리거 메서드
*/
public String triggerDag(String dagId, Map<String, Object> conf) {
String url = AIRFLOW_URL + "/api/v1/dags/" + dagId + "/dagRuns";
// 요청 헤더 설정
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("Authorization", getBasicAuthHeader());
// 요청 바디 설정
Map<String, Object> body = new HashMap<>();
body.put("conf", conf != null ? conf : new HashMap<>());
HttpEntity<Map<String, Object>> request = new HttpEntity<>(body, headers);
// API 호출
try {
ResponseEntity<String> response = restTemplate.exchange(
url,
HttpMethod.POST,
request,
String.class
);
if (response.getStatusCode() == HttpStatus.OK) {
System.out.println("DAG " + dagId + " 실행 성공!");
return response.getBody();
} else {
System.out.println("오류: " + response.getStatusCode());
return null;
}
} catch (Exception e) {
System.out.println("DAG 트리거 실패: " + e.getMessage());
return null;
}
}
/**
* DAG 실행 상태 조회
*/
public String getDagRunStatus(String dagId, String dagRunId) {
String url = AIRFLOW_URL + "/api/v1/dags/" + dagId + "/dagRuns/" + dagRunId;
HttpHeaders headers = new HttpHeaders();
headers.set("Authorization", getBasicAuthHeader());
HttpEntity<String> request = new HttpEntity<>(headers);
try {
ResponseEntity<String> response = restTemplate.exchange(
url,
HttpMethod.GET,
request,
String.class
);
return response.getBody();
} catch (Exception e) {
System.out.println("상태 조회 실패: " + e.getMessage());
return null;
}
}
/**
* Basic Auth 헤더 생성
*/
private String getBasicAuthHeader() {
String auth = USERNAME + ":" + PASSWORD;
byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes());
return "Basic " + new String(encodedAuth);
}
/**
* 사용 예시
*/
public static void main(String[] args) {
AirflowService service = new AirflowService();
// Config 파라미터 준비
Map<String, Object> conf = new HashMap<>();
conf.put("source", "mysql");
conf.put("target", "s3");
conf.put("date", "2024-01-01");
// DAG 트리거
String result = service.triggerDag("etl_pipeline", conf);
System.out.println("결과: " + result);
}
}
. . . . .
6) 실무 활용 시나리오
(1) 웹 애플리케이션에서 배치 작업 트리거
사용자가 웹 UI에서 "데이터 동기화" 버튼을 클릭하면, 백엔드 서버가 Airflow REST API를 호출하여 즉시 ETL 작업을 실행합니다.
① 사용자가 웹 페이지에서 "데이터 동기화" 버튼 클릭
② 프론트엔드가 백엔드 API 호출 (예: POST /api/sync)
③ 백엔드 서버가 Airflow REST API로 DAG 트리거
④ Airflow가 배치 작업 실행
⑤ 백엔드가 주기적으로 실행 상태 확인 후 사용자에게 알림
(2) 스케줄러에서 특정 조건에 따라 실행
백엔드 서버의 스케줄러가 특정 조건을 감지하면 (예: 파일 업로드, 데이터베이스 변경), Airflow DAG를 자동으로 트리거합니다.
(3) 마이크로서비스 간 통신
마이크로서비스 A가 데이터 처리를 완료하면, Airflow DAG를 호출하여 후속 작업을 시작합니다. 이를 통해 서비스 간 결합도를 낮추면서도 복잡한 워크플로우를 관리할 수 있습니다.
(4) CI/CD 파이프라인 통합
배포가 완료된 후 자동으로 데이터 마이그레이션 DAG를 실행하여 데이터베이스 스키마 변경을 적용합니다.
마무리
이 글에서는 Apache Airflow의 DAG 생성부터 실행, REST API를 통한 외부 연동까지 실무에 필요한 모든 내용을 다루었습니다.
핵심 내용 정리
① DAG 구조 - Airflow는 방향성 비순환 그래프로 워크플로우를 표현하며, Task와 Operator를 통해 작업을 정의합니다.
② DAG 생성과 배포 - Python으로 DAG를 작성하고 dags 디렉토리에 배치하면 Scheduler가 자동으로 감지합니다. Git을 통한 버전 관리가 권장됩니다.
③ 설치와 설정 - Docker Compose를 이용한 설치가 가장 편리하며, airflow.cfg를 통해 세부 설정을 제어할 수 있습니다.
④ 실행 방법 - schedule_interval로 자동 실행을 설정하거나, Web UI/CLI를 통해 수동으로 실행할 수 있습니다.
⑤ REST API 연동 - 외부 백엔드 서버에서 HTTP 요청을 통해 DAG를 트리거하고 상태를 조회할 수 있습니다. Python, Java 등 다양한 언어로 구현 가능합니다.
Airflow는 데이터 엔지니어링의 필수 도구로 자리잡았습니다. 복잡한 배치 작업을 효율적으로 관리하고, REST API를 통해 다른 시스템과 유연하게 통합할 수 있습니다. 이 가이드를 참고하여 여러분의 데이터 파이프라인을 성공적으로 구축하시기 바랍니다.
출처 명시:
이 글은 Apache Airflow 공식 문서를 참고하여 작성되었으며, 모든 코드 예시와 설명은 공식 문서와 실무 경험을 바탕으로 구성되었습니다.
이 글은 Apache Airflow 공식 문서를 참고하여 작성되었으며, 모든 코드 예시와 설명은 공식 문서와 실무 경험을 바탕으로 구성되었습니다.
긴 글 읽어주셔서 감사합니다.
끝.
끝.
반응형
'Development > Web' 카테고리의 다른 글
| [Web] Apache Airflow 워크플로우 자동화 방법 (0) | 2025.10.30 |
|---|---|
| [Web] Apache Kafka 완벽 가이드 (0) | 2025.10.24 |
| [Web] Spring Boot 3.4 + AWS 환경 Tomcat vs JBoss WildFly 성능 비교 분석 (3) | 2025.04.07 |
| [Web] JEUS DB Connection Leak 완벽 해결 방법 (0) | 2024.07.24 |
| [Web] Spring Dispatcher Servlet 정의와 동작 원리 (0) | 2022.09.24 |