Skip to main content

Python Pipeline API to MySQL to Snowflake

I decided that I liked doing some of my coding in python, even if I have to manually kick it off at the moment I might go with the safe option of a $5 a month python anywhere package to run on a schedule in the cloud or I could put in as an AWS lambda function or in Azure but I don't want to accidentally rack up a bill so might wait until I am further into training on them. 

So in this code I have:

  • Used dotenv to store all parameters and passwords as environment variables so I can post my scrips without modification and store them in git. (with the env file set to gitignore). 
  • Retrieved the values above and called the weather API. Flattened the json to get all the columns. Put the new rows into the table in MySQL. 
  • Retrieved the table from MySQL and done a drop and replace into Snowflake. 

My Code: 

# -*- coding: utf-8 -*-
"""
Version Number ||| Mod By ||| Version Date
---------------------------------------------------------
1.00 ||| GM ||| May 17 17:39:45 2022
"""
## set which functions to run
runWeather = True
from dotenv import load_dotenv
load_dotenv()
import urllib.request, json
import pandas as pd
from flatten_json import flatten
from sqlalchemy import create_engine
from snowflake.connector.pandas_tools import pd_writer
import os
# Load Keys
#weather
weatherkey = os.environ.get("WEATHER_API_KEY")
#mySQL
MySQLHost = os.environ.get("MYSQLHOST")
MySQLUser = os.environ.get("MYSQLSUER")
MySQLPwd = os.environ.get("MYSQLPWD")
MySQLDB = os.environ.get("MYSQLDB")
#Snowflake
SnowAcc = os.environ.get("SNOWACC")
SnowUser = os.environ.get("SNOWUSER")
SnowPwd = os.environ.get("SNOWPWD")
SnowDB = os.environ.get("SNOWDB")
SnowWH = os.environ.get("SNOWWH")
SnowSchema = os.environ.get("SNOWSCHEMA")
# setup database connection strings
mySQL_conn = create_engine("mysql+mysqldb://"+MySQLUser+":"+MySQLPwd+'@'+MySQLHost+"/"+MySQLDB);
Snowengine = create_engine(
'snowflake://{user}:{password}@{account}/{db}/{schema}?warehouse={warehouse}'.format(
user= SnowUser,
password= SnowPwd,
account= SnowAcc,
warehouse= SnowWH,
db= SnowDB,
schema=SnowSchema ))
def runweatherPipe():
## load the json data
jsonurl = urllib.request.urlopen("http://api.weatherapi.com/v1/current.json?key="+weatherkey+"&q=BN266NH&aqi=yes")
weatherdatajson = json.loads(jsonurl.read())
#flatten and extract the json data
flattendata = flatten(weatherdatajson)
normWeatherData = pd.json_normalize(flattendata)
## load new row into mySQL staging table
normWeatherData.to_sql('stg_weather', con=mySQL_conn, if_exists='append',index=False);
## Extract required columns from staging table
toSnow = pd.read_sql_query('select distinct location_name locationName, location_region LocationRegion , location_country LocationCountry , location_lat Latitude , location_lon Longitude , current_last_updated currentLastUpdated , current_temp_c currentTempC , current_condition_text currentCondText, current_wind_mph windMph , current_precip_mm rainMM , current_humidity Humidity , current_cloud Cloud , current_feelslike_c FeelsLikeC from stg_weather',mySQL_conn);
## Drop and recreate table in Snowflake
toSnow.to_sql('stg_weather', con=Snowengine, if_exists='replace',index=False, method=pd_writer);
if runWeather:
runweatherPipe()
view raw APIMySQLSnow.py hosted with ❤ by GitHub

The table: 



Comments

Popular posts from this blog

Gen AI news 29-04-2024

Here are some recent updates and insights related to Generative AI (gen AI) : Enterprise Hits and Misses - Robotics and Gen AI Converge : This article discusses the convergence of robotics and generative AI. It explores breakthroughs needed in the field, the FTC’s policy change regarding non-competes, and the impact on AI model sizes for enterprises 1 . Read more All You Need To Know About The Upcoming AI-Powered OLED iPad Pro : This piece provides a summary of rumors surrounding the next-gen AI-fused OLED iPad Pro, powered by the new Apple M4 chip 2 . Read more Delivering on the Promise of Gen AI : New Electronics reflects on NVIDIA GTC and key announcements that contribute to delivering on the promises made for generative AI 3 . Read more The Future of Generative AI - An Early View in 15 Charts (McKinsey): Since the release of ChatGPT in November 2022, generative AI has been making headlines. McKinsey research estimates that gen AI features could add up to $4.4 trillion to the globa...

Keboola Flows

Really finding Keboola was the thing that kickstarted this project otherwise I would be trying to build custom code on a python cloud server and building everything from scratch.  In Keboola you build you data sources and destinations using connection details which is fairly simple and something I will likely cover in another post, same goes for transformations etc. Here though I am going to discuss Flows, this is where you bring everything together. On my free account there are some limitations.  My easiest flow is very basic:  Pull parkrun results e-mail from Gmail to Google Sheets (actually done by Zap not Keboola).  Keboola will, as often as I like, in this case once a week, pull the data from the sheet into its storage.  It will then transfer this to the target database. Currently I have this setup to be MySQL database but I can and might expand that to the Snowflake instance within Keboola.  I then, outside of Keboola, connect to the MySQL database f...

Snowflake Scripting - SQL Cursors

Snowflake scripting in SQL seems to be in preview and I have decided to have a play with it. Given how new it is there is limited documentation so I am using a combination of what I can find on the Snowflake site and the odd blog that has been written about it. There appear to be a few quirks, at least when compared to Oracle PL/SQL (though that has been round for years). How many of these are intentional and how many are things to be ironed out I don't know. You can see the procedure I have created it:  Accepts an id as a parameter  Creates a result set selecting from a table, using the parameter as a filter Loads the results set into a cursor.  Loops through the cursor loading the id in the cursor into variable Calls procedure passing in the variable as the parameter.  Then as a proof of concept I tried the Snowflake feature of allowing declaration of variables within the main start and end block.