This blog describes how to generate data lineage from query history in Snowflake using the data-lineage python package.
data-lineage generates DAG from parsing SQL statements in query history. It specifically looks for DML statements like COPY, INSERT, UPDATE, DELETE and builds a network of tables as vertices and edges to denote data flow from one table to another by DML statements.
The first step is to store query history within a table in Snowflake. Snowflake has a rich information schema that provides information on all aspects of the database. Three table functions capture data transformation and transfer history are:
Snowflake retains data in these table functions for 7 or 14 days. If a longer retention is required, then the history has to be copied to tables owned by you.
The Gitlab Data team uses query history to manage database spend on Snowflake. They have released a dbt package snowflake_spend to store query history in user tables.
The package can be used to store query history for data lineage as well. The package documentation has instructions to set up and run using dbt to copy query history. Instructions that worked with data-lineage are provided below:
dbt init # Start a new project
# Specify package dependency on snowflake_spend in packages.yml
packages:
- git: https://gitlab.com/gitlab-data/snowflake_spend.git
revision: v1.1.0
# Fill in models section to use snowflake_spend in dbt_project.yml
models:
snowflake_spend:
enabled: true
materialized: table
xf:
schema: analytics
# Setup a profile to connect to Snowflake
# Ensure the user has access to information schema and table functions
dbt compile
dbt run
dbt can be scheduled to run regularly and store query histories.
The query history in user tables can be retrieved using the snowflake python connector.
import snowflake.connector
query = """
select query_text
from table(information_schema.query_history(
end_time_range_start=>to_timestamp_ltz('2017-12-4 12:00:00.000 -0700'),
end_time_range_end=>to_timestamp_ltz('2017-12-4 12:30:00.000 -0700')));
" " "
connection = snowflake.connector.connect(...)
queries = []
with connection.get_cursor() as cursor:
cursor.execute(query)
row = cursor.fetchone()
while row is not None:
queries.append(row[0])
The list of queries can be passed to the data-lineage module to generate a DAG of lineage.
This blog post explained the methods to store and retrieve query history in Snowflake. data-lineage can use the query history to create and provide a data lineage, which you can visualize or process programmatically. If you need data lineage from Snowflake, check out snowflake_spend and data-lineage. Do get in touch if you have any questions or feedback.
Tokern Data Lineage is an open source Python project to visualize and analyze data lineage.
Get in touch for bespoke support for PII Catcher
We can help discover, manage and secure sensitive data in your data warehouse.