2023.05.25 (Thu) ํ์ต ์ ๋ฆฌ
#Linux-if๋ฌธ #chmod #TriggerRule
1. Linux
1-1. If๋ฌธ
- ๊ธฐ๋ณธ ์ฌ์ฉ๋ฒ
if [ ์กฐ๊ฑด์1 -a ์กฐ๊ฑด์2 ]
then
์ํ๋ฌธ
elif [ ์กฐ๊ฑด์ ]
์ํ๋ฌธ
else
์ํ๋ฌธ
fi
- ๋
ผ๋ฆฌ ์ฐ์ฐ์
- and : -a
- or : -o
- input ๊ฐ ๋ฐ์์ ์ฒ๋ฆฌํ๊ธฐ
- $n : n๋ฒ์งธ ๊ฐ
- $# : input ๊ฐ์ ์
๐ if๋ฌธ ํ์ฉ ์์
if [[ $# != 3 ]] #input๊ฐ์ ์๊ฐ 3์ด ์๋ ๊ฒฝ์ฐ
then
echo "Oops!!! argument required."
exit 1
fi
echo $1
echo $2
echo $3
$ bash a.sh 1 2
Oops!!! argument required.
$ bash a.sh 1 2 3
1
2
3
1-2. chomod +x
๋๋ ํ ๋ฆฌ ๋ฐ ํ์ผ์ ์คํ ๊ถํ์ ์ ์ฉ
chmod +x ํ์ผ๋ช
2. Airflow Trigger Rule
์์ Task์์ ์์กด๊ด๊ณ๋ฅผ ์ค์ ํ์ฌ Task๊ฐ ์ฑ๊ณต/์คํจ ๊ฒฝ์ฐ์ ๋ฐ๋ผ ๋ค์ Task๋ฅผ ์คํ
| all_success | ๋ชจ๋ ์์ Task ์คํ ์ฑ๊ณต |
| all_failed | ๋ชจ๋ ์์ Task ์คํ ์คํจ |
| all_done | ๋ชจ๋ ์์ Task (์ฑ๊ณต/์คํจ ์ฌ๋ถ ์๊ด์์ด) ์คํ ์๋ฃ |
| one_failed | ์์ Task ์ค 1๊ฐ ์ด์ ์คํจ - ๋ชจ๋ ์์ Task์ ์คํ์ ๊ธฐ๋ค๋ฆฌ์ง ์์ |
| one_success | ์์ Task ์ค 1๊ฐ ์ด์ ์ฑ๊ณต - ๋ชจ๋ ์์ Task์ ์คํ์ ๊ธฐ๋ค๋ฆฌ์ง ์์ |
| none_failed | ๋ชจ๋ ์์ Task๊ฐ ์คํจ ๋๋ upstream_failed๊ฐ ์๋ (๋ชจ๋ ์์ Task๊ฐ ์ฑ๊ณต ๋๋ skipped) |
| none_failed_min_one_success | ๋ชจ๋ ์์ Task๊ฐ ์คํจ ๋๋ upstream_failed๊ฐ ์๋๊ณ 1๊ฐ ์ด์ ์ฑ๊ณต |
| none_skipped | ๋ชจ๋ ์์ Task๊ฐ skipped๊ฐ ์๋ (๋ชจ๋ ์์ Task๊ฐ ์ฑ๊ณต, ์คํจ ๋๋ upstream_failed) |
| always | ํญ์ ์คํ |
- Trigger Rule ์ค์
test_bash = BashOperator(
task_id = 'test',
bash_command=f"echo '1'",
dag = test_dag,
trigger_rule="all_success", #trigger_rule ์์ฑ
)
- Trigger Rule ํ์ฉ
check.sh๋ฅผ ํตํด ๊ฒฝ๋ก์ ํด๋น ํ์ผ์ ์กด์ฌ ์ฌ๋ถ๋ฅผ ์ฒดํฌํ๊ณ ์ํฉ์ ๋ฐ๋ผ ๋ค๋ฅด๊ฒ task๊ฐ ์คํ๋๋๋ก ์ฝ๋๋ฅผ ๊ตฌํํจ
๐ Trigger Rule ํ์ฉ ์์ ์ฝ๋
#check.sh
if [[ $# != 2 ]]
then
echo "Oops!!! hdfs_path argument required."
exit 1
fi
S3_PATH=$1
DONE_FLAG_NAME=$2
aws s3 ls ${S3_PATH}/${DONE_FLAG_NAME}
if [ $? -eq 0 ]
then
echo "confirmed : zero size ${DONE_FLAG_NAME}"
exit 0
else
echo "${DONE_FLAG_NAME} is missing or non-zero size"
exit 1
fi
chmod +x [๊ฒฝ๋ก/ํ์ผ๋ช
.sh] #๊ถํ๋ถ์ฌ
#dags.py
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
MY_PATH = '/opt/airflow/dags/sub'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 1),
'retries': 0,
}
test_dag = DAG(
'sub_test',
default_args=default_args,
schedule_interval=timedelta(days=1)
)
# Define the BashOperator task
check_task = BashOperator(
task_id='Check_Done',
bash_command=f"""
{MY_PATH}/check.sh s3://sub DONE
""",
dag=test_dag
)
done_exit = BashOperator(
task_id = 'Done_Exit',
bash_command=f"echo 'confirmed : zero size _DONE'",
dag = test_dag,
trigger_rule="all_success",
)
done_not_exit = BashOperator(
task_id = 'Done_not_Exit',
bash_command=f"echo 'DONE is missing or non-zero size'",
dag = test_dag,
trigger_rule="none_failed",
)
check_task >> [done_exit, done_not_exit]
'๐ Data > Engineering' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
| [Hadoop] Hadoop ์ค์น ๋ฐ ๊ฐ์ (0) | 2023.05.30 |
|---|---|
| [Airflow] Airflow Standalone ์ค์น ๋ฐ ํ ์คํธ (0) | 2023.05.26 |
| [AWS] Airflow๋ฅผ ํ์ฉํ์ฌ AWS S3๋ก ํ์ผ ์ ์กํ๊ธฐ (0) | 2023.05.24 |
| [Airflow with Docker] Airflow ๋ฐ Docker ๋ช ๋ น์ด ๊ธฐ์ด (0) | 2023.05.23 |
| [LINUX] ๊ธฐ๋ณธ ๋ฆฌ๋ ์ค ๋ช ๋ น์ด ์ค์ต2 (0) | 2023.05.19 |