2023.05.26 (Fri) ํ์ต์ ๋ฆฌ
#Airflow #Standalone #backfill
1. Airflow standalone ์ค์น
- ์ค์น
export AIRFLOW_HOME=~/airflow
AIRFLOW_VERSION=2.6.1
# Extract the version of Python you have installed. If you're currently using Python 3.11 you may want to set this manually as noted above, Python 3.11 is not yet supported.
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# For example this would install 2.6.1 with python 3.7: https://raw.githubusercontent.com/apache/airflow/constraints-2.6.1/constraints-3.7.txt
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
- ์คํ
export AIRFLOW_HOME=~/airflow #์คํ ์ ๋ง๋ค ๋ฐ๋ณต
airflow standalone
- Airflow UI ์ ์ - localhost:8080

์ ์ ํ, script์์ password๋ฅผ ์ฐพ์ ๋ก๊ทธ์ธ
standalone | Airflow is ready
standalone | Login with username: admin password: ************
standalone | Airflow Standalone is for development purposes only. Do not use this in production!
๋๋, standalone_admin_password.txt ํ์ผ์์ password ํ์ธ
cd airflow
vi standalone_admin_password.txt
- ํ๊ฒฝ ๋ณ์ ์ค์
cd airflow
vi airflow.cfg
# airflow.cfg
# dags ๊ฒฝ๋ก ๋ณ๊ฒฝ
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /home/sub/dags
# dags example ์ญ์ : ์ฌ์ฉ์๊ฐ ์์ฑํ dags๋ง ๋ณด์ฌ์ค
# Whether to load the DAG examples that ship with Airflow. It's good to
# get started, but you probably want to set this to ``False`` in a production
# environment
load_examples = False
2. Ariflow Test
2-1. ๋จ์ผ task ์ธ์คํด์ค ์คํ
task ๋จ์๋ก ์ค์ ์ธ์คํด์ค๋ฅผ ํธ์ถํ์ฌ ํ ์คํธ ์ํ
airflow dags list #dags ๋ชฉ๋ก
# run your first task instance
airflow tasks test [dag id] [task id] [์คํ๋ ์ง]
2-2. backfill
Dag๊ฐ ์ฒ๋ฆฌ๋๋ ์์ ๋ณด๋ค ์ด์ ์์ ๋ถํฐ์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ณ ์ถ์ ๋
- ๊ณผ๊ฑฐ ์์ ์ ๊ณต๋ฐฑ ๋ฐ์ดํฐ๋ฅผ ์ฑ์ฐ๋ ์์ ์ ์ฌ์ฉ
# run a backfill over 2 days
airflow dags backfill [dag id] --start-date [start date] --end-date [end date]
2-3. backfill ํ์ฉ ์์
- dags ์ค๋ช
1. ๋ฐ์ดํฐ ์์ง ๊ฐ๊ฒฉ : ๋งค์ผ ์๋ฒฝ 5์(KST)
2. ์๋ ๊ฒฝ๋ก๋ก ํํฐ์ ๋ ํ์ฌ data ์ ์กํ๋ ์ฝ๋ ์์ฑ
- s3://pd24/<๋ณธ์ธ์์ด์ด๋ฆ>/yyyyMMdd/SUM.log
- s3://pd24/<๋ณธ์ธ์์ด์ด๋ฆ>/dd/MM/yyyy/RAW.log
- s3://pd24/<๋ณธ์ธ์์ด์ด๋ฆ>/DONE/yyyyMMdd/_DONE

- ์ฝ๋ ์์
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.models.variable import Variable
import pendulum
MY_PATH = '/home/sub/airflow/data'
AWS_PATH = 's3://mypath'
local_tz = pendulum.timezone("Asia/Seoul")
exe_kr_nodash = '{{ execution_date.add(hours=9).strftime("%Y%m%d") }}'
year = '{{ execution_date.add(hours=9).strftime("%Y") }}'
month = '{{ execution_date.add(hours=9).strftime("%m") }}'
day = '{{ execution_date.add(hours=9).strftime("%d") }}'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 30, tzinfo=local_tz),
'retries': 0,
}
test_dag = DAG(
'mario-sub-partitioning_ver2',
default_args=default_args,
schedule_interval="0 5 * * *"
)
# Define the BashOperator task
load_web_log = BashOperator(
task_id = 'load_web_log',
bash_command=f"""aws s3 cp {AWS_PATH}/web.log {MY_PATH}/web.log
echo "{exe_kr_nodash} {year} {month} {day}"
""",
dag=test_dag
)
extract_raw = BashOperator(
task_id='extract_RAW_log',
bash_command=f"cat {MY_PATH}/web.log | grep 'item=' | cut -d'=' -f 2 | cut -d',' -f 1 > {MY_PATH}/RAW.log",
dag=test_dag
)
extract_sum = BashOperator(
task_id='extract_SUM_log',
bash_command=f"cat {MY_PATH}/web.log | grep 'item=' | cut -d'=' -f 2 | cut -d',' -f 1 | sort -n | uniq -c > {MY_PATH}/SUM.log",
dag=test_dag
)
touch_done = BashOperator(
task_id='DONE',
bash_command=f"touch {MY_PATH}/DONE",
dag=test_dag
)
transfer_raw = BashOperator(
task_id='transfer_RAW_log',
bash_command=f"""
aws s3 cp {MY_PATH}/RAW.log {AWS_PATH}/sub/{exe_kr_nodash}/RAW.log
sleep 1
""",
dag=test_dag
)
transfer_sum = BashOperator(
task_id='transfer_SUM_log',
bash_command=f"""
aws s3 cp {MY_PATH}/SUM.log {AWS_PATH}/sub/{day}/{month}/{year}/SUM.log
sleep 1
""",
dag=test_dag
)
transfer_done = BashOperator(
task_id='transfer_DONE',
bash_command=f"""
aws s3 cp {MY_PATH}/DONE {AWS_PATH}/sub/DONE/{exe_kr_nodash}/_DONE
sleep 1
""",
dag=test_dag
)
start_task = EmptyOperator(task_id='start',dag=test_dag)
end_task = EmptyOperator(task_id='end',dag=test_dag)
start_task >>load_web_log >> [extract_raw, extract_sum] >> touch_done >> [transfer_raw , transfer_sum, transfer_done] >> end_task
- backfill ์คํ ๋ฐ ๊ฒฐ๊ณผ
airflow dags backfill mario-sub-partitioning_ver2 --start-date 2023-05-26 --end-date 2023-05-29
$aws s3 ls mypath/sub/
# 2023-05-26 ~ 2023-05-29์ ๋ฐ์ดํฐ
PRE 20230526/
PRE 20230527/
PRE 20230528/
PRE 20230529/
PRE 26/
PRE 27/
PRE 28/
PRE 29/
PRE DONE/'๐ Data > Engineering' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
| [Hive] Hive ์ค์น ๋ฐ ๊ฐ์ (0) | 2023.05.31 |
|---|---|
| [Hadoop] Hadoop ์ค์น ๋ฐ ๊ฐ์ (0) | 2023.05.30 |
| [Airflow/AWS] Airflow - Trigger Rule (0) | 2023.05.25 |
| [AWS] Airflow๋ฅผ ํ์ฉํ์ฌ AWS S3๋ก ํ์ผ ์ ์กํ๊ธฐ (0) | 2023.05.24 |
| [Airflow with Docker] Airflow ๋ฐ Docker ๋ช ๋ น์ด ๊ธฐ์ด (0) | 2023.05.23 |