Skip to content

Commit 2822fdd

Browse files
committed
Queue updates
1 parent 0968b1e commit 2822fdd

File tree

6 files changed

+226
-218
lines changed

6 files changed

+226
-218
lines changed

020_DIRECT_Framework/Direct_Framework/Direct_Framework.sqlproj

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
<ItemGroup>
6363
<Folder Include="Functions" />
6464
<Folder Include="Properties" />
65-
<Folder Include="Reference_Databases" />
6665
<Folder Include="Schemas" />
6766
<Folder Include="Scripts" />
6867
<Folder Include="Stored Procedures" />
@@ -132,12 +131,12 @@
132131
<Build Include="Views\omd_reporting.vw_MODULE_FAILURES.sql" />
133132
<None Include="Functions\omd_processing.NextModuleInQueue.sql" />
134133
<None Include="Scripts\InsertReferenceData.sql" />
135-
<None Include="Scripts\Queue_Job_Batch.sql"/>
136-
<None Include="Scripts\Queue_Job_Module.sql"/>
134+
<None Include="Scripts\Queue_Job_Batch.sql" />
135+
<None Include="Scripts\Queue_Job_Module.sql" />
137136
<None Include="Stored Procedures\omd.EndDating.sql" />
138137
<None Include="Views\omd_processing.vw_CURRENT_PROCESSING.sql" />
139-
<None Include="Views\omd_processing.vw_QUEUE_BATCH_PROCESSING.sql"/>
140-
<None Include="Views\omd_processing.vw_QUEUE_MODULE_PROCESSING.sql"/>
138+
<None Include="Views\omd_processing.vw_QUEUE_BATCH_PROCESSING.sql" />
139+
<None Include="Views\omd_processing.vw_QUEUE_MODULE_PROCESSING.sql" />
141140
<None Include="Views\omd_reporting.vw_QUEUE_PROGRESS.sql" />
142141
</ItemGroup>
143142
<ItemGroup>
@@ -154,11 +153,6 @@
154153
<SuppressMissingDependenciesErrors>False</SuppressMissingDependenciesErrors>
155154
<DatabaseSqlCmdVariable>msdb</DatabaseSqlCmdVariable>
156155
</ArtifactReference>
157-
<ArtifactReference Include="Reference_Databases\SSISDB.dacpac">
158-
<HintPath>Reference_Databases\SSISDB.dacpac</HintPath>
159-
<SuppressMissingDependenciesErrors>False</SuppressMissingDependenciesErrors>
160-
<DatabaseSqlCmdVariable>SSISDB</DatabaseSqlCmdVariable>
161-
</ArtifactReference>
162156
</ItemGroup>
163157
<ItemGroup>
164158
<SqlCmdVariable Include="master">
@@ -174,4 +168,4 @@
174168
<Value>$(SqlCmdVar__3)</Value>
175169
</SqlCmdVariable>
176170
</ItemGroup>
177-
</Project>
171+
</Project>
Binary file not shown.

020_DIRECT_Framework/Direct_Framework/Scripts/Queue_Job_Batch.sql

Lines changed: 97 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@ GO
44
IF EXISTS (SELECT name FROM msdb.dbo.sysjobs WHERE name='Queue_Batch')
55
EXEC msdb.dbo.sp_delete_job @job_name='Queue_Batch', @delete_unused_schedule=1
66

7-
/****** Object: Job [Queue_Batch] Script Date: 4/3/2020 8:06:05 PM ******/
87
BEGIN TRANSACTION
98
DECLARE @ReturnCode INT
109
SELECT @ReturnCode = 0
11-
/****** Object: JobCategory [[Uncategorized (Local)]] Script Date: 4/3/2020 8:06:05 PM ******/
10+
1211
IF NOT EXISTS (SELECT name FROM msdb.dbo.syscategories WHERE name=N'[Uncategorized (Local)]' AND category_class=1)
1312
BEGIN
1413
EXEC @ReturnCode = msdb.dbo.sp_add_category @class=N'JOB', @type=N'LOCAL', @name=N'[Uncategorized (Local)]'
@@ -18,102 +17,112 @@ END
1817

1918
DECLARE @jobId BINARY(16)
2019
EXEC @ReturnCode = msdb.dbo.sp_add_job @job_name=N'Queue_Batch',
21-
@enabled=1,
22-
@notify_level_eventlog=0,
23-
@notify_level_email=0,
24-
@notify_level_netsend=0,
25-
@notify_level_page=0,
26-
@delete_level=0,
27-
@description=N'No description available.',
28-
@category_name=N'[Uncategorized (Local)]',
29-
@owner_login_name=N'sa', @job_id = @jobId OUTPUT
20+
@enabled=1,
21+
@notify_level_eventlog=0,
22+
@notify_level_email=0,
23+
@notify_level_netsend=0,
24+
@notify_level_page=0,
25+
@delete_level=0,
26+
@description=N'No description available.',
27+
@category_name=N'[Uncategorized (Local)]',
28+
@owner_login_name=N'sa', @job_id = @jobId OUTPUT
3029
IF (@@ERROR <> 0 OR @ReturnCode <> 0) GOTO QuitWithRollback
31-
/****** Object: Step [Continuous_Integration] Script Date: 4/3/2020 8:06:06 PM ******/
30+
3231
EXEC @ReturnCode = msdb.dbo.sp_add_jobstep @job_id=@jobId, @step_name=N'Continuous_Integration',
33-
@step_id=1,
34-
@cmdexec_success_code=0,
35-
@on_success_action=1,
36-
@on_success_step_id=0,
37-
@on_fail_action=2,
38-
@on_fail_step_id=0,
39-
@retry_attempts=0,
40-
@retry_interval=0,
41-
@os_run_priority=0, @subsystem=N'TSQL',
42-
@command=N'CREATE PROCEDURE #runningJobs @NUM_JOBS int OUTPUT AS
32+
@step_id=1,
33+
@cmdexec_success_code=0,
34+
@on_success_action=1,
35+
@on_success_step_id=0,
36+
@on_fail_action=2,
37+
@on_fail_step_id=0,
38+
@retry_attempts=0,
39+
@retry_interval=0,
40+
@os_run_priority=0, @subsystem=N'TSQL',
41+
@command=N'/*
42+
Create a temporary in-memory stored procedure that can be used to return the number of currently running processes as per the control framework.
43+
*/
44+
CREATE OR ALTER PROCEDURE #runningJobs
45+
@NUMBER_OF_RUNNING_PROCESSES INT OUTPUT
46+
AS
4347
(
44-
SELECT @NUM_JOBS = (SELECT COUNT(*) FROM [900_Direct_Framework].omd.MODULE_INSTANCE WHERE EXECUTION_STATUS_CODE=''E'')
48+
SELECT @NUMBER_OF_RUNNING_PROCESSES = (SELECT COUNT(*) FROM [900_Direct_Framework].omd.BATCH_INSTANCE WHERE EXECUTION_STATUS_CODE=''E'')
4549
)
4650
GO
47-
--Only execute the queue order when the number of executing ETLs is smaller than the maximum concurrency parameters
48-
49-
DECLARE @DEBUG_FLAG INT
50-
DECLARE @MAX_CONCURRENCY INT
51-
DECLARE @NUM_RUNNING_JOBS INT
52-
DECLARE @DELAY_TIME VARCHAR(8)
53-
DECLARE @JOBNAME as VARCHAR(256)
54-
DECLARE @CURRENT_TIME VARCHAR(19)
55-
DECLARE @PRINT_MSG VARCHAR(1000)
56-
DECLARE @SQL_STRING NVARCHAR(4000)
57-
SELECT @MAX_CONCURRENCY = 5
58-
SELECT @DELAY_TIME =''00:00:05'' -- This is the time the queue waits upon detecting concurrency
59-
SELECT @DEBUG_FLAG =0
51+
52+
DECLARE @DEBUG_FLAG INT = 1; -- Debug is enabled by default
53+
DECLARE @MAX_CONCURRENCY INT = 2 -- Determines how many processes can be run in parallel / concurrent
54+
DECLARE @NUMBER_OF_RUNNING_PROCESSES INT = 0;
55+
DECLARE @NUMBER_OF_QUEUED_PROCESSES INT = 0;
56+
DECLARE @DELAY_TIME VARCHAR(8) = ''00:00:05'' -- This is the time the queue waits upon detecting concurrency
57+
DECLARE @PROCESS_NAME as VARCHAR(256);
58+
DECLARE @CURRENT_TIMESTAMP VARCHAR(19);
59+
DECLARE @PRINT_MESSAGE VARCHAR(1000);
60+
DECLARE @SQL_STRING NVARCHAR(4000);
6061
6162
WHILE 1 = 1
6263
BEGIN
63-
EXEC #runningJobs @NUM_RUNNING_JOBS OUTPUT
64-
SELECT @CURRENT_TIME = (SELECT CAST(SYSDATETIME() AS VARCHAR(19)))
65-
IF @DEBUG_FLAG =1 PRINT ''Initial @NUM_RUNNING_JOBS= ''+CAST (@NUM_RUNNING_JOBS AS VARCHAR(10))
66-
IF @DEBUG_FLAG =1 PRINT ''Initial @MAX_CONCURRENCY= ''+CAST (@MAX_CONCURRENCY AS VARCHAR(10))
67-
IF @DEBUG_FLAG =1 PRINT ''Initial Log Date/Time = ''+ @CURRENT_TIME
68-
--Whenever the number of jobs exceeds the parameter, wait for a bit (as per the delay time)
69-
WHILE (@NUM_RUNNING_JOBS >= @MAX_CONCURRENCY)
70-
BEGIN
71-
IF @DEBUG_FLAG =1
72-
SET @PRINT_MSG = ''WAITFOR ''+@DELAY_TIME+'', currently still @NUM_RUNNING_JOBS at ''+CAST (@NUM_RUNNING_JOBS AS VARCHAR(10))
73-
RAISERROR (@PRINT_MSG, 0, 1) WITH NOWAIT; -- Raise Error used to flush to the debug window immediately, PRINT has a large delay
74-
WAITFOR DELAY @DELAY_TIME
75-
EXEC #runningJobs @NUM_RUNNING_JOBS OUTPUT
76-
END
77-
IF @DEBUG_FLAG =1 PRINT ''After wait @NUM_RUNNING_JOBS= ''+CAST (@NUM_RUNNING_JOBS AS VARCHAR(10))
78-
IF @DEBUG_FLAG =1 PRINT ''After wait @MAX_CONCURRENCY= ''+CAST (@MAX_CONCURRENCY AS VARCHAR(10))
79-
IF @DEBUG_FLAG =1 PRINT ''After wait Log Date/Time = ''+ @CURRENT_TIME
80-
-- When a spot becomes available, run the next ETL(s) from the queue
81-
SELECT TOP 1 @JOBNAME = ETL_PROCESS_NAME
82-
FROM
83-
84-
( -- Select the Batch that hasn''t run the longest (oldest age)
85-
SELECT *
86-
FROM [900_Direct_Framework].[omd_processing].[vw_QUEUE_BATCH_PROCESSING]
87-
) ETL_QUERY
88-
ORDER BY END_DATETIME ASC
64+
--Only execute the queue order when the number of executing ETLs is smaller than the maximum concurrency parameters
65+
EXEC #runningJobs @NUMBER_OF_RUNNING_PROCESSES OUTPUT
66+
67+
SELECT @CURRENT_TIMESTAMP = (SELECT CAST(SYSDATETIME() AS VARCHAR(19)))
68+
SELECT @NUMBER_OF_QUEUED_PROCESSES = (SELECT COUNT(*) AS NUMBER_OF_QUEUED_PROCESSED FROM [900_Direct_Framework].[omd_processing].[vw_QUEUE_BATCH_PROCESSING])
69+
70+
IF @DEBUG_FLAG = 1 PRINT ''@NUMBER_OF_RUNNING_PROCESSES= ''+CAST (@NUMBER_OF_RUNNING_PROCESSES AS VARCHAR(10))
71+
IF @DEBUG_FLAG = 1 PRINT ''@MAX_CONCURRENCY= ''+CAST (@MAX_CONCURRENCY AS VARCHAR(10))
72+
IF @DEBUG_FLAG = 1 PRINT ''@CURRENT_TIMESTAMP = ''+ @CURRENT_TIMESTAMP
73+
IF @DEBUG_FLAG = 1 PRINT ''@NUMBER_OF_QUEUED_PROCESSES = ''+ +CAST (@NUMBER_OF_QUEUED_PROCESSES AS VARCHAR(10))
74+
75+
--Whenever the number of jobs exceeds the parameter, wait for a bit (as per the delay time)
76+
WHILE (@NUMBER_OF_RUNNING_PROCESSES >= @MAX_CONCURRENCY)
77+
BEGIN
78+
IF @DEBUG_FLAG =1
79+
BEGIN
80+
SET @PRINT_MESSAGE = ''WAITFOR ''+@DELAY_TIME+'', currently still @NUMBER_OF_RUNNING_PROCESSES at ''+CAST (@NUMBER_OF_RUNNING_PROCESSES AS VARCHAR(10))
81+
RAISERROR (@PRINT_MESSAGE, 0, 1) WITH NOWAIT; -- Raise Error used to flush to the debug window immediately, PRINT has a large delay
82+
END
83+
84+
WAITFOR DELAY @DELAY_TIME -- Perform the wait / delay.
85+
EXEC #runningJobs @NUMBER_OF_RUNNING_PROCESSES OUTPUT -- Check again if the next process is good to g.
86+
END
87+
88+
IF @DEBUG_FLAG =1 PRINT ''After wait @NUMBER_OF_RUNNING_PROCESSES= ''+CAST (@NUMBER_OF_RUNNING_PROCESSES AS VARCHAR(10))
89+
IF @DEBUG_FLAG =1 PRINT ''After wait @MAX_CONCURRENCY= ''+CAST (@MAX_CONCURRENCY AS VARCHAR(10))
90+
IF @DEBUG_FLAG =1 PRINT ''After wait @CURRENT_TIMESTAMP = ''+ @CURRENT_TIMESTAMP
91+
92+
-- When a spot becomes available, run the process from the queue
93+
SELECT TOP 1 @PROCESS_NAME = BATCH_CODE
94+
FROM
95+
(
96+
-- Select the Batch that hasn''t run the longest (oldest age)
97+
SELECT BATCH_CODE, END_DATETIME
98+
FROM [900_Direct_Framework].[omd_processing].[vw_QUEUE_BATCH_PROCESSING]
99+
) batchQueue
100+
ORDER BY END_DATETIME ASC
89101
90-
SET @PRINT_MSG = ''EXECUTING JOB: ''+@JOBNAME
91-
92-
IF @DEBUG_FLAG =1 RAISERROR (@PRINT_MSG, 0, 1) WITH NOWAIT
93-
BEGIN TRY
94-
Declare @execution_id bigint
95-
EXEC [SSISDB].[catalog].[create_execution] @package_name=@JOBNAME, @execution_id=@execution_id OUTPUT, @folder_name=N''EDW'', @project_name=N''Enterprise_Data_Warehouse'', @use32bitruntime=False, @reference_id=Null
96-
Select @execution_id
97-
DECLARE @var0 smallint = 1
98-
EXEC [SSISDB].[catalog].[set_execution_parameter_value] @execution_id, @object_type=50, @parameter_name=N''LOGGING_LEVEL'', @parameter_value=@var0
99-
EXEC [SSISDB].[catalog].[start_execution] @execution_id
100-
END TRY
101-
BEGIN CATCH
102-
SET @JOBNAME = SUBSTRING(@JOBNAME,1,LEN(@JOBNAME)-5)
103-
SET @SQL_STRING = N''UPDATE [900_Direct_Framework].omd.BATCH SET INACTIVE_INDICATOR=''''Y'''' WHERE BATCH_CODE=''''''+@JOBNAME+''''''''
104-
SET @PRINT_MSG = ''ERROR EXECUTING JOB: ''+@JOBNAME+''.dtsx. DEACTIVATE QUERY: ''+@SQL_STRING+''''
105-
RAISERROR (@PRINT_MSG, 0, 1) WITH NOWAIT
106-
EXECUTE sp_executesql @SQL_STRING
107-
END CATCH
108-
109-
--Also functions as delayer
110-
WAITFOR DELAY ''00:00:05'' -- This is mandatory! Otherwise processes will be spawned too fast! This prevents the same process to be kicked off many times before OMD has had the chance to register
102+
SET @PRINT_MESSAGE = ''Running process: ''+@PROCESS_NAME
103+
104+
IF @DEBUG_FLAG =1 RAISERROR (@PRINT_MESSAGE, 0, 1) WITH NOWAIT
105+
BEGIN TRY
106+
EXEC [900_Direct_Framework].[omd].[RunBatch]
107+
@BatchCode = @PROCESS_NAME
108+
END TRY
109+
BEGIN CATCH
110+
SET @PROCESS_NAME = SUBSTRING(@PROCESS_NAME,1,LEN(@PROCESS_NAME)-5)
111+
SET @SQL_STRING = N''UPDATE [900_Direct_Framework].omd.BATCH SET INACTIVE_INDICATOR=''''Y'''' WHERE BATCH_CODE=''''''+@PROCESS_NAME+''''''''
112+
SET @PRINT_MESSAGE = ''ERROR EXECUTING JOB: ''+@PROCESS_NAME+'' DEACTIVATE QUERY: ''+@SQL_STRING+''''
113+
RAISERROR (@PRINT_MESSAGE, 0, 1) WITH NOWAIT
114+
EXECUTE sp_executesql @SQL_STRING
115+
END CATCH
116+
117+
-- Also functions as delayer
118+
-- This is mandatory! Otherwise processes will be spawned too fast! This prevents the same process to be kicked off many times before OMD has had the chance to register
119+
WAITFOR DELAY ''00:00:05''
111120
END
112-
DROP PROCEDURE #runningJobs
113-
',
114-
@database_name=N'master',
115-
@output_file_name=N'D:\Logs\Job_Master',
116-
@flags=2
121+
122+
DROP PROCEDURE #runningJobs',
123+
@database_name=N'master',
124+
@output_file_name=N'D:\Logs\Job_Master',
125+
@flags=2
117126
IF (@@ERROR <> 0 OR @ReturnCode <> 0) GOTO QuitWithRollback
118127
EXEC @ReturnCode = msdb.dbo.sp_update_job @job_id = @jobId, @start_step_id = 1
119128
IF (@@ERROR <> 0 OR @ReturnCode <> 0) GOTO QuitWithRollback
@@ -124,5 +133,4 @@ GOTO EndSave
124133
QuitWithRollback:
125134
IF (@@TRANCOUNT > 0) ROLLBACK TRANSACTION
126135
EndSave:
127-
128136
GO

0 commit comments

Comments
 (0)