Skip to main content

Register and scan a data warehouse

Overview#

This example showcases the steps to

  1. register a data warehouse or source.
  2. Scan the source to add schemata, tables and columns.
  3. Add a job, job execution and column lineage information.

The last step is optional and can be skipped in subsequent recipes.

Run the recipe on wikimedia demo.#

You can run the recipe on a demo that contains a database with wikimedia clickstream tables.

Start the demo using the following instructions:

# in a new directory runwget https://raw.githubusercontent.com/tokern/data-lineage/master/install-manifests/docker-compose/wikimedia-demo.yml# or runcurl https://raw.githubusercontent.com/tokern/data-lineage/master/install-manifests/docker-compose/wikimedia-demo.yml -o docker-compose.yml

Run docker-compose

docker-compose up -d

Verify containers are running

docker container ls | grep tokern
# Required configuration for API and wikimedia database network address
docker_address = "http://127.0.0.1:8000"wikimedia_db = {  "username": "etldev",  "password": "3tld3v",  "uri": "tokern-demo-wikimedia",  "port": "5432",  "database": "wikimedia"}
# Setup a connection to catalog using the SDK.from data_lineage import Catalog
catalog = Catalog(docker_address)
# Register wikimedia datawarehouse with data-lineage app.
source = catalog.add_source(name="wikimedia", source_type="postgresql", **wikimedia_db)
# Scan the wikimedia data warehouse and register all schemata, tables and columns.
catalog.scan_source(source)
# Create a job and job_execution that inserts data from page to page_lookup_nonredirect
job = catalog.add_job("insert_into_page_lookup_nonredirect",                      {                          "sql": "insert into page_lookup_nonredirect(redirect_id) select page_id from page"                      })
import datetimefrom dbcat.catalog.models import JobExecutionStatus
job_execution = catalog.add_job_execution(    job=job,    started_at=datetime.datetime.combine(        datetime.date(2021, 4, 1), datetime.time(1, 0)    ),    ended_at=datetime.datetime.combine(        datetime.date(2021, 4, 1), datetime.time(1, 15)    ),    status=JobExecutionStatus.SUCCESS,)
# Add an edge between these two columns:# (test", "default", "page", "page_id") -> ("test", "default", "page_lookup_nonredirect", "redirect_id"),
source_column = catalog.get_column(source_name="wikimedia",                                   schema_name="public",                                   table_name="page",                                   column_name="page_id")target_column = catalog.get_column(source_name="wikimedia",                                   schema_name="public",                                   table_name="page_lookup_nonredirect",                                   column_name="redirect_id")
edge = catalog.add_column_lineage(source=source_column,                                  target=target_column,                                  job_execution_id=job_execution.id,                                  context={})

Visit Kedro UI

One Task Graph