SQLServerCentral Article

Enhancing SQL Server Searches with Elasticsearch and Python

,

As a seasoned SQL developer and tech enthusiast, I often encounter scenarios where traditional SQL Server searches struggle to meet performance and flexibility requirements. One such challenge is efficiently searching a large person database with multiple parameters and fuzzy logic. In this article, I'll explore how integrating Elasticsearch with SQL Server using Python can offer a robust solution to this problem.

This article is also available as a notebook

Business Problem

Imagine you're tasked with developing a search functionality for a large database containing information about people. Users need the ability to search using various parameters such as:

  • First Name
  • Last Name
  • Preferred Name
  • City
  • State
  • Zip Code
  • DOB
  • Email

The complexity increases as searches can occur on any combination of these fields, and there's a need to implement fuzzy logic to handle typos and partial matches.

Technical Challenges with SQL Server

One problem is inefficient indexing. Creating indexes on every possible combination of the searchable columns isn't practical. The number of required indexes grows exponentially with each additional column, leading to:

  • Increased storage requirements
  • Slower write operations due to index maintenance
  • Diminishing returns on query performance

It is also hard implementing fuzzy logic. While SQL Server offers some capabilities like the LIKE operator, soundx or full-text search, these methods are limited:

  • Fuzzy searches can be slow on large datasets.
  • Implementing advanced fuzzy matching (e.g., handling typos, synonyms) requires intricate queries and functions that can degrade performance.

High-Level Solution: Integrating Elasticsearch Using Python

To overcome these challenges, I recommend integrating Elasticsearch into your search architecture. Here's why:

  • Optimized for Search: Elasticsearch is designed for lightning-fast full-text searches across multiple fields.
  • Fuzzy Matching: It natively supports fuzzy logic, allowing for more flexible and user-friendly search experiences.
  • Scalability: Built to handle large volumes of data and queries efficiently.
  • Existing Infrastructure: Many production environments already use Elasticsearch for logging, making integration smoother.
  • Cost-Effective: The basic version is free and sufficient for many use cases. For advanced security features, consider the paid tiers.

Implementation Overview with Python

While Elasticsearch can be integrated using various programming languages, I'll demonstrate how to use Python for this purpose. Python's rich ecosystem and simplicity make it an excellent choice for interfacing with Elasticsearch.

To begin with first let's install necessary python libraries.

%pip install pyodbc
%pip install elasticsearch

Preparing Source Data

Here idea is to generate enough random data in source database. For this I'll be using python script to connect to Db and generate random data.

This Python script creates a SQL Server database (PersonSearchDB) and performs bulk data insertion into a Persons table. It defines functions to create the database (create_database_if_not_exists), establish a SQL connection (get_sql_connection), and execute SQL queries (execute_SQL_Query).

The main function, setup_database_and_bulk_insert_data, generates random person data (e.g., names, DOB, and emails) and inserts 1 million records into the Persons table in batches of 10,000, improving performance.

Finally, the script initializes the database and populates it with the generated data.

import os
import pyodbc
import random
import datetime
from elasticsearch import Elasticsearch, helpers
import time
from colorama import Fore, Style
# Define server name as a global variable so it can be set once
server = 'localhost'
db_name = 'PersonSearchDB'
def create_database_if_not_exists(db_name):
    # Connection to 'master' for creating the database
    master_conn_str = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE=master;Trusted_Connection=yes;'
    master_conn = pyodbc.connect(master_conn_str)
    # Set autocommit to True to disable transactions
    master_conn.autocommit = True
    master_cursor = master_conn.cursor()
    # Check if the database exists, if not, create it
    create_db_query = f"IF DB_ID('{db_name}') IS NULL CREATE DATABASE {db_name};"
    master_cursor.execute(create_db_query)
    # Close master connection
    master_cursor.close()
    master_conn.close()
# Function to return SQL Server connection
def get_sql_connection(db_name):
    """
    Returns a SQL Server connection object for the given database name.
    If the database does not exist, the function connects to 'master' first to create it.
    
    Parameters:
    db_name (str): The name of the database to connect to.
    Returns:
    pyodbc.Connection: A connection object to the specified database.
    """    
    # Connection to the newly created or existing database
    conn_str = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={db_name};Trusted_Connection=yes;'
    return pyodbc.connect(conn_str)
def execute_SQL_Query(db_name, query, params=None):
    """
    Executes a SQL query on the given database. Uses the connection obtained from get_sql_connection.
    
    Parameters:
    db_name (str): The name of the database.
    query (str): The SQL query to execute.
    params (tuple): Parameters to pass to the query (optional).
    
    Returns:
    list: The result of the query if it's a SELECT query, otherwise None.
    """
    conn = get_sql_connection(db_name)
    cursor = conn.cursor()
    if params:
        cursor.execute(query, params)
    else:
        cursor.execute(query)
    
    if query.strip().upper().startswith("SELECT"):
        result = cursor.fetchall()  # Fetch all results if it's a SELECT query
    else:
        conn.commit()
        result = None
    cursor.close()
    conn.close()
    
    return result

def setup_database_and_bulk_insert_data(db_name, record_count=1000, batch_size=100):
    # List of Indian first names, last names, cities, and states
    first_names = [
        'Rahul', 'Anjali', 'Amit', 'Pooja', 'Rajesh', 'Sneha', 'Vikram', 'Neha', 'Suresh', 'Sunita',
        'Arjun', 'Kiran', 'Ravi', 'Priya', 'Nikhil', 'Meera', 'Kunal', 'Rina', 'Aakash', 'Divya',
        'Sanjay', 'Anita', 'Deepak', 'Kavita', 'Manish', 'Shweta', 'Rohit', 'Preeti', 'Vijay', 'Swati',
        'Ajay', 'Nisha', 'Gaurav', 'Shalini', 'Alok', 'Tanvi', 'Varun', 'Shruti', 'Vivek', 'Rashmi'
    ]
    last_names = [
        'Sharma', 'Patel', 'Gupta', 'Mehta', 'Jain', 'Agarwal', 'Reddy', 'Singh', 'Kumar', 'Verma',
        'Chopra', 'Desai', 'Iyer', 'Joshi', 'Kapoor', 'Malhotra', 'Nair', 'Pandey', 'Rao', 'Saxena',
        'Bose', 'Chatterjee', 'Das', 'Mukherjee', 'Banerjee', 'Bhat', 'Pillai', 'Menon', 'Choudhury', 'Trivedi',
        'Shah', 'Parekh', 'Chauhan', 'Patil', 'Dutta', 'Nayar', 'Kulkarni', 'Bhattacharya', 'Hegde', 'Sinha'
    ]
    cities = [
        'Mumbai', 'Delhi', 'Bangalore', 'Chennai', 'Hyderabad', 'Ahmedabad', 'Kolkata', 'Pune', 'Jaipur', 'Lucknow',
        'Surat', 'Kanpur', 'Nagpur', 'Indore', 'Thane', 'Bhopal', 'Visakhapatnam', 'Patna', 'Vadodara', 'Ghaziabad'
    ]
    states = [
        'MH', 'DL', 'KA', 'TN', 'TS', 'GJ', 'WB', 'MH', 'RJ', 'UP',
        'MP', 'AP', 'BR', 'HR', 'PB', 'KL', 'OR', 'AS', 'JK', 'CH'
    ]
    def random_name():
        return random.choice(first_names), random.choice(last_names)
    def random_email(first_name, last_name):
        return f"{first_name.lower()}.{last_name.lower()}@randommail.com"
    def random_DOB():
        start_date = datetime.date(1950, 1, 1)
        end_date = datetime.date(2005, 12, 31)
        time_between_dates = end_date - start_date
        days_between_dates = time_between_dates.days
        random_number_of_days = random.randrange(days_between_dates)
        random_date = start_date + datetime.timedelta(days=random_number_of_days)
        return random_date
    def random_zipcode():
        return str(random.randint(100000, 999999))  # Indian zip codes are 6 digits
    # Create Persons table using execute_SQL_Query
    create_table_query = '''
    IF OBJECT_ID('Persons', 'U') IS NOT NULL DROP TABLE Persons;
    CREATE TABLE Persons (
        FirstName NVARCHAR(50),
        LastName NVARCHAR(50),
        PreferredName NVARCHAR(50),
        City NVARCHAR(50),
        State NVARCHAR(50),
        ZipCode NVARCHAR(10),
        DOB DATE,
        Email NVARCHAR(100)
    );
    '''
    execute_SQL_Query(db_name, create_table_query)
    # Insert records in batches
    for batch_start in range(0, record_count, batch_size):
        values = []
        for _ in range(batch_size):
            first_name, last_name = random_name()
            preferred_name = first_name  # Assume preferred name is the first name
            city = random.choice(cities)
            state = random.choice(states)
            zipcode = random_zipcode()
            dob = random_DOB()
            dob_str = dob.strftime('%Y-%m-%d')
            email = random_email(first_name, last_name)
            values.append(f"SELECT '{first_name}', '{last_name}', '{preferred_name}', '{city}', '{state}', '{zipcode}', '{dob_str}', '{email}'")
        # Create bulk insert query using INSERT INTO ... SELECT
        insert_query = '''
        INSERT INTO Persons (FirstName, LastName, PreferredName, City, State, ZipCode, DOB, Email)
        ''' + " UNION ALL ".join(values)
        execute_SQL_Query(db_name, insert_query)
        print(f"Inserted batch starting at record {batch_start}")
        
    #Master Database Creation
create_database_if_not_exists(db_name)
#Setup Database and Bulk Insert Data  
#I am inserting million records to demo perfromance gains. Feel free to adjust for testing purpose.
setup_database_and_bulk_insert_data(db_name, record_count=1000000, batch_size=10000)

Setting Up Elasticsearch

To setup Elasticsearch, please download and install Elasticsearch from the official website and follow the official installation guide provided in elasticsearch/reference/current/zip-windows.html

I struggled with finding password so make sure you find password from the output of setup. The password for the elastic user and the enrollment token are output to your terminal.

Creating and Populating Elasticsearch Index

In this step, we will populate our Elastic index with the source data. This is going to take some time. On my machine, 1 million records took around 5 minutes to load.

Following code connects to an Elasticsearch server hosted locally, deletes an existing index if it exists, and creates a new index with mappings for person-related data. It then retrieves data from the Persons table in a SQL database in batches and indexes the data into Elasticsearch. Each batch of data is processed and uploaded using Elasticsearch's bulk method. If there's an error during the bulk operation, it prints the error details.

from elasticsearch import Elasticsearch, helpers
import urllib3
# Suppress warnings about insecure connections (optional)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
   
# Replace with your actual password
elastic_password = "xxx"
# Initialize the Elasticsearch client with SSL and authentication
# make sure elastic search is running on port 9200, 
# Use the following command to start elastic search
# .\elasticsearch-8.11.1\bin\elasticsearch.bat

es = Elasticsearch(
    ["https://localhost:9200"],
    ca_certs=False,          # Disable SSL certificate verification
    verify_certs=False,      # Disable SSL cert verification (use with caution)
    basic_auth=("elastic", elastic_password),
)
def index_data_to_elasticsearch(db_name, index_name, batch_size=10000):
    
    # Delete the existing index if it exists
    if es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)
        print(f"Deleted existing index: {index_name}")
    # Create a new index with mappings
    index_mappings = {
        "mappings": {
            "properties": {
                "FirstName": {"type": "text"},
                "LastName": {"type": "text"},
                "PreferredName": {"type": "text"},
                "City": {"type": "text"},
                "State": {"type": "keyword"},
                "ZipCode": {"type": "keyword"},
                "DOB": {"type": "date"},
                "Email": {"type": "keyword"}
            }
        }
    }
    es.indices.create(index=index_name, body=index_mappings)
    print(f"Created new index: {index_name}")
    # Get total number of records
    total_records_query = "SELECT COUNT(*) FROM Persons"
    total_records_result = execute_SQL_Query(db_name, total_records_query)
    total_records = total_records_result[0][0]
    offset = 0
    while offset < total_records:
        query = f'''
        SELECT FirstName, LastName, PreferredName, City, State, ZipCode, DOB, Email
        FROM Persons
        ORDER BY FirstName
        OFFSET {offset} ROWS FETCH NEXT {batch_size} ROWS ONLY
        '''
        rows = execute_SQL_Query(db_name, query)
        actions = []
        for row in rows:
            print(f"\n\nrow: {row}")
            doc = {
                "_index": index_name,
                "_source": {
                    "FirstName": row[0],
                    "LastName": row[1],
                    "PreferredName": row[2],
                    "City": row[3],
                    "State": row[4],
                    "ZipCode": row[5],
                    "DOB": row[6].strftime('%Y-%m-%d') if row[6] else None,
                    "Email": row[7]
                }
            }
            actions.append(doc)
            
        try:
            helpers.bulk(es, actions)
        except helpers.BulkIndexError as e:
            print(f"Bulk indexing error: {e}")
            for error in e.errors:
                print(error)
        
        offset += batch_size
        print(f"Indexed {offset}/{total_records} records")
# Example usage: setting up the database and bulk inserting data
# setup_database_and_bulk_insert_data(db_name, record_count=1000, batch_size=100)
index_data_to_elasticsearch('PersonSearchDB', 'person_index', batch_size=1000)

Implementing Search Queries

Lets see how we can query our newly created Elastic index. For this, I'll be showcasing some basic scenarios like searching on first name, combination of first and last name as well as date range query. I'll be showcasing fuzzy search which is difficult to implement in SQL server without performance hit.

This code defines several functions for performing Elasticsearch queries. The main function, ExecuteElasticSearch, runs a search on the "people_index" and prints the results. The first_name_search function searches for an exact match of a specified first name in a given column. The multi_field_wildcard_search function performs a search that uses wildcards for partial matches on the first and last names while filtering by birth year. The fuzzy_logic_search function looks for similar first names using fuzzy logic, which allows for small differences or typos. Lastly, the boolean_logic_search function combines conditions using boolean logic, searching by wildcards for first and last names and a range of birth dates, ensuring that at least one of the conditions is met. All searches are executed using the ExecuteElasticSearch function, which handles the query execution and output.

You can use any other fields as per your liking.

def ExecuteElasticSearch(search_query):
    batch_size=10
    # Execute the search
    response = es.search(index="people_index", body=search_query, size=batch_size)
    
    # Process the results
    print(f"Found {response['hits']['total']['value']} documents:")
    for hit in response['hits']['hits']:
        print(f"\n\n\n{hit['_source']}")

        
def first_name_search(column_name, first_name):
    search_query = {
        "query": {
            "match": {
                column_name: first_name
            }
        }
    }
    ExecuteElasticSearch(search_query)
def multi_field_wildcard_search(first_name, last_name, birth_year):    
    search_query = {
        "query": {
            "bool": {
                "must": [
                    {"wildcard": {"FirstName": f"*{first_name}*"}},
                    {"wildcard": {"LastName": f"*{last_name}*"}},
                    {
                        "range": {
                            "BirthDate": {
                                "gte": f"{birth_year}-01-01",
                                "lte": f"{birth_year}-12-31"
                            }
                        }
                    }
                ]
            }
        }
    }
    ExecuteElasticSearch(search_query)
def fuzzy_logic_search(first_name):
    search_query = {
        "query": {
            "fuzzy": {
                "FirstName": {
                    "value": first_name,
                    "fuzziness": 2
                }
            }
        }
    }    
    ExecuteElasticSearch(search_query)
    
def boolean_logic_search(first_name, last_name, birth_year):
    search_query = {
        "query": {
            "bool": {
                "should": [
                    {"wildcard": {"FirstName": f"*{first_name}*"}},
                    {"wildcard": {"LastName": f"*{last_name}*"}},
                    {
                        "range": {
                            "BirthDate": {
                                "gte": f"{birth_year}-01-01",
                                "lte": f"{birth_year}-12-31"
                            }
                        }
                    }
                ],
                "minimum_should_match": 1
            }
        }
    }
    ExecuteElasticSearch(search_query)
# Example usage of search functions
first_name_search("FirstName", "Rahul")
#Multi Field Wildcard Search
multi_field_wildcard_search("Rahul", "Sharma", 1990)
#Fuzzy Logic Search
fuzzy_logic_search("Rahul")
#Boolean Logic Search
boolean_logic_search("Rahul", "Sharma", 1990)

Performance Comparison

Now let's see if it all really worth.

Following code compares the performance of SQL Server and Elasticsearch for a specific query. The compare_performance function runs a search using both systems, measuring the time it takes to retrieve records matching a given first name, last name, and preferred name pattern. For SQL Server, it runs a query using execute_SQL_Query and records the execution time over a specified number of iterations (default is 1000). The average execution time is then calculated. Similarly, it constructs and runs an Elasticsearch query using the es.search function, also measuring and averaging the execution time over the same number of iterations. After both operations are complete, the script prints the average times for each system and calculates the performance gain Elasticsearch offers compared to SQL Server. The performance results are color-coded in the output for clarity.

On my system, I observed a performance improvement of approximately 100% when processing a million entries. With larger datasets, the performance gains are likely to increase further.

def compare_performance(first_name, last_name, preferred_name, iterations=1000):
    # SQL query
    sql_query = '''
    SELECT FirstName, LastName, PreferredName
    FROM Persons
    WHERE FirstName = ? AND LastName = ? AND PreferredName LIKE ?
    '''
    # Warm-up run for SQL Server
    execute_SQL_Query(db_name, sql_query, (first_name, last_name, preferred_name))
    # Record execution times for SQL Server
    sql_execution_times = []
    for _ in range(iterations):
        start_time = time.perf_counter()
        execute_SQL_Query(db_name, sql_query, (first_name, last_name, preferred_name))
        end_time = time.perf_counter()
        sql_execution_times.append(end_time - start_time)
    # Calculate average execution time for SQL Server
    average_time_sql = sum(sql_execution_times) / iterations
    # Elasticsearch query
    es_query = {
        "query": {
            "bool": {
                "must": [
                    {"match": {"FirstName": first_name}},
                    {"match": {"LastName": last_name}},
                    {"wildcard": {"PreferredName": preferred_name}}
                ]
            }
        }
    }
    # Warm-up run for Elasticsearch
    es.search(index="person_index", body=es_query)
    # Record execution times for Elasticsearch
    es_execution_times = []
    for _ in range(iterations):
        start_time = time.perf_counter()
        es.search(index="person_index", body=es_query)
        end_time = time.perf_counter()
        es_execution_times.append(end_time - start_time)
    # Calculate average execution time for Elasticsearch
    average_time_es = sum(es_execution_times) / iterations
    print(f"\033[91mAverage SQL execution time: {average_time_sql:.6f} seconds\033[0m")
    print(f"\033[92mAverage Elasticsearch execution time: {average_time_es:.6f} seconds\033[0m")
    performance_gain = (average_time_sql - average_time_es) / average_time_sql * 100
    print(f"\033[93mElasticsearch performance gain: {performance_gain:.2f}%\033[0m")
    
#Performance Comparison
compare_performance("Rahul", "Sharma", "Rahul%")

Conclusion

Integrating Elasticsearch with SQL Server can significantly enhance search capabilities, especially for large datasets requiring flexible and fuzzy searches. While it introduces additional components to your architecture, the performance gains and improved user experience can be well worth the effort.

Remember, handling PII necessitates a careful approach to security. By following best practices and possibly opting for Elasticsearch's paid tiers for advanced security features, you can create a powerful, secure, and efficient search solution.

Resources

Share

Rate

5 (3)

You rated this post out of 5. Change rating