Delta Lake MERGE INTO Bug Full Table Scan With DELETE Action
Hey guys! Today, we're diving deep into a fascinating issue we uncovered while working with Delta Lake. We're talking about a bug in the MERGE INTO
query when it involves a DELETE
action. This bug can cause a full table scan, which, as you can imagine, can significantly slow things down and make your operations more expensive. So, let's break it down, figure out what's happening, and how we can tackle it.
Understanding the Problem: Full Table Scans with MERGE INTO and DELETE
When using Delta Lake, the MERGE INTO
command is super powerful for updating tables based on changes. However, we found that when you include a DELETE
action in your MERGE INTO
query, it can trigger a full table scan. This means that instead of just looking at the necessary metadata and fields to figure out the merge, Delta Lake scans the entire table, including all the data fields. Imagine trying to find a needle in a haystack, but you have to sift through every single straw – that’s what this full table scan feels like!
Why is This a Problem?
The main issue here is performance. Scanning the entire table is incredibly inefficient, especially when you're dealing with large datasets. It increases the time it takes to complete the merge operation and consumes a lot of resources. Think about it – if you're running this on a production system, it can lead to delays and higher costs. We observed this firsthand and realized we needed to dig deeper to understand what was causing it and how to fix it.
The Culprit: Unnecessary Data Access
What’s happening under the hood is that the query is accessing data fields that it doesn’t actually need. For a DELETE
operation, you really only need to know which rows to delete based on the merge condition. You shouldn't need to read all the data in those rows. This unnecessary data access is what leads to the full table scan. We expected the scan to be limited to metadata fields like _id
, __delta_internal_is_row_deleted
, and other file-related information, but instead, it was pulling in the entire data
field.
Reproducing the Issue
To give you a clearer picture, here’s how you can reproduce the issue. Run a MERGE INTO
query like this:
MERGE INTO snapshot_table t
USING change_events c
ON t._id = c._id
WHEN MATCHED AND c.operation='DELETE' THEN DELETE
When you run this, you’ll likely see a table scan that looks like this:
Scan snapshot_table
Output : [_id, **data**, __delta_internal_is_row_deleted, _tmp_metadata_row_index, file_path, file_name, file_size, file_block_start, file_block_length, file_modification_time]
Notice the data
field in the output? That’s the culprit. We expected the scan to look like this instead:
Scan snapshot_table
Output : [_id, __delta_internal_is_row_deleted, _tmp_metadata_row_index, file_path, file_name, file_size, file_block_start, file_block_length, file_modification_time]
See the difference? No data
field. This is the optimized scan we were hoping for.
Diving into the Code: Our Investigation and Fix
So, we rolled up our sleeves and started digging into the Delta Lake code to figure out what was going on. We traced the issue back to the MergeOutputGeneration.scala
file. Specifically, we found a snippet that seemed to be the root cause.
The Problematic Snippet
In MergeOutputGeneration.scala
, there’s a section where the expressions for the DELETE
action are generated. Here’s the original code:
val deleteSourceRowExprs =
(targetWriteCols ++
rowIdColumnExpressionOpt.map(_ => Literal(null)) ++
rowCommitVersionColumnExpressionOpt.map(_ => Literal(null)) ++
Seq(Literal(true))) ++
(if (cdcEnabled) Seq(CDC_TYPE_NOT_CDC) else Seq())
This code constructs the expressions for the output columns when a row is deleted. The targetWriteCols
part is where the problem lies. It includes all the columns from the target table, including the data
field, which leads to the full table scan.
Our Solution: Selective Column Access
To fix this, we modified the code to selectively access columns. We introduced a check to prune the target columns in the DELETE
operation. Here’s the modified code:
val deletedColsForUnmatchedTarget =
if (cdcEnabled || !pruneTargetColsInDelete) targetWriteCols
else targetWriteCols.map(e => Cast(Literal(null), e.dataType))
val deleteSourceRowExprs =
(targetWriteCols ++
(deletedColsForUnmatchedTarget ++
rowIdColumnExpressionOpt.map(_ => Literal(null)) ++
rowCommitVersionColumnExpressionOpt.map(_ => Literal(null)) ++
Seq(Literal(true))) ++
(if (cdcEnabled) Seq(CDC_TYPE_NOT_CDC) else Seq())
With this change, we only access the necessary columns for the DELETE
operation, avoiding the full table scan. We create a deletedColsForUnmatchedTarget
which either uses all targetWriteCols
if CDC is enabled or pruneTargetColsInDelete
is false, otherwise, it maps the targetWriteCols
to NULL
values, preventing the full table scan.
More Tweaks Needed: Handling Other Cases
But wait, there’s more! We found that depending on the MERGE INTO
statement, there were other places in the code that could cause similar issues. For example, the generateAllActionExprs
function also had the potential to reference all columns in the target table. Here’s the original code:
case _: DeltaMergeIntoMatchedDeleteClause =>
val incrCountExpr = {
if (shouldCountDeletedRows) {
incrementMetricsAndReturnBool(
names = Seq("numTargetRowsDeleted", "numTargetRowsMatchedDeleted"),
valueToReturn = true)
} else {
Literal.TrueLiteral
}
}
// Generate expressions to set the ROW_DROPPED_COL = true and mark as a DELETE
targetWriteCols ++
rowIdColumnExpressionOpt ++
rowCommitVersionColumnOpt ++
Seq(incrCountExpr) ++
(if (cdcEnabled) Some(CDC_TYPE_DELETE) else None)
We needed to make sure we addressed these cases as well to ensure consistent performance improvements.
CDC and Tombstones: A Curious Observation
While diving into the code, we also stumbled upon an interesting comment:
* @param targetWriteCols List of output column expressions from the target table. Used to
* generate CDC data for DELETE.
This comment suggested that the full row was being propagated intentionally into the target table with a tombstone (ROW_DROPPED_COL = true
) set for Change Data Capture (CDC). However, we noticed that these rows were actually being dropped in the Spark logical plan.
The Spark Plan Reveal
When we looked at the Spark plan for rewriting data, we saw the following projection and filter:
(13) Project [codegen id : 2]
Output [1]: [true AS _row_dropped_#5803]
Input [2]: [_id#857, _maglev_id#4784]
(14) Filter [codegen id : 2]
Input [1]: [_row_dropped_#5803]
Condition : NOT _row_dropped_#5803
This showed us that after the inner join, all rows were being filtered out. This led us to another realization: if the MERGE INTO
is only deleting data, the rewriting data stage might not even be necessary.
Optimization Opportunity: Skipping the Rewriting Stage
This brings us to a potential optimization. Currently, MERGE INTO
runs in two main stages: rewriting data and writing out delete vectors. But if we're only deleting data, the rewriting stage seems redundant. The implementation or Spark optimizer isn't yet smart enough to recognize this and skip the rewriting stage, which could be a significant performance boost.
A Second Bug/Optimization Opportunity: Unnecessary Rewriting Stage
Let's delve deeper into this second potential issue. As mentioned earlier, the MERGE INTO
command in Delta Lake appears to have two primary stages: rewriting data and writing out delete vectors. However, our analysis suggests that when the operation solely involves deleting data, the rewriting data stage might be unnecessary. This is a crucial observation that could lead to significant performance improvements.
Why is the Rewriting Stage Redundant for Deletes?
The rewriting stage typically involves updating the data files to reflect the changes made by the MERGE INTO
command. However, when we are only deleting rows, the actual data content doesn't need to be rewritten. Instead, the system only needs to record the deletions, which is handled by writing out delete vectors. Delete vectors are a more efficient way to manage deletions, as they simply mark rows as deleted without modifying the underlying data files directly.
The Spark Plan Confirms the Redundancy
Our examination of the Spark plan revealed that after the inner join, all rows are deterministically filtered out when the MERGE INTO
operation is focused solely on deletes. This observation underscores the redundancy of the rewriting stage in such scenarios. The system performs computations and operations that ultimately lead to a no-op, which is a clear indication of an optimization opportunity.
The Potential for Performance Improvement
If the Delta Lake implementation or the Spark optimizer were capable of recognizing that the rewriting data stage is not required for delete-only operations, it could lead to a substantial reduction in processing time and resource consumption. Imagine the gains in efficiency if the system could bypass a whole stage of processing when it's not needed! This is the kind of optimization that can make a real difference in large-scale data operations.
The Challenge: Making the System Smarter
The challenge lies in making the system