Skip to main content

prefect ETL tool in python

Having spent a lot of my time playing with Keboola and dbt to load and transform my data I wanted to have a look at just doing stuff in pure python. I have previously built the fill ETL pipeline for a company in python but haven't really had a need to touch it in over 4 years. Most of the work I did before was just using pandas with a few connectors to various databases and producing reports in Excel using xlwings. It wasn't pretty but it was effective and everyone was happy with the job that it did. 

Instead I ended up using the prefect library. Well I built it all and then integrated it into prefect once I found it. I found it ok and it has some useful features bit it is not brilliant but that could be through back of use. It does allow you to produce DAGs and and lots of other useful functionality. Script below. 



# -*- coding: utf-8 -*-
"""
Created on Sat May 21 20:25:39 2022
@author: garym
"""
import prefect
from prefect import task, Flow
from dotenv import load_dotenv
load_dotenv()
import urllib.request, json
import pandas as pd
from flatten_json import flatten
from sqlalchemy import create_engine
from snowflake.connector.pandas_tools import pd_writer
import os
from datetime import timedelta
# Load Keys
#weather
weatherkey = os.environ.get("WEATHER_API_KEY")
#mySQL
MySQLHost = os.environ.get("MYSQLHOST")
MySQLUser = os.environ.get("MYSQLSUER")
MySQLPwd = os.environ.get("MYSQLPWD")
MySQLDB = os.environ.get("MYSQLDB")
#Snowflake
SnowAcc = os.environ.get("SNOWACC")
SnowUser = os.environ.get("SNOWUSER")
SnowPwd = os.environ.get("SNOWPWD")
SnowDB = os.environ.get("SNOWDB")
SnowWH = os.environ.get("SNOWWH")
SnowSchema = os.environ.get("SNOWSCHEMA")
# setup database connection strings
mySQL_conn = create_engine("mysql+mysqldb://"+MySQLUser+":"+MySQLPwd+'@'+MySQLHost+"/"+MySQLDB);
Snowengine = create_engine(
'snowflake://{user}:{password}@{account}/{db}/{schema}?warehouse={warehouse}'.format(
user= SnowUser,
password= SnowPwd,
account= SnowAcc,
warehouse= SnowWH,
db= SnowDB,
schema=SnowSchema ))
@task(max_retries=3, retry_delay=timedelta(seconds=10))
def extract_api_data():
jsonurl = urllib.request.urlopen("http://api.weatherapi.com/v1/current.json?key="+weatherkey+"&q=BN266NH&aqi=yes")
weatherdatajson = json.loads(jsonurl.read())
return weatherdatajson
@task(max_retries=3, retry_delay=timedelta(seconds=10))
def transform(weatherdatajson):
flattendata = flatten(weatherdatajson)
normWeatherData = pd.json_normalize(flattendata)
return normWeatherData
@task(max_retries=3, retry_delay=timedelta(seconds=10))
def load_mySQL_data(normWeatherData):
normWeatherData.to_sql('stg_weather', con=mySQL_conn, if_exists='append',index=False)
loaded_ind = 'Y'
return loaded_ind
@task(max_retries=3, retry_delay=timedelta(seconds=10))
def extract_mySQL_data(loaded_ind):
if loaded_ind == 'Y' :
toSnow = pd.read_sql_query('select distinct location_name locationName, location_region LocationRegion , location_country LocationCountry , location_lat Latitude , location_lon Longitude , current_last_updated currentLastUpdated , current_temp_c currentTempC , current_condition_text currentCondText, current_wind_mph windMph , current_precip_mm rainMM , current_humidity Humidity , current_cloud Cloud , current_feelslike_c FeelsLikeC from stg_weather',mySQL_conn)
return toSnow
@task(max_retries=3, retry_delay=timedelta(seconds=10))
def load_live_data(toSnow):
toSnow.to_sql('stg_weather', con=Snowengine, if_exists='replace',index=False, method=pd_writer)
with Flow("Weather-ETL") as flow:
weatherdatajson = extract_api_data()
normWeatherData = transform(weatherdatajson)
loadedmySQL = load_mySQL_data(normWeatherData)
frommySQL = extract_mySQL_data(loadedmySQL)
end = load_live_data(frommySQL)
flow.run()
view raw prefectetl.py hosted with ❤ by GitHub

Comments

Popular posts from this blog

AWS training cloud academy free course

One of the things I like about this course are the instructors are really clear but also that it provides free labs that allow you to actually sign into AWS and perform some actions to actually create and do things without worrying that you are going to incur a cost.  Today I complete one of the hands on labs.  This was to create a lambda function, in this case it was a very basic python script that was searching a website for a keyword. I then placed this into a schedule and used cloudwatch to create a dashboard that monitored the running of this function. Overall it was a very simple use case but it was also a very simple process to setup.  I don't have much to add to this other than it is well worth signing up to cloud academy for the free training if nothing else, I am tempted, once i have done some more training, to give the paid for option a go to get the full sandboxes. 

AI News

Here’s a concise roundup of the latest AI news from the past couple of days: AI Technology: Friend or Foe? Researchers and experts continue to debate the impact of artificial intelligence. Is it a boon or a threat? The discussion ranges from AI ethics to its potential in various fields. Read more here . 5 Ways Artificial Intelligence Will Change the World by 2050 Futurists predict that AI will revolutionize our lives in the coming decades. From healthcare to transportation, AI is set to transform industries. Explore the possibilities here . How AI Will Transform Businesses in 2023 Business leaders are embracing AI to enhance efficiency, decision-making, and customer experiences. Stay updated on the latest AI trends in the corporate world here . China’s High-Level AI Modules China is pushing the boundaries of AI with modular next-generation systems. These high-level AI technologies promise breakthroughs in fields like robotics, healthcare, and smart cities. Learn more here . The Future ...

Data Cleansing View in Snowflake

For part of one of my free ETLs I am using Zapps to transfer e-mails from Google Sheets and then Keboola to transfer the sheets into my Snowflake database. I am familiar with string searches and cleansing in Oracle and using python but have not had the chance to do this in Snowflake. I wanted to give it a go as a proof of concept if nothing else. There were some difference in functions between Oracle and Snowflake, no INSTR and using POSITION instead and some difference in working with dates / timestamps but overall it was very similar.  The code below is what I ended up using:  I think want to use this to create some overview graphics to allow me to track the success or failure of my ETLs. Assuming the aspects of Retool remain free you can see how much ETL is going on this link .  In case things aren't working, here is a table of the output I am producing.