728x90
cd ~/workspace
pip install virtualenv
python3 -m virtualenv venv
cd venv
cd bin
ls -al | grep activate
. activate
cd
# 가상환경 안에서
export AIRFLOW_HOME=~/airflow
AIRFLOW_VERSION=2.8.1
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
airflow db migrate
## username 수정가능
airflow users create \
--username hyul \
--firstname Peter \
--lastname Parker \
--role Admin \
--email spiderman@superhero.org
which airflow
nohup airflow webserver --port 8080 &
nohup airflow scheduler &
ps aux | grep airflow
ps -ef | grep airflow
# 포트 8080이 켜져있어서 안켜지는 에러가 발생했음
# 현재 사용중인 포트와 pid
sudo netstat -ntpl
# 프로세스 죽이기
sudo kill -9 pid
# 프로세스 찾기
sudo ps -ef | grep [프로그램이름]
# 해당 이름이 들어간 프로세스 다 죽이기
sudo pkill -f [이름]
# 도커 켜진거
docker stop $(docker ps -a -q)
# airflow 홈
cd ~/airflow
ls -al
find / -name "dataset_consumes_1.py" 2>/dev/null
cd ~/airflow/dags
vim encore_01.py
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
dag = DAG(
dag_id="listing_4_01",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@hourly",
)
get_data = BashOperator(
task_id="get_data",
bash_command=(
"curl -o /tmp/wikipageviews.gz "
"https://dumps.wikimedia.org/other/pageviews/"
"{{ execution_date.year }}/"
"{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/"
"pageviews-{{ execution_date.year }}"
"{{ '{:02}'.format(execution_date.month) }}"
"{{ '{:02}'.format(execution_date.day) }}-"
"{{ '{:02}'.format(execution_date.hour) }}0000.gz"
),
dag=dag,
)
vim encore_02.py
from urllib import request
import airflow.utils.dates
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="listing_4_18",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@hourly",
max_active_runs=1,
)
def _get_data(year, month, day, hour, output_path, **_):
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
print(url)
request.urlretrieve(url, output_path)
get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
op_kwargs={
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
"hour": "{{ execution_date.hour }}",
"output_path": "/tmp/wikipageviews.gz",
},
dag=dag,
)
extract_gz = BashOperator(
task_id="extract_gz", bash_command="gunzip --force /tmp/wikipageviews.gz", dag=dag
)
def _fetch_pageviews(pagenames, execution_date, **_):
result = dict.fromkeys(pagenames, 0)
with open("/tmp/wikipageviews", "r") as f:
for line in f:
domain_code, page_title, view_counts, _ = line.split(" ")
if domain_code == "en" and page_title in pagenames:
result[page_title] = view_counts
with open("/tmp/postgres_query.sql", "w") as f:
for pagename, pageviewcount in result.items():
f.write(
"INSERT INTO pageview_counts VALUES ("
f"'{pagename}', {pageviewcount}, '{execution_date}'"
");\n"
)
fetch_pageviews = PythonOperator(
task_id="fetch_pageviews",
python_callable=_fetch_pageviews,
op_kwargs={"pagenames": {"Google", "Amazon", "Apple", "Microsoft", "Facebook"}},
dag=dag,
)
get_data >> extract_gz >> fetch_pageviews
728x90
'Bootcamp > Airflow' 카테고리의 다른 글
[Airflow] naver 리뷰 airflow 하기(버전만) (0) | 2024.03.27 |
---|