close up of two forks joined together
Photo by Sternsteiger Stahlwaren on Pexels.com

Something I have to write quite often, as most developers working with data will do too, is a merge statement. You can of course do an ‘upsert’ but I’d definitely say that the merge statement is a ‘nice to have’. I’ve done this in T-SQL more times than I can count, but in this post I’m going to look specifically at Microsoft Fabric and merging tables using notebooks and pyspark.

Say for example that you have some nightly changes (e.g. new rows, changed rows etc…) coming in from a source system and you’re ‘landing’ this data in one table (or even a file in the lake), you’d probably then want to ‘merge’ this data into another table which contains all the previously loaded data. In the medallion architecture (love it or hate it) proposed by the databricks people, this table containing the previously loaded data would likely be referred to as the ‘bronze’ layer.

Writing a simple merge statement to get data from the landing table into the bronze table (or whatever you want to call it… staging/base/archive/etc.) is fairly easy. For example it might be something a bit like this:-

MERGE
BronzeTable as trg

USING
LandingTable as src
ON
trg.SomeKeyField = src.SomeKeyField -- Join on the unique field(s)

-- Insert new rows where there isn't a match
WHEN NOT MATCHED BY TARGET THEN INSERT (
    SomeKeyField
    ,OtherField
    ,YetAnotherField
) VALUES (
    src.SomeKeyField
    ,src.OtherField
    ,src.YetAnotherField
)

-- Update rows where there are differences, note we DO NOT update
-- the key field or check for differences as that is the field we
-- MATCHED on! ;-)
WHEN MATCHED AND (
    trg.OtherField <> src.OtherField
    OR trg.YetAnotherField <> src.YetAnotherField
)
THEN UPDATE SET
trg.OtherField = src.OtherField
,trg.YetAnotherField = src.YetAnotherField

brown wooden scrabble tiles on white surface
Photo by Brett Jordan on Pexels.com

Ok, simple… But what if you’ve got many different landing tables and bronze tables? You’re going to have to write a merge statement for each one right? This of course is going to be quite tedious, even if you copy and paste, maybe you could build a code generator to write the code for you… either way you’re going to end up with lots of code. The other alternative is to write a single merge statement that could merge any of the landing tables into its corresponding ‘bronze’ table. In T-SQL you could do it maybe with some dynamic SQL using the good old sp_executesql. However, if your preference is developing using notebooks and python (pyspark) then what’s the story there? Well, a quick look at the delta table docs regarding merging delta tables there’s some examples, most of which show how to merge two data frames. There is an example shown in the docs pasted below:-

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Ok, so that’s pretty straightforward. However, instead of a dataframe we can actually use a delta table as the target (although we still need a dataframe for the source). So a small change and we have this bit of pyspark code that we can use in Fabric (as above in the previous example, the sourceDF is a source data frame) in our notebook:-

from delta.tables import *

# Get the target table
target_table = DeltaTable.forName(spark, "SomeLakehouse.some_table")

# Run the merge
(target_table 
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Now whether you’d want to include the whenNotMatchedBySourceDelete() part in the above example code comes down to your particular use case. If you’re merging only new/changed rows into the ‘bronze’ table then you would NOT want to delete rows in the target ‘bronze’ table that aren’t in the source. Only if your landing table contained a full set of rows from the entire source table would you want to use it… anyway, I digress. We’ll need to expand the code a little more as ideally we don’t want to just update all rows if there’s a match, we need some conditions. So lets change the code a bit more, note we’re going to replace/generate some of the variables below later, but for now we only need to know the parameters we’ll need:-

(target_table.alias(target_alias)
    .merge(source = source_df.alias(source_alias), condition = join_condition)
    .whenNotMatchedInsertAll()
    .whenMatchedUpdate(condition = update_condition, set = update_set)
    .execute())

Ok, looking at the code above means we’ll need to generate values for the following variables:-

  • join_condition – This is the join ‘on’ condition, its just a string… we need this since each table could have different and/or multiple fields you would need to join on. These ‘join’ fields would/should be referred to as the ‘business keys’, if we’re going back to data warehousing parlance.
  • update_condition – The update ‘when matched’ condition, we need this to know when we should update a row. It usually involves checking all fields for differences in source/target, except the business key field(s).
  • update_set – The update ‘set’ condition, this part tells us which fields we update if there’s a match. Similar to the update condition, this is usually all fields except the business key field(s). This is dictionary (or Dict) type, not a string.
woman sitting on chair while leaning on laptop
Photo by Andrea Piacquadio on Pexels.com

That sounds like hard work… and well, yes, it is a little bit of a faff to write, but think of it this way, once its written you can just stick it in a function and hey presto, many lines of code down to just one 🙂

assorted color plastic gear lot
Photo by Dan Cristian Pădureț on Pexels.com

I like breaking down my code into reusable little functions where I can. You could of course just write all this out as one big function, this is just my preference. So with that in mind, first we’ll write a little function to build the join ‘on’ condition and since I also like my function names to be very clear and verbose, I’ll call it ‘build_join_condition‘, this way the name tells you exactly what it does. In this function all we need to do is loop through each of the join ‘columns ‘keys’ and build code like ‘target.column = source.column’ and if there’s multiple ‘keys’, just prepend an ‘AND’ for each one 😉

# Builds a join condition for two tables

def build_join_condition(source_alias, target_alias, join_columns):
    return " AND ".join([f"{target_alias}.{col} = {source_alias}.{col}" for col in join_columns])

Now we’ll build another function to create the update condition, called ‘build_update_condition‘. Since we’ll need to remove the ‘business key’ field(s) from the condition we need something to find them and remove them from our list of columns, so I’ve written another little function called ‘remove_items_from_list‘ to do this, and this is called by our ‘build_update_condition‘ function:-

# This is just a little helper function to remove specific items from a list

def remove_items_from_list(items, items_to_remove):
    return [value for value in items if value not in items_to_remove]

# Now the real function, this builds the difference checking code so that we only update
# the rows where there are differences ;-)

def build_update_condition(source_alias, target_alias, join_columns, columns):
    # First remove the join columns, as we DO NOT update join columns!
    columns_to_update = remove_items_from_list(columns, join_columns)

    # Now build the update differences check condition
    return " OR ".join([f"{target_alias}.{col} <> {source_alias}.{col}" for col in columns_to_update])

Almost there now, lastly we need to build a dictionary of fields containing which field will update which. We basically loop though the list of columns and build a key value pair which is returned that can be used in the ‘set’ part of the merge later on:-

# Returns a dictionary which can be used in a merge to specify which columns in source are used
# to update which columns in the target

def build_update_set(source_alias, target_alias, columns):
    return dict([(f"{target_alias}.{key}", f"{source_alias}.{val}") for key, val in zip(columns[::1], columns[::1])])
confetti on a piece of paper
Photo by olia danilevich on Pexels.com

So, putting all this together… we can now build a function to merge data from one table into another, as per described at the start! Phew…

from delta.tables import *

def merge_source_into_target(source_table_name, target_table_name, join_columns, columns_to_exclude):
    # Set the alias names for source and target
    source_alias = "source"
    target_alias = "target"

    # Get the source data into a data frame
    source_df = spark.read.table(source_table_name)
    
    # Get a list of column names from the source dataframe, then remove the key fields
    column_names = source_df.schema.names
    columns_to_update = remove_items_from_list(column_names, join_columns)

    # Build the join condition
    join_condition = build_join_condition(source_alias, target_alias, join_columns)
    print(f"MERGE - join condition = '{join_condition}'")
    
    # Build the update condition
    update_condition = build_update_condition(source_alias, target_alias, join_columns, column_names)
    print(f"MERGE - update condition = '{update_condition}'")

    # Build the update 'set' fields    
    update_set = build_update_set(source_alias, target_alias, columns_to_update)
    print(f"MERGE - update set = '{update_set}'")

    # Get the target table
    target_table = DeltaTable.forName(spark, target_table_name)

    # Run the merge
    (target_table.alias(target_alias)
        .merge(source = source_df.alias(source_alias), condition = join_condition)
        .whenNotMatchedInsertAll()
        .whenMatchedUpdate(condition = update_condition, set = update_set)
        .execute())

We could expand on this further maybe by adding in a datetime or timestamp field to record when a row was updated. Also, if we were instead performing a ‘full load’ merge then we could use the whenNotMatchedBySourceDelete() part of the merge. Or better still, something to just mark a row as deleted. I’ll leave that for another day, I’m tired now, so till next time…

Hope this comes in handy for someone somewhere 😉

2 comments

  1. I am not entirely sure but have observed that when you run SQL statement as compare to the method above where we use source data frame to merge into target is faster than SQL merge statement.

    1. Thanks for your comment, you might be right, although I’ve not compared the two methods side by side with a timer 😉 I’d normally write a SQL statement for a single merge though. This solution I’ve shown here is if I need to loop over multiple times with several different tables, where I can effectively parameterise the table ‘names’ – which of course isn’t possible with a normal SQL statement/cell. Although saying that, it would be if using spark.sql(“merge…”) where you could build up a string for the query. I’ll have to try them both some time and see if there’s any significant difference, I guess it would come down to how they’re both ‘compiled down’ to run on the nodes and if there’s any difference in the final code generated… 😉

Leave a Reply to AventiusCancel reply

Discover more from Aventius

Subscribe now to keep reading and get access to the full archive.

Continue reading