astronaut standing beside american flag on the moon
Photo by Pixabay on Pexels.com

Ok, so we’ve got some data in our ‘landing’ lakehouse, now what? Well, next we need to create some tables to keep a history of all data that’s coming into landing, so we can quickly and easily query them to get data not yet in the warehouse into the warehouse. We do this by merging the transient data in landing into our ‘staging’ area.

Words are all I have…

Just briefly going back to terminology from the first post in this series… some people may refer to this particular step as ‘base’, or in ‘medallion architecture’ terminology this might be part of the ‘bronze’ layer. I’ve always preferred ‘staging’ for this step as that’s essentially what we’re doing, we’re staging the data somewhere before final load. This could be simply storing history tables of data, but in some cases you may also need to do some fundamental transformations before the last step of getting the data into a true dimensional model. What we don’t really want to do at this stage though is any cleansing or other non-essential transformations. This is because we want to keep what amounts to an raw ‘offline’ copy of the data in source without any logic, transformations or other things done to it. This makes it easier to trace problems as opposed to running say a complex query with logic in it on the source directly. Your only way to trace problems in those cases is to keep running test queries on the source system, which is something you probably want to keep to a minimum. We also may want to partly expose some of this data to analysts or data scientists for closer examination in certain situations. This obviously becomes a problem if we don’t have the ‘raw’ unadulterated data, we simply won’t be able to provide it to them – whether we want to or not. If you do need to do some fundamental transformations here, its probably a good idea to keep your ‘history’ tables as well as any later ‘adjusted’ versions.

Either way, essentially for our example here we need tables which match the source schema as closely as possible, so no major structure changes here unless there is no choice. If we mange this properly we should then be able to tell what data is new, changed or even deleted in source and then determine the appropriate action to take on ‘our’ side here.

In the past at several organisations I’ve seen people transforming the data whilst its in transit. At one particular place a contractor had created possibly the most complex SSIS package I’ve ever seen to do this. As you could imagine it was an absolute nightmare to both understand and to maintain. He argued that some of the actions in SSIS were faster than doing it in SQL, which in some cases can be true but it’s mostly splitting heirs. The downside however, if you do this, you lose the ability to easily check and compare your data with the source during debugging, not to mention if you need to alter your structure then you could get stuck needing a complete reload of all data from source – which could be a serious performance and/or cost issue in certain cases. Lastly, in most cases the appropriate and fastest tool to do the transformations is the destination data store (e.g. SQL server, Spark, etc…) whilst the orchestration tool (e.g. data factory, SSIS etc…) is often the fastest way to move or copy data without transforming it at the same time.

overview text on brown background
Photo by Ann H on Pexels.com

Anyway, with that out of the way… As shown in the last part of this series, below is the list of lake houses and warehouses we’ve got in our workspace. For want of a better name, I just created a folder called ‘Databases’ and put them in there to keep things nice and ordered. So, lets have a look at the ‘StagingLakehouse’…

In this lakehouse (see below) we’ve created a table with the same structure as the initial table in landing. However, notice that we’ve got 3 additional datetime fields to indicate when the data was loaded, updated or deleted. These will come in useful, especially for source systems which ‘hard’ delete rows instead of flagging them as deleted…

On that subject, note that we’re not physically deleting records though here, we’re only flagging them with a ‘deleted’ datetime so we can keep track of what has been deleted and when. If we didn’t keep this information then we wouldn’t be able to tell the next step (the warehouse) what data has been deleted in source because we’d have no record of it here! So, remember, don’t physically delete anything here at this step just to save some space or try to increase performance a little… otherwise you’ll have problems further down the line 🤔

Quickly referring back to the data factory pipeline, note that we’re using stored procedures in the ‘ControlWarehouse’ to record start/end times for this step (just like we did with staging, and will do with the warehouse step later too). This way we encapsulate all the code in the warehouse as opposed to writing T-SQL in a script activity in the data factory. We want the data factory to concentrate on its primary role, orchestration, and this way we can leave the pipeline doing what its meant to do whilst any code changes happen ‘behind’ the stored procedures.

The main part of this is the notebook which is a parameterised notebook that dynamically merges the data from a source table into a destination table. Why a notebook? Well, first its easy to write some pyspark to do a dynamic merge and secondly we can’t use a data flow because as at the time of writing you cannot pass parameters to a data flow in data factory (hopefully that’s a feature that will be coming, like parameterised connections).

Maybe we could have written a dynamic merge using a stored procedure and some dynamic T-SQL, I think either way should be ok – I’ll leave it to your preferences how you’d do it. That’s one of the beauties of Fabric, you can pick your preferred way in many situations. If all your devs are SQL experts then go with warehouses using procedures and T-SQL. If they prefer python/pyspark then do it that way. Although obviously if you’ve got a warehouse you can’t run pyspark on it (well, not without some sneakiness 😉).

In our notebook we need a parameter cell, for the ‘referral to treatment’ data it looks like this. We set the name of the ‘dataset’ and as there’s no subset we just set this to ‘None’. The obviously the ‘join_columns’ is a comma separated list of columns which we’ll join the source/destination on.

# Name of the dataset and optional subset
dataset_name = "referral_to_treatment"
subset_name = None

# Comma separated string of columns used to join on
join_columns = "Period,ProviderOrgCode,CommissionerOrgCode,RTTPartType,TreatmentFunctionCode"

# Optional comma separated string of columns to exclude in the merge
excluded_columns = ""

Now just a little bit of code to either create a table if there isn’t one already and load the landing data in. Or, if there is a ‘staging’ table already, we use our merge function to merge the landing data into the table.

# Lets run the import process
table_name = dataset_name

# If there's a subset name we append this to the table name
if subset_name is not None and len(subset_name) > 0:
    table_name = table_name + "_" + subset_name

print(f"Table name: '{table_name}'")

# Set the names for lakehouse source/target tables
source_table_name = f"LandingLakehouse.{table_name}"
target_table_name = f"StagingLakehouse.{table_name}"
print(f"Table names for merge: '{source_table_name}' into '{target_table_name}'")

# Check if target table exists, if not then load straight into a new table
if not table_exists("StagingLakehouse", table_name):
    print("Staging table doesn't exist, copying data from landing and creating new staging table")

    # No target table yet, so load the source table into a dataframe
    source_df = spark.table(source_table_name)
    
    # Add the standard metadata fields to the data frame
    final_df = add_metadata_fields_to_data_frame(source_df)

    # Now just create a new table in staging lakehouse with the data from landing ;-)
    final_df.write.mode("overwrite").option("overwriteSchema", "true").format("delta").saveAsTable(target_table_name)

    print("Staging table created from landing table data")
else:
    print("Staging table exists, comparing landing and staging table schemas for any differences")

    # The staging table exists, so first we need to check for schema differences 
    # between the landing table and the staging table in case something has 
    # changed in the new landing table (e.g. column missing or renamed etc...). Just
    # make sure we exclude staging only columns when we compare the schemas!
    column_names_to_exclude = [datetime_loaded_field_name, datetime_updated_field_name, datetime_deleted_field_name]
    
    # Are there any differences?
    if table_schemas_are_different(source_table_name, target_table_name, column_names_to_exclude):        
        # Yes, there are differences, so lets get the columns involved
        differences = get_differences_between_table_schemas(source_table_name, target_table_name, column_names_to_exclude) 
        error_message = f"Schema differences detected between landing and staging tables: {differences}"
        print(error_message)

        # Log the error showing the different columns
        log_error(error_message)

        # Trigger an error!
        raise ValueError(error_message)    

    # If we're here, then all is ok so far. So lets transform the list of join
    # columns into a list for the merge function...
    join_columns_list = join_columns.split(",")
    
    print("Staging merge starting")

    # Merge the landing data into staging lakehouse table
    merge_source_into_target_with_timestamps(source_table_name, target_table_name, join_columns_list, datetime_loaded_field_name, datetime_updated_field_name)

    print("Staging merge completed")

Some functions which the above code uses to check for table schema differences. Remember, as noted in the first part of this series of posts, NHS data structures as often liable to change without notice! This way we can choose what to do if there is a schema difference detected. At this moment though, we only raise/log an error, but this could be expanded upon to do whatever you like, maybe even dynamic mapping using column names in a table…

Here are the little functions which check for schema differences…

# Some table comparison helpers

def table_schemas_are_different(table1, table2, column_names_to_exclude):
    # Get the schema for the first table
    t1_df = spark.read.table(table1)
    t1_column_names = t1_df.schema.names
    
    # Get the schema for the other table
    t2_df = spark.read.table(table2)
    t2_column_names = t2_df.schema.names
    
    # Check for differences, returns True/False depending on outcome
    return are_different_lists(t1_column_names, t2_column_names, column_names_to_exclude)

def get_differences_between_table_schemas(table1, table2, column_names_to_exclude):
    # Get the schema for the first table
    t1_df = spark.read.table(table1)
    t1_column_names = t1_df.schema.names
    
    # Get the schema for the other table
    t2_df = spark.read.table(table2)
    t2_column_names = t2_df.schema.names
    
    # Return list of the differences
    return get_differences_between_lists(t1_column_names, t2_column_names, column_names_to_exclude)

Now we need the code which actually performs the merge…

from delta.tables import *

def merge_source_into_target_with_timestamps(source_table_name, target_table_name, join_columns, loaded_ts_column_name, updated_ts_column_name):
    from pyspark.sql.functions import current_timestamp

    # Set some standard stuff
    source_alias = "source"
    target_alias = "target"

    # Get the source data
    source_df = spark.read.table(source_table_name)
    
    # Sort out lists of columns
    column_names = source_df.schema.names
    
    # Build the join condition
    join_condition = build_join_condition(source_alias, target_alias, join_columns)
    print(f"MERGE - join condition = '{join_condition}'")
    
    # Build the update condition
    update_condition = build_update_condition(source_alias, target_alias, join_columns, column_names)
    print(f"MERGE - update condition = '{update_condition}'")

    # Build the update fields    
    columns_to_update = remove_items_from_list(column_names, join_columns)
    update_set = build_update_set(source_alias, target_alias, columns_to_update)
    update_set[updated_ts_column_name] = current_timestamp()
    print(f"MERGE - update set = '{update_set}'")

    # Build the insert fields    
    insert_values = build_insert_values(source_alias, target_alias, column_names)
    insert_values[loaded_ts_column_name] = current_timestamp()
    print(f"MERGE - insert values = '{insert_values}'")

    # Get the target
    target_table = DeltaTable.forName(spark, target_table_name)

    # Run the merge
    target_table.alias(target_alias)\
        .merge(source = source_df.alias(source_alias), condition = join_condition)\
        .whenNotMatchedInsert(values = insert_values)\
        .whenMatchedUpdate(condition = update_condition, set = update_set)\
        .execute()

Finally, these are some helper functions which the merge function uses to build the final merge command. Basically some simple ‘list’ functions to help build the strings or dicts we need to pass to the merge command.

# Merge table helpers

def build_join_condition(source_alias, target_alias, join_columns):
    return " AND ".join([f"{target_alias}.{col} = {source_alias}.{col}" for col in join_columns])
    
def build_update_condition(source_alias, target_alias, join_columns, columns):
    # First remove the join columns, as we don't update join columns!
    columns_to_update = remove_items_from_list(columns, join_columns)

    # Now build the update differences check condition
    return " OR ".join([f"{target_alias}.{col} <> {source_alias}.{col}" for col in columns_to_update])

def build_update_set(source_alias: str, target_alias: str, columns: list) -> dict:
    return dict([(f"{target_alias}.{key}", f"{source_alias}.{val}") for key, val in zip(columns[::1], columns[::1])])

def build_insert_values(source_alias: str, target_alias: str, columns: list) -> dict:
    return dict([(f"{target_alias}.{key}", f"{source_alias}.{val}") for key, val in zip(columns[::1], columns[::1])])

Phew, and there we go… With all that out of the way, we should now have what we need to merge the landing data into some ‘history’ tables in the staging area.

Next time, we’ll look at the final part of the puzzle, getting all this into a dimensional model in a warehouse. T-SQL and stored procedures, here we come 🫡

Leave a Reply

Discover more from Aventius

Subscribe now to keep reading and get access to the full archive.

Continue reading