Skip to main content

Register and scan a data warehouse

Overview

This example showcases the steps to

  1. register a data warehouse or source.
  2. Scan the source to add schemata, tables and columns.
  3. Add a job, job execution and column lineage information.

The last step is optional and can be skipped in subsequent recipes.

Run the recipe on wikimedia demo.

You can run the recipe on a demo that contains a database with wikimedia clickstream tables.

Start the demo using the following instructions:

# in a new directory run
wget https://raw.githubusercontent.com/tokern/data-lineage/master/install-manifests/docker-compose/wikimedia-demo.yml
# or run
curl https://raw.githubusercontent.com/tokern/data-lineage/master/install-manifests/docker-compose/wikimedia-demo.yml -o docker-compose.yml

Run docker-compose

docker-compose up -d

Verify containers are running

docker container ls | grep tokern
# Required configuration for API and wikimedia database network address

docker_address = "http://127.0.0.1:8000"
wikimedia_db = {
"username": "etldev",
"password": "3tld3v",
"uri": "tokern-demo-wikimedia",
"port": "5432",
"database": "wikimedia"
}
# Setup a connection to catalog using the SDK.
from data_lineage import Catalog

catalog = Catalog(docker_address)
# Register wikimedia datawarehouse with data-lineage app.

source = catalog.add_source(name="wikimedia", source_type="postgresql", **wikimedia_db)

# Scan the wikimedia data warehouse and register all schemata, tables and columns.

catalog.scan_source(source)
# Create a job and job_execution that inserts data from page to page_lookup_nonredirect

job = catalog.add_job("insert_into_page_lookup_nonredirect",
{
"sql": "insert into page_lookup_nonredirect(redirect_id) select page_id from page"
})
import datetime
from dbcat.catalog.models import JobExecutionStatus

job_execution = catalog.add_job_execution(
job=job,
started_at=datetime.datetime.combine(
datetime.date(2021, 4, 1), datetime.time(1, 0)
),
ended_at=datetime.datetime.combine(
datetime.date(2021, 4, 1), datetime.time(1, 15)
),
status=JobExecutionStatus.SUCCESS,
)
# Add an edge between these two columns:
# (test", "default", "page", "page_id") -> ("test", "default", "page_lookup_nonredirect", "redirect_id"),

source_column = catalog.get_column(source_name="wikimedia",
schema_name="public",
table_name="page",
column_name="page_id")
target_column = catalog.get_column(source_name="wikimedia",
schema_name="public",
table_name="page_lookup_nonredirect",
column_name="redirect_id")

edge = catalog.add_column_lineage(source=source_column,
target=target_column,
job_execution_id=job_execution.id,
context={})

Visit Kedro UI

One Task Graph