2023.05.24 (WED) ํ์ต ์ ๋ฆฌ
#Pyenv #Airflow #AWS #S3
1. pyenv
ํ๋ก์ ํธ/ํ์ดํ๋ผ์ธ ํ๊ฒฝ ๋ณ๋ก ํ์ด์ฌ ๋ฒ์ ์ด ๋ค๋ฅด๊ณ ๋ฐ์ดํฐ ์์ง๋์ด๋ ์ฌ๋ฌ ๋ฒ์ ์ ํ์ด์ฌ์ ๊ฐ๋ฐํ๊ฒฝ์ ์ค์นํ์ฌ ๊ฐ๋ฐ/ํ ์คํธ ์์ ์ ์ํ ํด์ผํ๋ฏ๋ก pyenv๋ฅผ ํ์ฉํ์ฌ ๋๋ ํ ๋ฆฌ๋ณ๋ก ์ํ๋ ๋ฒ์ ผ์ผ๋ก ๊ฒฉ๋ฆฌ๋ ํ๊ฒฝ์ ๊ฐ๋ฅํ๊ฒ ํจ
- pyenv ์ค์น
curl https://pyenv.run | bash
- zshrc ์ค์ - vi ~/.zshrc ํ์ผ์ ์๋ ๋ด์ฉ ์ถ๊ฐ
export PYENV_ROOT="$HOME/.pyenv"
command -v pyenv >/dev/null || export PATH="$PYENV_ROOT/bin:$PATH"
eval "$(pyenv init -)"
eval "$(pyenv virtualenv-init -)"
- ์ค์น ํ์ธ
source ~/.zshrc #์ฌ์์
pyenv versions
- ์ค์น๊ฐ๋ฅํ ํ์ด์ฌ version ํ์ธ
pyenv install --list
pyenv install --list | grep "3.7.1" #3.7.1 version๋ง ํ์ธ
- ํน์ ๋ฒ์ ์ ํ์ด์ฌ ์ค์น
pyenv install 3.7.16
- ๊ธฐ๋ณธ ์ค์ ํ์ด์ฌ ๋ฒ์ ๋ณ๊ฒฝ
pyenv global 3.7.16
- ํ์ด์ฌ ๊ฐ๋ฐํ๊ฒฝ ์์ฑ
pyenv virtualenv 3.7.16 pd24air # ๊ฐ๋ฐํ๊ฒฝ ์์ฑ
pyenv global pd24air # ์์ฑ๋ ํ์ด์ฌ ๊ฐ๋ฐํ๊ฒฝ์ ๊ธฐ๋ณธ์ผ๋ก ์ฌ์ฉ
2. Aws Hadoop System S3
Amazon Simple Storage Service๋ ์ธํฐ๋ท์ฉ ์คํ ๋ฆฌ์ง ์๋น์ค
2-1. Basic
* ์์์ ์์ฑํ pd24air ํ๊ฒฝ์์ ์งํ
- awscli ์ค์น
pip install awscli
aws configure #์ ๊ทผ ๊ถํ ์ค์ ...
- aws cli
aws s3 ls #๋ชฉ๋ก ์กฐํ
aws s3 rm s3://mybucket #์ญ์
aws s3 mv sub.log s3://mybucket #์ด๋
aws s3 cp sub.log s3://mybucket/sub/ #Local → #ํด๋๊ฐ ์์ ๊ฒฝ์ฐ ํด๋๋ฅผ ์์ฑ ํ ๋ณต์ฌ
2-2. airflow๋ฅผ ํ์ฉํ์ฌ AWS์ ํ์ผ ์ ์กํ๊ธฐ
- airflow container์ awscli ์ค์น ๅฟ
# ๊ตฌ๋ ์ปจํ
์ด๋๋ค์ ์ํ ํ์ธ
docker ps
docker exec -it [worker ์ปจํ
์ด๋ ID] bash
# ์ปจํ
์ด๋ ๋ด๋ถ์์ awscli ์ค์น
pip install awscli
aws configure #์ ๊ทผ ๊ถํ ์ค์ ...
- airflow/dags ์์ฑ

๐ 'Transfer to AWS' Dag Code
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
MY_PATH = '/opt/airflow/dags/data'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 1),
'retries': 0,
}
test_dag = DAG(
'mario-sub-aws',
default_args=default_args,
schedule_interval=timedelta(days=1)
)
# Define the BashOperator task
load_web_log = BashOperator(
task_id = 'load_web_log',
bash_command=f"aws s3 cp s3:/log/web.log {MY_PATH}/web.log",
dag=test_dag
)
bash_extract_raw = BashOperator(
task_id='extract_RAW_log',
bash_command=f"cat {MY_PATH}/web.log | grep 'item=' | cut -d'=' -f 2 | cut -d',' -f 1 > {MY_PATH}/RAW.log",
dag=test_dag
)
bash_extract_sum = BashOperator(
task_id='extract_SUM_log',
bash_command=f"cat {MY_PATH}/RAW.log | sort -n | uniq -c > {MY_PATH}/SUM.log",
dag=test_dag
)
bash_task_done = BashOperator(
task_id='DONE',
bash_command=f"touch {MY_PATH}/DONE",
dag=test_dag
)
bash_task_aws = BashOperator(
task_id='transfer_to_aws',
bash_command=f"""
aws s3 cp {MY_PATH}/RAW.log s3://log/sub/RAW.log
sleep 1
aws s3 cp {MY_PATH}/SUM.log s3://log/sub/SUM.log
sleep 1
aws s3 cp {MY_PATH}/DONE s3://log/sub/DONE
echo "END"
""",
dag=test_dag
)
start_task = EmptyOperator(task_id='start',dag=test_dag)
end_task = EmptyOperator(task_id='end',dag=test_dag)
start_task >>load_web_log >>bash_extract_raw >> bash_extract_sum >> bash_task_done
bash_task_done >> bash_task_aws >> end_task'๐ Data > Engineering' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
| [Airflow] Airflow Standalone ์ค์น ๋ฐ ํ ์คํธ (0) | 2023.05.26 |
|---|---|
| [Airflow/AWS] Airflow - Trigger Rule (0) | 2023.05.25 |
| [Airflow with Docker] Airflow ๋ฐ Docker ๋ช ๋ น์ด ๊ธฐ์ด (0) | 2023.05.23 |
| [LINUX] ๊ธฐ๋ณธ ๋ฆฌ๋ ์ค ๋ช ๋ น์ด ์ค์ต2 (0) | 2023.05.19 |
| [LINUX] ๊ธฐ๋ณธ ๋ฆฌ๋ ์ค ๋ช ๋ น์ด ์ค์ต (0) | 2023.05.18 |