Skip to main content

Python Pipeline API to MySQL to Snowflake

I decided that I liked doing some of my coding in python, even if I have to manually kick it off at the moment I might go with the safe option of a $5 a month python anywhere package to run on a schedule in the cloud or I could put in as an AWS lambda function or in Azure but I don't want to accidentally rack up a bill so might wait until I am further into training on them. 

So in this code I have:

  • Used dotenv to store all parameters and passwords as environment variables so I can post my scrips without modification and store them in git. (with the env file set to gitignore). 
  • Retrieved the values above and called the weather API. Flattened the json to get all the columns. Put the new rows into the table in MySQL. 
  • Retrieved the table from MySQL and done a drop and replace into Snowflake. 

My Code: 

# -*- coding: utf-8 -*-
"""
Version Number ||| Mod By ||| Version Date
---------------------------------------------------------
1.00 ||| GM ||| May 17 17:39:45 2022
"""
## set which functions to run
runWeather = True
from dotenv import load_dotenv
load_dotenv()
import urllib.request, json
import pandas as pd
from flatten_json import flatten
from sqlalchemy import create_engine
from snowflake.connector.pandas_tools import pd_writer
import os
# Load Keys
#weather
weatherkey = os.environ.get("WEATHER_API_KEY")
#mySQL
MySQLHost = os.environ.get("MYSQLHOST")
MySQLUser = os.environ.get("MYSQLSUER")
MySQLPwd = os.environ.get("MYSQLPWD")
MySQLDB = os.environ.get("MYSQLDB")
#Snowflake
SnowAcc = os.environ.get("SNOWACC")
SnowUser = os.environ.get("SNOWUSER")
SnowPwd = os.environ.get("SNOWPWD")
SnowDB = os.environ.get("SNOWDB")
SnowWH = os.environ.get("SNOWWH")
SnowSchema = os.environ.get("SNOWSCHEMA")
# setup database connection strings
mySQL_conn = create_engine("mysql+mysqldb://"+MySQLUser+":"+MySQLPwd+'@'+MySQLHost+"/"+MySQLDB);
Snowengine = create_engine(
'snowflake://{user}:{password}@{account}/{db}/{schema}?warehouse={warehouse}'.format(
user= SnowUser,
password= SnowPwd,
account= SnowAcc,
warehouse= SnowWH,
db= SnowDB,
schema=SnowSchema ))
def runweatherPipe():
## load the json data
jsonurl = urllib.request.urlopen("http://api.weatherapi.com/v1/current.json?key="+weatherkey+"&q=BN266NH&aqi=yes")
weatherdatajson = json.loads(jsonurl.read())
#flatten and extract the json data
flattendata = flatten(weatherdatajson)
normWeatherData = pd.json_normalize(flattendata)
## load new row into mySQL staging table
normWeatherData.to_sql('stg_weather', con=mySQL_conn, if_exists='append',index=False);
## Extract required columns from staging table
toSnow = pd.read_sql_query('select distinct location_name locationName, location_region LocationRegion , location_country LocationCountry , location_lat Latitude , location_lon Longitude , current_last_updated currentLastUpdated , current_temp_c currentTempC , current_condition_text currentCondText, current_wind_mph windMph , current_precip_mm rainMM , current_humidity Humidity , current_cloud Cloud , current_feelslike_c FeelsLikeC from stg_weather',mySQL_conn);
## Drop and recreate table in Snowflake
toSnow.to_sql('stg_weather', con=Snowengine, if_exists='replace',index=False, method=pd_writer);
if runWeather:
runweatherPipe()
view raw APIMySQLSnow.py hosted with ❤ by GitHub

The table: 



Comments

Popular posts from this blog

AWS training cloud academy free course

One of the things I like about this course are the instructors are really clear but also that it provides free labs that allow you to actually sign into AWS and perform some actions to actually create and do things without worrying that you are going to incur a cost.  Today I complete one of the hands on labs.  This was to create a lambda function, in this case it was a very basic python script that was searching a website for a keyword. I then placed this into a schedule and used cloudwatch to create a dashboard that monitored the running of this function. Overall it was a very simple use case but it was also a very simple process to setup.  I don't have much to add to this other than it is well worth signing up to cloud academy for the free training if nothing else, I am tempted, once i have done some more training, to give the paid for option a go to get the full sandboxes. 

AI News

Here’s a concise roundup of the latest AI news from the past couple of days: AI Technology: Friend or Foe? Researchers and experts continue to debate the impact of artificial intelligence. Is it a boon or a threat? The discussion ranges from AI ethics to its potential in various fields. Read more here . 5 Ways Artificial Intelligence Will Change the World by 2050 Futurists predict that AI will revolutionize our lives in the coming decades. From healthcare to transportation, AI is set to transform industries. Explore the possibilities here . How AI Will Transform Businesses in 2023 Business leaders are embracing AI to enhance efficiency, decision-making, and customer experiences. Stay updated on the latest AI trends in the corporate world here . China’s High-Level AI Modules China is pushing the boundaries of AI with modular next-generation systems. These high-level AI technologies promise breakthroughs in fields like robotics, healthcare, and smart cities. Learn more here . The Future ...

Data Cleansing View in Snowflake

For part of one of my free ETLs I am using Zapps to transfer e-mails from Google Sheets and then Keboola to transfer the sheets into my Snowflake database. I am familiar with string searches and cleansing in Oracle and using python but have not had the chance to do this in Snowflake. I wanted to give it a go as a proof of concept if nothing else. There were some difference in functions between Oracle and Snowflake, no INSTR and using POSITION instead and some difference in working with dates / timestamps but overall it was very similar.  The code below is what I ended up using:  I think want to use this to create some overview graphics to allow me to track the success or failure of my ETLs. Assuming the aspects of Retool remain free you can see how much ETL is going on this link .  In case things aren't working, here is a table of the output I am producing.