
Here’s a little bit of pyspark code that maybe someone might find useful. For me, I use this code to check for schema changes between tables when running ingestion processes. For example, if I’ve landed some data in a table and want to merge the data from that table into a history or archive table (as you might do in a ‘bronze’ layer).
Typically, I would say its better in the landing zone to drop or overwrite the existing target table (if there is one), as opposed to merging incoming data or file straight into a history or archive table. You could argue, “Why don’t you just merge the files right into the history table?”, but breaking it down like this gives you more control over each step of the process. Say for example, the schema in the incoming file is different than usual, what do you do? Well, this way we’ve already loaded the data into a table in our ‘landing’ zone so we can just examine the differences or whatever else we might want to check.
For this little example, we’re imagining that someone has changed a column name in the incoming data or file. How can we easily compare the 2 tables to see if anything (and indeed what) has changed or not? First, we’ll need to write a little function to get the differences between 2 lists, we can then use this with another function which gets the schemas of the 2 tables later:-
def get_differences_between_lists(values, values_to_compare, values_to_ignore):
# Remove any values we want to ignore
values = [x for x in values if x not in values_to_ignore]
values_to_compare = [x for x in values_to_compare if x not in values_to_ignore]
# First check for values that are NOT in the 'compare' list
values_not_in_values_to_compare = [x for x in values if x not in values_to_compare]
# Now check for values from the 'compare' list that are NOT in the original list
values_to_compare_not_in_values = [x for x in values_to_compare if x not in values]
# Combine the 2 lists
values_not_in_values_to_compare.extend(values_to_compare_not_in_values)
# Finally, return the extended list ;-)
return values_not_in_values_to_compare
Note, you can wrap this function inside another if you just want something to show if there are differences but you’re not interested in what the differences are. We simply get the output of the function above and if the length of the output is greater than 0 then there are differences 😉
def are_different_lists(values, values_to_compare, values_to_ignore):
return len(get_differences_between_lists(values, values_to_compare, values_to_ignore)) > 0
Anyway, back to the main problem. Lets now get the schemas of the 2 tables and use our compare function(s) to see if there are any differences:-
def table_schemas_are_different(table1, table2, column_names_to_exclude):
# Get the schema for the first table
t1_df = spark.read.table(table1)
t1_column_names = t1_df.schema.names
# Get the schema for the other table
t2_df = spark.read.table(table2)
t2_column_names = t2_df.schema.names
# Check for differences, returns True/False depending on outcome
return are_different_lists(t1_column_names, t2_column_names, column_names_to_exclude)
And there we go…
Now, whether you want to throw and error or log the issue somewhere is completely up to you and the process you’re building. Taking it a step further, we could extend the functions to return 2 lists, one with columns not in first and vice versa, if you needed more detail.
Anyway, hope someone finds this useful. Till next time 🙂