Delta Lake MERGE INTO Bug Full Table Scan With DELETE Action

by Sharif Sakr 61 views

Hey guys! Today, we're diving deep into a fascinating issue we uncovered while working with Delta Lake. We're talking about a bug in the MERGE INTO query when it involves a DELETE action. This bug can cause a full table scan, which, as you can imagine, can significantly slow things down and make your operations more expensive. So, let's break it down, figure out what's happening, and how we can tackle it.

Understanding the Problem: Full Table Scans with MERGE INTO and DELETE

When using Delta Lake, the MERGE INTO command is super powerful for updating tables based on changes. However, we found that when you include a DELETE action in your MERGE INTO query, it can trigger a full table scan. This means that instead of just looking at the necessary metadata and fields to figure out the merge, Delta Lake scans the entire table, including all the data fields. Imagine trying to find a needle in a haystack, but you have to sift through every single straw – that’s what this full table scan feels like!

Why is This a Problem?

The main issue here is performance. Scanning the entire table is incredibly inefficient, especially when you're dealing with large datasets. It increases the time it takes to complete the merge operation and consumes a lot of resources. Think about it – if you're running this on a production system, it can lead to delays and higher costs. We observed this firsthand and realized we needed to dig deeper to understand what was causing it and how to fix it.

The Culprit: Unnecessary Data Access

What’s happening under the hood is that the query is accessing data fields that it doesn’t actually need. For a DELETE operation, you really only need to know which rows to delete based on the merge condition. You shouldn't need to read all the data in those rows. This unnecessary data access is what leads to the full table scan. We expected the scan to be limited to metadata fields like _id, __delta_internal_is_row_deleted, and other file-related information, but instead, it was pulling in the entire data field.

Reproducing the Issue

To give you a clearer picture, here’s how you can reproduce the issue. Run a MERGE INTO query like this:

MERGE INTO snapshot_table t
USING change_events c
ON t._id = c._id
WHEN MATCHED AND c.operation='DELETE' THEN DELETE

When you run this, you’ll likely see a table scan that looks like this:

Scan snapshot_table
Output : [_id, **data**, __delta_internal_is_row_deleted, _tmp_metadata_row_index, file_path, file_name, file_size, file_block_start, file_block_length, file_modification_time]

Notice the data field in the output? That’s the culprit. We expected the scan to look like this instead:

Scan snapshot_table
Output : [_id, __delta_internal_is_row_deleted, _tmp_metadata_row_index, file_path, file_name, file_size, file_block_start, file_block_length, file_modification_time]

See the difference? No data field. This is the optimized scan we were hoping for.

Diving into the Code: Our Investigation and Fix

So, we rolled up our sleeves and started digging into the Delta Lake code to figure out what was going on. We traced the issue back to the MergeOutputGeneration.scala file. Specifically, we found a snippet that seemed to be the root cause.

The Problematic Snippet

In MergeOutputGeneration.scala, there’s a section where the expressions for the DELETE action are generated. Here’s the original code:

val deleteSourceRowExprs =
  (targetWriteCols ++
    rowIdColumnExpressionOpt.map(_ => Literal(null)) ++
    rowCommitVersionColumnExpressionOpt.map(_ => Literal(null)) ++
    Seq(Literal(true))) ++
    (if (cdcEnabled) Seq(CDC_TYPE_NOT_CDC) else Seq())

This code constructs the expressions for the output columns when a row is deleted. The targetWriteCols part is where the problem lies. It includes all the columns from the target table, including the data field, which leads to the full table scan.

Our Solution: Selective Column Access

To fix this, we modified the code to selectively access columns. We introduced a check to prune the target columns in the DELETE operation. Here’s the modified code:

val deletedColsForUnmatchedTarget =
  if (cdcEnabled || !pruneTargetColsInDelete) targetWriteCols
  else targetWriteCols.map(e => Cast(Literal(null), e.dataType))
val deleteSourceRowExprs =
  (targetWriteCols ++
  (deletedColsForUnmatchedTarget ++
    rowIdColumnExpressionOpt.map(_ => Literal(null)) ++
    rowCommitVersionColumnExpressionOpt.map(_ => Literal(null)) ++
    Seq(Literal(true))) ++
    (if (cdcEnabled) Seq(CDC_TYPE_NOT_CDC) else Seq())

With this change, we only access the necessary columns for the DELETE operation, avoiding the full table scan. We create a deletedColsForUnmatchedTarget which either uses all targetWriteCols if CDC is enabled or pruneTargetColsInDelete is false, otherwise, it maps the targetWriteCols to NULL values, preventing the full table scan.

More Tweaks Needed: Handling Other Cases

But wait, there’s more! We found that depending on the MERGE INTO statement, there were other places in the code that could cause similar issues. For example, the generateAllActionExprs function also had the potential to reference all columns in the target table. Here’s the original code:

case _: DeltaMergeIntoMatchedDeleteClause =>
  val incrCountExpr = {
    if (shouldCountDeletedRows) {
      incrementMetricsAndReturnBool(
        names = Seq("numTargetRowsDeleted", "numTargetRowsMatchedDeleted"),
        valueToReturn = true)
    } else {
      Literal.TrueLiteral
    }
  }
  // Generate expressions to set the ROW_DROPPED_COL = true and mark as a DELETE
  targetWriteCols ++
    rowIdColumnExpressionOpt ++
    rowCommitVersionColumnOpt ++
    Seq(incrCountExpr) ++
    (if (cdcEnabled) Some(CDC_TYPE_DELETE) else None)

We needed to make sure we addressed these cases as well to ensure consistent performance improvements.

CDC and Tombstones: A Curious Observation

While diving into the code, we also stumbled upon an interesting comment:

* @param targetWriteCols List of output column expressions from the target table. Used to
*                        generate CDC data for DELETE.

This comment suggested that the full row was being propagated intentionally into the target table with a tombstone (ROW_DROPPED_COL = true) set for Change Data Capture (CDC). However, we noticed that these rows were actually being dropped in the Spark logical plan.

The Spark Plan Reveal

When we looked at the Spark plan for rewriting data, we saw the following projection and filter:

(13) Project [codegen id : 2]
Output [1]: [true AS _row_dropped_#5803]
Input [2]: [_id#857, _maglev_id#4784]

(14) Filter [codegen id : 2]
Input [1]: [_row_dropped_#5803]
Condition : NOT _row_dropped_#5803

This showed us that after the inner join, all rows were being filtered out. This led us to another realization: if the MERGE INTO is only deleting data, the rewriting data stage might not even be necessary.

Optimization Opportunity: Skipping the Rewriting Stage

This brings us to a potential optimization. Currently, MERGE INTO runs in two main stages: rewriting data and writing out delete vectors. But if we're only deleting data, the rewriting stage seems redundant. The implementation or Spark optimizer isn't yet smart enough to recognize this and skip the rewriting stage, which could be a significant performance boost.

A Second Bug/Optimization Opportunity: Unnecessary Rewriting Stage

Let's delve deeper into this second potential issue. As mentioned earlier, the MERGE INTO command in Delta Lake appears to have two primary stages: rewriting data and writing out delete vectors. However, our analysis suggests that when the operation solely involves deleting data, the rewriting data stage might be unnecessary. This is a crucial observation that could lead to significant performance improvements.

Why is the Rewriting Stage Redundant for Deletes?

The rewriting stage typically involves updating the data files to reflect the changes made by the MERGE INTO command. However, when we are only deleting rows, the actual data content doesn't need to be rewritten. Instead, the system only needs to record the deletions, which is handled by writing out delete vectors. Delete vectors are a more efficient way to manage deletions, as they simply mark rows as deleted without modifying the underlying data files directly.

The Spark Plan Confirms the Redundancy

Our examination of the Spark plan revealed that after the inner join, all rows are deterministically filtered out when the MERGE INTO operation is focused solely on deletes. This observation underscores the redundancy of the rewriting stage in such scenarios. The system performs computations and operations that ultimately lead to a no-op, which is a clear indication of an optimization opportunity.

The Potential for Performance Improvement

If the Delta Lake implementation or the Spark optimizer were capable of recognizing that the rewriting data stage is not required for delete-only operations, it could lead to a substantial reduction in processing time and resource consumption. Imagine the gains in efficiency if the system could bypass a whole stage of processing when it's not needed! This is the kind of optimization that can make a real difference in large-scale data operations.

The Challenge: Making the System Smarter

The challenge lies in making the system