DAG: Airkorea_stn_raw_ETL_bashoperator

schedule: 5 * * * *


Airkorea_stn_raw_ETL_bashoperator

Toggle wrap
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import requests
from bs4 import BeautifulSoup
import pandas as pd
import pandas.io.sql as psql
import numpy as np
import re
import psycopg2
import smtplib
from email.message import EmailMessage
from glob import glob
import redis
import json
from icecream import ic

from airflow import DAG
from airflow.models.xcom import XCom
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta

import sys
sys.setrecursionlimit(2000000)

import pendulum
kst = pendulum.timezone("Asia/Seoul")

from haversine import haversine
def get_distances(station_df, stn_nm):
    station_lo, station_la = station_df[station_df.stn_nm == stn_nm].iloc[0][['stn_lo', 'stn_la']]
    return list(map(lambda x: haversine(x,(station_la, station_lo)), station_df[['stn_la', 'stn_lo']].values))

def make_distance_df(station):
    rows = []
    stn_nms = station.stn_nm.values
    for stn_nm in stn_nms:
        rows.append(get_distances(station, stn_nm))
    distance_df = pd.DataFrame(rows, index = stn_nms, columns = stn_nms)
    return distance_df

now = datetime.now()
start_date = datetime(now.year,now.month,now.day,tzinfo=kst)
start_date = start_date - timedelta(days=1)
run_time = datetime.now(tz=kst)
run_time_str_long = str(run_time)[:16]
run_time_str_short = run_time.strftime('%y%m%d%H')
columns = ["stn_nm", "stn_addr", "year", "mang_nm", "item", "stn_lo", "stn_la"]

default_args = {
    'owner':'dilab'
}


dag = DAG(
     'Airkorea_stn_raw_ETL_bashoperator',
     default_args=default_args,
     schedule_interval="5 * * * * ",
     start_date=start_date,
     tags=['Airkorea_stn_raw_ETL_bashoperator']
)

Airkorea_stn_raw_ETL_command = f"python /usr/local/airflow/dags/Airkorea_stn_raw_ETL.py"

Airkorea_stn_raw_ETL = BashOperator(
    task_id=f'Airkorea_stn_raw_ETL_bashoperator',
    bash_command=Airkorea_stn_raw_ETL_command,
    dag=dag
)

if __name__ == "__main__":
    dag.cli()