Blog Post

Control Flow Restartability in Azure Data Factory

,

I presented at SQL Saturday Pittshburgh this past weekend about populating your data warehouse with a metadata-driven, pattern-based approach. One of the benefits I mentioned is that it’s easy to employ this pattern for restartability.

For instance, let’s say I am loading data from 30 tables and 5 files into the staging area of my data mart or data warehouse, and one of table loads fails. I don’t want to reload the other tables I just loaded. I want to load the ones that have not been recently loaded. Or let’s say I have 5 dimensions and 4 facts, and I had a failure loading a fact table. I don’t want to reload my dimensions, and I only want to reload the failed facts. How do we accomplish this?

The Execution Pattern

First, let’s talk about my control and logging tables, as they facilitate the restartability.

I keep some synthetic metadata in my ETL schema to tell ADF what to execute or copy. This data lives in my ETLControl table.

CREATE TABLE [ETL].[ETLControl](
[ControlId] [int] IDENTITY(1,1) NOT NULL,
[SourceDataSourceID] [varchar](50) NOT NULL,
[SequenceOrder] [int] NULL,
[IsIncremental] [bit] NOT NULL,
[DestinationTable] [varchar](255) NULL,
[Command] [varchar](4000) NULL,
[IsLoadEnabled] [bit] NOT NULL,
[PipelineName] [varchar](50) NOT NULL,
[LastUpdated] [datetime] NOT NULL,
[DestinationDataSourceID] [varchar](50) NULL,
CONSTRAINT [PK_ETLControl] PRIMARY KEY CLUSTERED 
(
[ControlId] ASC
)
)

I have a row in ETLControl for each table to load, stored procedure to execute, or semantic model to refresh.

Results of querying the ETLControl table. Staging tables have a query used as the source of a copy activity. Reporting tables have a stored procedure used to transform and land data in the target table.

Each pipeline I run logs information to a table I call ETLLog.

CREATE TABLE [ETL].[EtlLog](
[ETLLogID] [bigint] IDENTITY(1,1) NOT NULL,
[TriggerPipelineRunID] [varchar](40) NULL,
[PipelineRunID] [varchar](40) NULL,
[PipelineName] [varchar](100) NULL,
[ParentPipelineName] [varchar](100) NULL,
[ExecStartDateTime] [datetime] NOT NULL,
[ExecEndDateTime] [datetime] NULL,
[ExecStatus] [varchar](20) NOT NULL,
[Tablename] [varchar](128) NOT NULL,
[SourceSystem] [nvarchar](40) NULL,
[InsertRowCount] [bigint] NULL,
[UpdateRowCount] [bigint] NULL,
[DeleteRowCount] [bigint] NULL,
[CTStartVersion] [bigint] NULL,
[CTEndVersion] [bigint] NULL,
CONSTRAINT [PK_ETLLog] PRIMARY KEY CLUSTERED 
(
[ETLLogID] ASC
)
)

This table tells me what pipeline was executed, what table was loaded or command executed, the start and end times, and the number of rows inserted, updated, or deleted as a result.

The ETLLog table allows me to easily query the average execution times for each table. It also allows me to see how often a single step in my process fails.

Data Factory Executor Pipeline

In my data factory, I have pipelines that serve 4 main roles: orchestration, execution, work, and utility. The orchestration pipelines mostly contain Execute Pipeline and other control flow activities. The execution pipelines usually have a Lookup activity followed by a ForEach activity. They call the worker pipelines, which actually do the work of copying data or calling a stored procedure to transform data. The utility pipelines are reusable and might be called in many different executions. For me, this usually includes parameterized pipelines that start/stop virtual machines, scale databases up/down, and refresh Power BI semantic models.

An executor pipeline that I use for calling the worker pipeline that copies data from source databases to the staging schema in my data mart is shown below.

A data factory pipeline with 5 activities. 1. Log the beginning of the pipeline using a stored procedure. 2. A lookup activity to get the queries to be executed. 3. A ForEach activity that executes a child pipeline. 4. Log the end of the stored procedure as successful. OR 5. Log the end of the stored procedure as failed.

The lookup uses a stored procedure that retrieves the queries against the source database which are stored in the ETLControl table.

The Stored Procedure

This stored procedure is where we implement the logic for restartability. This is done by querying the ETLLog table to see if the command has been executed in the last [n] minutes.

CREATE PROCEDURE [ETL].[spGetDBStageQueries] 
(@SourceSystem         VARCHAR(30),
 @RefreshThresholdMins INT = NULL)
AS
BEGIN
SET NOCOUNT ON;
IF ( @RefreshThresholdMins IS NULL 
OR @RefreshThresholdMins = 0 )
BEGIN
SELECT DestinationTable,
'Stage'            AS TargetSchema,
Command,
SED.DataSourceName AS SourceSystem,
SED.DatabaseServer AS SDBServer,
SED.DatabaseName   AS SDBName,
DED.DataSourceName AS TargetSystem,
DED.DatabaseServer AS DBDServer,
DED.DatabaseName   AS DBDName
FROM   ETL.ETLControl EC
INNER JOIN ETL.DataSource SED
 ON EC.SourceDataSourceID = SED.DataSourceID
INNER JOIN ETL.DataSource DED
 ON EC.DestinationDataSourceID = DED.DataSourceID
WHERE IsLoadEnabled = 1
    AND PipelineName = 'PL_Copy_DBToDB'
    AND SED.DataSourceName = @SourceSystem;
END
ELSE
        BEGIN
            SELECT DestinationTable,
                'Stage'            AS TargetSchema,
                Command,
                SED.DataSourceName AS SourceSystem,
                SED.DatabaseServer AS SDBServer,
                SED.DatabaseName   AS SDBName,
                DED.DataSourceName AS TargetSystem,
                DED.DatabaseServer AS DBDServer,
                DED.DatabaseName   AS DBDName
    FROM   ETL.ETLControl EC
    INNER JOIN ETL.DataSource SED
    ON EC.SourceDataSourceID = SED.DataSourceID
    INNER JOIN ETL.DataSource DED
    ON EC.DestinationDataSourceID = DED.DataSourceID
            WHERE  EC.IsLoadEnabled = 1
               AND EC.PipelineName = 'PL_Copy_DBToDB'
               AND SED.DataSourceName = @SourceSystem
               AND NOT EXISTS (
        SELECT 1
FROM   ETL.EtlLog EL
                WHERE  EL.PipelineName = EC.PipelineName
 AND EL.Tablename = EC.DestinationTable
 AND ((EL.ExecStatus = 'Success'
                  AND EL.ExecStartDateTime > 
                   DATEADD(MINUTE, -1 * @RefreshThresholdMins,    
                    CURRENT_TIMESTAMP))
  OR ExecStatus ='In Progress')      
               )
    END
END

The logic I used is:

  • If the start time of the pipeline execution that loaded the table is within @RefreshThresholdMins minutes of the current time and the execution was successful, don’t run the pipeline again.
  • If the pipeline is currently in progress, don’t run it again.
  • Otherwise, as long as the load is enabled, and the row matches the provided values for @SourceSystem and PipelineName, execute it.

You may want to implement slightly different logic. I have seen some people filter based upon end time instead of start time. But the start time tells me how “fresh” the data from the source system is, so that is what I care about most.

Also, I can rely on my execution status of “In Progress” only meaning that the pipeline is actually in progress. If the pipeline fails, I have error handling that marks that row in the ETLLog table as failed. If your pipeline can fail and leave the status set to “In Progress”, you’ll need to write different logic.

Control Flow vs Data Flow

The logic I have explained above is all or nothing at the worker pipeline level. This is within the control flow of my executor pipeline in ADF. I populate my dimensions and facts by using stored procedures that perform inserts, updates, and deletes. If the stored procedure successfully executed the updates and failed on the inserts, and ADF re-executes the procedure, it will run the entire stored procedure again.

I have change detection built in to my stored procedures to ensure rows are only inserted/updated/deleted once for each change detected. If I run the procedure multiple times, the rows that were “new” in the first run are no longer identified as new in subsequent runs. I also use TRY…CATCH blocks in my procedures, so if a failure occurs, I can roll back my transaction. This ensures my procedure is not partially-executed in the event of a failure.

How you handle failures within the stored procedure that transforms the data is a business process decision that might change from project to project. Would you rather get the updates and have partially updated data that is missing the rows to be inserted? This ensures that you have the most current data possible up until the failure occurred. If you would rather be sure that the updates and inserts both occur in the same run, then you can roll them back in the event of a failure. This gives assurance that the data in the dimension table is complete as of the last run until the current run is fully completed. Your choice here is mostly about supportability and making sure stakeholders understand the state of the data at any point.

Alternatives

You can go back and re-run a parameterized pipeline in ADF. So if your executor failed because a child worker pipeline failed, you can go back and execute the child pipelines until they succeed. But this is additional effort to determine what failed (unless you write some code to do this). Then you would need to re-execute your orchestration pipeline to continue the process, or run each subsequent executor pipeline until you have finished the full process.

There are other ADF patterns that achieve control flow restartability by storing a value in a variable in ADF. This is useful if you don’t have a control table and log table to reference. I appreciate the extra reporting I get from my ETLLog table, so it’s generally included on the projects I work.

Original post (opens in new tab)
View comments in original post (opens in new tab)

Rate

You rated this post out of 5. Change rating

Share

Share

Rate

You rated this post out of 5. Change rating