API Example Notebook

Overview

This example showcases the API exposed the data lineage package. The API can be used to build a lineage graph by adding nodes and edges that represent columns and transformations.

Note that the goal of the example to explain the building blocks of the lineage graph. In practical scenarios, use a pack (e.g. query parser pack) to automate the process.

This example consists of the following sequence of operations:

  • Load a catalog from a json file. You can load a catalog from any database supported by dbcat
  • Register nodes from columns in the catalog.
  • Register directed edges to represent that a column is the source of data for another column.
  • Visualize the graph.
# Scan the catalog from a file. Note that this is for demo only.
# Use dbcat (https://github.com/tokern/dbcat) to load the catalog from a database
# or a data warehouse
from dbcat.scanners.json import File
catalog_objects = File("test", "test/catalog.json").scan()
# Load the catalog. For more details, refer to https://github.com/tokern/dbcat
# Provide credentials of the postgres database where catalog is stored.
# Note that you should have already created the database and user
from data_lineage import catalog_connection
catalog_conf = """
catalog:
type: postgres
user: catalog_user
password: catal0g_passw0rd
host: 127.0.0.1
port: 5432
database: tokern
"""
catalog = catalog_connection(catalog_conf)
catalog.save_catalog(catalog_objects)
# Add an edge between these two columns:
# (test", "default", "page", "page_id") -> ("test", "default", "page_lookup_nonredirect", "redirect_id"),
source_column = catalog.get_column(database_name="test",
schema_name="default",
table_name="page",
column_name="page_id")
target_column = catalog.get_column(database_name="test",
schema_name="default",
table_name="page_lookup_nonredirect",
column_name="redirect_id")
from data_lineage import add_edge
edge, created = add_edge(catalog=catalog, source=source_column, target=target_column)
# Load a graph and visualize it
from data_lineage import load_graph
graph = load_graph(catalog)
import plotly
plotly.offline.iplot(graph.fig())

# Add all edges in the example.
all_edges = [
(
("test", "default", "page", "page_id"),
("test", "default", "page_lookup_nonredirect", "page_id"),
),
(
("test", "default", "page", "page_id"),
("test", "default", "page_lookup_redirect", "redirect_id"),
),
(
("test", "default", "page", "page_id"),
("test", "default", "page_lookup_redirect", "page_id"),
),
(
("test", "default", "page", "page_title"),
("test", "default", "page_lookup_nonredirect", "redirect_title"),
),
(
("test", "default", "page", "page_title"),
("test", "default", "page_lookup_nonredirect", "true_title"),
),
(
("test", "default", "page", "page_title"),
("test", "default", "page_lookup_redirect", "redirect_title"),
),
(
("test", "default", "page", "page_title"),
("test", "default", "page_lookup_redirect", "true_title"),
),
(
("test", "default", "page", "page_latest"),
("test", "default", "page_lookup_nonredirect", "page_version"),
),
(
("test", "default", "page", "page_latest"),
("test", "default", "page_lookup_redirect", "page_version"),
),
(
("test", "default", "page_lookup_redirect", "redirect_id"),
("test", "default", "page_lookup", "redirect_id"),
),
(
("test", "default", "page_lookup_redirect", "redirect_title"),
("test", "default", "page_lookup", "redirect_title"),
),
(
("test", "default", "page_lookup_redirect", "true_title"),
("test", "default", "page_lookup", "true_title"),
),
(
("test", "default", "page_lookup_redirect", "page_id"),
("test", "default", "page_lookup", "page_id"),
),
(
("test", "default", "page_lookup_redirect", "page_version"),
("test", "default", "page_lookup", "page_version"),
),
(
("test", "default", "page_lookup", "true_title"),
("test", "default", "normalized_pagecounts", "page_title"),
),
(
("test", "default", "page_lookup", "page_id"),
("test", "default", "normalized_pagecounts", "page_id"),
),
(
("test", "default", "filtered_pagecounts", "views"),
("test", "default", "normalized_pagecounts", "page_url"),
),
(
("test", "default", "filtered_pagecounts", "bytes_sent"),
("test", "default", "normalized_pagecounts", "views"),
)
]
for edge in all_edges:
source_column = catalog.get_column(database_name=edge[0][0],
schema_name=edge[0][1],
table_name=edge[0][2],
column_name=edge[0][3])
target_column = catalog.get_column(database_name=edge[1][0],
schema_name=edge[1][1],
table_name=edge[1][2],
column_name=edge[1][3])
edge_in_db, created = add_edge(catalog=catalog, source=source_column, target=target_column)
# Load and visualize the complete lineage graph
graph = load_graph(catalog)
import plotly
plotly.offline.iplot(graph.fig())
# Visualize a sub graph
target = catalog.get_column(database_name="test",
schema_name="default",
table_name="normalized_pagecounts",
column_name="page_title")
sub_graph = graph.sub_graph(target)
sub_fig = sub_graph.fig()
plotly.offline.iplot(sub_fig)