Introduction
ETL processing, generally involves copying/moving, transforming, cleaning the records/transactions from one or multiple sources. Most of the batch processing or warehousing projects involve such data processing in millions on daily/weekly basis. Typically, there is a Staging area and production area. Records are cleaned, transformed, filtered and verified from staging to production area. This demands SQL Set theory based queries, parallel processing with multiple processors/CPU. The article focuses on need of SQL Set theory approach and parallel processing while processing large volume of ETL records using programming approach. This article is NOT a comparison between stored procedures and ETL Tools, and it does NOT detail hardware features/requirements.
Parallel Data Processing - Concept
Typically, large warehousing or batch processing systems are equipped with good hardware. Processing of high volume data usually happens in the time window where there is less activities happening on the servers. Considering this aspect, the hardware resources/features like multi-CPU, multi-processor, etc. can be used to its maximum strength during data batch load into warehouses/ODS/ETL environments.
Most of the new versions of databases are focusing on database internals to implement as much as intelligence during SQL Query execution and in other database activities. This article focuses on dividing the huge data into smaller chunks to execute/process in parallel. With the combination of dividing large volume data into smaller chunks AND database internal parallelism in SQL execution, we can expect good performance gains.
The batch processing, ODS/DWH/ETL kind of applications involves below activities.
- Validating
- Filtering
- Transforming
- Copying/Moving to Production area
First three steps may involve few/many business logics. For each record, a set of condition checks and data transformations need to be applied before moving it to production space. These condition checks and transformations may involve joining other tables, looking for specific data values, etc. Some of the business logics may require a set of operation/statements to be executed as part of validation/transformation. All these should be applied for every incoming record. Now, processing these records sequentially in a record by record manner may consume more time. As much as possible, we need to apply SQL Set theory, parallel data processing and function based approaches to process quickly.
Considering the improvements/enhancements provided with the new releases of Databases, the scope of performance tuning is going beyond the SQL query tuning. The database internals are becoming more and more intelligent in data read/writes/management. Even the poorly coded SQL queries are translated into reasonable/fair execution plans by the optimizers. Given such situations, there is a need to look beyond SQL queries or indexes or statistics, etc. That is what triggering usage/improvements in Partitions, Materialized/Indexed Views, multiple intermediate storage/state of data, etc. This article is an effort to share the idea of processing huge volume of data in parallel fashion with SQL programs.
Illustration
Below sections explains one of the ways of divide and conquer method data parallelism. Pseudo code / algorithm is given instead of code to keep it independent of database. Few tables that are required for recording the status/history-audit and for customization parameters used during program execution are also explained.
Assume there is a daily feed coming in .CSV format, it is being read and data is stored in Staging tables in a Database. From staging to production, data need to be processed by applying business rules in data validation and transformation steps.
Tables – Data Dictionary
tblFileStatus
To store information about the incoming .CSV files.
Column Name | Data Type | Comments |
FileID | INT | Primary Key |
FileName | Char | Name of the incoming CSV file |
FileType | Char | Type of the file if any to be recorded |
TotalRecords | INT | Total number records in the CSV file |
IsProcessed | INT | To identify whether file is processed from Staging to Production area. 0 – No 1 – Yes 9 – Error (Manual intervention is required to solve the problem) |
CreateDateTime | DateTime | Audit column to record creation date |
UpdateDateTime | DateTime | Audit column to record data update date |
tblParamCustomize
To record information about customizable parameters which are used during the program execution. The value of these parameters can be changed if needed depending on the requirement.
Column Name | Data Type | Comments |
ParamID | INT | Primary Key |
ParamName | Char | Name of the parameter |
ParamValue | Char | Value of the parameter |
ParamPurpose | Char | Purpose of the parameter |
IsEnabled | INT | To identify whether parameter is enabled/disabled |
CreateDateTime | DateTime | Audit column to record creation date |
UpdateDateTime | DateTime | Audit column to record data update date |
Few parameters in tblParamCustomize are explained below…
ParamName | ParamValue | ParamPurpose |
MaxThreads | 5 | Maximum threads that can be spawned |
MaxRecsInThread | 10000 | Maximum records that a thread should process |
MaxThreadTryCount | 2 | Maximum number of times thread can be tried in case of errors. |
tblFileProcessing
To record the thread/job information spawned by program for processing the data.
Column Name | Data Type | Comments |
FileProcessingID | INT | Primary Key |
FileID | INT | Foreign Key to tblFileStatus.FileID |
ThreadInstance | INT | Identification sequential number of the thread/job spawned |
ThreadStartRecord | INT | Starting record number that the thread should start processing |
ThreadEndRecord | INT | Ending record number that the thread should end processing |
ThreadStartTime | DateTime | Execution start time of the Thread/Job |
ThreadEndTime | DateTime | Execution end time of the Thread/Job |
ThreadLastBreath | DateTime | Update from thread from time/step to time/step about its life/existence |
ThreadStatus | INT | Status of the Thread 0 – Not started 1 – Completed 2 – Under Process 9 – Error 3 – Processing Validation rules 4 – Processing Data Transformation rules 5 – Copying records to production 10 – Thread is missing from track, no updates from it from long time |
ThreadDBInfo1 | Char | To record database internal information about the thread being run |
ThreadDBInfo2 | Char | To record database internal information about the thread being run |
ThreadDBInfo3 | Char | To record database internal information about the thread being run |
CreateDateTime | DateTime | Audit column to record creation date |
UpdateDateTime | DateTime | Audit column to record data update date |
tblFileProcessingHistory
To record every change that happens in tblFileProcessing data. In case troubleshooting the errors/problems of the threads processing, the history of changes gives clarity on the processing/timing of processing of different steps/stages of a thread/job.
The table structure is same as tblFileProcessing with additional columns detailed below.
Column Name | Data Type | Comments |
AudFileProcessingID | INT | Primary Key |
AudCreateDateTime | DateTime | Audit column to record creation date of history record. Since it is only inserts happen on this table, updateDateTime column is not required. |
tmpDataTransaction
Assume that the daily feed .CSV file data is put into one staging table. The data need to be processed from this Transactions staging table to production. Before putting to production, data undergoes lot of validation/transformation. These validations/transformations logics are applied to many columns in every record. To carryout all these activities, let us have a temporary table called tmpDataTransaction which acts as an intermediate stage between Staging and Production area. This will be same structure of staging table with additional columns for recording errors/status which are detailed below. This tmpDataTransaction tables should be created with nologging option, by making it nologging, logs are not generated for insert/update/delete/truncate which saves good amount of time without a need to write into Redo/Undo logs.
Additional columns required in tmpDataTransaction table
Column Name | Data Type | Comments |
RecordStatus | INT | 0 – Not processed record 1 – Valid to process 8 - Warning 9 – Error |
Errors | Char | For recording Error(s) |
RecordID | INT | PrimaryKey, in case if it is missing in Staging tables. |
Data Processing
Let us look at the steps involved in processing the records from Staging to Production.
Two programs algorithm is given below, one is the main program which decides number of threads to spawn and monitor/control the threads. Another sub program which actually carry out the processing of data.
Thread spawning Program
Main program which spawns threads/jobs, also keeps track of execution details and status of threads/jobs.
Step 1 and 2 refers to picking the file for processing
Step 3 and 4 refers to spawning the jobs/threads
Step 5 to 7 refers to the threads responsibility to update information about its status/details
Step 8 refers to handling the errors if any during sub-program execution
Step 9 refers to handling business logics which requires complete set of incoming data
Step 10 refers to concluding the processing
1. Scan tblFileStatus table for unprocessed records in ascending order as per CreateDateTime column
2. Pick up one file which is not processed to Production
3. Depending on MaxThreads and MaxRecsInThread in tblParamCustomize and TotalRecords value in tblFileStatus, derive number of threads to be spawned for processing the current file. Enter the details in tblFileProcessing with appropriate details in ThreadInstance, ThreadStartRecord, ThreadEndRecord. Note, if the total number of records cannot be processed in one cycle of maximum threads, repeat the cycle till all the records gets processed.
4. Fire job/thread – it is basically calling the same procedure/job by providing starting record and ending record numbers of the Staging table for processing. This step calls “Data processing Program” (sub-program explained below). This step repeated based on the number of threads derived from the step 3.
5. Now the required number of threads are fired for execution. Each thread is processing the data as per their respective Starting Record Number and Ending Record Number. Each thread should update ThreadDBInfo columns in tblFileProcessing table with appropriate database ProcessID and other required details. Also, thread should update ThreadStatus column.
6. Each thread should update their respective ThreadEndTime, ThreadStatus in tblFileProcessing once it completes execution.
7. Every X minutes i.e. every 3 or 5 minutes, tblFileProcessing is checked for completion of all threads using ThreadStatus column.
8. If any of the threads reported Errors during execution, then the same should be fired again for processing. Depending on the MaxThreadTryCount customizable parameter, stop executing the thread if it reports errors repeatedly. Manual intervention is required at this stage.
9. Some of the business logics may need complete set of incoming data to validate certain rules. For example, elimination of duplicate records if any in the file. But while running individual threads on sub-set of data, these validations could not be performed because thread is focusing only on specific/sequential sub-set of data. Such rules can be executed at this step OR before spawning the jobs/threads step i.e. before Step 4.
10. If all the threads have completed the processing without errors, then update the IsProcessed status accordingly in tblFileStatus and exit the program.
Data Processing program
Actual data processing is done here. The validation/transformation plus copying to production is carried out depending on Starting record and ending record numbers.
Step 1 refers to initiating the processing/setup of threads/jobs
Step 2 to 4 refers to processing transformation/validation checks
Step 5 refers to copying records to Production area
Step 6 refers to handling errors while copying records Production area in bulk batches
Step 7 and 8 refers to monitoring the progress of thread/job
1. Copy records as per Starting Record and Ending Record Number parameters from Staging table to tmpDataProcessing table. This step can be done in one shot by copying all records from Staging to tmpDataProcessing before spawning the threads. Keeping this step here makes it a parallel task.
2. In the execution of Step 3 and 4, scope should be given to Set theory SQL statements and function based SQL statements. Try to process business logics code/sql on the complete set of records that the current Thread need to process. Avoid record by record processing as much as possible, even if business logic requires s set of statements to be performed, consider using function based SQL and Function based indexes database features. Few example SQL Set theory based queries are given below.
a. UPDATE tmpDataTransaction SET RecordStatus = 3 WHERE fldFirstName is null or fldDOB is null fldCity is null
b. Update tmpDataTransaction SET RecordStatus = 2 WHERE fnQuarterCalcTranscation(AccountID) < 5
c. Update tmpDataTransaction t1, mstAccountType t2 SET t1.AccountName = t2.AccountName WHERE t1.AccountType = t2.AccountType
Executing the validation checks
After each below steps it set theory, not record by record, in single statement update the invalid column to 1 wherever applicable… introduce function if code is more… if impossible, such vericiation can be carried out in set only once using one scan only… e.g.
3. Execute all Data validation checks against the set of records and mark the RecordStatus column accordingly like Valid, Erroneous/Invalid, Warning status. The erroneous/invalid records need to be filtered in the consecutive validation SQL/steps i.e. SQLs should include extra condition “WHERE RecordStatus != 3 “ (RecordStatus = 3, is erroneous/invalid record).
4. Execute all Data Transformation logics for the valid records.
5. After steps 3 and 4, the validated and transformed records need to be pushed to production database/tables. This may involve copying records to one or multiple tables. We can consider two options for step.
a. Copy all valid records starting from Start Record Number to End Record Number from tmpDataTransaction to Production tables. If there are constraints in doing it, then try option B.
b. Copy records in bulk batches like 500 or 1000 records in loop.
Data Commit/Rollback statements/logs/settings should be handled properly in both the options.
6. During execution of 5th step, option b, if we encounter errors, for that particular batch of 500 or 1000 records, record by record insertion need to be done. This allows to filter only the particular erroneous records that are causing problem in bulk insert.
7. Update tblFileProcessing.ThreadStatus periodically like after validation checks, after transformation rules, in between every batch bulk insertion into production area, etc. This assists in monitoring the jobs/threads. If the thread has any problems or it has gone to hanging state or taking more time than expected, then the ThreadStatus and other time columns in tblFileProcessing tables plus records in tblFileProcessingHistory table gives us an idea about the task status/progress.
8. After completion of all steps, update tblFileProcessing.RecordStatus as processed for this particular thread.
The above algorithm of “Data Processing program” uses only one tmpDataTransaction table based on starting record and ending record number each thread works on its own set of data. If required, we can increase the maximum records processed by a thread to higher value, create individual tmpDataTransaction table for each thread, this allows each thread to work on its own table for all validation/transformation. Since nologging is used while creating these temporary intermediate tables, the time to log information in redo/undo logs is saved.
Additional
Additional thoughts can be put on like, partitioning the tmpDataTransaction table and production tables based on category/type of records like city wise, date range wise, transaction type wise, etc. This helps database optimizer to work on independent set of data. In many new versions of the databases, the optimizer tries to introduce as much as parallelism as possible while executing the query. Looking/Tuning the Database settings based on project needs can also contribute in performance.
Conclusion
As discussed in the article, while looking at the performance in batch/ETL kind of data processing, considering/using of set theory, processing data in chunks in parallel, having nologging temporary intermediate stages in validation/transformation logics results/yields more gain in performance. While developing ETL/Batch processing SQL Programs, we can plan plus implement innovative approaches in design/coding. Hence, looking at SQL Set theory, function based SQL/Indexes, parallel processing in chunks, studying execution plans of query for tuning purpose plus using other new features of the database; processing of huge volume data can be made faster.