Bootcamp/Airflow

[Airflow] Dags 추가

K_Hyul 2024. 2. 7. 14:27
728x90
cd ~/workspace

pip install virtualenv
python3 -m virtualenv venv

cd venv
cd bin
ls -al | grep activate

. activate

cd

# 가상환경 안에서
export AIRFLOW_HOME=~/airflow

AIRFLOW_VERSION=2.8.1

PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

 

 

airflow db migrate

## username 수정가능
airflow users create \
    --username hyul \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email spiderman@superhero.org
which airflow

nohup airflow webserver --port 8080 &
nohup airflow scheduler &

ps aux | grep airflow

ps -ef | grep airflow

 

 

 

# 포트 8080이 켜져있어서 안켜지는 에러가 발생했음
# 현재 사용중인 포트와 pid 
sudo netstat -ntpl

# 프로세스 죽이기 
sudo kill -9 pid 
# 프로세스 찾기 
sudo ps -ef | grep [프로그램이름]

# 해당 이름이 들어간 프로세스 다 죽이기 
sudo pkill -f [이름]


# 도커 켜진거
docker stop $(docker ps -a -q)

 

 

 

# airflow 홈
cd ~/airflow

ls -al

find / -name "dataset_consumes_1.py" 2>/dev/null

 

 

cd ~/airflow/dags 
vim encore_01.py


import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator

dag = DAG(
    dag_id="listing_4_01",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@hourly",
)

get_data = BashOperator(
    task_id="get_data",
    bash_command=(
        "curl -o /tmp/wikipageviews.gz "
        "https://dumps.wikimedia.org/other/pageviews/"
        "{{ execution_date.year }}/"
        "{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/"
        "pageviews-{{ execution_date.year }}"
        "{{ '{:02}'.format(execution_date.month) }}"
        "{{ '{:02}'.format(execution_date.day) }}-"
        "{{ '{:02}'.format(execution_date.hour) }}0000.gz"
    ),
    dag=dag,
)

 

 

 

vim encore_02.py

from urllib import request

import airflow.utils.dates
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="listing_4_18",
    start_date=airflow.utils.dates.days_ago(1),
    schedule_interval="@hourly",
    max_active_runs=1,
)


def _get_data(year, month, day, hour, output_path, **_):
    url = (
        "https://dumps.wikimedia.org/other/pageviews/"
        f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
    )
    print(url)
    request.urlretrieve(url, output_path)


get_data = PythonOperator(
    task_id="get_data",
    python_callable=_get_data,
    op_kwargs={
        "year": "{{ execution_date.year }}",
        "month": "{{ execution_date.month }}",
        "day": "{{ execution_date.day }}",
        "hour": "{{ execution_date.hour }}",
        "output_path": "/tmp/wikipageviews.gz",
    },
    dag=dag,
)


extract_gz = BashOperator(
    task_id="extract_gz", bash_command="gunzip --force /tmp/wikipageviews.gz", dag=dag
)


def _fetch_pageviews(pagenames, execution_date, **_):
    result = dict.fromkeys(pagenames, 0)
    with open("/tmp/wikipageviews", "r") as f:
        for line in f:
            domain_code, page_title, view_counts, _ = line.split(" ")
            if domain_code == "en" and page_title in pagenames:
                result[page_title] = view_counts

    with open("/tmp/postgres_query.sql", "w") as f:
        for pagename, pageviewcount in result.items():
            f.write(
                "INSERT INTO pageview_counts VALUES ("
                f"'{pagename}', {pageviewcount}, '{execution_date}'"
                ");\n"
            )


fetch_pageviews = PythonOperator(
    task_id="fetch_pageviews",
    python_callable=_fetch_pageviews,
    op_kwargs={"pagenames": {"Google", "Amazon", "Apple", "Microsoft", "Facebook"}},
    dag=dag,
)

get_data >> extract_gz >> fetch_pageviews

 

 

 

728x90

'Bootcamp > Airflow' 카테고리의 다른 글

[Airflow] naver 리뷰 airflow 하기(버전만)  (0) 2024.03.27