I had a couple of clients who were moving content from development catalogs to production catalogs for the first time. They wanted to copy the schema and data from tables, views, and volumes.
So I wrote a python notebook to handle this task. It creates the objects in the new catalog and then changes the owner to a selected user/group. You can find the full notebook on GitHub.
I’ll walk through some of my code here. This notebook is a bit “quick and dirty”. It is not comprehensive (e.g., it doesn’t include external tables because I didn’t have any, doesn’t copy files into managed volumes, and there is no real error handling). But I thought it still might be helpful for others to see/borrow.
Getting Started
Before you can use this script, your target catalog, target external location, and intended owner user/group must be created.
The general pattern is:
- Create a dataframe with the objects (tables/views/volumes) to be copied.
- Create a function that dynamically generates the SQL DDL to create the object and executes it.
- Loop through the rows in the dataframe, check if the object already exists, and call the function to create the object as needed.
The notebook creates 5 widgets that require input.
- source_catalog – the name of the catalog that contains the content you want to migrate
- target_catalog – the name of the catalog to which you want to migrate content
- source_storage – the ADLS storage account name used in views or volumes in the source catalog
- target_storage – the ADLS storage account name to be used in views or volumes in the target catalog
- assigned_owner – the user or group that should be made owner of the objects created in the target catalog
Notebook Functionality
First, I create the widgets and assign the input values to some variables.
# create input widgets (only need to run once)
dbutils.widgets.text("source_catalog", "") dbutils.widgets.text("target_catalog", "") dbutils.widgets.text("source_storage","") dbutils.widgets.text("target_storage","") dbutils.widgets.text("assigned_owner","")
# assign widget values to variables
source_catalog = dbutils.widgets.get("source_catalog") target_catalog = dbutils.widgets.get("target_catalog") source_storage = dbutils.widgets.get("source_storage") target_storage = dbutils.widgets.get("target_storage") target_storage = dbutils.widgets.get("assigned_owner")
To copy tables, I created 3 cells. The first gives me the dataframe containing one table per row. The second is the function I call to copy the tables. Third loops through the tables and calls the function.
# get dataframe of managed tables
df_tables = spark.sql(f"Select * from {source_catalog}.information_schema.tables where table_type = 'MANAGED'") display(df_tables)
# function for copying tables
def copy_table(source_catalog, target_catalog, schema, table):
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {target_catalog}.{schema};")
if spark.catalog.tableExists(f"{target_catalog}.{schema}.{table}"):
print(f"Skip table {target_catalog}.{schema}.{table}, because the table already exists.")
else:
print(f"Copy from {source_catalog}.{schema}.{table} into {target_catalog}.{schema}.{table}.")
source_df = spark.table(f"{source_catalog}.{schema}.{table}")
renamed_cols = [col.replace(" ", "_") for col in source_df.columns]
renamed_df = source_df.toDF(*renamed_cols)
renamed_df.write.mode("overwrite").saveAsTable(f"{target_catalog}.{schema}.{table}")
# loop to call the function for each table
for index, row in df_tables.toPandas().iterrows():
schema = row[1]
table = row[2]
copy_table(source_catalog, target_catalog, schema, table)
After that is done, I check to see if there are any tables that somehow didn’t get copied.
# check to see if there are any tables that did not get created in target
df_tables_s = spark.sql(f"Select table_schema, table_name from {source_catalog}.information_schema.tables where table_type = 'MANAGED'")
df_tables_t = spark.sql(f"Select table_schema, table_name from {target_catalog}.information_schema.tables where table_type = 'MANAGED'")
df_tables_left = df_tables_s.exceptAll(df_tables_t)
display(df_tables_left)
Volumes are similar, although I organized my code slightly differently.
First, I have my function. It uses sql with the DESCRIBE command to get the definition. Because external volumes reference a storage location, I had to replace the source storage account with the target storage account in the SQL code. This assumes the folder structure in the source and target storage accounts are the same, and there is only one storage location for each.
# function to copy EXTERNAL volumnes
def migrate_vol (source_catalog, target_catalog, schema, volnm):
voldef = spark.sql(f"DESCRIBE VOLUME {source_catalog}.{schema}.{volnm};")
voldefp = voldef.toPandas()
crstr = f"CREATE EXTERNAL VOLUME {target_catalog}.{schema}.{volnm} LOCATION '"
defstr = voldefp['storage_location'][0]
defstrstr = defstr.replace(source_storage, target_storage)
finalstr = crstr + defstrstr + "';";
print (finalstr)
spark.sql(finalstr)
I put my dataframe and my loop together in this next cell. I did my check for existence inside of my for loop. When I tried to do the same thing as with tables, I encountered an error, so the Databricks assistant rewrote it for me this way. It works great.
# get df of external volumes
df_vols = spark.sql(f"Select * from {source_catalog}.information_schema.volumes WHERE volume_type = 'EXTERNAL'")
for index, row in df_vols.toPandas().iterrows():
schema = row[1]
volnm = row[2]
# Check if the volume exists
vol_exists = spark.sql(f"""
SELECT COUNT(*)
FROM {target_catalog}.information_schema.volumes
WHERE volume_schema = '{schema}'
AND volume_name = '{volnm}'
""").collect()[0][0] > 0
if vol_exists:
print(f"Skip volume {target_catalog}.{schema}.{volnm}, because the volume already exists.")
else:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {target_catalog}.{schema};")
migrate_vol(source_catalog, target_catalog, schema, volnm)
To change the owner of the object, you generally run an ALTER command on the object and assign it to a new user or group.
The code below changes the owner of a volume.
# function to change volume owner
def migrate_volowner (schema, volnm):
crstr = f"Alter VOLUME {target_catalog}.{schema}.{volnm} OWNER TO `{assigned_owner}`;"
print (crstr)
spark.sql(crstr)
# get df of volumes that need owner changed
df_vols = spark.sql(f"Select * from {target_catalog}.information_schema.volumes WHERE volume_owner <> '{assigned_owner}'")
# loop through volumes
for index, row in df_vols.toPandas().iterrows():
schema = row[1]
volnm = row[2]
# Check if the volume exists with correct owner
vol_exists = spark.sql(f"""
SELECT COUNT(*)
FROM {target_catalog}.information_schema.volumes
WHERE volume_schema = '{schema}'
AND volume_name = '{volnm}'
AND volume_owner = '{assigned_owner}'
""").collect()[0][0] > 0
if vol_exists:
print(f"Skip volume {target_catalog}.{schema}.{volnm}, because the volume owner already changed.")
else:
migrate_volowner(schema, volnm)
One thing I want to note that isn’t shown in this sample of the code is that even though information_schema.views
looks like it should contain the definition of all views, I have found that is not the case. Instead of pulling the definition from the views, I get it by using the SHOW CREATE TABLE
command. This works consistently.
Get the Notebook
I have made the notebook publicly available for your use on Github. It’s in ipynb format, so you should be able to download it and upload it to your Databricks workspace.
I wouldn’t use this for normal dev ops migration processes, but it was good for populating an empty catalog. I have some clients that keep their schema definition in DBSchema and script out their changes via the schema compare functionality, making it a process very similar to what many people do for SQL Server. But for people who do not have a “database project” of sorts, my script can be helpful to do the initial population.
If nothing else, this increased my knowledge of metadata in Databricks Unity Catalog.
What is your process?
If you are using Unity Catalog, how do you store your schema definitions? And what do you do to promote changes from one catalog to another? Let me know in the comments.