SQLServerCentral Article

Ingest and Retrieve data from an ADX Cluster Using Python

,

Introduction

Azure Data Explorer (ADX) helps to analyze high volumes of data in near real-time. The Azure Machine Learning (AML) Python SDK v2 is an updated Python SDK package. It helps to build a single command or a chain of commands like Python functions to create a single step or a complex workflow. I will discuss the end-to-end solution for data ingestion and data retrieval in ADX Cluster database using AML Python notebooks.

Here are the Azure Resources used:

  1. Azure Storage
  2. Azure Data Explorer
  3. Azure Machine Learning
  4. App Registrations

Steps

The following steps will explain the creation of the Azure Data Resources and data ingestion and retrieval in the ADX Cluster database from Storage account blob. The Python notebook will be created in the AML Workspace and will be executed using AML Workspace Compute Instance.

Step 1

Create a Storage Account first. Then, create a container named ml-adx-demo inside the Storage Account.

Step 2

Upload a CSV file in the container named ml-adx-demo inside the Storage Account

Step 3

Go to Entra ID and add App Registration from the top menu

Step 4

Register an application named ml_adx_demo_app

Step 5

Create an ADX Cluster named ml-adx-demo-cluster with minimum configuration to optimize the cost. Other tab fields are saved with the default options.

Step 6

Create a database named ml_adx_demo_db in the ADX Cluster.

Step 7

Go to the ADX Cluster Database and give Database Admin permission to the App Registered in the earlier steps.

Step 8

Create an Azure Machine Learning Studio workspace named ml_adx_demo_ws. While creating the ML Workspace, it is required to provide Storage Account, Application Insight and Key Vault details as well. Keep the default options in the other tabs.

Step 9

Go to Machine Learning Studio and create a Compute Instance named ml-adx-demo-compute. Lowest cost Virtual Machine is selected. Keep the default options for the other tabs.

Step 10

Once the Compute Instance is in Running state, go to Notebooks menu. Open Terminal and install two Kusto libraries in the Compute Instance created in the last step

pip install azure-kusto-data
pip install azure-kusto-ingest

Step 11

Create a notebook to transfer the CSV file data to the ml_adx_demo_db ADX database

Step 12

Write and execute the following code in the notebook step-by-step.

  • Import the required library modules.
# Import the required library modules
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
  • Define all the required constants to be used for data ingestion and retrieval
# Define all the required constants to be used for data ingestion and retrieval
cluster = "https://ml-adx-demo-cluster.eastus2.kusto.windows.net"
KUSTO_INGEST_URI = "https://ingest-ml-adx-demo-cluster.eastus2.kusto.windows.net"
client_id = "58d388fa-fad5-4f6d-a727-1e58e42409df"
client_secret = "1a48Q~VjRAut_Txq1HPC_GWymFL625YjlZgbzbEG"
authority_id = "xxxxe25b-xxxx-xxxx-xxxx-2938c046xxxx"
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(cluster, client_id, client_secret, authority_id)
kcsb_ingest = KustoConnectionStringBuilder.with_aad_application_key_authentication(KUSTO_INGEST_URI, client_id, client_secret, authority_id)
client = KustoClient(kcsb)

client id, client secret and authority id (Directory (tenant) ID) are copied from the App Registration

 

cluster and KUSTO_INGEST_URI are copied from ADX Cluster

Define the input file details

# define the input file details
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, ReportLevel, ReportMethod
CONTAINER = "ml-adx-demo"
ACCOUNT_NAME = "sdarticlestorageacc"
SAS_TOKEN = "sv=2022-11-02&ss=b&srt=sco&sp=rwdlatf&se=2024-02-27T19:20:39Z&st=2024-02-27T11:20:39Z&spr=https&sig=JFpZtw6sRetYhZPo2e1WFqyvWcbp92fsOe0.0000001GxeNGGo"  
FILE_PATH = "titanic_data.csv"
FILE_SIZE = 29480.96    # in bytes
BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \ 
CONTAINER + "/" + FILE_PATH + SAS_TOKEN
print(BLOB_PATH)

Define the destination database and table name where the data needs to be populated

# Define the destination database and table name where the data needs to be populated
KUSTO_DATABASE = "ml_adx_demo_db"
DESTINATION_TABLE = "titanic"
DESTINATION_TABLE_COLUMN_MAPPING = "titanic_mapping"

Create the destination table and print its structure

# create the destination table and print its structure
CREATE_TABLE_COMMAND = ".create-merge table ['titanic']  (['PassengerId']:long, ['Survived']:long, ['Pclass']:long, ['Name']:string, ['Gender']:string, ['Age']:real, ['SibSp']:long, ['Parch']:long, ['Ticket']:string, ['Fare']:real, ['Cabin']:string, ['Embarked']:string)"
RESPONSE = client.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)
df = dataframe_from_result_table(RESPONSE.primary_results[0])
print(df)

Create the ingestion mapping for the destination table

# Create the ingestion mapping for the destination table
CREATE_MAPPING_COMMAND = """.create-or-alter table ['titanic'] ingestion csv mapping 'titanic_mapping' '[{"column":"PassengerId", "Properties":{"Ordinal":"0"}},{"column":"Survived", "Properties":{"Ordinal":"1"}},{"column":"Pclass", "Properties":{"Ordinal":"2"}},{"column":"Name", "Properties":{"Ordinal":"3"}},{"column":"Gender", "Properties":{"Ordinal":"4"}},{"column":"Age", "Properties":{"Ordinal":"5"}},{"column":"SibSp", "Properties":{"Ordinal":"6"}},{"column":"Parch", "Properties":{"Ordinal":"7"}},{"column":"Ticket", "Properties":{"Ordinal":"8"}},{"column":"Fare", "Properties":{"Ordinal":"9"}},{"column":"Cabin", "Properties":{"Ordinal":"10"}},{"column":"Embarked", "Properties":{"Ordinal":"11"}}]'"""
RESPONSE = client.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)
df = dataframe_from_result_table(RESPONSE.primary_results[0])
print(df)

Input data is queued up for ingestion in the destination table

# Input data is queued up for ingestion in the destination table
INGESTION_CLIENT = QueuedIngestClient(kcsb_ingest)
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
print('Done queuing up ingestion with Azure Data Explorer: ' + DESTINATION_TABLE)

Query the destination table to validate the data population from the source file

# Query the destination table to validate the data population from the source file
QUERY = "titanic | count"
RESPONSE = client.execute_query(KUSTO_DATABASE, QUERY)
df = dataframe_from_result_table(RESPONSE.primary_results[0])
print("titanic | count")
print(df)

The data is now available in the destination table.

Step 12

Go to the ADX Cluster database. Check for the newly created table and data inside the table.

Conclusion

The ML Workspace script can be reused for data population in different ADX database tables from a different set of input files available in Storage Account. Apart from csv file, parquet file also can be used as input. In that case, the table mapping needs to be changed from csv to parquet. Data ingestion is queued in the ADX table and it may take 4-5 minutes to reflect the data in the ADX table. Data ingestion can be monitored with the KQL query

".show commands  | where CommandType == "DataIngestPull" |order by StartedOn desc| "

Rate

You rated this post out of 5. Change rating

Share

Share

Rate

You rated this post out of 5. Change rating