a choir singing while holding music sheets
Photo by Thirdman on Pexels.com

Welcome to part 2, this time we’ll take a look at how we’re going to orchestrate the import process. Below is a rough diagram of how this might work and where the data will flow. Note the warehouse ‘ControlWarehouse’ which is where we’ll store our metadata tables for data sources and imports…

Within Fabric we’ll organise all these components above in a single workspace and use some folders to keep it nice and tidy too. Note, I’ve linked this workspace up to Azure DevOps for version/source control. The workspace looks something like this…

I must point out a couple of things before we go any further, and before I forget… 🤔

Firstly… I’m using pyspark to download the files from the websites instead of data factory and the ‘COPY’ activity. Why? This is because currently you can’t parameterise Fabric data connections like you can with the old Azure data factory linked services. The feature is coming apparently, but its currently WIP at the time of writing this – May 2024. This means we’d need to hard wire a separate connection for every different website we’d want to download from! This also applies for SQL and other servers if you were going to connect to several different ones say on-prem. So, for this example I’m just using web data downloads, I might update this guide though if/when the feature is rolled out. This is a work around (or rather an alternative) for this and that is to use a data flow, but its a little over complicated for what essentially is just a simple copy. So, for now (until they implement parameterised connections) we’ll just stick with this. The architecture isn’t difficult to modify to pull data from an on-prem server, but unless you’ve only got a single on-prem SQL server for example, I’d wait until they implement the connection parameterisation feature.

Secondly… As I mentioned above, I’m using a warehouse to store all the metadata tables which will contain the data source details, you could argue that this is maybe like using a sledgehammer to crack a nut and that it could (or should) be done by using an Azure SQL database (it could be cheaper). However, just for example purposes here I’ve chosen to keep all this within Fabric (for now). Alternatively, we could even move these tables/views into the ‘DataWarehouse’ warehouse I suppose?

multicolored mixing console
Photo by Lukas on Pexels.com

The key part of this architecture is of course the metadata control table(s). Instead of creating multiple pipelines to import various different data from various different sources, we can parameterise a pipeline to be flexible enough that it can load many different types of data from different places (see limitations on parameterised connections in Fabric I talked about before though). Essentially, if we can list our data sources in some tables, with a few key bits of information such as source location, business key or identifier fields, names and so on, then we can loop through these in order to run various imports. Thus, we should be able to keep the number of data factory pipelines to a minimum and drive almost everything from our ‘control’ tables – as opposed to the administrative nightmare of hundreds of pipelines all doing different things.

I’ve created a warehouse called ‘ControlWarehouse’ in Fabric and made a small table in there to do such a thing for us, I’ve called it ‘DataSources’. For this example I’m only using a URL as the location of the source data, but this table (and process) could easily be expanded upon to give details of say, a database connection, maybe with fields like ‘ServerName’, ‘DatabaseName’ and ‘CustomQuery’ and so on (see my comments earlier regarding parameterised connections though). But, since I’m only working with web file downloads for this example, I’ve omitted these kinds particular fields for now, but hopefully you get the idea.

So, lets have a quick look at this table – also, I’ve listed a few helper views below that sit on top of this table:-

Hopefully most of the fields should be self explanatory, such as the datetime fields which get populated by the pipeline as per when the relevant parts are executed. But for some key ones:-

  • Dataset / Subset – these are the clear names of the dataset and if its a subset of a dataset, for our example the dataset name would be ‘Referral To Treatment‘. The system will use this name (and subset name) as a convention for saving files in a similarly named sub folder and for writing to named tables. Conventions like this should be used where possible as they should prevent different developers from coming up with their own unique naming conventions and other processes. This is because since the process defines these, any devs must to stick to them in order to get there imports working.
  • JoinColumns – a comma separated list of ‘business key’ columns, i.e. the fields that you’ll need to join on when performing a merge operation.
  • ProcessInto**** – these are just a few flag fields that can be set if you want to just run a particular step, or re-run a particular step. The pipeline will read the value of these and then skip or run the relevant steps. This can help with debugging as well, when you don’t want to run other steps.
  • WarehouseProcedure**** – these are left NULL, unless you want to run a custom procedure after processing. If left as NULL then the dataset naming convention dictates the name of the stored procedure that will be run in the warehouse during the warehouse ‘step’.

Lastly, the views are what the data factory pipeline actually looks at. Again, hopefully the naming conventions are fairly obvious for what does what… The ‘vwDataSourcesToProcess’ is a list of data sources which are yet to be processed. It filters out any data sources that have been completely processed successfully (i.e. it has a simple WHERE clause in it). I’m assuming you can figure out what the ‘vwDataSourcesWithErrors’ shows? 😊

The code for the main ‘vwDataSources’ view is below:-

CREATE VIEW [dbo].[vwDataSources] AS

SELECT
[DataSourceKey] -- A unique identifier to refer to this data source

-- A simple flag to indicate if any step of the import is in progress
,[IsInProgress] = CAST(CASE
	WHEN (
		[LandingStartedDateTime] IS NOT NULL -- If landing has started...
		AND [LandingCompletedDateTime] IS NULL -- ...and has not completed yet
	) OR (
		[StagingStartedDateTime] IS NOT NULL -- If staging has started...
		AND [StagingCompletedDateTime] IS NULL -- ...and has not completed yet
	) OR (
		[WarehouseStartedDateTime] IS NOT NULL -- If warehouse has started...
		AND [WarehouseCompletedDateTime] IS NULL -- ...and has not completed yet
	)
	THEN 1 -- Then this data source load is in process

	ELSE 0 -- Otherwise it isn't
END AS BIT)

-- Name of the current step in progress, if nothing is in progress then this will be NULL
,[CurrentStep] = CASE
	-- If landing has started but not completed then we're still running landing step...
	WHEN [LandingStartedDateTime] IS NOT NULL
	AND [LandingCompletedDateTime] IS NULL
	THEN 'Landing'

	-- If staging has started but not completed then we're still running staging step...
	WHEN [StagingStartedDateTime] IS NOT NULL
	AND [StagingCompletedDateTime] IS NULL
	THEN 'Staging'

	-- If warehouse has started but not completed then we're still running warehouse step...
	WHEN [WarehouseStartedDateTime] IS NOT NULL
	AND [WarehouseCompletedDateTime] IS NULL
	THEN 'Warehouse'

	ELSE NULL
END

-- Add's up each steps duration into an hours/minutes format
,[TotalDuration] = CASE
	WHEN [WarehouseStartedDateTime] IS NOT NULL
	THEN CONVERT(
		VARCHAR(5), 
		DATEADD(MINUTE, DATEDIFF(MINUTE, [LandingStartedDateTime], ISNULL([WarehouseCompletedDateTime], GETDATE())), 0), 
		114
	)

	WHEN [StagingStartedDateTime] IS NOT NULL
	THEN CONVERT(
		VARCHAR(5), 
		DATEADD(MINUTE, DATEDIFF(MINUTE, [LandingStartedDateTime], ISNULL([StagingCompletedDateTime], GETDATE())), 0), 
		114
	)

	WHEN [LandingStartedDateTime] IS NOT NULL
	THEN CONVERT(
		VARCHAR(5), 
		DATEADD(MINUTE, DATEDIFF(MINUTE, [LandingStartedDateTime], GETDATE()), 0), 
		114
	)

	ELSE NULL
END

-- If any step has an error message then obviously this import has failed ;-)
,[HasFailed] = CAST(CASE
	WHEN [LandingErrorMessage] IS NOT NULL
	OR [StagingErrorMessage] IS NOT NULL
	OR [WarehouseErrorMessage] IS NOT NULL
	THEN 1

	ELSE 0
END AS BIT)

,[IsEnabled]
,[IngestionMechanism]

,[Dataset]
,[Subset]

-- The Fabric lakehouse names mean lowercase and underscores, no spaces, so we do a little text adjustment
,[LakehouseDatasetName] = REPLACE(LOWER([Dataset]), ' ', '_')
,[LakehouseSubsetName] = REPLACE(LOWER([Subset]), ' ', '_')
,[LakehouseTableName] = REPLACE(LOWER([Dataset] + CASE WHEN [Subset] IS NULL THEN '' ELSE ' ' + [Subset] END), ' ', '_')

-- We don't want spaces in our warehouse schema names!
,[WarehouseSchemaName] = REPLACE([Dataset], ' ', '')

,[Notebook] -- Name of the custom notebook to execute (not yet implemented)
,[Url] -- URL of the data source
,[DataFormat] = UPPER([DataFormat]) -- Format, e.g. CSV, JSON... currently only loads CSV data
,[HasHeaderRow] -- A bit field to indicate if the file has a header row or not
,[JoinColumns] -- A comma separated list of unique fields on which to join when doing a merge or update

-- Some flags to specify which steps to process or not
,[ProcessIntoLanding]
,[ProcessIntoStaging]
,[ProcessIntoWarehouse]

-- Landing step stuff
,[LandingStartedDateTime]
,[LandingCompletedDateTime]
,[LandingErrorMessage]

,[LandingDuration] = CASE
	WHEN [LandingStartedDateTime] IS NOT NULL
	AND [LandingCompletedDateTime] IS NULL
	THEN CONVERT(
		VARCHAR(5), 
		DATEADD(MINUTE, DATEDIFF(MINUTE, [LandingStartedDateTime], GETDATE()), 0), 
		114
	)

	ELSE CONVERT(
		VARCHAR(5), 
		DATEADD(MINUTE, DATEDIFF(MINUTE, [LandingStartedDateTime], [LandingCompletedDateTime]), 0), 
		114
	) 
END

,[LandingIsInProgress] = CAST(CASE
	WHEN [LandingStartedDateTime] IS NOT NULL
	AND [LandingCompletedDateTime] IS NULL
	THEN 1

	ELSE 0
END AS BIT)

-- Staging step stuff
,[StagingStartedDateTime]
,[StagingCompletedDateTime]
,[StagingErrorMessage]

,[StagingDuration] = CASE
	WHEN [StagingStartedDateTime] IS NOT NULL
	AND [StagingCompletedDateTime] IS NULL
	THEN CONVERT(
		VARCHAR(5), 
		DATEADD(MINUTE, DATEDIFF(MINUTE, [StagingStartedDateTime], GETDATE()), 0), 
		114
	)

	ELSE CONVERT(
		VARCHAR(5), 
		DATEADD(MINUTE, DATEDIFF(MINUTE, [StagingStartedDateTime], [StagingCompletedDateTime]), 0), 
		114
	)
END

,[StagingIsInProgress] = CAST(CASE
	WHEN [StagingStartedDateTime] IS NOT NULL
	AND [StagingCompletedDateTime] IS NULL
	THEN 1

	ELSE 0
END AS BIT)

-- Warehouse step stuff
,[WarehouseStartedDateTime]
,[WarehouseCompletedDateTime]
,[WarehouseErrorMessage]

,[WarehouseDuration] = CASE
	WHEN [WarehouseStartedDateTime] IS NOT NULL
	AND [WarehouseCompletedDateTime] IS NULL
	THEN CONVERT(
		VARCHAR(5), 
		DATEADD(MINUTE, DATEDIFF(MINUTE, [WarehouseStartedDateTime], GETDATE()), 0), 
		114
	)

	ELSE CONVERT(
		VARCHAR(5), 
		DATEADD(MINUTE, DATEDIFF(MINUTE, [WarehouseStartedDateTime], [WarehouseCompletedDateTime]), 0), 
		114
	)
END

,[WarehouseIsInProgress] = CAST(CASE
	WHEN [WarehouseStartedDateTime] IS NOT NULL
	AND [WarehouseCompletedDateTime] IS NULL
	THEN 1

	ELSE 0
END AS BIT)

-- Name of the warehouse proc to run after staging completes
,[WarehouseProcedure] = CASE
	-- Custom procedure supplied?
	WHEN [WarehouseProcedureSchema] IS NOT NULL
	AND [WarehouseProcedureName] IS NOT NULL
	THEN [WarehouseProcedureSchema] + '.' + [WarehouseProcedureName]

	-- No, use default naming convention for procedure name to run
	ELSE REPLACE([Dataset], ' ', '') + '.uspProcess' + REPLACE(ISNULL([Subset], ''), ' ', '')
END

,[WarehouseProcedureParameter] -- Optional parameter value to pass to the warehouse proc

FROM
[dbo].[DataSources]

Ok, hopefully that makes sense, onto the data factory pipeline that will read from these control tables/views… Its reasonably simple, we’ve got a ‘LookUp’ where we first read the metadata tables (i.e. the list of imports we want to run), then a ‘ForEach’ loops through each data source and processes appropriately.

Next, drilling into the ‘ForEach’ loop, we break down the process into 3 further steps. Note that each stage is enclosed in an ‘If Condition’. We’re using the flags from the control table discussed before, we check their values and run each step if the relevant flag is set:-

  • Landing – downloads data files from the web, landing the files in OneLake in their original unadulterated format, then loads the data in a fresh/empty table in the landing lakehouse.
  • Staging – reads from the table in landing, checks for potential schema differences between the new ‘landed’ data and what we’ve got already, then merges the landing data into a history table in the staging lakehouse.
  • Warehouse – reads from the history table in the staging lakehouse, gets the latest data we’ve not got already in the warehouse then builds the dimension, fact and any bridge tables. Also may create additional tables such as aggregated or snapshot tables too.

Landing (see below) – this writes the current datetime to the ‘LandingStartedDateTime’ field for the row we’re processing so we can see when the landing step started, runs the ‘download into landing’ notebook, then on completion it writes the current datetime to ‘LandingCompletedDateTime’. This way we can track duration, if any errors occur it writes those to the ‘LandingErrorMessage’ field.

Staging (see below) – similar to the landing step, this first writes the current datetime to the ‘StagingStartedDateTime’ field for the row we’re processing, runs the ‘merge landing into staging’ notebook, then on completion it writes the current datetime to ‘StagingCompletedDateTime’. This way we can track duration, if any errors occur it writes those to the ‘StagingErrorMessage’ field.

Warehouse (see below) – again, similar to the landing and staging steps, this first writes the current datetime to the ‘WarehouseStartedDateTime’ field for the row we’re processing, runs the ‘process staging into warehouse’ stored procedure in the ‘DataWarehouse’ warehouse. Then, on completion it writes the current datetime to ‘WarehouseCompletedDateTime’. This way we can track duration, if any errors occur it writes those to the ‘WarehouseErrorMessage’ field.

So that the overview of how this process will be orchestrated using data factory and metadata table parts of the process, in the next part of this series of posts we’ll look at what’s going on in the landing step in more depth.

See you next time… 🫡

Leave a Reply

Discover more from Aventius

Subscribe now to keep reading and get access to the full archive.

Continue reading