February 25, 2016 at 11:37 pm
I have designed a multithreaded ETL system that utilizes a few tables on SQL Server (2012) for controlling the queue. I have noticed a strange occurrence lately, and have been spinning my wheels trying to get to the bottom of it. First, the structure (scaled down to the relevant pieces):
Queue table:
CREATE TABLE ctrl.QueuedTasks
(QueuedTaskSK int IDENTITY(1,1) PRIMARY KEY
,TableName varchar(255)
,TableSchema varchar(25)
,PhaseName varchar(50)
,QueuedDttm datetime
,ExecuteStartDttm datetime
,ExecuteEndDttm datetime
,ExecutingFlag char(1)
,ExecutedFlag char(1)
)
Now, there are three different types of processes that write to this table. One process checks the last phase that executed for each table (ExecutedFlag = 'Y') against a control table, and if there is another phase that comes after the one that just executed, it inserts a record into the above table.
The second process is the "thread manager", which pulls the top n rows where ExecutedFlag = 'N' for each phase, updates them to ExecutingFlag = 'Y' and ExecuteStartDttm = GetDate(), and fires off their respective processes in parallel.
The last process comes from the individual "threads" themselves. After the thread finishes its execution, the record is updated to ExecutingFlag = 'N', ExecutedFlag = 'Y', ExecuteEndDttm = GetDate().
What I have noticed is I'll occasionally see 2 of the same phases for a particular table executed in a row. For example, I see:
TableName PhaseName QueuedDttm ExecuteStartDttm ExecuteEndDttm
Visit Phase1 2016-02-24 00:05:25.322 2016-02-24 00:21:01.324 2016-02-24 00:21:04.253
Visit Phase2 2016-02-24 00:21:07.234 2016-02-24 00:37:21.093 2016-02-24 00:42:04.611
Visit Phase2 2016-02-24 00:42:04.421 2016-02-24 00:55:35.515 2016-02-24 01:01:01.166
Visit Phase3 2016-02-24 01:03:09.033 2016-02-24 01:07:00.325 2016-02-24 01:08:01.111
Notice how there are two Phase2 entries for this table. Also, notice how the ExecuteEndDttm value for the first Phase2 entry is very close to the second Phase2 entry's QueuedDttm (less than 200ms difference). The process that inserts new records into this table checks for the last queued phase for each table, and above all else, will never insert a second instance of any phase back to back. The only way this could be happening is if, at the very moment it's checking the last queued phase, it doesn't "see" the first instance. When I initially wrote this insert into - select from, I was having many deadlock issues because at any given time, there could be 100+ threads for various tables and phases, and the process that inserts new queue records runs about every second. For this reason, I researched and implemented what I thought was supposed to eliminate this -- the WITH (UPDLOCK) hint.
As I understood it, if a thread attempted to update a row after the insert/select started, but before it escalated the lock, that's where you would encountered a deadlock, and this hint was supposed to immediately lock the necessary record(s) until the insert completed. However, obviously I'm not understanding this entirely, or there is something else at play here.
I'm starting to think that the single table design is a poor choice for how busy it is...especially since it sees updates. I considered having a separate table that the threads would write events to (e.g. "Table: Visit Phase1 started", "Table: Visit Phase2 completed", etc.) that the process that currently inserts new queue records would read from and update the queue records before running the inserts. This way, only two processes would be updating the queue, and they would never be concurrent since the "Update Queue" and "Start Threads" processes run sequentially during each iteration...and the individual threads would just be doing simple inserts into a separate table. I didn't want to go too far down that rabbit hole until I figured out if I was on the right track or if this too would encounter the same issues.
If you've read this far, thank you.
February 26, 2016 at 5:18 am
Hi Bantrim,
Thanks for posting the simplified table structure - details like that can be invaluable in understanding a problem.
In this case, we may need more details - more on that below.
First - the only way of not seeing an existing and committed row is when you use the READPAST hint. I assume that you would be aware of that if you that.
Another possible way of not seeing a row can only happen with uncommitted modifications, if you use one of the snapshot isolation levels. What is returned if you execute the query below in your database:
SELECT snapshot_isolation_state,
snapshot_isolation_state_desc,
is_read_committed_snapshot_on
FROM sys.databases
WHERE database_id = DB_ID();
If those theories do not explain the issue, then you probably should post more details - for each of the three processes, what exactly are the T-SQL statement you are running, from the start of the transaction scope until the commit or rollback. Also, is this whole process executed from T-SQL (stored procedures), or from a client that fires queries?
When I initially wrote this insert into - select from, I was having many deadlock issues because at any given time, there could be 100+ threads for various tables and phases, and the process that inserts new queue records runs about every second. For this reason, I researched and implemented what I thought was supposed to eliminate this -- the WITH (UPDLOCK) hint.
As I understood it, if a thread attempted to update a row after the insert/select started, but before it escalated the lock, that's where you would encountered a deadlock, and this hint was supposed to immediately lock the necessary record(s) until the insert completed. However, obviously I'm not understanding this entirely, or there is something else at play here.
To understand UPDLOCK, you have to know how locking works. If you insert, delete, or update a row, SQL Server will take an "X" (for "eXclusive") lock that blocks concurrent access to the resource until the transaction finishes. From now on, nobody can even read let alone modify the row. However, when you merely read a row, SQL Server takes only an "S" (for "Shared") lock, that blocks "X" locks (so nobody can change the data you are reading), but does not block other "S" locks (so other people can read at the same time).
The UPDLOCK is in-between those two. You use the hint on the SELECT query that reads the data whe you expect that you might need to change the data later. SQL Server will now take a "U" lock, which does not allow concurrent other "U" locks or "X" locks, but does allow concurrent "S" locks - so once you have the "U" lock, people who want to read only are not blocked, but people who want to read and perhaps update later will have to wait until you are done. This can eliminate a lot of deadlocks, but only when properly and consistently implemented.
It will not, however, cause anyone to skip committed data.
I'm starting to think that the single table design is a poor choice for how busy it is...especially since it sees updates.
Based on your description it is not yet entirely clear what you are trying to achieve, but based on what you did post I am thinking that perhaps SQL Server Service Broker would be a good tool for you.
But if you can give more details (in terms of functionality, not implementation) on what exactly you need to accomplish, others might have better ideas.
February 27, 2016 at 4:04 pm
Hugo,
Thank you for your detailed reply. To start, the results of the query you had me run are "0, OFF, 0". I'm assuming this doesn't point to an issue in this mode, but I could be wrong. I'm an "accidental DBA" at best, so I question the settings of my environment.
Let me post some actual code that runs throughout the process.
The first query is the "Queue Update", which is what adds new records to the queue based on satisfied pre-requisites.
INSERT INTO [BI-DEV].ctrl.ETL_QueuedTasks_Fact
(
SrcServerName, TableName, TableSchema, QueueName, QueuedDttm, ExecutingFlag, ExecutedFlag, ExecuteStartDttm, ExecuteEndDttm
)
SELECT tbl.DataSourceName
,tbl.TableName
,tbl.TableSchema
,ph2.EtlPhaseName QueueName
,GetDate() QueuedDttm
,'N' ExecutingFlag
,'N' ExecutedFlag
,NULL ExecuteStartDttm
,NULL ExecuteEndDttm
FROM [BI-DEV].[ctrl].[ETL_Tables_Dim] tbl
INNER JOIN
(
SELECT
TableName
,TableSchema
,EtlPhaseName
,ROW_NUMBER() OVER(PARTITION BY TableName, TableSchema ORDER BY que.QueueOrder) rn
,ROW_NUMBER() OVER(PARTITION BY TableName, TableSchema ORDER BY que.QueueOrder DESC) rn_desc
FROM [BI-DEV].ctrl.ETL_QueueControl_Dim ctrl
INNER JOIN [BI-DEV].ctrl.ETL_Queue_Dim que ON ctrl.EtlPhaseName = que.QueueName
WHERE ctrl.ActiveFlag = 'Y'
) ph1 ON ph1.TableName = tbl.TableName AND ph1.TableSchema = tbl.TableSchema
INNER JOIN
(
SELECT
TableName
,TableSchema
,EtlPhaseName
,ROW_NUMBER() OVER(PARTITION BY TableName, TableSchema ORDER BY que.QueueOrder) rn
,ROW_NUMBER() OVER(PARTITION BY TableName, TableSchema ORDER BY que.QueueOrder DESC) rn_desc
FROM [BI-DEV].ctrl.ETL_QueueControl_Dim ctrl
INNER JOIN [BI-DEV].ctrl.ETL_Queue_Dim que ON ctrl.EtlPhaseName = que.QueueName
WHERE ctrl.ActiveFlag = 'Y'
) ph2 ON ph2.TableName = tbl.TableName AND ph2.TableSchema = tbl.TableSchema AND ph1.rn = ph2.rn - 1
INNER JOIN
(
SELECT *
FROM
(
SELECT TableName
,TableSchema
,QueueName
,QueuedDttm
,ROW_NUMBER() OVER(PARTITION BY TableName, TableSchema ORDER BY QueuedDttm DESC) rn
FROM [BI-DEV].ctrl.ETL_QueuedTasks_Fact WITH (UPDLOCK)
WHERE ExecutedFlag = 'Y'
) x
WHERE x.rn = 1
) lst ON tbl.TableName = lst.TableName AND tbl.TableSchema = lst.TableSchema AND ph1.EtlPhaseName = lst.QueueName
LEFT JOIN [BI-DEV].ctrl.ETL_QueuedTasks_Fact exc WITH (UPDLOCK) ON exc.TableName = tbl.TableName AND exc.TableSchema = tbl.TableSchema AND
exc.QueueName = ph2.EtlPhaseName AND exc.ExecutedFlag = 'N' AND exc.QueuedDttm > lst.QueuedDttm
LEFT JOIN
(
SELECT
t.TableSK
,tbd.TableName
,tbd.TableSchema
,DependentTableName
,DependentTableSchema
,tbd.EtlPhase
FROM [BI-DEV].[ctrl].[ETL_TableDependency_Fact] tbd
INNER JOIN [BI-DEV].ctrl.ETL_Tables_Dim t ON tbd.TableName = t.TableName AND tbd.TableSchema = t.TableSchema
INNER JOIN [BI-DEV].ctrl.ETL_Tables_Dim t2 ON tbd.DependentTableName = t2.TableName AND tbd.DependentTableSchema = t2.TableSchema
LEFT JOIN [BI-DEV].ctrl.ETL_QueuedTasks_Fact qd WITH (UPDLOCK) ON tbd.DependentTableName = qd.TableName AND tbd.DependentTableSchema = qd.TableSchema AND qd.QueuedDttm > CAST(@CurrentDttm as date) AND qd.QueueName = tbd.DependentEtlPhase AND qd.ExecutedFlag = 'Y'
WHERE
t.EtlEnabledFlag = 'Y'
AND t2.EtlEnabledFlag = 'Y'
AND qd.QueuedTaskSK IS NULL
) dep ON tbl.TableName = dep.TableName AND tbl.TableSchema = dep.TableSchema AND dep.EtlPhase = ph2.EtlPhaseName
WHERE tbl.EtlEnabledFlag = 'Y'
AND dep.TableSK IS NULL
AND exc.QueuedTaskSK IS NULL
To describe the various parts of this, the ph1 and ph2 subqueries are designed to be able to grab each table's last executed task, and get the next sequential task for that table. As you can probably tell, by themselves, it returns each table's whole task list and the respective "next task", which later gets filtered down to just the last executed, and "next in line". For example:
TableName EtlPhaseName1 EtlPhaseName2
Visit Phase1 Phase2
Visit Phase2 Phase3
Visit Phase3 Phase4
Visit Phase4 NULL
Would be what is returned for the "Visit" table. Then, whenever it gets joined to the lst subquery (last executed) on the table name, schema, and EtlPhaseName/QueueName, it would be reduced to the last executed, and next in line. For example, if Phase 2 was the last executed, then it would return:
TableName EtlPhaseName1 EtlPhaseName2
Visit Phase2 Phase3
So it's easy to determine what the next phase is for the table. The dep subquery is to check for dependencies. This is how I keep the loads organized when one phase may be dependent on another table/phase to complete. Basically, it is designed to look for any unexecuted phases for any tables logged as dependencies, and if any are returned, that table falls off of the result set until all dependencies are satisfied. Lastly, the exc aliased join is basically to ensure that what is happening doesn't happen (duplicate entries). It just checks to see if there is already a non-executed entry for the table's next in line phase. If there is, it falls off of the result set.
That's the big one. The rest aren't as daunting, I promise.
This query is run at the start of each thread to "check out" a table/task fed from the "thread manager".
UPDATE [ctrl].[ETL_QueuedTasks_Fact]
SET ExecutingFlag = 'Y', ExecuteStartDttm = GetDate()
WHERE QueuedTaskSK = @taskSK
and this query is run at the end of the thread to check the table/task back into the queue as completed:
UPDATE [ctrl].[ETL_QueuedTasks_Fact]
SET ExecutingFlag = 'N', ExecuteEndDttm = GetDate(), ExecutedFlag = 'Y', ErrorFlag = 'N'
WHERE QueuedTaskSK = @taskSK
Lastly, this query is what determines how many threads are open for a particular phase of the ETL cycle:
SELECT MIN(RowCnt) OpenThreads
FROM
(
SELECT ISNULL(x.QueuedCount, 0) RowCnt
,qd.QueueName
FROM [BI-DEV].ctrl.ETL_Queue_Dim qd
LEFT JOIN
(
SELECT QueueName
,COUNT(*) QueuedCount
FROM [BI-DEV].[ctrl].[ETL_QueuedTasks_Fact]
WHERE ExecutedFlag = 'N'
GROUP BY QueueName
) x ON x.QueueName = qd.QueueName
WHERE qd.QueueName = 'Load into Greenplum'
UNION ALL
SELECT v.VariableValue - ISNULL(ExecutingCount, 0) RowCnt
,qd.QueueName
FROM [BI-DEV].ctrl.ETL_Queue_Dim qd
INNER JOIN [BI-DEV].ctrl.ETL_EnvironmentalVariables_Dim v ON qd.QueueName = v.Context
LEFT JOIN
(
SELECT QueueName
,COUNT(*) ExecutingCount
FROM [BI-DEV].[ctrl].[ETL_QueuedTasks_Fact]
WHERE ExecutingFlag = 'Y'
GROUP BY QueueName
) x ON x.QueueName = qd.QueueName
WHERE qd.QueueName = 'Load into Greenplum'
) y
GROUP BY QueueName
It basically asks "what is currently running" and "what is queued to run" and "what is the max amount of threads I can open", and figures out how many tasks the "thread manager" can start. The "thread manager" just takes the number returned from this and does a
SELECT TOP n QueuedTaskSK FROM ctrl.ETL_QueuedTasks_Fact WHERE QueueName = '<name>' AND ExecutedFlag = 'N' ORDER BY QueuedDttm
and uses those QueuedTaskSK values to feed into the respective processes that begin and end with the updates above.
To answer your question about the process type...it actually executes quite a variety of processes. It can execute an SSIS package, FTP a file to a unix server, execute a shell script on the unix server, run a psql script on our Greenplum, run MSSQL stored procedures, etc. Those tasks are all encapsulated in the "thread" executables that I described earlier. Each of them are pretty basic, and follow the logic of:
So to circle back to the problem at hand, it seems that the very first query is inserted a record in the queue for the same table/task that was checked back into the queue almost immediately after. As in, Phase 2 for the Visit table was inserted with a dttm stamp of 2016-02-25 01:45:21.950 and the prior entry of Phase 2 for the Visit table was checked back in (ExecuteEndDttm) at 2016-02-25 01:45:22.110.
I'm going to feel really foolish if this proves to be something minor that I'm just glazing over, but I'm really pulling my hair out over this. Thank you for taking a look at this for me!
February 28, 2016 at 6:14 am
Hi Bantrim,
You post a lot of information, which is always good. However, the complexity of the problem may be beyond what can be solved -or rather, what I am prepared to do- in a free support forum. I would normally request that you simplify and trim down until you get a much simpler repro of your problem. But I have seen enough that I suspect that your setup is complex, and the very fact that you have no clue what causes your problem makes it hard to simplify it - especially because it is intermittent. I will give you a few pointers and perhaps others can chime in as well, but this may be the type of problem where you have no other choice but to pay a high-end consultant or Microsoft support.
bantrim (2/27/2016)
To start, the results of the query you had me run are "0, OFF, 0". I'm assuming this doesn't point to an issue in this mode, but I could be wrong. I'm an "accidental DBA" at best, so I question the settings of my environment.
You are right that this does not point to an issue. In fact, it rules out one possible explanation. If these settings had been different, you might have been using one of the snapshot isolation modes. Normally when SQL Server tries to access a row that is being modified by another process, it will block and wait until the modification is either committed or rolled back, so that you do not ever read data that has never been "officially" in the database. Using snapshot isolation changes this; in those modes the process that tries to read the data being modified will instead go back in time and read the value that was current before the modification started (the "last committed" data). I considered this as a possible explanation for your claim that a query does not see rows that you know for sure have been added - I figured that perhaps the insert had been executed but not committed yet, and hence would be skipped in snapshot isolation. Anyway, you do not use that, so this is not the explanation.
On to more viable theories.
One thing I notice is that the QueuedTasks table has no real key. It has a primary key on an identity column, which does nothing to prevent duplicates. If you add a UNIQUE constraint on the set of columns that should be unique in this table, you will not prevent whatever process inserts the duplicates from attempting to mess up, but it will fail with an error instead of succeeding. If you find the errors, you might be able to pinpoint the issue. And at least you will prevent the same task from executing twice.
What you do not describe is how all the various elements are tied together. So one obvious explanation for the duplicates is that first the "Queue Update" query runs and adds some processes to execute. In the data you included in the first post, that would happen at 0:21:07.234 and add the first "Phase2" row. Execution of that starts at 0:37:21.093 and sets the ExecuteStartDttm timestamp and the ExecutingFlag column. That execution ends at 0:42:04.611, at which time the ExecuteEndDttm column is filled, the ExecutingFlag is reset and the ExecutedFlag is set. However, just before that at 0:42:04.421, the "Queue Update" process runs again. As you describe in your explanation of the query, the lst subquery is used to find the last row with ExecutedFlag set. For Phase2, that flag is not yet set at that time, so the last executed flag at that particular time is still Phase1, and Phase2 is inserted again. Perhaps you should add a test on the ExecutingFlag in this query?
(Note that I did not attempt to parse and understand the full query, so it is quite possible that I am overlooking something - however, Ctrl-F tells me that the "Queue Update" query does not reference ExecutingFlag at all).
The above also tells me that the process that starts with checking out a task and ends with checking it back in (the two small queries) is not enclosed in a single transaction. If that had been the case, then the "Queue Update" query would have blocked between 0:42:04.611 and 0:42:04.421, and no new rows would have been inserted. I don't know enough about your system to assess whether this should or should not be in a transaction, merely pointing out a fact.
Some additional comments follow that probably are not related to the issue you are asking help for, but that might still be important.
I also notice that your use of UPDLOCK is not consistent and not correct. The "Queue Update" query uses it, but not on rows it intends to update later (unless there is more to the same transaction that I do not see). So here you are needlessly getting more locks than you need, which can cause blocking and deadlocks. Also, the last two queries ("number of open threads" and "what is currently running/what is queued to run") do not use UPDLOCK, which is probably correct, but should change if in that same transaction you might need to later update the rows you read. You do not show the query that actually selects which task to run next (the query that fills @taskSK before executing the "check out" code). This is the query where you really need UPDLOCK, and then you should also ensure that this query and the checkout logic are combined in a single transaction.
Another big issue is the lack of indexes on the QueuedTasks table. The queries to check in and out use the primary key, which is great because the primary key is always automatically indexed. However, the "Queue Update" monster query filters by different columns and none of them are indexed. This forces SQL Server to scan all rows in the table to find the information it needs, which slows down your process. It also causes SQL Server to request locks on all those rows, which increases the likelihood of blocking and deadlocks. I assume that the root cause of the deadlocks you mention in the first post is actually the lack of indexes, and that better indexing would have fixed this far more efficiently then using an UPDLOCK hint.
And my final question, which I now realise I probably should have started with, is why you are building your own scheduling system. There are lots of commercial offerings available. Are your needs really so specific that none of them can be tailored for your needs? Building something yourself may leave some of the company's cash in the bank, but once you compute the cost of the time staff spends on building a custom solution, getting a third-party product is almost always the cheaper solution.
February 28, 2016 at 6:56 am
Hugo,
Again, thank you for the detailed reply, and taking the time to respond. I apologize that I did not include all of the necessary details -- the QueuedTasks table does have 5 nonclustered indexes on it that are tailored to these three processes, however it does not have any keys outside of the primary key you mentioned. This will be the first step I take in isolating the cause of these issues.
I was able to have a successful run last night by changing the "queue update" process to, rather than joining directly to the QueuedTasks table, selecting the entirety of queuedtasks into a temp table (with no hints), indexing the temp table, and using the temp table throughout the "queue update" process in place of the QueuedTasks table. It probably sounds a bit inefficient, and it probably is, but it runs in about 200ms, so it's a viable solution while I'm investigating the cause of the issue at hand. I definitely don't like the "temp table cure-all" approach, so this will eat away at me until I get it fully resolved, but I wanted to test the theory that the locking hints were misbehaving....which apparently they aren't and I'm just not implementing them correctly.
As far as the reason for a custom tailored solution...we haven't found a decent scheduler that we could customize to our needs. That is, one that handles multithreading to a degree that we need, including the ability to manage table/phase-dependencies. Perhaps you could suggest a product that may assist? We have purchased a whole multi-million dollar suite of products for our BI project, which includes an integration service manager for scheduling processes. Unfortunately, it isn't as robust as we need...and believe me, I was very excited when I heard we had an out-of-the-box solution that might be able to replace my custom one.
Basically, the key things that we need this scheduler to do are: multithreading, table/phase dependencies, cross-platform execution of processes (probably all of them do this), dynamic load frequencies (being able to set a table to "daily", "hourly", "5 minutes", etc. and having that table's process run with that frequency), and the ability to set "windows" of start/stop times throughout the day that the ETL can pull from particular systems. (i.e. Only pull from System A between 1AM and 7AM or 8PM and 11PM, System B between 3AM and 11AM, etc.). The currently data volume I'm working with is about 7.9TB across ~12 billion rows in around 7k tables pulled from 13 different source systems. This is for a medical system, btw. 🙂 If it matters, the average daily deltas against the DW is around 500mil per day.
Again, thank you Hugo. I really appreciate your feedback and suggestions. It really helps, even to just converse with someone else in the field...and your ideas are definitely going to help zero in on the issue.
February 28, 2016 at 7:05 am
Hi Bantrim,
Unfortunately I cannot assist you in finding off-the-shelf packages that fit your needs. This is not an area I ever had to investigate. But I am happy to hear that you did and do consider this as an option, and that you are not working for one of the many companies that suffer from the NIH syndrome. ("NIH" = not invented here).
Thinking about the exchange so far and looking at the timestamps you posted, I am at this time 99% convinced that the issue is caused by the scenario I outlined in the previous post, where a new task is added while a task is executing, not executed. If I were you, I would start there, try to verify that this is indeed the case, and then either fix the root cause of the "Queue Update" process running early, or change it to look at the ExecutingFlag as well.
Oh, and definitely add those constraints. And after that, check if any of the UNIQUE constraints you added made existing indexes redundant (SQL Server automatically creates indexe for unique constraints, just as it does for the primary key).
Viewing 6 posts - 1 through 5 (of 5 total)
You must be logged in to reply to this topic. Login to reply