snowmobile¶
snowmobile bundles the SnowflakeConnection into an object model focused on configuration-management and streamlining access to Snowflake within Python.
Its main features are:
- Consolidated configuration: snowmobile.toml
- Use one configuration file, tracked by snowmobile and accessible from any Python instance on a machine
- Simplified execution of raw SQL
- Query results into a DataFrame, SnowflakeCursor or DictCursor from the same object
- Refined data loading implementation
- DDL from DataFrame; compatibility checks at run time; if_exists in 'append', 'truncate', 'replace', 'fail'
- sql scripts as Python objects
- Work with subsets of scripts; clearly denote code, comments, and metadata; export to markdown
Note
snowmobile is a wrapper around the snowflake.connector, not a replacement for it; the SnowflakeConnection is intentionally stored as a public attribute so that the snowflake.connector and snowmobile APIs can be leveraged congruently.
Overview¶
Connecting¶
Establishing a connection from configured defaults is done with:
import snowmobile
sn = snowmobile.connect()
sn
is a Snowmobile with the following attributes:
print(sn) #> snowmobile.Snowmobile(creds='creds1')
print(sn.cfg) #> snowmobile.Configuration('snowmobile.toml')
print(type(sn.con)) #> <class 'snowflake.connector.connection.SnowflakeConnection'>
Specific connection arguments are accessed by pre-configured alias:
sn2 = snowmobile.connect(creds='sandbox')
print(sn.cfg.connection.current != sn2.cfg.connection.current) #> True
sn
The variable sn
represents a generic instance of Snowmobile roughly equivalent to that created with the snippet below; it’s referred to as throughout the documentation, and applicable examples make use of it as a fixture without explicitly re-instantiating.
import snowmobile
sn = snowmobile.connect()
Query Execution¶
Setup
Assume a pre-existing
sample_table:
COL1 | COL2 |
---|---|
1 | 1 |
2 | 4 |
4 | 9 |
Into a DataFrame
:
sn.query('select * from sample_table')
col1 | col2 | |
---|---|---|
0 | 1 | 1 |
1 | 2 | 4 |
2 | 3 | 9 |
Into a SnowflakeCursor:
sn.ex('select * from sample_table').fetchall()
[(1, 1), (2, 4), (3, 9)]
Into a DictCursor:
sn.exd('select * from sample_table').fetchall()
[{'COL1': 1, 'COL2': 1}, {'COL1': 2, 'COL2': 4}, {'COL1': 3, 'COL2': 9}]
Or to get a single value:
sn.query('select count(*) from sample_table', as_scalar=True)
3
Information API¶
Check existence:
sn.exists('sample_table') #> True
Select records:
sn.select('sample_table', n=1)
col1 | col2 | |
---|---|---|
0 | 1 | 1 |
Query metadata:
sn.count('sample_table') #> 3
sn.count('sample_table', dst_of='col1') #> 3
sn.columns('sample_table') #> ['COL1', 'COL2']
Verify dimensionality:
sn.is_distinct('sample_table', field='col1') #> True
Submit basic administrative commands:
sn.clone(nm='sample_table', to='sample_table2')
Fetch DDL:
print(sn.ddl('sample_table'))
create or replace TABLE SAMPLE_TABLE (
COL1 FLOAT,
COL2 FLOAT
);
Drop objects:
for t in ['sample_table', 'sample_table2']:
sn.drop(t, obj='table')
sn.exists('sample_table') #> False
sn.exists('sample_table2') #> False
Loading Data¶
Setup
Assume sample_table does not
yet exist and df
was created with:
import pandas as pd
import numpy as np
df = pd.DataFrame(
data = {'COL1': [1, 2, 3], 'COL2': [1, 4, 9]}
)
print(df.shape) #> (3, 2)
COL1 | COL2 |
---|---|
1 | 1 |
2 | 4 |
3 | 9 |
Given , instantiating the following Table, t1
, with as_is=True will:
Generate and execute DDL to create sample_table
Load
df
into sample_table via the Bulk Loading from a Local File System standard
import snowmobile
t1 = snowmobile.Table(
df=df,
table='sample_table',
as_is=True,
)
In absence of providing an existing to snowmobile.Table()
, an instance was created and stored as a public attribute; the create and load for sample_table can be verified with either of:
print(t1.sn.exists('sample_table')) #> True
print(t1.loaded) #> True
When compatability between the df and the table is unknown, as_is=True can be omitted and the Table that’s returned inspected further prior to continuing with the loading process:
df2 = pd.concat([df, df], axis=1)
print(list(df2.columns)) #> ['COL1', 'COL2', 'COL1', 'COL2']
print(t1.sn.columns('sample_table')) #> ['COL1', 'COL2']
t2 = snowmobile.Table(
df=df2,
table='sample_table',
)
Primary compatability checks are centered around things like:
print(t2.exists) #> True
print(t2.cols_match) #> False
With snowmobile.toml defaults, calling Table.load()
on t2
will throw an error:
from snowmobile.core.errors import ColumnMismatchError
try:
t2.load()
except ColumnMismatchError as e:
print(e)
"""
>>>
ColumnMismatchError: `SAMPLE_TABLE` columns do not equal those in the local DataFrame and
if_exists='append' was specified.
Either provide if_exists='replace' to overwrite the existing table or see `table.col_diff()`
to inspect the mismatched columns.
"""
Table.col_diff()
returns a dictionary of tuples containing the table and DataFrame columns by index position; providing mismatch=True limits the results to only those responsible for the ColumnMismatchError
:
import json
print(json.dumps(t2.col_diff(mismatched=True), indent=4))
"""
>>>
{
"3": [
null,
"col1_1"
],
"4": [
null,
"col2_1"
]
}
"""
Keyword arguments take precedent over configurations in snowmobile.toml, so df2
can still be loaded with:
t2.load(if_exists='replace')
print(t2.loaded) #> True
print(t2.sn.columns('sample_table')) #> ['COL1', 'COL2', 'COL1_1', 'COL2_1']
Note
With default behavior, the duplicate column names in df2
were automatically renamed by t2
before loading into sample_table:
print(list(df2.columns)) #> ['COL1', 'COL2', 'COL1', 'COL2']
print(list(t2.df.columns)) #> ['COL1', 'COL2', 'COL1_1', 'COL2_1']
Working with SQL Scripts¶
Setup
path
is a full path (pathlib.Path
or str
) to a file, sample_table.sql, containing 5 standard sql st:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | create or replace table sample_table (
col1 number(18,0),
col2 number(18,0)
);
insert into sample_table with
sample_data as (
select
uniform(1, 10, random(1)) as rand_int
from table(generator(rowcount => 3)) v
)
select
row_number() over (order by a.rand_int) as col1
,(col1 * col1) as col2
from sample_data a;
select * from sample_table;
/*-qa-empty~verify 'sample_table' is distinct on 'col1'-*/
select
a.col1
,count(*)
from sample_table a
group by 1
having count(*) <> 1;
/*-insert into~any_other_table-*/
insert into any_other_table (
select
a.*
,tmstmp.tmstmp as insert_tmstmp
from sample_table a
cross join (select current_timestamp() as tmstmp)tmstmp
);
|
Given a path
to sample_table.sql, a Script can be created with:
import snowmobile
script = snowmobile.Script(path=path, silence=True)
print(script) #> snowmobile.Script('sample_table.sql')
print(script.depth) #> 5
script.dtl()
sample_table.sql
================
1: Statement('create table~s1')
2: Statement('insert into~s2')
3: Statement('select data~s3')
4: Statement('qa-empty~verify 'sample_table' is distinct on 'col1'')
5: Statement('insert into~any_other_table')
Statements can be accessed by their index position or name (nm
):
script(5) #> Statement('insert into~any_other_table')
script(-1) #> Statement('insert into~any_other_table')
script('insert into~any_other_table') #> Statement('insert into~any_other_table')
Each Statement has its own set of attributes:
s3 = script(3) # store 3rd statement
print(s3.index) #> 3
print(s3.sql) #> select * from sample_table
print(s3.kw) #> select
print(s3.desc) #> sample_table
print(s3.nm) #> select~sample_table
Based on statement attributes, script
can be filtered and used within that context:
with script.filter(
excl_desc=['.*any_other_table'], # throwing out s5; pattern is regex not glob
excl_kw=['select'], # throwing out s3
) as s:
s.run()
sample_table.sql
================
<1 of 3> create table~s1 (1s)......................................... <completed>
<2 of 3> insert into~s2 (0s).......................................... <completed>
<3 of 3> qa-empty~verify 'sample_table' is distinct on 'col1' (0s).... <passed>
Spans of statements are directly executable by index boundaries:
script.run((1, 3))
sample_table.sql
================
<1 of 6> create table~s1 (0s)............................... <completed>
<2 of 6> insert into~s2 (0s)................................ <completed>
<3 of 6> select data~s3 (0s)................................ <completed>
And their results accessible retroactively:
script(3).results.head()
col1 | col2 | |
---|---|---|
0 | 1 | 1 |
1 | 2 | 4 |
2 | 3 | 9 |