import requests
from bs4 import BeautifulSoup
import pandas as pd
import pandas.io.sql as psql
import numpy as np
import re
import psycopg2
import smtplib
from email.message import EmailMessage
from glob import glob
import redis
import json
from icecream import ic
from airflow import DAG
from airflow.models.xcom import XCom
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
import sys
sys.setrecursionlimit(2000000)
import pendulum
kst = pendulum.timezone("Asia/Seoul")
from haversine import haversine
def get_distances(station_df, stn_nm):
station_lo, station_la = station_df[station_df.stn_nm == stn_nm].iloc[0][['stn_lo', 'stn_la']]
return list(map(lambda x: haversine(x,(station_la, station_lo)), station_df[['stn_la', 'stn_lo']].values))
def make_distance_df(station):
rows = []
stn_nms = station.stn_nm.values
for stn_nm in stn_nms:
rows.append(get_distances(station, stn_nm))
distance_df = pd.DataFrame(rows, index = stn_nms, columns = stn_nms)
return distance_df
now = datetime.now()
start_date = datetime(now.year,now.month,now.day,tzinfo=kst)
start_date = start_date - timedelta(days=1)
run_time = datetime.now(tz=kst)
run_time_str_long = str(run_time)[:16]
run_time_str_short = run_time.strftime('%y%m%d%H')
columns = ["stn_nm", "stn_addr", "year", "mang_nm", "item", "stn_lo", "stn_la"]
default_args = {
'owner':'dilab'
}
dag = DAG(
'Airkorea_stn_raw_ETL_bashoperator',
default_args=default_args,
schedule_interval="5 * * * * ",
start_date=start_date,
tags=['Airkorea_stn_raw_ETL_bashoperator']
)
Airkorea_stn_raw_ETL_command = f"python /usr/local/airflow/dags/Airkorea_stn_raw_ETL.py"
Airkorea_stn_raw_ETL = BashOperator(
task_id=f'Airkorea_stn_raw_ETL_bashoperator',
bash_command=Airkorea_stn_raw_ETL_command,
dag=dag
)
if __name__ == "__main__":
dag.cli()