As organizations load an increasing amount of data from different sources into their data warehouses, monitoring these data loads becomes ever more crucial. Traditional database monitoring happens at the level of the whole database. When a solution incorporates Python scripts with SQL, businesses can monitor their data loads at individual tables and receive daily reports on which loads have succeeded and failed. While there are no commercial tool sets for table-level data load monitoring currently available, there is code that can be used to automate the process, generating reports that teams can use to quickly repair failed loads.
Overview of load monitoring with SQL and Python
In a load monitoring system that utilizes both SQL and Python, SQL queries access the database directly, performing actions like counting rows and calculating standard deviation, average, and threshold. Python code then serves as a “wrapper” around the SQL queries. Once written, Python scripts can be automated to run daily, summarize findings in reports, and email the reports to technical teams, making it easy for the teams to identify and fix problems proactively, rather than reactively. Adding Python to database querying increases capabilities, and the language is easy to understand and free to use.
For successful data monitoring, it is important to ensure that all data tables in the warehouse are monitored and that the monitoring of data loads is customized based on the structure of the loads. The basic steps to implement data monitoring are as follows:
- Create tables. This includes a table that stores the names of the tables to be monitored, as well as another table containing details and metadata about the data tables.
- Transform the data. This is accomplished by handling data types, merging tables, and performing calculations to determine how many loads were successful and which tables need to be checked.
- Report results. The script will automatically generate a report summarizing the results and then will send it as an email attachment.
Step-by-step code to implement data load monitoring
The load monitoring process is described in detail in the code snippets below.
Import Modules
Import all the required modules. These include those required to connect to the database, determine the date and time, manipulate dataframes, and access an email application.
import pandas as pd import numpy as np from datetime import datetime import datetime as dt import os import pyodbc import sqlalchemy import shutil import math import win32com.client as win32 from datetime import datetime import os import warnings warnings.filterwarnings("ignore")
Initialize all the server credentials
Initial the credentials, such as the ‘server name’ and ‘database name.’ Create a connection to SQL Server using an ODBC Driver. This allows access to the database that stores the data being monitored.
# Server Connection server = <'Server Name'> database = <'Database Name'> driver = 'ODBC+DRIVER+13+for+SQL+Server' conn = pyodbc.connect('Driver={SQL Server};SERVER='+server+';DATABASE='+database+';Trusted_Connection=yes;') conn.autocommit=True cursor = conn.cursor()
Find yesterday’s date
This will be necessary in order to determine which tables were modified yesterday. Here is the Python code to calculate this.
# Date Initialization today = pd.to_datetime('today').strftime("%Y-%m-%d") yesterday = pd.Series(today).astype('datetime64[ns]') - pd.Timedelta(1, unit='D') today = pd.Series(today).astype('datetime64[ns]')
Create a list of tables
A list of tables to monitor needs to be created, along with their corresponding schema and databases. This step will list all of the tables in the database, regardless of when they were modified.
# Common Table List Creation table_query=("""select distinct table_catalog, table_schema, table_name from <schema name>.INFORMATION_SCHEMA.TABLES""") df = pd.read_sql(table_query, conn) df.rename(columns={'table_catalog':'Database', 'table_schema':'Schema'}, inplace=True)
Gather details about the above tables
The previous step provided a comprehensive list of tables from the specified database and schema. In this particular case, all the tables in the data warehouse contain a column known as BATCH_AS_OF_DATE, which facilitates the tracking of incremental load changes. In other data warehouses, this column may be referred to by a different name such as “CURRENT_LOAD_DATA” or “ETL_LOAD_DATE.” If this is the case, simply replace the term “BATCH_AS_OF_DATE” in the provided code snippet accordingly.
In the next few lines, the average number and standard deviation of loads typically completed during weekdays, Saturdays, and Sundays will be calculated. These values will allow insight into load patterns and facilitate effective decision-making.
dbo_lst = pd.DataFrame() df2 = pd.DataFrame() for i in df.index: dbase = df['Database'] table = df['table_name'] schema = df['Schema'] query=(""" WITH _WeekdayAverages as ( SELECT AVG([row_count]) as weekday_average, '{0}' as table_name FROM ( SELECT COUNT(*) as [row_count] FROM [{1}].[{2}].[{3}] WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (2,3,4,5,6) --monday thru friday GROUP BY [BATCH_AS_OF_DATE] ) RowCounts ), _WeekdayStdiv as ( SELECT ROUND(STDEV([row_count]), 0) as weekday_stdev, '{4}' as table_name FROM ( SELECT COUNT(*) as [row_count] FROM [{5}].[{6}].[{7}] WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (2,3,4,5,6) --monday thru friday GROUP BY [BATCH_AS_OF_DATE] ) RowCounts ), _SaturdayStdiv as ( SELECT ROUND(STDEV([row_count]), 0) as saturday_stdiv, '{8}' as table_name FROM ( SELECT COUNT(*) as [row_count] FROM [{9}].[{10}].[{11}] WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (7) --Saturday code GROUP BY [BATCH_AS_OF_DATE] ) RowCounts ), _SundayStdiv as ( SELECT ROUND(STDEV([row_count]), 0) as sunday_stdiv, '{12}' as table_name FROM ( SELECT COUNT(*) as [row_count] FROM [{13}].[{14}].[{15}] WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (1) --sunday code GROUP BY [BATCH_AS_OF_DATE] ) RowCounts ), _SaturdayAverages as ( SELECT AVG([row_count]) as saturday_average, '{16}' as table_name FROM ( SELECT COUNT(*) as [row_count] FROM [{17}].[{18}].[{19}] WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (7) --sunday and saturday GROUP BY [BATCH_AS_OF_DATE] ) RowCounts ), _SundayAverages as ( SELECT AVG([row_count]) as sunday_average, '{20}' as table_name FROM ( SELECT COUNT(*) as [row_count] FROM [{21}].[{22}].[{23}] WHERE [BATCH_AS_OF_DATE] >= DATEADD(d,-180,getdate()) --last 6 months of data AND DATEPART(dw,[BATCH_AS_OF_DATE]) IN (1) --sunday and saturday GROUP BY [BATCH_AS_OF_DATE] ) RowCounts ) SELECT db_name(DB_ID(N'{24}')) as database_name , s.name as schema_name , t.name as table_name , us.last_user_update as last_update , weekday_average , weekday_stdev , saturday_average , sunday_average , saturday_stdiv , sunday_stdiv FROM {25}.sys.dm_db_index_usage_stats us INNER JOIN {26}.sys.tables t on us.object_id = t.object_id INNER JOIN {27}.sys.schemas s on t.schema_id = s.schema_id LEFT JOIN _WeekdayAverages on _WeekdayAverages.table_name = t.name LEFT JOIN _SaturdayAverages on _SaturdayAverages.table_name = t.name LEFT JOIN _SundayAverages on _SundayAverages.table_name = t.name LEFT JOIN _WeekdayStdiv on _WeekdayStdiv.table_name = t.name LEFT JOIN _SaturdayStdiv on _SaturdayStdiv.table_name = t.name LEFT JOIN _SundayStdiv on _SundayStdiv.table_name = t.name WHERE us.database_id = DB_ID(N'{28}') --the current database only AND ( t.name = '{29}' ); """.format(table, dbase, schema, table, table, dbase, schema, table, table, dbase, schema, table, table, dbase, schema, table, table, dbase, schema, table, table, dbase, schema, table, dbase, dbase, dbase, dbase, dbase, table)) dbo_lst = dbo_lst.append(pd.read_sql(query, conn)) print(table, '---', dbase) ######################################## query1=(""" WITH _Yest_count as ( SELECT [row_count] as yest_count, '{0}' as table_name, BATCH_AS_OF_DATE FROM ( SELECT COUNT(*) as [row_count], BATCH_AS_OF_DATE FROM [{1}].[{2}].[{3}] WHERE [BATCH_AS_OF_DATE] = '{4}' GROUP BY [BATCH_AS_OF_DATE] ) RowCounts ) SELECT db_name(DB_ID(N'{5}')) as database_name , s.name as schema_name , t.name as table_name ,BATCH_AS_OF_DATE ,yest_count ,DATEPART(dw,[BATCH_AS_OF_DATE]) as day_cd FROM {6}.sys.dm_db_index_usage_stats us INNER JOIN {7}.sys.tables t on us.object_id = t.object_id INNER JOIN {8}.sys.schemas s on t.schema_id = s.schema_id LEFT JOIN _Yest_count on _Yest_count.table_name = t.name WHERE us.database_id = DB_ID(N'{9}') --the current database only AND ( t.name = '{10}' ); """).format(table, dbase, schema, table, str(yesterday.loc[0]).split(' ')[0], dbase, dbase, dbase, dbase, dbase, table) df2 = df2.append(pd.read_sql(query1, conn))
Convert Data Types
In order to prevent any potential stack overflow issues during the calculation of averages and thresholds from table counts, it is important to convert the data types within the dataframes to numerical values. This conversion will help to ensure the accuracy and integrity of the calculations and avoid unexpected errors.
dbo_lst['weekday_threshold'] = '' dbo_lst['saturday_threshold'] = '' dbo_lst['sunday_threshold'] = '' dbo_lst['sunday_average'] = pd.to_numeric(dbo_lst['sunday_average'], er-rors='coerce') dbo_lst['sunday_average'] = dbo_lst['sunday_average'].fillna(0) dbo_lst['saturday_stdiv'] = pd.to_numeric(dbo_lst['saturday_stdiv'], er-rors='coerce') dbo_lst['saturday_stdiv'] = dbo_lst['saturday_stdiv'].fillna(0) dbo_lst['sunday_stdiv'] = pd.to_numeric(dbo_lst['sunday_stdiv'], er-rors='coerce') dbo_lst['sunday_stdiv'] = dbo_lst['sunday_stdiv'].fillna(0) dbo_lst['weekday_threshold'] = pd.to_numeric(dbo_lst['weekday_threshold'], er-rors='coerce') dbo_lst['weekday_threshold'] = dbo_lst['weekday_threshold'].fillna(0) dbo_lst['weekday_threshold'] = pd.to_numeric(dbo_lst['weekday_threshold'], er-rors='coerce') dbo_lst['weekday_threshold'] = dbo_lst['weekday_threshold'].fillna(0) dbo_lst['saturday_threshold'] = pd.to_numeric(dbo_lst['saturday_threshold'], errors='coerce') dbo_lst['saturday_threshold'] = dbo_lst['saturday_threshold'].fillna(0) dbo_lst['sunday_threshold'] = pd.to_numeric(dbo_lst['sunday_threshold'], er-rors='coerce') dbo_lst['sunday_threshold'] = dbo_lst['sunday_threshold'].fillna(0) dbo_lst['saturday_average'] = dbo_lst['saturday_average'].astype(float) dbo_lst['saturday_average'] = dbo_lst['saturday_average'].fillna(0) dbo_lst['weekday_average'] = dbo_lst['weekday_average'].astype(float) dbo_lst['weekday_average'] = dbo_lst['weekday_average'].fillna(0) dbo_lst['last_update'] = dbo_lst['last_update'].astype(str) dbo_lst['last_update'] = dbo_lst['last_update'].fillna(0) dbo_lst['weekday_stdev'] = pd.to_numeric(dbo_lst['weekday_stdev'], er-rors='coerce') dbo_lst['weekday_stdev'] = dbo_lst['weekday_stdev'].fillna(0) dbo_lst.rename(columns={'schema_name':'schemaname'}, inplace=True) dbo_lst.reset_index(drop=True, inplace=True)
Add data monitoring thresholds
In this code example, user-specified data monitoring thresholds are stored in a common table called "table_monitoring_threshold" within the specified database and schema (e.g., [<database name>].[dbo].[table_monitoring_threshold]). For example, a user might specify that table should be flagged if the number of successful uploads is less than 80 percent of the average. Instead of hardcoding the thresholds in the Python code, the code will now read the table and calculate the average and standard deviation dynamically. This approach ensures that the code does not need to be updated whenever the thresholds change in the "table_monitoring_threshold" table.
# Updating the Threshold Table for u in dbo_lst.index: Database = dbo_lst.iloc['database_name'] Schema_name = dbo_lst.iloc['schemaname'] Table_name = dbo_lst.iloc['table_name'] Last_update = dbo_lst.iloc['last_update'] Week_avg = dbo_lst.iloc['weekday_average'] Week_stdiv = dbo_lst.iloc['weekday_stdev'] Sat_avg = dbo_lst.iloc['saturday_average'] Sun_avg = dbo_lst.iloc['sunday_average'] Sat_stdiv = dbo_lst.iloc['saturday_stdiv'] Sun_stdiv = dbo_lst.iloc['sunday_stdiv'] Weekday_threshold = dbo_lst.iloc['weekday_threshold'] Saturday_threshold = dbo_lst.iloc['saturday_threshold'] Sunday_threshold = dbo_lst.iloc['sunday_threshold'] cursor.execute(""" IF EXISTS (SELECT * FROM [<database name>].[dbo].[table_monitoring_threshold] WHERE table_name = '{0}') BEGIN UPDATE [<database name>].[dbo].[table_monitoring_threshold] SET dbase_name='{1}', schemaname='{2}', table_name='{3}', last_update='{4}', weekday_average={5}, weekday_stdev={6}, saturday_average='{7}', sun-day_average={8}, saturday_stdiv={9}, sunday_stdiv='{10}', weekday_threshold={11}, saturday_threshold = '{12}',sunday_threshold = '{13}' WHERE table_name='{14}' END ELSE BEGIN insert into [<database name>].[dbo].[table_monitoring_threshold] val-ues('{15}','{16}','{17}','{18}',{19},{20},{21},{22},{23},{24}, {25},{26},{27}) END """.format(Table_name, Database, Schema_name, Table_name, Last_update, Week_avg, Week_stdiv, Sat_avg, Sun_avg, Sat_stdiv, Sun_stdiv, Weekday_threshold, Saturday_threshold, Sun-day_threshold, Table_name, Database, Schema_name, Table_name, Last_update, Week_avg, Week_stdiv, Sat_avg, Sun_avg, Sat_stdiv, Sun_stdiv, Weekday_threshold, Saturday_threshold, Sunday_threshold)) # print('Query executed successfully') print(u, ' Done') dbo_lst = pd.read_sql('select * from [<database name>].[dbo].[table_monitoring_threshold]', conn) dbo_lst.rename(columns={'dbase_name':'database_name', 'schemana-me':'schema_name'}, inplace=True)
Check for Updates
To determine the success of the data load and ensure all counts match, a new variable called "yest_update_flag" was added in the code below. This flag indicates whether the data was successfully updated yesterday. By storing this information, it is easy to identify whether the data was updated yesterday and report the number of missing data uploads.
# Data Week Labelling master_data['yest_update_flag'] = 0 master_data['day_flag'] = '' for i in master_data.index: if pd.isna(master_data['yest_date']): # print('True') master_data['yest_update_flag'] = 1 else: # print('false') master_data['yest_update_flag'] = 0 # DayFlag Labelling try: if (master_data['day_cd'] > 1) and (master_data['day_cd'] < 7): master_data['day_flag'] = 'weekday' elif (master_data['day_cd'] == 1): master_data['day_flag'] = 'sunday' elif (master_data['day_cd'] == 7): master_data['day_flag'] = 'saturday' else: master_data['day_flag'] = 'Error' except: master_data['day_flag'] = np.nan
Flag the Data Upload Status
The number of successful uploads will be compared to the threshold specified and update the data’s status accordingly, so that the teams can be informed as to which tables need to be checked.
Failed_df = master_data[['database_name', 'schema_name', 'table_name', 'last_update', 'yest_update_flag']] Failed_df = Failed_df[Failed_df['yest_update_flag'] == 1].reset_index(drop=True) master_data = master_data[master_data['yest_update_flag'] == 0].reset_index(drop=True) if (list(set(master_data['day_cd'].tolist()))[0] == 1) and (list(set(master_data['day_cd'].tolist()))[-1] == 1): final_data = master_data[['database_name', 'schema_name', 'table_name', 'last_update', 'sunday_average', 'sunday_stdiv', 'yest_date', 'yest_count', 'day_cd', 'day_flag', 'yest_update_flag']] final_data['status'] = '' for i in final_data.index: try: if (final_data['yest_count'] > final_data['sunday_stdiv']) and (final_data['yest_count'] > final_data['sunday_average']): final_data['status'] = 'Successful upload' elif (final_data['yest_count'] > final_data['sunday_stdiv']) and (final_data['yest_count'] < final_data['sunday_average']): final_data['status'] = 'Check Required' elif (final_data['yest_count'] > fi-nal_data['sunday_average']) and (final_data['yest_count'] < fi-nal_data['sunday_stdiv']): final_data['status'] = 'Check Required' elif (final_data['yest_count'] < fi-nal_data['sunday_average']) and (final_data['yest_count'] < fi-nal_data['sunday_stdiv']): final_data['status'] = 'Concerning Count' elif (final_data['yest_count'] <= 0) or (fi-nal_data['yest_count'] <= 0): final_data['status'] = 'Upload Failed' except: final_data['status'] = 'Data Or Threshold Missing' elif (list(set(master_data['day_cd'].tolist()))[0] == 7) and (list(set(master_data['day_cd'].tolist()))[-1] == 7): final_data = master_data[['database_name', 'schema_name', 'table_name', 'last_update', 'saturday_average', 'saturday_stdiv', 'yest_date', 'yest_count', 'day_cd', 'day_flag', 'yest_update_flag']] final_data['status'] = '' for i in final_data.index: try: if (final_data['yest_count'] > final_data['saturday_stdiv']) and (final_data['yest_count'] > final_data['saturday_average']): final_data['status'] = 'Successful upload' elif (final_data['yest_count'] > fi-nal_data['saturday_stdiv']) and (final_data['yest_count'] < fi-nal_data['saturday_average']): final_data['status'] = 'Check Required' elif (final_data['yest_count'] > fi-nal_data['saturday_average']) and (final_data['yest_count'] < fi-nal_data['saturday_stdiv']): final_data['status'] = 'Check Required' elif (final_data['yest_count'] < fi-nal_data['saturday_average']) and (final_data['yest_count'] < fi-nal_data['saturday_stdiv']): final_data['status'] = 'Concerning Count' elif (final_data['yest_count'] <= 0) or (fi-nal_data['yest_count'] <= 0): final_data['status'] = 'Upload Failed' except: final_data['status'] = 'Data Or Threshold Missing' else: final_data = master_data[['database_name', 'schema_name', 'table_name', 'last_update', 'weekday_average', 'weekday_stdev', 'yest_date', 'yest_count', 'day_cd', 'day_flag', 'yest_update_flag']] final_data['status'] = '' for i in final_data.index: try: if (final_data['yest_count'] > final_data['weekday_stdev']) and (final_data['yest_count'] > final_data['weekday_average']): final_data['status'] = 'Successful upload' elif (final_data['yest_count'] > final_data['weekday_stdev']) and (final_data['yest_count'] < final_data['weekday_average']): final_data['status'] = 'Check Required' elif (final_data['yest_count'] > fi-nal_data['weekday_average']) and (final_data['yest_count'] < fi-nal_data['weekday_stdev']): final_data['status'] = 'Check Required' elif (final_data['yest_count'] < fi-nal_data['weekday_average']) and (final_data['yest_count'] < fi-nal_data['weekday_stdev']): final_data['status'] = 'Concerning Count' elif (final_data['yest_count'] <= 0) or (fi-nal_data['yest_count'] <= 0): final_data['status'] = 'Upload Failed' except: final_data['status'] = 'Data Or Threshold Missing'
Send Out the Results
The final step of the process is to send out an email with the data monitoring results. This begins by connecting to the Outlook application. The results then need to be converted to a .csv file and attached to an email. The email can then be sent to an individual or a team.
# Send Email With Attachment cursor.execute(""" DECLARE @tab char(1) = CHAR(9); EXEC msdb.dbo.sp_send_dbmail @profile_name = 'DATAWH-DEV-MAIL', @recipients = '<Email Address>', @query = N'SET NOCOUNT ON; select * from [<database name>].[dbo].[table_monitoring_threshold];' , @subject = 'Table Monitoring Status', @body = 'Hello All, Please find the table update status in the file attached. ', @attach_query_result_as_file = 1, @query_attachment_filename = 'table_threshold.csv', @query_result_separator = @tab, --',', @query_result_no_padding = 1, @exclude_query_output = 1, @append_query_error = 0, @query_result_header = 1; """.format())
Creating an effective data load monitoring plan
Any organization that manages large amounts of data with SQL can benefit from utilizing Python—a free and open-source technology—to monitor its data loads at the table level. An automated monitoring utility can calculate daily and weekly load averages, standard deviations, and thresholds for all tables. This information is then presented in a comprehensive daily statistics report, allowing data warehouse teams to proactively fix any missing loads. This simple solution can help organizations expand their data management capabilities without incurring any additional costs.
About the Author:
Sankul Seth holds the position of assistant vice president, data and analytics, at a financial institution where his primary responsibility is overseeing the data and analytics strategy of the organization. With over 18 years of experience, he is a seasoned leader in IT, data, and analytics. His expertise spans various critical areas, including data analytics, cybersecurity, engineering, cloud framework, business intelligence, digital, campaign, and marketing automation. Sankul has an MBA and a bachelor's degree in computer science engineering. For additional information, contact sankul.seth@penair.org.