SQL 2008 MERGE using Dynamic SQL over a Linked Server
Yet another reason to upgrade to 2008; MERGE. I wrote this for a BI project I was working on recently.. This SP is self explanatory. It Uses dynamic SQL to build a MERGE statement over a linked server with some additional parameters; i.e. Oracle vs. SQL, limit the source data, etc.
I'm using this in combination with SSIS as part of an ETL task. i.e. use a data flow task to interate through the sys.tables in a given schema and put the table names into an ADO recordset object, then execute this SP using an expression inside a for each loop container.
Enjoy! (just give me some credit if you use it :)-)
IF OBJECT_ID('dbo.usp_dynamic_merge') IS NOT NULL
DROP PROCEDURE dbo.usp_dynamic_merge
go
IF @@VERSION NOT LIKE '%10.0.%'
RAISERROR ('MERGE is only supported on SQL 2008 and above.', 16, 1)
go
CREATE PROCEDURE dbo.usp_dynamic_merge
@local_schema_name sysname,
@remote_server_name sysname,
@remote_database_name sysname,
@remote_schema_name sysname,
@local_table_name sysname,
@remote_table_name sysname,
@merge_last_modified_enabled bit,
@merge_last_modified_column sysname,
@merge_last_modified_dt datetime,
@merge_delete_enabled bit,
@is_oracle bit,
@debug_enabled bit,
@output_enabled bit
WITH ENCRYPTION
AS
/*
Name: dbo.usp_dynamic_merge
Contact: Tommy Bollhofer (tbollhofer2@gmail.com)
Purpose: Uses dynamic SQL to build and execute a MERGE statement across a linked server.
Last Modified: 03/24/2009
Usage Examples:
This example will execute the procedure using a remote ORACLE RDBMS as the source, limit the number of rows returned using
'MODIFY_DT', merging on the primary/composite key, delete disabled, output enabled, and with debug enabled (i.e. printing the
dynamic sql statement as opposed to executing it):
EXEC dbo.usp_dynamic_merge
@local_schema_name = 'dbo',
@remote_server_name = 'ORACLE_10G',
@remote_database_name = NULL,
@remote_schema_name = 'FOO',
@local_table_name = 'TEST',
@remote_table_name = 'TEST',
@merge_last_modified_enabled = 1,
@merge_last_modified_column = 'MODIFY_DT',
@merge_last_modified_dt = NULL,
@merge_delete_enabled = 0,
@is_oracle = 1,
@debug_enabled = 1,
@output_enabled = 1
This example will execute the procedure using a remote MSSQL RDBMS as the source, passing the (3) part name, without limits on the
number of rows returned at the source merging on the primary/composite key, delete disabled, output enabled, and with debug enabled:
EXEC dbo.usp_dynamic_merge
@local_schema_name = 'Person',
@remote_server_name = 'SQL_2008',
@remote_database_name = 'AdventureWorks2008',
@remote_schema_name = 'Person',
@local_table_name = 'Person',
@remote_table_name = 'Person',
@merge_last_modified_enabled = 0,
@merge_last_modified_column = NULL,
@merge_last_modified_dt = NULL,
@merge_delete_enabled = 0,
@is_oracle = 0,
@debug_enabled = 1,
@output_enabled = 1
*/
SET NOCOUNT ON
DECLARE @merge_join_clause varchar(max),
@merge_dynamic_sql nvarchar(max),
@pk_output sysname,
@pk_1 sysname,
@pk_2 sysname,
@pk_dynamic_sql nvarchar(max),
@dynamic_insert_clause varchar(max),
@dynamic_update_clause varchar(max)
IF @merge_last_modified_enabled = 1 AND @merge_last_modified_dt IS NULL
BEGIN
--SET A DEFAULT OF 3 DAYS IF NULL IS PASSED IN
SET @merge_last_modified_dt = DATEADD(dd, DATEDIFF(dd,0,getutcdate())-3, 0)
END
DECLARE @pk_table TABLE
(
table_qualifier sysname,
table_owner sysname,
table_name sysname,
column_name sysname,
key_seq int,
pk_name sysname
)
DECLARE @insert_table TABLE
(
row_id int identity(1,1),
insert_column varchar(max)
)
DECLARE @insert_table_merge TABLE
(
row_id int identity(1,1),
insert_statement varchar(max)
)
DECLARE @update_table TABLE
(
row_id int identity(1,1),
update_column varchar(max)
)
DECLARE @update_table_merge TABLE
(
row_id int identity(1,1),
update_statement varchar(max)
)
INSERT INTO @pk_table
EXEC sp_pkeys @local_table_name, @local_schema_name
SELECT @pk_output = column_name
FROM @pk_table
WHERE key_seq = 1
IF (SELECT COUNT(*) FROM @pk_table) > 1
BEGIN
SELECT @pk_1 = 'ON ' + @local_schema_name + '_TARGET.' + column_name + ' = ' + @local_schema_name + '_SOURCE.' + column_name FROM @pk_table WHERE key_seq = 1
SELECT @pk_2 = 'AND ' + @local_schema_name + '_TARGET.' + column_name + ' = ' + @local_schema_name + '_SOURCE.' + column_name FROM @pk_table WHERE key_seq = 2
SELECT @merge_join_clause = @pk_1 + ' ' + @pk_2
END
ELSE
BEGIN
SELECT @pk_1 = 'ON ' + @local_schema_name + '_TARGET.' + column_name + ' = ' + @local_schema_name + '_SOURCE.' + column_name from @pk_table where key_seq = 1
SELECT @merge_join_clause = @pk_1
END
INSERT INTO @insert_table (insert_column)
SELECT STUFF(sc.name, 1, 0, @local_schema_name + '_SOURCE.')
FROM sys.all_columns sc WITH(NOLOCK)
WHERE SC.OBJECT_ID=OBJECT_ID(@local_schema_name + '.' + @local_table_name)
ORDER BY column_id
INSERT INTO @insert_table_merge
SELECT DISTINCT STUFF((SELECT ',' + b.insert_column
FROM @insert_table b
FOR XML PATH('')),1,1,'')
FROM @insert_table a
SELECT @dynamic_insert_clause = insert_statement
FROM @insert_table_merge
INSERT INTO @update_table (update_column)
SELECT STUFF(sc.name, 1, 0, @local_schema_name + '_TARGET.') + ' = ' + STUFF(sc.name, 1, 0, @local_schema_name + '_SOURCE.')
FROM sys.all_columns sc WITH(NOLOCK)
WHERE SC.OBJECT_ID=OBJECT_ID(@local_schema_name + '.' + @local_table_name)
ORDER BY column_id
INSERT INTO @update_table_merge
SELECT DISTINCT STUFF((SELECT ',' + b.update_column
FROM @update_table b
FOR XML PATH('')),1,1,'')
FROM @update_table a
SELECT @dynamic_update_clause = 'UPDATE SET ' + update_statement + ' '
FROM @update_table_merge
SET @merge_dynamic_sql =
CASE WHEN @output_enabled = 1
THEN 'DECLARE @merge_output TABLE (table_name sysname,operation sysname,value int);'
ELSE ''
END
+ ' '
+ 'MERGE ' + @local_schema_name + '.' + @local_table_name + ' AS ' + @local_schema_name + '_TARGET '
+ 'USING OPENQUERY(' + @remote_server_name + ', '
+
CASE WHEN @remote_database_name IS NOT NULL
THEN '''SELECT ' + REPLACE(@dynamic_insert_clause,@local_schema_name + '_SOURCE.','') + ' FROM ' + @remote_database_name + '.' + @remote_schema_name + '.' + @remote_table_name
ELSE '''SELECT ' + REPLACE(@dynamic_insert_clause,@local_schema_name + '_SOURCE.','') + ' FROM ' + @remote_schema_name + '.' + @remote_table_name
END
+
CASE
WHEN @merge_last_modified_enabled = 1 AND @is_oracle = 1
THEN ' WHERE ' + @merge_last_modified_column + ' >= TO_DATE(' + ''''''
+ CAST(DATEPART(YYYY,@merge_last_modified_dt) AS CHAR(4)) + '-'
+ RIGHT(CAST(100+DATEPART(MM,@merge_last_modified_dt) AS CHAR(3)),2) + '-'
+ RIGHT(CAST(100+DATEPART(DD,@merge_last_modified_dt) AS CHAR(3)),2) + ' '
+ RIGHT(CAST(100+DATEPART(HH,@merge_last_modified_dt) AS CHAR(3)),2) + ':'
+ RIGHT(CAST(100+DATEPART(MI,@merge_last_modified_dt) AS CHAR(3)),2) + ':'
+ RIGHT(CAST(100+DATEPART(SS,@merge_last_modified_dt) AS CHAR(3)),2)
+ '''''' + ', ''''yyyy-mm-dd hh24:mi:ss'''')'
ELSE ''
END
+
CASE
WHEN @merge_last_modified_enabled = 1 AND @is_oracle = 0
THEN ' WHERE ' + @merge_last_modified_column + ' >= ' + ''''''
+ CAST(DATEPART(YYYY,@merge_last_modified_dt) AS CHAR(4)) + '-'
+ RIGHT(CAST(100+DATEPART(MM,@merge_last_modified_dt) AS CHAR(3)),2) + '-'
+ RIGHT(CAST(100+DATEPART(DD,@merge_last_modified_dt) AS CHAR(3)),2) + ' '
+ RIGHT(CAST(100+DATEPART(HH,@merge_last_modified_dt) AS CHAR(3)),2) + ':'
+ RIGHT(CAST(100+DATEPART(MI,@merge_last_modified_dt) AS CHAR(3)),2) + ':'
+ RIGHT(CAST(100+DATEPART(SS,@merge_last_modified_dt) AS CHAR(3)),2)
+ '.000'
+ ''''''
ELSE ''
END
+ ''') AS ' + @local_schema_name + '_SOURCE '
+ @merge_join_clause
+ ' '
+ 'WHEN MATCHED THEN '
+ @dynamic_update_clause
+ 'WHEN NOT MATCHED BY TARGET THEN '
+ 'INSERT (' + REPLACE(@dynamic_insert_clause,@local_schema_name + '_SOURCE.','') + ') '
+ 'VALUES (' + @dynamic_insert_clause + ') '
+
CASE WHEN @merge_delete_enabled = 1
THEN 'WHEN NOT MATCHED BY SOURCE THEN DELETE '
ELSE ''
END
+
CASE WHEN @output_enabled = 1
THEN 'OUTPUT ' + '''' + @local_schema_name + '.' + @local_table_name + + '''' + ', '
+ '$action, '
+ 'isnull(inserted.' + @pk_output + ',deleted.' + @pk_output + ') '
+ 'INTO @merge_output; '
+ 'SELECT table_name, '
+ 'operation, '
+ 'count(value) as total '
+ 'FROM @merge_output '
+ 'GROUP BY operation, '
+ 'table_name;'
ELSE ';'
END
IF @debug_enabled = 0
BEGIN
EXEC sp_executeSQL @merge_dynamic_sql;
END
ELSE IF @debug_enabled = 1
BEGIN
PRINT @merge_dynamic_sql
END
GO