Overview
In this article, I will show you the process of setting up an upsert operation in Cosmos DB using Azure Data Factory. With an upsert operation we can either insert or update an existing record at the same time. To compare to the existing records, upsert statement use key column(s) to determine if this is a new or existing record. We will use Azure SQL database as a source and CosmosDB as a sink.
Solution
Let's create a sample table in SQL Server. It will be used as a source in ADF:
CREATE TABLE TBL_SQL_TO_COSMOS_INCR (SALES_ORDER_ID INT , SALES_AMOUNT INT)
I will insert some values in the table:
INSERT INTO TBL_SQL_TO_COSMOS_INCR VALUES (10,10000), (11,10500), (12,11000), (13,20000), (14,30000)
Let's see the table data:
SELECT * FROM TBL_SQL_TO_COSMOS_INCR
We see 5 records in the table:
I created a database container in Cosmos DB as below; this container will be used for the sink in ADF:
Pipeline Design
We will use a copy data activity in Azure Data Factory:
Let's edit the Source. We are using SQL Database as a source dataset here. The query shown below is selecting data from the table we created above:
Next, we will edit the sink. Here we are using a CosmosDB connection. In the Write behaviors section, we have both insert and upsert options available. Let's select Upsert:
Now, running the pipeline:
Let's check the data from Cosmos DB. As expected we see 5 results in Cosmos DB:
Let's insert a record and update a record to the source table :
INSERT INTO TBL_SQL_TO_COSMOS_INCR VALUES (15,35000) UPDATE TBL_SQL_TO_COSMOS_INCR SET SALES_AMOUNT=19000 WHERE SALES_ORDER_ID=10
Now we check the Azure SQL Database and see the table data changed as below :
Let's run the pipeline again and see the result in Cosmos DB. The pipeline executes again successfully.
We should have 1 row updated and 1 row inserted. So the total number of rows should be the same as in the SQL table, i.e. 6. Now, Let's see the data from Cosmos DB:
As you see from the above image, the total result is 11 rows. Although we selected the upsert operation in copy data activity, it looks like only the insert operation happened.
Let's understand Cosmos DB item structure, it has the below properties -
- id - User-defined unique name in a logical partition.
- _rid - Unique identifier of the item
- _self - URI
- _etag - Entity tag used for optimistic concurrency control
- _ts - timestamp of the last update of the item
We are interested in the id properties which uniquely identify an item. The id generated in the above result is system generated. Now we will map the id column with the Key column (SALES_ORDER_ID).
Let's change the query in the copy data activity Source. We are adding a new column ID from SALES_ORDER_ID:
In the mapping tab of the copy data activity. Here we will make some changes as shown below:
- Click on import schemas to import both sources and sink schemas
- Map source ID column with the sink id column
- Uncheck Include columns for _rid,_self,_etag,_attachments and _ts columns.
Now the copy data activity is ready:
We will delete data from both the SQL table and Cosmos DB. Then inserting some samples as above:
INSERT INTO TBL_SQL_TO_COSMOS_INCR VALUES (10,10000), (11,10500), (12,11000), (13,20000), (14,30000)
Now table data looks like below :
Let's run the pipeline and see the data in Cosmos DB. Now the id property is generated from the SALES_ORDER_ID column:
Now, we will insert 1 record and update 1 record in the source table as we have done before :
INSERT INTO TBL_SQL_TO_COSMOS_INCR VALUES (15,35000) UPDATE TBL_SQL_TO_COSMOS_INCR SET SALES_AMOUNT=19000 WHERE SALES_ORDER_ID=10
And running the pipeline again:
Let's see the result from Cosmos DB. Now we are getting expected values, the total number of results is 6. Also, the updated Sales_Amount value 19000 for Sales_Order_Id 10 is reflected in the resultset :
Conclusion
In this article, we have learned how to upsert data in Cosmos DB using the copy activity in Azure Data Factory.