Data Lineage on Snowflake
This blog will describe how to generate data lineage using the data-lineage python package from query history in Snowflake.
data-lineage generates DAG from parsing SQL statements in query history. It specifically looks for DML statements like COPY, INSERT, UPDATE, DELETE and builds a network of tables as vertices and edges to denote data flow from one table to another by DML statements.
The first step is to store query history within a table in Snowflake. Snowflake has a rich information schema that provides information on all aspects of the database. Three table functions capture data transformation and transfer history are:
Snowflake retains data in these table functions for 7 or 14 days. If a longer retention is required, then the history has to be copied to tables owned by you.
The package can be used to store query history for data lineage as well. The package documentation has instructions to setup and run using dbt to copy query history. Instructions that worked with data-lineage are provided below:
dbt init # Start a new project # Specify package dependency on snowflake_spend in packages.yml packages: - git: https://gitlab.com/gitlab-data/snowflake_spend.git revision: v1.1.0 # Fill in models section to use snowflake_spend in dbt_project.yml models: snowflake_spend: enabled: true materialized: table xf: schema: analytics # Setup a profile to connect to Snowflake # Ensure the user has access to information schema and table functions dbt compile dbt run
dbt can be scheduled to run regularly and store query histories.
The query history in user tables can be retrieved using snowflake python connector.
import snowflake.connector query = """ select query_text from table(information_schema.query_history( end_time_range_start=>to_timestamp_ltz('2017-12-4 12:00:00.000 -0700'), end_time_range_end=>to_timestamp_ltz('2017-12-4 12:30:00.000 -0700'))); """ connection = snowflake.connector.connect(...) queries =  with connection.get_cursor() as cursor: cursor.execute(query) row = cursor.fetchone() while row is not None: queries.append(row)
The list of queries can be passed to data-lineage module to generate a DAG of lineage.
This blog post explained the methods to store and retrieve query history in Snowflake. data-lineage can use the query history to create and provide a data lineage that can be visualized or processed programmatically. If you need data lineage from Snowflake checkout snowflake_spend and data-lineage. Do get in touch if you have any questions or feedback.