This article implements an automatization of SSIS Data Pump method described at http://www.sqlservercentral.com/articles/integration+services+(ssis)/72493/. The method SSIS Data Pump represents the most effective way to load large amount of data into a SQL Server database because the SSIS Data Pump makes optimal usage of the transaction logs, parallelism, locking, and pipelining. A more detailed explanation is in the referenced article.
Imagine the following scenario: I have a large filegroup containing 20 tables , and the filegroup has a lot of unused space. If I want to recover the unused space the best way is to create a new filegroup, rename the old tables , create the new tables on the new filegroup and then copy the rows from old tables into new tables . To copy 20 tables, I could manually create 20 SSIS packages and run them, but it would be nice to have a T-SQL stored procedure that, given a T-SQL select statement, executes it and loads the resulting rows into a given table using the SSIS Data Pump.
This article presents an implemented and tested solution that loads data using a dynamically generated SSIS package. The solution is made up of 2 components :
- An SSIS package named BULK_TRANSFER_DATA.dtsx
- the stored procedure Direct_Path_Insert
Here is a snapshot of how BULK_TRANSFER_DATA looks like
The core of the solution is the VB script Create_DTSX, which creates an SSIS package using values contained into package variables that work as input parameters. This code is similar to that shown these two links:
- http://msdn.microsoft.com/en-us/library/ms345167.aspx
- http://social.msdn.microsoft.com/forums/en-US/sqlintegrationservices/thread/3f513155-6b09-45bc-b913-a24906d1b580
The code is shown below:
Public Sub Main() Dim packageName As String = Dts.Variables("DEST_TABLE_NAME").Value.ToString Dim threads As Integer = 1 orderNumber = 1 '-------------------------------------------------------------------------------------------- app = New Application() Dts.Events.FireInformation(0, "Bulk Transfer Package", packageName, "", 0, True) Dim srcDbConnectionString As String = Dts.Variables("SOURCE_CONNECT").Value.ToString Dim destDbConnectionString As String = Dts.Variables("DEST_CONNECT").Value.ToString '-------------------------------------------------------------------------------------------- 'Create package dataTransferPackage = New Package() dataTransferPackage.LocaleID = 1029 'Create SourceConection Dim srcConn As Microsoft.SqlServer.Dts.Runtime.ConnectionManager = dataTransferPackage.Connections.Add("OLEDB") srcConn.Name = "OLEDB Source" srcConn.ConnectionString = srcDbConnectionString 'Create DestConection Dim destConn As Microsoft.SqlServer.Dts.Runtime.ConnectionManager = dataTransferPackage.Connections.Add("OLEDB") destConn.Name = "OLEDB Destination" destConn.ConnectionString = destDbConnectionString Dim SQLCommand As String = Dts.Variables("SOURCE_SELECT").Value.ToString Dim schemaTableName As String = Dts.Variables("DEST_TABLE_NAME").Value.ToString '------------------------------------------------------ 'Create DataFlow task Dim ePipeline As Executable = dataTransferPackage.Executables.Add("STOCK:PipelineTask") Dim thMainPipe As Microsoft.SqlServer.Dts.Runtime.TaskHost = CType(ePipeline, Microsoft.SqlServer.Dts.Runtime.TaskHost) thMainPipe.Name = Format(orderNumber, "000000") + "_copy_Table_Task_" + schemaTableName orderNumber += 1 Dim dataFlowTask As MainPipe = CType(thMainPipe.InnerObject, MainPipe) dataFlowTask.DefaultBufferSize = 50 * 1024 * 1024 ' FastLoadMaxInsertCommitSize will have the same value of DefaultBufferMaxRows ' It represents the size in rows of a commited batch ' If I want to have all in a single transaction, must choose 0 dataFlowTask.DefaultBufferMaxRows = 20000 ' dataFlowTask.BLOBTempStoragePath = dataTransferCache ' dataFlowTask.BufferTempStoragePath = dataTransferCache Dts.Events.FireInformation(0, "DataFlow task created for table", schemaTableName, "", 0, True) '-------------------------------------------------------------------------------------------- 'Create OLEDB Source Dts.Events.FireInformation(0, "Creating OLEDB source - for table", schemaTableName, "", 0, True) Dim compSrc As IDTSComponentMetaData100 = dataFlowTask.ComponentMetaDataCollection.New() compSrc.ComponentClassID = "DTSAdapter.OleDbSource.2" compSrc.Name = "OLEDBSource_" + schemaTableName ' Initialize the component. Dim instSrc As CManagedComponentWrapper = compSrc.Instantiate() instSrc.ProvideComponentProperties() 'Dts.Events.FireInformation(0, "Set Connection OLEDB source - for table", "[" + schemaName + "].[" + tableName + "]", "", 0, True) compSrc.RuntimeConnectionCollection(0).ConnectionManagerID = dataTransferPackage.Connections("OLEDB Source").ID compSrc.RuntimeConnectionCollection(0).ConnectionManager = DtsConvert.GetExtendedInterface(dataTransferPackage.Connections("OLEDB Source")) compSrc.CustomPropertyCollection.Item("DefaultCodePage").Value = 1250 Dts.Events.FireInformation(0, "OLEDB source query", SQLCommand, "", 0, True) instSrc.SetComponentProperty("AccessMode", 2) ' Table or view ... 0, SQL Command ... 2, Table or view - fast 3 instSrc.SetComponentProperty("SqlCommand", SQLCommand) ' Reinitialize the metadata. instSrc.AcquireConnections(vbNull) instSrc.ReinitializeMetaData() instSrc.ReleaseConnections() instSrc = Nothing compSrc.Name = "OLEDBSource_" + schemaTableName Dim srcOutput As IDTSOutput100 = compSrc.OutputCollection(0) Dts.Events.FireInformation(0, "SSIS Tasks created for table", schemaTableName, "", 0, True) '-------------------------------------------------------------------------------------------- 'Create OLEDB Destination Dts.Events.FireInformation(0, "Creating OLEDB dest - for table", schemaTableName, "", 0, True) Dim compDest As IDTSComponentMetaData100 = dataFlowTask.ComponentMetaDataCollection.New() compDest.ComponentClassID = "DTSAdapter.OleDbDestination.2" compDest.Name = "OLEDBDest_" + schemaTableName ' Initialize the component. Dim instDest As CManagedComponentWrapper = compDest.Instantiate() instDest.ProvideComponentProperties() compDest.RuntimeConnectionCollection(0).ConnectionManagerID = dataTransferPackage.Connections("OLEDB Destination").ID compDest.RuntimeConnectionCollection(0).ConnectionManager = DtsConvert.GetExtendedInterface(dataTransferPackage.Connections("OLEDB Destination")) compDest.CustomPropertyCollection.Item("DefaultCodePage").Value = 1250 ' Set the custom properties instDest.SetComponentProperty("AccessMode", 3) instDest.SetComponentProperty("OpenRowset", "DBO." + schemaTableName) Dts.Events.FireInformation(0, "instDest", schemaTableName, "", 0, True) instDest.SetComponentProperty("FastLoadKeepIdentity", True) instDest.SetComponentProperty("FastLoadKeepNulls", True) instDest.SetComponentProperty("FastLoadMaxInsertCommitSize", dataFlowTask.DefaultBufferMaxRows) instDest.SetComponentProperty("FastLoadOptions", "CHECK_CONSTRAINTS ") ',ROWS_PER_BATCH = " + dataFlowTask.DefaultBufferMaxRows.ToString()) Dim destInput As IDTSInput100 = compDest.InputCollection(0) Dim path As IDTSPath100 = dataFlowTask.PathCollection.New() path.AttachPathAndPropagateNotifications(srcOutput, destInput) ' Reinitialize the metadata. instDest.AcquireConnections(vbNull) instDest.ReinitializeMetaData() compDest.Name = "OLEDBDest_" + schemaTableName Dts.Events.FireInformation(0, "instDest_ioriuno", schemaTableName, "", 0, True) Dim vdestInput As IDTSVirtualInput100 = destInput.GetVirtualInput() For Each vColumn As IDTSVirtualInputColumn100 In vdestInput.VirtualInputColumnCollection Dim vCol As IDTSInputColumn100 = instDest.SetUsageType(destInput.ID, vdestInput, vColumn.LineageID, DTSUsageType.UT_READWRITE) instDest.MapInputColumn(destInput.ID, vCol.ID, destInput.ExternalMetadataColumnCollection(vColumn.Name).ID) Next instDest.ReleaseConnections() instDest = Nothing '-------------------------------------------------------------------------------------------- dataTransferPackage.MaxConcurrentExecutables = threads dataTransferPackage.MaximumErrorCount = 100 app.SaveToXml(packageName + ".dtsx", dataTransferPackage, Nothing) Dts.TaskResult = ScriptResults.Success End Sub
The BULK_TRANSFER_DATA first creates an SSIS package named the same as the destination table. It then executes the package using a built-in SSIS ExecutePackage component before and deleting the package. The dynamically created package contains a DataFlow that contains a simple OleDb Source - OleDb Destination mapping.
The OleDb Source is created from the value of the package variable SOURCE_SELECT, which is the concatenation of the variables SELECT_CLAUSE + WHERE_CONDITION. The OleDB Destination inserts data into a table named the same as the value of the variable DEST_TABLE_NAME.
Let's now look at the stored procedure. The procedure first creates a Windows batch file named the same as the destination_table and then launches it. The batch file sets the working directory as the value of @work_dir, and then launches BULK_TRANSFER_DATA.
The prerequistes for Direct_Path_Insert to run are :
- The destination table exists
- The columns generated by the source select must have the identical name (case sensitive), datatype, and length as the columns of the destination table.
The procedure code is shown here:
/* Created by F Iori 20110427 Perform Bulk Insert, generating dynamically a dtsx , running it , deleting it Setup : copy into @work_dir on the server BULK_TRANSFER_DATA.dtsx and thats all @dest_tab must be without schema name , is dbo by default @select_clause column names are case sensitive , they should be the same name, type and number as dest_tab elseway the proc will fail *//* declare @dest_tab varchar(200) = 'drop_GSM_CDR_PTC_TRAFFIC_ACTUAL' -- Schema is always dbo , @src_db varchar(200) = 'DWHSK_TARGET_PROD' , @dest_db varchar(200) = 'DWHSK_STAGE_PROD' , @instance_name varchar(50) = 'NTSQLDWHSKT01\I03' -- select clause column names are case sensitive ,@select_clause varchar(2000) = 'select top 350 CDR_ID, File_ID, Exchange_ID , Charging_Start_Time , Create_Date from GSM_CDR_PTC_TRAFFIC_ACTUAL where 1=1 ' , @wherecond varchar(2000) = ' and Charging_Start_Time between ''2011-03-01'' and ''2011-03-02'' ' -- or empty string exec dwhsk_warehouse.dbo.Direct_Path_Insert @dest_tab , @src_db , @dest_db , @instance_name , @select_clause , @wherecond */CREATE proc [dbo].[Direct_Path_Insert] ( @dest_tab varchar(200) , @src_db varchar(50), @dest_db varchar(50), @instance_name varchar(50), @select_clause varchar(2000) , @wherecond varchar(2000) ) as declare @ret int , @cmd varchar(8000) = '' , @cmds varchar(8000) = '' , @work_dir varchar(100)= 'N:\Data1\SS_Direct_Path_Insert\' begin set @cmd = ' DTexec.exe /File "BULK_TRANSFER_DATA.dtsx" ' + ' /SET "\package.Variables[User::DEST_TABLE_NAME].Value";"'+@dest_tab+'" ' + ' /SET "\package.Variables[User::SOURCE_DB].Value";"'+@src_db+'" ' + ' /SET "\package.Variables[User::DEST_DB].Value";"'+@dest_db+'" ' + ' /SET "\package.Variables[User::INSTANCE_NAME].Value";"'+@instance_name+'" ' + ' /SET "\package.Variables[User::SELECT_CLAUSE].Value";"'+@select_clause+'" ' + ' /SET "\package.Variables[User::WHERE_CONDITION].Value";"'+@wherecond+'" ' print @cmd set @cmds= ' del '+@work_dir+@dest_tab+'.* ' exec @ret= master.sys.xp_cmdshell @cmds , no_output set @cmds= 'echo '+LEFT(@work_dir,1)+': > '+@work_dir+@dest_tab+'.bat ' exec @ret= master.sys.xp_cmdshell @cmds , no_output set @cmds= 'echo cd '+@work_dir+' >> '+@work_dir+@dest_tab+'.bat ' exec @ret= master.sys.xp_cmdshell @cmds , no_output set @cmds=' echo '+@cmd+' >> '+@work_dir+@dest_tab+'.bat ' exec @ret= master.sys.xp_cmdshell @cmds , no_output set @cmds = ' '+@work_dir+@dest_tab+'.bat ' exec @ret= master.sys.xp_cmdshell @cmds , no_output if @ret<>0 BEGIN declare @errmsg varchar(300) = @dest_tab+'.bat failed ' raiserror (@errmsg, 12, 1) RETURN END set @cmds= ' del '+@work_dir+@dest_tab+'.bat ' exec @ret= master.sys.xp_cmdshell @cmds , no_output end
Installation of this system is very easy :
- On the database server create a folder named the same as the variable @work_dir defined in the stored procedure Direct_Path_Insert.
- Copy into the folder @work_dir, the SSIS package BULK_TRANSFER_DATA.dtsx
- Create the stored procedure Direct_Path_Insert on any db of the instance.
Now the stored procedure can be called from a T-SQL script as :
exec Direct_Path_Insert <destination_table> , <source_db_name>, <destination_db_name>, @@servername, <source_select >, <where_condition for source select>
Example :
use DWHSK_STAGE_PROD go -- Create destination table select top 250 [CDR_ID] ,[File_ID] ,[Exchange_ID] , Charging_Start_Time , Create_Date into dwhsk_stage_prod.dbo.drop_GSM_CDR_PTC_TRAFFIC_ACTUAL from dwhsk_target_prod..GSM_CDR_PTC_TRAFFIC_ACTUAL where 1=2 -- Loads some rows into destination table declare @dest_tab varchar(200) = 'drop_GSM_CDR_PTC_TRAFFIC_ACTUAL' -- Schema is always dbo , @src_db varchar(200) = 'DWHSK_TARGET_PROD' , @dest_db varchar(200) = 'DWHSK_STAGE_PROD' , @instance_name varchar(50) = 'NTSQLDWHSKT01\I03' ,@select_clause varchar(2000) = ' select CDR_ID, File_ID, Exchange_ID , Charging_Start_Time , Create_Date from GSM_CDR_PTC_TRAFFIC_ACTUAL where 1=1 ' , @wherecond varchar(2000) = ' and Charging_Start_Time between ''2011-03-01'' and ''2011-03-02'' ' -- or empty string exec dwhsk_warehouse.dbo.Direct_Path_Insert @dest_tab , @src_db , @dest_db , @instance_name , @select_clause , @wherecond select * from dwhsk_stage_prod.dbo.drop_GSM_CDR_PTC_TRAFFIC_ACTUAL drop table dwhsk_stage_prod.dbo.drop_GSM_CDR_PTC_TRAFFIC_ACTUAL
At the end, the dynamically generated SSIS packages and Windows batch files are deleted from the working directory on the database server.
Please note that the insert operation performed by Direct_Path_Insert is not atomic, but commits inserted rows in batchs of 20000 rows. That is the value of dataFlowTask.DefaultBufferMaxRows in the VB script. If an error occurs, only the last batch is rolled back.
To have atomic behaviour, just set FastLoadMaxInsertCommitSize to 0 and all the inserted rows will be committed or rolled back in a single batch.
The name of the stored procedure Direct_Path_Insert has been chosen because offers similar functionality as Oracle Direct Path Insert ( insert with hint APPEND).