Having spent a lot of my time playing with Keboola and dbt to load and transform my data I wanted to have a look at just doing stuff in pure python. I have previously built the fill ETL pipeline for a company in python but haven't really had a need to touch it in over 4 years. Most of the work I did before was just using pandas with a few connectors to various databases and producing reports in Excel using xlwings. It wasn't pretty but it was effective and everyone was happy with the job that it did.
Instead I ended up using the prefect library. Well I built it all and then integrated it into prefect once I found it. I found it ok and it has some useful features bit it is not brilliant but that could be through back of use. It does allow you to produce DAGs and and lots of other useful functionality. Script below.
# -*- coding: utf-8 -*- | |
""" | |
Created on Sat May 21 20:25:39 2022 | |
@author: garym | |
""" | |
import prefect | |
from prefect import task, Flow | |
from dotenv import load_dotenv | |
load_dotenv() | |
import urllib.request, json | |
import pandas as pd | |
from flatten_json import flatten | |
from sqlalchemy import create_engine | |
from snowflake.connector.pandas_tools import pd_writer | |
import os | |
from datetime import timedelta | |
# Load Keys | |
#weather | |
weatherkey = os.environ.get("WEATHER_API_KEY") | |
#mySQL | |
MySQLHost = os.environ.get("MYSQLHOST") | |
MySQLUser = os.environ.get("MYSQLSUER") | |
MySQLPwd = os.environ.get("MYSQLPWD") | |
MySQLDB = os.environ.get("MYSQLDB") | |
#Snowflake | |
SnowAcc = os.environ.get("SNOWACC") | |
SnowUser = os.environ.get("SNOWUSER") | |
SnowPwd = os.environ.get("SNOWPWD") | |
SnowDB = os.environ.get("SNOWDB") | |
SnowWH = os.environ.get("SNOWWH") | |
SnowSchema = os.environ.get("SNOWSCHEMA") | |
# setup database connection strings | |
mySQL_conn = create_engine("mysql+mysqldb://"+MySQLUser+":"+MySQLPwd+'@'+MySQLHost+"/"+MySQLDB); | |
Snowengine = create_engine( | |
'snowflake://{user}:{password}@{account}/{db}/{schema}?warehouse={warehouse}'.format( | |
user= SnowUser, | |
password= SnowPwd, | |
account= SnowAcc, | |
warehouse= SnowWH, | |
db= SnowDB, | |
schema=SnowSchema )) | |
@task(max_retries=3, retry_delay=timedelta(seconds=10)) | |
def extract_api_data(): | |
jsonurl = urllib.request.urlopen("http://api.weatherapi.com/v1/current.json?key="+weatherkey+"&q=BN266NH&aqi=yes") | |
weatherdatajson = json.loads(jsonurl.read()) | |
return weatherdatajson | |
@task(max_retries=3, retry_delay=timedelta(seconds=10)) | |
def transform(weatherdatajson): | |
flattendata = flatten(weatherdatajson) | |
normWeatherData = pd.json_normalize(flattendata) | |
return normWeatherData | |
@task(max_retries=3, retry_delay=timedelta(seconds=10)) | |
def load_mySQL_data(normWeatherData): | |
normWeatherData.to_sql('stg_weather', con=mySQL_conn, if_exists='append',index=False) | |
loaded_ind = 'Y' | |
return loaded_ind | |
@task(max_retries=3, retry_delay=timedelta(seconds=10)) | |
def extract_mySQL_data(loaded_ind): | |
if loaded_ind == 'Y' : | |
toSnow = pd.read_sql_query('select distinct location_name locationName, location_region LocationRegion , location_country LocationCountry , location_lat Latitude , location_lon Longitude , current_last_updated currentLastUpdated , current_temp_c currentTempC , current_condition_text currentCondText, current_wind_mph windMph , current_precip_mm rainMM , current_humidity Humidity , current_cloud Cloud , current_feelslike_c FeelsLikeC from stg_weather',mySQL_conn) | |
return toSnow | |
@task(max_retries=3, retry_delay=timedelta(seconds=10)) | |
def load_live_data(toSnow): | |
toSnow.to_sql('stg_weather', con=Snowengine, if_exists='replace',index=False, method=pd_writer) | |
with Flow("Weather-ETL") as flow: | |
weatherdatajson = extract_api_data() | |
normWeatherData = transform(weatherdatajson) | |
loadedmySQL = load_mySQL_data(normWeatherData) | |
frommySQL = extract_mySQL_data(loadedmySQL) | |
end = load_live_data(frommySQL) | |
flow.run() |
Comments
Post a Comment