Real-Time SQL Server to BigQuery Streaming ETL using CDC
This script enables real-time ETL from SQL Server to Google BigQuery by leveraging Change Data Capture (CDC) and BigQuery's streaming API. It tracks changes in SQL Server (inserts, updates, and deletes) and pushes the new data directly into BigQuery in real-time, making it useful for continuous, low-latency data synchronization.
Instructions:
- Prerequisites:
- Ensure CDC (Change Data Capture) is enabled on the SQL Server database.
- Install the Google Cloud SDK and configure it for BigQuery access.
- Ensure your SQL Server user has permissions to read from the CDC tables and write data to BigQuery.
- Step 1: Capture Changed Data in SQL Server using CDC:
- This script identifies changes (inserts, updates, and deletes) by querying the CDC tables in SQL Server, which contain rows that were modified since the last run.
- Step 2: Stream Changes to BigQuery:
- Instead of batch uploading, the script uses BigQuery's streaming API to insert changes directly into BigQuery tables in real-time.
- Step 3: Handle Errors:
- Implement basic error handling to retry failed inserts to ensure data consistency between SQL Server and BigQuery.
import pyodbc
from google.cloud import bigquery
from datetime import datetime, timedelta
# SQL Server Connection Configuration
sql_conn_str = "DRIVER={ODBC Driver 17 for SQL Server};SERVER=your_server_name;DATABASE=your_db_name;UID=your_username;PWD=your_password"
# BigQuery Configuration
bq_client = bigquery.Client()
bq_table_id = "your_project_id.your_dataset.your_table"
# Initialize BigQuery Schema for Streaming Inserts
schema = [
bigquery.SchemaField("id", "INTEGER"),
bigquery.SchemaField("column1", "STRING"),
bigquery.SchemaField("column2", "STRING"),
bigquery.SchemaField("operation_type", "STRING"), # To track inserts, updates, deletes
bigquery.SchemaField("timestamp", "TIMESTAMP")
]
# Step 1: Retrieve changes from CDC in SQL Server
def fetch_cdc_changes(last_sync_time):
conn = pyodbc.connect(sql_conn_str)
cursor = conn.cursor()
# Query the CDC table to get changes since the last sync (inserts/updates/deletes)
query = f"""
SELECT __$operation, id, column1, column2, __$start_lsn, __$seqval
FROM cdc.your_table_CT
WHERE __$start_lsn > ? ORDER BY __$start_lsn ASC
"""
cursor.execute(query, last_sync_time)
changes = cursor.fetchall()
cursor.close()
conn.close()
return changes
# Step 2: Stream data to BigQuery
def stream_to_bigquery(rows):
errors = bq_client.insert_rows_json(bq_table_id, rows) # Stream rows to BigQuery
if errors:
print(f"Errors occurred: {errors}")
else:
print(f"Successfully streamed {len(rows)} rows to BigQuery.")
# Helper: Map CDC operation codes to human-readable operations
def map_operation(op_code):
operation_map = {1: 'DELETE', 2: 'INSERT', 3: 'UPDATE'}
return operation_map.get(op_code, 'UNKNOWN')
# Main ETL Process
def real_time_etl(last_sync_time):
# Fetch changes from SQL Server
changes = fetch_cdc_changes(last_sync_time)
rows_to_insert = []
for change in changes:
operation_type = map_operation(change[0]) # Map CDC operation codes
rows_to_insert.append({
"id": change[1],
"column1": change[2],
"column2": change[3],
"operation_type": operation_type,
"timestamp": datetime.utcnow() # Use current UTC time for tracking
})
if rows_to_insert:
# Stream the changes to BigQuery in real-time
stream_to_bigquery(rows_to_insert)
if __name__ == "__main__":
# Step 3: Track the last synchronization time
# For the first run, initialize last_sync_time with a past date or store it externally (e.g., in a database)
last_sync_time = datetime.utcnow() - timedelta(minutes=5) # Example: sync changes in the last 5 minutes
# Real-time ETL process
while True:
real_time_etl(last_sync_time)
# Update last_sync_time after each successful sync (could be saved externally)
last_sync_time = datetime.utcnow()
# Sleep for a short interval to mimic near-real-time processing (e.g., every 1 minute)
time.sleep(60)