February 23, 2012 at 8:57 pm
I would like to call stored procs asynchronously, but there seems to be a delay in service broker so that not all of the procedures finish at around the same time. Actually, the delay is significant at several seconds. I have max readers higher than the amount of procedures I am calling. I have tried using the same conversation, multiple conversations, receiving more than top(1) --- although for high concurrency top(1) should be best --- and using waitfor with a timeout. Here's what it looks like:
create table AsyncExecResults (
token uniqueidentifier primary key,
start_time datetime null,
finish_time datetime null)
go
create queue AsyncExecQueue;
go
create service AsyncExecService on queue AsyncExecQueue (DEFAULT);
GO
if object_id('usp_AsyncExecActivated') is not null
drop procedure usp_AsyncExecActivated;
go
create procedure usp_AsyncExecActivated
as
set nocount on;
declare @h uniqueidentifier,
@messageTypeName sysname,
@messageBody varbinary(max),
@xmlBody xml,
@procedureName sysname,
@startTime datetime,
@finishTime datetime,
@token uniqueidentifier;
waitfor(
receive top(1)
@h = conversation_handle,
@messageTypeName = message_type_name,
@messageBody = message_body
from AsyncExecQueue
), TIMEOUT 5000
if (@h is not null)
begin
if (@messageTypeName = N'DEFAULT')
begin
select @xmlBody = CAST(@messageBody as xml);
select @procedureName = @xmlBody.value('(//procedure/name)[1]', 'sysname');
select @startTime = CURRENT_TIMESTAMP;
exec(@procedureName);
select @finishTime = CURRENT_TIMESTAMP;
select @token = conversation_id
from sys.conversation_endpoints
where conversation_handle = @h;
insert AsyncExecResults (start_time, finish_time, token)
values(
@startTime,
@finishTime,
@token);
end
else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
begin
end conversation @h;
end
end
go
alter queue AsyncExecQueue
with activation (
procedure_name = usp_AsyncExecActivated
, max_queue_readers = 25
, execute as owner
, status = on
), status = on, retention = off;
go
if object_id('usp_AsyncExecInvoke') is not null
drop procedure usp_AsyncExecInvoke;
go
create procedure usp_AsyncExecInvoke
@procedureName sysname,
@token uniqueidentifier output
as
declare @h uniqueidentifier,
@xmlBody xml
set nocount on;
begin dialog conversation @h
from service AsyncExecService
to service N'AsyncExecService', 'current database'
with encryption = off;
select @xmlBody = (
select @procedureName as name
for xml path('procedure'), type);
send on conversation @h (@xmlBody);
go
if object_id('usp_MyLongRunningProcedure') is not null
drop procedure usp_MyLongRunningProcedure;
go
create procedure usp_MyLongRunningProcedure
as
begin
waitfor delay '00:00:10';
end
go
truncate table AsyncExecResults;
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
select * from AsyncExecResults
order by finish_time;
Code borrowed heavily from here[/url] for reference.
I have really taken it down to just the bare-bones on purpose here so that I can test performance. What I would like is for there to be as little delay between the first start-time and last finish-time when results are sorted in ascending order. I have done some research and it seems the issue is that service broker is not constantly scanning the queue and then activating whatever is on it. It waits for a bit and then scans again. However, after I run all of the waitfor delay procedures, they should all enter the queue. So by the time SB checks a second time, they should surely all be there for SB to dispatch activation procedures.
Thanks for any help!
September 18, 2012 at 8:03 am
Your async handler routine should not exit after each message: Service broker expects its handlers to continue processing messages for as long as there are more messages available. As you discovered, it only rechecks after 5 seconds to see if the queue has emptied. Only if it hasn't then it starts another handler, for as far as the max_queue_readers setting permits it.
So the problem you need to fix is first of all in your handler procedure. Try this for example (not tested):
if object_id('usp_AsyncExecActivated') is not null
drop procedure usp_AsyncExecActivated;
go
create procedure usp_AsyncExecActivated
as
set nocount on;
declare @h uniqueidentifier,
@messageTypeName sysname,
@messageBody varbinary(max),
@xmlBody xml,
@procedureName sysname,
@startTime datetime,
@finishTime datetime,
@token uniqueidentifier;
while 1 = 1
begin
begin tran;
waitfor(
receive top(1)
@h = conversation_handle,
@messageTypeName = message_type_name,
@messageBody = message_body
from AsyncExecQueue
), TIMEOUT 5000;
if not @@rowcount > 0
begin
commit tran;
break; -- exit while 1 = 1
end
if (@messageTypeName = N'DEFAULT')
begin
select @xmlBody = CAST(@messageBody as xml);
select @procedureName = @xmlBody.value('(//procedure/name)[1]', 'sysname');
select @startTime = CURRENT_TIMESTAMP;
exec(@procedureName);
select @finishTime = CURRENT_TIMESTAMP;
select @token = conversation_id
from sys.conversation_endpoints
where conversation_handle = @h;
insert AsyncExecResults (start_time, finish_time, token)
values(
@startTime,
@finishTime,
@token);
end
else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
begin
end conversation @h;
end
commit tran;
end
go
This will process your messages sequentially, until for more than 5 seconds the queue could not be emptied by a single instance. If the queue is not empty 5 seconds after the 1st instance got started a 2nd instance will be started, and 2 messages will be processed simultaneously. Then after 10 seconds and the queue is still not empty, a 3rd instance is started and 3 messages are processed at the same time (provided your server has at least 3 cpu's, otherwise you'll just be context switching) and so on, until either the maximum number of max_queue_readers is reached or the queue becomes empty for at least 5 seconds (= the timeout value on your waitfor( receive) statement).
In practice 25 queue readers is probably way to high.
September 19, 2012 at 1:47 am
For those trying to duplicate this experiment, for example to learn some Service broker basics from it, here is the same experimental code with documented improvements in it:
create database t2
go
use master
go
alter database t2 set enable_broker
go
use t2
go
create table AsyncExecResults (
token uniqueidentifier primary key,
start_time datetime null,
finish_time datetime null)
go
create queue AsyncExecQueue;
go
create service AsyncExecService
authorization dbo
on queue AsyncExecQueue
( [DEFAULT] -- default is NOT a keyword here. i.e. it MUST be
-- enclosed by square brackets.
);
GO
if object_id('usp_AsyncExecActivated') is not null
drop procedure dbo.usp_AsyncExecActivated;
go
if object_id('usp_AsyncExecActivated') is not null
drop procedure dbo.usp_AsyncExecActivated;
go
alter procedure dbo.usp_AsyncExecActivated
as
set nocount on;
declare @h uniqueidentifier,
@messageTypeName sysname,
@messageBody varbinary(max),
@xmlBody xml,
@procedureName sysname,
@startTime datetime,
@finishTime datetime,
@token uniqueidentifier;
while 1 = 1
begin
begin tran;
waitfor(
receive top(1)
@h = conversation_handle,
@messageTypeName = message_type_name,
@messageBody = message_body
from AsyncExecQueue
), TIMEOUT 5000;
if not @@rowcount > 0
begin
commit tran;
break; -- exit while 1 = 1
end
if (@messageTypeName = N'DEFAULT')
begin
select @xmlBody = CAST(@messageBody as xml);
select @procedureName = @xmlBody.value('(//procedure/name)[1]', 'sysname');
select @startTime = CURRENT_TIMESTAMP;
exec(@procedureName);
select @finishTime = CURRENT_TIMESTAMP;
select @token = conversation_id
from sys.conversation_endpoints
where conversation_handle = @h;
insert AsyncExecResults (start_time, finish_time, token)
values(
@startTime,
@finishTime,
@token);
-- The chosen protocol says the conversation ends after the 1st
-- message is received. So let SSSB know we're done by ending the
-- conversation after each received message. Not ending the conversation
-- will leave all conversations open. When you get into the millions
-- of open conversations (see sys.conversation_endpoints), your server
-- will go slower and slower.
end conversation @h;
end
else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
begin
end conversation @h;
end
-- Don't forget to handle error messages too.
-- These also indicate the end of a conversation, just like EndDialog.
else if (@messageTypeName = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
begin
-- Read the xml document that we were sent in the message body.
declare @nCode int;
declare @vchDescription nvarchar(2000);
declare @xmlError xml; -- (document
select @xmlError = convert(xml, @messageBody);
with xmlnamespaces( default 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
select
@nCode = Errors.error.value( 'Code[1]', 'int'),
@vchDescription = Errors.error.value( 'Description[1]', 'nvarchar(max)')
from @xmlError.nodes( '/Error[1]') as Errors(error);
-- Any text output by a SSSB queue reader will by default end up in the sql log,
-- so I just write out the output with nowait to get errors shown in the sql log.
-- Using "with log" would put the error in the sql log twice: once because of
-- the with log, plus once more because all output from an auto activated procedure
-- is written into the log. So don't specify with log here.
raiserror('Service broker error received. Error %d:%s. (%s)', 0, 0, @nCode, @vchDescription, '%PROC%') with nowait;
-- An Error message means the conversation has ended, just like with a EndDialog message.
-- So we tell SSSB we got the message and have it clean up our end of the conversation too.
end conversation @h;
end
commit tran;
end
go
alter queue dbo.AsyncExecQueue
with activation (
procedure_name = usp_AsyncExecActivated
, max_queue_readers = 25
, execute as owner
, status = on
), status = on, retention = off;
go
if object_id('usp_AsyncExecInvoke') is not null
drop procedure dbo.usp_AsyncExecInvoke;
go
alter procedure dbo.usp_AsyncExecInvoke
@procedureName sysname,
@token uniqueidentifier output
as
declare @h uniqueidentifier,
@xmlBody xml
set nocount on;
begin dialog conversation @h
from service AsyncExecService
to service N'AsyncExecService'--, 'current database'
with encryption = off;
select @xmlBody = (
select @procedureName as name
for xml path('procedure'), type);
send on conversation @h (@xmlBody);
go
if object_id('usp_MyLongRunningProcedure') is not null
drop procedure dbo.usp_MyLongRunningProcedure;
go
create procedure dbo.usp_MyLongRunningProcedure
as
begin
waitfor delay '00:00:10';
end
go
truncate table AsyncExecResults;
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
declare @token uniqueidentifier;
exec usp_AsyncExecInvoke N'usp_MyLongRunningProcedure', @token output;
go
select * from AsyncExecResults
order by start_time, finish_time;
Viewing 3 posts - 1 through 2 (of 2 total)
You must be logged in to reply to this topic. Login to reply