snowmobile

PyPI version Documentation Status codecov Code style: black License: MIT


snowmobile bundles the SnowflakeConnection into an object model focused on configuration-management and streamlining access to Snowflake within Python.

Its main features are:

Consolidated configuration: snowmobile.toml
Use one configuration file, tracked by snowmobile and accessible from any Python instance on a machine
Simplified execution of raw SQL
Query results into a DataFrame, SnowflakeCursor or DictCursor from the same object
Refined data loading implementation
DDL from DataFrame; compatibility checks at run time; if_exists in 'append', 'truncate', 'replace', 'fail'
sql scripts as Python objects
Work with subsets of scripts; clearly denote code, comments, and metadata; export to markdown

Note

snowmobile is a wrapper around the snowflake.connector, not a replacement for it; the SnowflakeConnection is intentionally stored as a public attribute so that the snowflake.connector and snowmobile APIs can be leveraged congruently.


Overview



Connecting

Connecting

snowmobile.connect() returns a Snowmobile whose purpose is to:

  1. Locate, instantiate, and store snowmobile.toml as a Configuration object (sn.cfg)

  2. Establish a connection to Snowflake and store the SnowflakeConnection (sn.con)

  3. Serve as the primary entry point to the SnowflakeConnection and snowmobile APIs

The first time it’s invoked, Snowmobile will find snowmobile.toml and cache its location; this step isn’t repeated unless the file is moved, the cache is manually cleared, or a new version of snowmobile is installed.

With all arguments omitted, it will authenticate with the default credentials and connection arguments specified in snowmobile.toml.


Establishing a connection from configured defaults is done with:

import snowmobile

sn = snowmobile.connect()

sn is a Snowmobile with the following attributes:

print(sn)            #> snowmobile.Snowmobile(creds='creds1')
print(sn.cfg)        #> snowmobile.Configuration('snowmobile.toml')
print(type(sn.con))  #> <class 'snowflake.connector.connection.SnowflakeConnection'>

Specific connection arguments are accessed by pre-configured alias:

sn2 = snowmobile.connect(creds='sandbox')

print(sn.cfg.connection.current != sn2.cfg.connection.current) #> True

sn

The variable sn represents a generic instance of Snowmobile roughly equivalent to that created with the snippet below; it’s referred to as throughout the documentation, and applicable examples make use of it as a fixture without explicitly re-instantiating.

import snowmobile

sn = snowmobile.connect()



Query Execution

Query Execution

provides three convenience methods for executing raw SQL:

query() implements pandas.read_sql() for querying results into a DataFrame

ex() implements SnowflakeConnection.cursor().execute() for executing commands within a SnowflakeCursor

exd() implements SnowflakeConnection.cursor(DictCursor).execute() for executing commands within a DictCursor


Setup

Assume a pre-existing
sample_table:

COL1

COL2

1

1

2

4

4

9


Into a DataFrame:

sn.query('select * from sample_table')
col1 col2
0 1 1
1 2 4
2 3 9

Into a SnowflakeCursor:

sn.ex('select * from sample_table').fetchall()
[(1, 1), (2, 4), (3, 9)]

Into a DictCursor:

sn.exd('select * from sample_table').fetchall()
[{'COL1': 1, 'COL2': 1}, {'COL1': 2, 'COL2': 4}, {'COL1': 3, 'COL2': 9}]

Or to get a single value:

sn.query('select count(*) from sample_table', as_scalar=True)
3

Information API

Information API

inherits everything from a SQL class that generates and executes raw SQL from inputs; its purpose is to provide a bare bones Python API to query metadata and execute administrative commands against Snowflake.


Check existence:

sn.exists('sample_table')  #> True

Select records:

sn.select('sample_table', n=1)
col1 col2
0 1 1

Query metadata:

sn.count('sample_table')                 #> 3
sn.count('sample_table', dst_of='col1')  #> 3
sn.columns('sample_table')               #> ['COL1', 'COL2']

Verify dimensionality:

sn.is_distinct('sample_table', field='col1')  #> True

Submit basic administrative commands:

sn.clone(nm='sample_table', to='sample_table2')

Fetch DDL:

print(sn.ddl('sample_table'))
create or replace TABLE SAMPLE_TABLE (
	COL1 FLOAT,
	COL2 FLOAT
);

Drop objects:

for t in ['sample_table', 'sample_table2']:
    sn.drop(t, obj='table')

sn.exists('sample_table')   #> False
sn.exists('sample_table2')  #> False

Loading Data

Loading Data


Table is a loading solution that at minimum accepts a df (DataFrame) and a table name (str).

In the same way that Snowmobile handles its keyword arguments, Table will adhere to any arguments explicitly provided and defer to the values configured in snowmobile.toml otherwise.

The behavior outlined below reflects those within the default snowmobile.toml file, meaning that t1 will:

  1. Check if sample_table exists in the schema associated with sn.con

  2. If sample_table does exist, it will validate df against sample_table and throw an error if their dimensions are not identical

  3. If sample_table does not exist (as is the case here), it will generate DDL from df and execute it as part of the loading process

Setup

Assume sample_table does not
yet exist and df was created with:

import pandas as pd
import numpy as np

df = pd.DataFrame(
    data = {'COL1': [1, 2, 3], 'COL2': [1, 4, 9]}
)
print(df.shape)  #> (3, 2)

COL1

COL2

1

1

2

4

3

9

Given , instantiating the following Table, t1, with as_is=True will:

  1. Generate and execute DDL to create sample_table

  2. Load df into sample_table via the Bulk Loading from a Local File System standard

import snowmobile

t1 = snowmobile.Table(
    df=df,
    table='sample_table',
    as_is=True,
)

In absence of providing an existing to snowmobile.Table(), an instance was created and stored as a public attribute; the create and load for sample_table can be verified with either of:

print(t1.sn.exists('sample_table'))  #> True
print(t1.loaded)                     #> True

When compatability between the df and the table is unknown, as_is=True can be omitted and the Table that’s returned inspected further prior to continuing with the loading process:

df2 = pd.concat([df, df], axis=1)
print(list(df2.columns))              #> ['COL1', 'COL2', 'COL1', 'COL2']
print(t1.sn.columns('sample_table'))  #> ['COL1', 'COL2']

t2 = snowmobile.Table(
    df=df2,
    table='sample_table',
)

Primary compatability checks are centered around things like:

print(t2.exists)      #> True
print(t2.cols_match)  #> False

With snowmobile.toml defaults, calling Table.load() on t2 will throw an error:

from snowmobile.core.errors import ColumnMismatchError

try:
    t2.load()
except ColumnMismatchError as e:
    print(e)
"""
>>>
ColumnMismatchError: `SAMPLE_TABLE` columns do not equal those in the local DataFrame and 
if_exists='append' was specified.

Either provide if_exists='replace' to overwrite the existing table or see `table.col_diff()`
to inspect the mismatched columns.
"""

Table.col_diff() returns a dictionary of tuples containing the table and DataFrame columns by index position; providing mismatch=True limits the results to only those responsible for the ColumnMismatchError:

import json

print(json.dumps(t2.col_diff(mismatched=True), indent=4))
"""
>>>
{
  "3": [
    null,
    "col1_1"
  ],
  "4": [
    null,
    "col2_1"
  ]
}
"""

Keyword arguments take precedent over configurations in snowmobile.toml, so df2 can still be loaded with:

t2.load(if_exists='replace')

print(t2.loaded)                      #> True
print(t2.sn.columns('sample_table'))  #> ['COL1', 'COL2', 'COL1_1', 'COL2_1']

Note

With default behavior, the duplicate column names in df2 were automatically renamed by t2 before loading into sample_table:

print(list(df2.columns))    #> ['COL1', 'COL2', 'COL1', 'COL2']
print(list(t2.df.columns))  #> ['COL1', 'COL2', 'COL1_1', 'COL2_1']



Working with SQL Scripts

Working with SQL Scripts


snowmobile.Script accepts a full path to a sql file and parses its contents based on patterns specified in snowmobile.toml.

At a minimum, the file is split into individual st, each of which is checked for decorated information in the form of a string directly preceding it wrapped in an opening (/*-) and closing (-*/) pattern, the simplest form of which is a single-line string that can be used as an accessor to the statement it precedes.

When no information is provided, Script generates a generic name for the statement based on the literal first SQL keyword it contains and its index position.

Line 27 within sample_table.sql represents the minimum markup required to associate a name with an individual statement; consistency in tag structure has obvious benefits, but this is a freeform string that can be anything.

Line 19 is an example of a special tag; the leading qa-empty tells Script to run assertion that its results are null (0 records) before continuing execution of the script.

The tags for statements beginning on lines 1, 6, and 17 were generated by Script based their contents and relative positions within the script.

Setup

path is a full path (pathlib.Path or str) to a file, sample_table.sql, containing 5 standard sql st:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
create or replace table sample_table (
	col1 number(18,0),
	col2 number(18,0)
);

insert into sample_table with
sample_data as (
  select
    uniform(1, 10, random(1)) as rand_int
  from table(generator(rowcount => 3)) v
)
  select
    row_number() over (order by a.rand_int) as col1
    ,(col1 * col1) as col2
  from sample_data a;

select * from sample_table;

/*-qa-empty~verify 'sample_table' is distinct on 'col1'-*/
select
  a.col1
  ,count(*)
from sample_table a
group by 1
having count(*) <> 1;

/*-insert into~any_other_table-*/
insert into any_other_table (
  select
    a.*
    ,tmstmp.tmstmp as insert_tmstmp
  from sample_table a
  cross join (select current_timestamp() as tmstmp)tmstmp
);

Given a path to sample_table.sql, a Script can be created with:

import snowmobile

script = snowmobile.Script(path=path, silence=True)

print(script)        #> snowmobile.Script('sample_table.sql')
print(script.depth)  #> 5
script.dtl()
sample_table.sql
================
1: Statement('create table~s1')
2: Statement('insert into~s2')
3: Statement('select data~s3')
4: Statement('qa-empty~verify 'sample_table' is distinct on 'col1'')
5: Statement('insert into~any_other_table')

Statements can be accessed by their index position or name (nm):

script(5)                             #> Statement('insert into~any_other_table')
script(-1)                            #> Statement('insert into~any_other_table')
script('insert into~any_other_table') #> Statement('insert into~any_other_table')

Each Statement has its own set of attributes:

s3 = script(3)  # store 3rd statement

print(s3.index)  #> 3
print(s3.sql)    #> select * from sample_table
print(s3.kw)     #> select
print(s3.desc)   #> sample_table
print(s3.nm)     #> select~sample_table

Based on statement attributes, script can be filtered and used within that context:

with script.filter(
    excl_desc=['.*any_other_table'],  # throwing out s5; pattern is regex not glob
    excl_kw=['select'],               # throwing out s3
) as s:
    s.run()
sample_table.sql
================
<1 of 3> create table~s1 (1s)......................................... <completed>
<2 of 3> insert into~s2 (0s).......................................... <completed>
<3 of 3> qa-empty~verify 'sample_table' is distinct on 'col1' (0s).... <passed>   

Spans of statements are directly executable by index boundaries:

script.run((1, 3))
sample_table.sql
================
<1 of 6> create table~s1 (0s)............................... <completed>
<2 of 6> insert into~s2 (0s)................................ <completed>
<3 of 6> select data~s3 (0s)................................ <completed>

And their results accessible retroactively:

script(3).results.head()
col1 col2
0 1 1
1 2 4
2 3 9