Showing Posts From

Data engineering

Column-Level Lineage in Microsoft Fabric Spark Notebooks

I will probably do another post on how to put this in productive use. Fabric's lineage view stops right where the interesting questions start. It shows you which notebook feeds which dataset and draws the arrows between workspace items, then leaves you there. Ask anything about what happens inside a table and Fabric nudges you toward Materialized Lake Views or dbt, an odd pivot to suggest when your platform already runs on Spark jobs and notebooks. So you do the obvious thing: write functions that scan the DDL and rebuild table-level lineage yourself. It works. You get a map of which table feeds which. But the map goes quiet the moment a question lives inside a table, and that is where most of them live:A field in a gold metric looks wrong. Table-level lineage hands you the full chain of tables feeding it and wishes you luck. Column-level lineage points at the one upstream column wired to that metric. Someone asks where customer_email ends up. You tagged it as PII in bronze, but table-level only confirms the bronze table is "used somewhere." Column-level lineage follows the column through every rename and concatenation downstream, so you know your actual masking surface. You want to drop a column nobody seems to use. Column-level lineage proves nothing downstream reads it, and the deprecation goes from risky to routine.Notice the pattern: the unit of the question is the column. An agent can tease these answers out of your repo, but point one at a large platform and watch its context window fill with noise. Reliable column-level lineage isn't only a governance asset, it can be part of the context that makes agentic development on your platform more reliable.In a recent project, I needed a solution and ran into OpenLineage. Does it make all of this disappear? Under the right conditions, mostly yes! So what exactly is OpenLineage? OpenLineage is an open standard for emitting data lineage events from various data processing engines (https://openlineage.io/). The Spark integration ships pre-installed in the Fabric Spark runtime (3.5.5.5.4.20260403.6 contains OpenLineage 1.26.0). Runtime 2.0 currently shows OpenLineage as pre-installed and the package supports Spark 4.x natively. It works via a JVM listener installed on the Spark driver. With a property configuration change at the Fabric Environment level, every Spark notebook starts emitting column-level lineage events in JSONL format upon every table creation, append, merge or overwrite. The whole protocol is event-based: emitters send START and COMPLETE (or FAIL) events per run. The "lineage graph" is something a consumer constructs by linking many events together. Now before I continue: The bundling of Fabric with OpenLineage lineage was already explored in depth by Raki Rahman, who did a lot of work on the topic. For example, there is this cool page that explains OpenLineage visually and lets you visualize JSONL files. Now, without further ado, here is what I learned during my exploration: For every column written to storage, OpenLineage captures: which source columns it came from, what kind of transformation produced it (identity, calculation, aggregation), whether the transformation involved masking (PII signal), and which other columns influenced row filtering, joins, or grouping (called indirect lineage). The JSON events have three top-level objects:Run: a unique execution instance. Has a UUID, start/complete timestamps, and a parent reference if launched by another run. Job: what was executed. A name, a namespace, and zero or more facets describing properties of the job. Dataset: what was read or written. Identified by namespace + name, with zero or more facets describing schema, statistics, etc.This shows an example JSON: { "target_column_name": { "inputFields": [ { "namespace": "abfss://...@onelake.dfs.fabric.microsoft.com", "name": "/3eb04867-.../Tables/data_source/application_entity", "field": "source_column_name", "transformations": [ {"type": "DIRECT", "subtype": "IDENTITY", "masking": false} ] } ] } }The type info in this can be quite rich: DIRECT/IDENTITY: the column was passed through unchanged (rename, projection). DIRECT/TRANSFORMATION: the value was computed from the source (cast, arithmetic, string ops). DIRECT/AGGREGATION: the column is sum, avg, count, etc. of the source. INDIRECT/FILTER: the source column was in a WHERE clause; it affected which rows ended up in the output, not their values. INDIRECT/JOIN: the source column was a join key. INDIRECT/GROUP_BY: the source column was a grouping key. INDIRECT/CONDITIONAL: the source column appeared in a CASE/IF predicate. masking: boolean. Set to true when the transformation is a known hashing/anonymization function. Maps directly to PII tracking.Other keys worth knowing:schema: column names and Spark types. Sometimes populated, sometimes not. Don't rely on it. outputStatistics: row counts written. Useful for "did the job actually do anything" checks. storage: file format, partitioning. Mostly informational. spark_properties: every Spark config the application was launched with. Useful for capturing notebook name (spark.synapse.context.notebookname) and app ID. parent: points at a parent run if the job was launched by another OpenLineage-aware run.The setup with a twist The setup of OpenLineage itself is not particularly difficult. You create a Fabric Environment, change its spark configuration settings and publish. spark.extraListeners = io.openlineage.spark.agent.OpenLineageSparkListener spark.openlineage.namespace = <your-workspace-or-env-name> spark.openlineage.columnLineage.datasetLineageEnabled = true spark.openlineage.facets.spark.logicalPlan.disabled = true spark.openlineage.facets.debug.disabled = trueAnd for the transport:spark.openlineage.transport.type = http spark.openlineage.transport.url = https://<function>.azurewebsites.net/api/lineage spark.openlineage.transport.auth.type = api_key spark.openlineage.transport.auth.apiKey = <key>What comes next is a bit trickier, because you are responsible for parsing and storing the lineage events. The main difficulty: concurrency. If you have a large plattform, multiple spark apps might be running simultaneously and emit events concurrently. This needs to be buffered. Microsoft offers a solution accelerator for Azure Databricks that can serve as the blueprint for the Fabric setup, see here. Spark notebook │ (OpenLineage HTTP transport) ▼ Azure Function (HTTP trigger) ← responds 200 in <100ms │ (publishes raw JSON message) ▼ Event Hub ← durable buffer, ordered partition by run_id │ (batch of events per invocation) ▼ Azure Function (Event Hub trigger) ← parse, validate, dedupe, normalize │ (append to Delta files) ▼ ADLS Gen2 storage ← Parquet/Delta files │ (OneLake shortcut) ▼ Fabric Lakehouse table: lineage_events_rawThe idea is to send the lineage events to a receiver (Azure function), then to a buffer (Event Hub), and finally to another parser (Arzue function) before writing to OneLake. The first Function is dumb: it validates the payload is JSON, drops it on Event Hub, returns 200. Should respond in <100ms so OpenLineage's HTTP transport doesn't time out and retry storms don't form. The Event Hub holds events durably. A consumer group lets multiple processors read in parallel without conflict. The second Function consumes from Event Hub in batches, does whatever cleaning you want (drop noise, normalize paths, extract column lineage), and appends to a Delta table either directly in Fabric or into an ADLS Gen2 that can be shortcutted into your Fabric Lakehouse. Good things about this setup: no concurrency window, platform agnostic (you can switch the emitter, the plumbing is identical), you have near real-time observability, plus you can setup an optional fan-out to a direct consumer. For instance, the second function could push events to Marquez or even Purview. The cons: It adds a small bill for the event hub and ADLS Gen2 storage (possibly around 20-50€ per month) and require some engineering. What is forgotten easily: bicep/terraform for the resources, CI/CD for the Functions, monitoring - and more dependencies. Given the cons, I would use this solution when there are actually highly concurrent Spark workloads. Concurrent here means not concurrently running, but concurrently emitting an event on a write, i.e., two Spark apps END in the same second. This can become frequent in very large platforms and under certain orchestration arrangements. Moreover, this becomes more critical when the lineage should be consumed in near real-time. Raki Rahman has a post on dropping the external infrastructure entirely and keeping the whole buffer on the Driver, which is worth reading. But there is another way, with only a few environment settings. Here is the plot twist spark.openlineage.transport.type = file You can also setup OpenLineage to write lineage events directly into the default Lakehouse attached to your notebook - just change a single config. This is one of those workarounds that do not look like a proper solution, because of the limited control you have about the written files: they have a fixed file name format (lineage_Y_minute_day_hour_month), they do not support concurrent append (apps closing on the same second are lost), they can crowd your Lakehouse. But upon further evaluation, you might realize that this is an OK trade-off if you are after lineage as a governance asset not in terms of real-time data observability and you are working on a mid-sized platform with manageable concurrency. With a functional harness that parses these files regularly and produces snapshot tables, you get valuable results at a fraction of engineering cost. I have created such a solution that also includes a workflow that partially automates the tedious work of generating and maintaining data contracts. It generates the full column lineage from Bronze to Platinum (semantic models) when using Microsoft Fabric's Spark and Semantic Model workloads for consumption as .yml contracts. I will cover this in detail in another post. The limitations OpenLineage's Spark integration reads the logical plan of SQL and DataFrame operations, so anything that isn't in that plan is dark to it. A df.withColumn("x", custom_python_udf("a", "b")) shows x deriving from a and b, but the subtype stays opaque (usually TRANSFORMATION with no description) because the listener can't see inside the UDF. RDD operations are darker still, and enrichment via REST calls is missed entirely. Setting OpenLineage properties inside a notebook cell does nothing. The listener is already instantiated by the time the cell runs, so all configuration has to live at the Fabric Environment level and be applied before the Spark application starts. That rules out spark.openlineage.run.tags (we wanted to stamp runs with pipeline IDs from notebook code, and couldn't). It also means roughly 2 minutes of extra queue time per ETL job, from the pool cold start. Delta MERGE events report empty top-level inputs. A MERGE INTO emits an event where inputs: [] is literally empty, and the real source paths sit nested inside columnLineage.fields[].inputFields[].name. Walk event.inputs for sources and you miss every MERGE. When notebook A runs runMultiple([notebook_B, notebook_C]), all three share one Spark application and one driver, so every event from B and C carries notebook A's name in spark.synapse.context.notebookname. The listener has no notion of which child is currently executing. If you need per-notebook attribution, derive it from the output dataset paths, not the notebook name. The table name gets extracted from the path, so a sensible naming convention and a metadata table to match against are what let you identify which entity an event belongs to and stitch the events into a graph. Is this cool? I think it is. There are lots of possibilites of how you can consume this type of data in Fabric, and this in turn generates entirely new data products useful in governance, agentic development and data quality. In a future post, I will give more details in how I implemented this into a data contract framework that can keep up with the increasing pace of engineering in a large platform.