SQL Queries
Important Capabilities
Capability | Status | Notes |
---|---|---|
Column-level Lineage | ✅ | Parsed from SQL queries |
Table-Level Lineage | ✅ | Parsed from SQL queries |
This source reads a newline-delimited JSON file containing SQL queries and parses them to generate lineage.
Query File Format
This file should contain one JSON object per line, with the following fields:
- query: string - The SQL query to parse.
- timestamp (optional): number - The timestamp of the query, in seconds since the epoch.
- user (optional): string - The user who ran the query. This user value will be directly converted into a DataHub user urn.
- operation_type (optional): string - Platform-specific operation type, used if the operation type can't be parsed.
- downstream_tables (optional): string[] - Fallback list of tables that the query writes to, used if the query can't be parsed.
- upstream_tables (optional): string[] - Fallback list of tables the query reads from, used if the query can't be parsed.
Example Queries File
{"query": "SELECT x FROM my_table", "timestamp": 1689232738.051, "user": "user_a", "downstream_tables": [], "upstream_tables": ["my_database.my_schema.my_table"]}
{"query": "INSERT INTO my_table VALUES (1, 'a')", "timestamp": 1689232737.669, "user": "user_b", "downstream_tables": ["my_database.my_schema.my_table"], "upstream_tables": []}
Note that this file does not represent a single JSON object, but instead newline-delimited JSON, in which each line is a separate JSON object.
CLI based Ingestion
Install the Plugin
The sql-queries
source works out of the box with acryl-datahub
.
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
datahub_api: # Only necessary if using a non-DataHub sink, e.g. the file sink
server: http://localhost:8080
timeout_sec: 60
source:
type: sql-queries
config:
platform: "snowflake"
default_db: "SNOWFLAKE"
query_file: "./queries.json"
Config Details
- Options
- Schema
Note that a .
is used to denote nested fields in the YAML recipe.
Field | Description |
---|---|
platform ✅ string | The platform for which to generate data, e.g. snowflake |
query_file ✅ string | Path to file to ingest |
default_db string | The default database to use for unqualified table names |
default_dialect string | The SQL dialect to use when parsing queries. Overrides automatic dialect detection. |
default_schema string | The default schema to use for unqualified table names |
platform_instance string | The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://datahubproject.io/docs/platform-instances/ for more details. |
env string | The environment that all assets produced by this connector belong to Default: PROD |
usage BaseUsageConfig | The usage config to use when generating usage statistics Default: {'bucket_duration': 'DAY', 'end_time': '2024-10-31... |
usage.bucket_duration Enum | Size of the time window to aggregate usage stats. Default: DAY |
usage.end_time string(date-time) | Latest date of lineage/usage to consider. Default: Current time in UTC |
usage.format_sql_queries boolean | Whether to format sql queries Default: False |
usage.include_operational_stats boolean | Whether to display operational stats. Default: True |
usage.include_read_operational_stats boolean | Whether to report read operational stats. Experimental. Default: False |
usage.include_top_n_queries boolean | Whether to ingest the top_n_queries. Default: True |
usage.start_time string(date-time) | Earliest date of lineage/usage to consider. Default: Last full day in UTC (or hour, depending on bucket_duration ). You can also specify relative time with respect to end_time such as '-7 days' Or '-7d'. |
usage.top_n_queries integer | Number of top queries to save to each table. Default: 10 |
usage.user_email_pattern AllowDenyPattern | regex patterns for user emails to filter in usage. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} |
usage.user_email_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True |
usage.user_email_pattern.allow array | List of regex patterns to include in ingestion Default: ['.*'] |
usage.user_email_pattern.allow.string string | |
usage.user_email_pattern.deny array | List of regex patterns to exclude from ingestion. Default: [] |
usage.user_email_pattern.deny.string string |
The JSONSchema for this configuration is inlined below.
{
"title": "SqlQueriesSourceConfig",
"description": "Any source that connects to a platform should inherit this class",
"type": "object",
"properties": {
"env": {
"title": "Env",
"description": "The environment that all assets produced by this connector belong to",
"default": "PROD",
"type": "string"
},
"platform_instance": {
"title": "Platform Instance",
"description": "The instance of the platform that all assets produced by this recipe belong to. This should be unique within the platform. See https://datahubproject.io/docs/platform-instances/ for more details.",
"type": "string"
},
"query_file": {
"title": "Query File",
"description": "Path to file to ingest",
"type": "string"
},
"platform": {
"title": "Platform",
"description": "The platform for which to generate data, e.g. snowflake",
"type": "string"
},
"usage": {
"title": "Usage",
"description": "The usage config to use when generating usage statistics",
"default": {
"bucket_duration": "DAY",
"end_time": "2024-10-31T19:35:23.421631+00:00",
"start_time": "2024-10-30T00:00:00+00:00",
"queries_character_limit": 24000,
"top_n_queries": 10,
"user_email_pattern": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"include_operational_stats": true,
"include_read_operational_stats": false,
"format_sql_queries": false,
"include_top_n_queries": true
},
"allOf": [
{
"$ref": "#/definitions/BaseUsageConfig"
}
]
},
"default_db": {
"title": "Default Db",
"description": "The default database to use for unqualified table names",
"type": "string"
},
"default_schema": {
"title": "Default Schema",
"description": "The default schema to use for unqualified table names",
"type": "string"
},
"default_dialect": {
"title": "Default Dialect",
"description": "The SQL dialect to use when parsing queries. Overrides automatic dialect detection.",
"type": "string"
}
},
"required": [
"query_file",
"platform"
],
"additionalProperties": false,
"definitions": {
"BucketDuration": {
"title": "BucketDuration",
"description": "An enumeration.",
"enum": [
"DAY",
"HOUR"
],
"type": "string"
},
"AllowDenyPattern": {
"title": "AllowDenyPattern",
"description": "A class to store allow deny regexes",
"type": "object",
"properties": {
"allow": {
"title": "Allow",
"description": "List of regex patterns to include in ingestion",
"default": [
".*"
],
"type": "array",
"items": {
"type": "string"
}
},
"deny": {
"title": "Deny",
"description": "List of regex patterns to exclude from ingestion.",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"ignoreCase": {
"title": "Ignorecase",
"description": "Whether to ignore case sensitivity during pattern matching.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
},
"BaseUsageConfig": {
"title": "BaseUsageConfig",
"type": "object",
"properties": {
"bucket_duration": {
"description": "Size of the time window to aggregate usage stats.",
"default": "DAY",
"allOf": [
{
"$ref": "#/definitions/BucketDuration"
}
]
},
"end_time": {
"title": "End Time",
"description": "Latest date of lineage/usage to consider. Default: Current time in UTC",
"type": "string",
"format": "date-time"
},
"start_time": {
"title": "Start Time",
"description": "Earliest date of lineage/usage to consider. Default: Last full day in UTC (or hour, depending on `bucket_duration`). You can also specify relative time with respect to end_time such as '-7 days' Or '-7d'.",
"type": "string",
"format": "date-time"
},
"top_n_queries": {
"title": "Top N Queries",
"description": "Number of top queries to save to each table.",
"default": 10,
"exclusiveMinimum": 0,
"type": "integer"
},
"user_email_pattern": {
"title": "User Email Pattern",
"description": "regex patterns for user emails to filter in usage.",
"default": {
"allow": [
".*"
],
"deny": [],
"ignoreCase": true
},
"allOf": [
{
"$ref": "#/definitions/AllowDenyPattern"
}
]
},
"include_operational_stats": {
"title": "Include Operational Stats",
"description": "Whether to display operational stats.",
"default": true,
"type": "boolean"
},
"include_read_operational_stats": {
"title": "Include Read Operational Stats",
"description": "Whether to report read operational stats. Experimental.",
"default": false,
"type": "boolean"
},
"format_sql_queries": {
"title": "Format Sql Queries",
"description": "Whether to format sql queries",
"default": false,
"type": "boolean"
},
"include_top_n_queries": {
"title": "Include Top N Queries",
"description": "Whether to ingest the top_n_queries.",
"default": true,
"type": "boolean"
}
},
"additionalProperties": false
}
}
}
Code Coordinates
- Class Name:
datahub.ingestion.source.sql_queries.SqlQueriesSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for SQL Queries, feel free to ping us on our Slack.