Skip to content

Commit dd4c4fe

Browse files
committed
Implement wait handle to block a pJob until its all tasks (parallel subtasks and callback task) complete execution.
Oracle version simply utilizes the signaling mechanism which provided by SYS.DBMS_ALERT package; At present, SQL Serverv version imitates it by a polling loop. -- This version has not been tested yet.
1 parent f60a4a4 commit dd4c4fe

33 files changed

+651
-124
lines changed

TaskParallelFoundation/ConsoleTest/App.config

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<?xml version="1.0" encoding="utf-8" ?>
22
<configuration>
33
<connectionStrings>
4-
<add name="TPW_Database" providerName="DDTek.Oracle" connectionString="TNSNames File=C:\Oracle\product\11.2.0\dbhome_1\NETWORK\ADMIN\tnsnames.ora;Data Source=ORADEV;Authentication Method=Client;Connection Timeout=600;Pooling=True;Min Pool Size=16;Max Pool Size=256;Max Pool Size Behavior=SoftCap;Initial Command Timeout=60;Application Name=TaskParallelPump"/>
4+
<add name="TPW_Database" providerName="DDTek.Oracle" connectionString="TNSNames File=C:\Oracle\product\11.2.0\dbhome_1\NETWORK\ADMIN\tnsnames.ora;Data Source=ORCL;Authentication Method=Client;Connection Timeout=600;Pooling=True;Min Pool Size=16;Max Pool Size=256;Max Pool Size Behavior=SoftCap;Initial Command Timeout=60;Application Name=TaskParallelPump"/>
55
<!--<add name="TPW_Database" providerName="System.Data.SqlClient" connectionString="Data Source=.\SQLEXPRESS;AttachDbFilename=E:\Projects\DbParallel\TaskParallelFoundation\ConsoleTest\SampleDatabase\SqlServer\TaskParallelWorkflow.mdf;Integrated Security=True;User Instance=True"/>-->
66
</connectionStrings>
77
<appSettings>

TaskParallelFoundation/ConsoleTest/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ static void Main(string[] args)
1515
PumpMain pump = new PumpMain();
1616

1717
pump.Start();
18+
Console.WriteLine("The Pump Service is running ...");
1819

1920
Task.Factory.StartNew(() =>
2021
{
Binary file not shown.
Binary file not shown.

TaskParallelFoundation/DataAccess/SqlServer/Booster/SqlLauncher.cs

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
using System;
2-
using System.Data.Common;
3-
using System.Configuration;
42
using System.Data.SqlClient;
53

64
namespace DbParallel.DataAccess.Booster.SqlServer
75
{
86
public class SqlLauncher : DbLauncher
97
{
10-
public SqlLauncher(DbProviderFactory dbProviderFactory, string connectionString, string destinationTableName, Action<SqlBulkCopyColumnMappingCollection> columnMappings = null,
8+
public SqlLauncher(string connectionString, string destinationTableName, Action<SqlBulkCopyColumnMappingCollection> columnMappings = null,
119
int multipleRockets = _DefaultMultipleRockets, int bulkSize = _DefaultBulkSize, int commandTimeout = _CommandTimeout)
1210
{
1311
if (multipleRockets < _MinMultipleRockets)
@@ -17,42 +15,20 @@ public SqlLauncher(DbProviderFactory dbProviderFactory, string connectionString,
1715
bulkSize = _MinBulkSize;
1816

1917
SqlConnection dbConnection;
20-
SqlBulkCopy bulkCopy = CreateBulkCopy(dbProviderFactory, connectionString, out dbConnection, destinationTableName, columnMappings, commandTimeout);
18+
SqlBulkCopy bulkCopy = CreateBulkCopy(connectionString, out dbConnection, destinationTableName, columnMappings, commandTimeout);
2119
_FillingRocket = new SqlRocket(bulkCopy, dbConnection, bulkSize);
2220

2321
for (int i = 1; i < multipleRockets; i++)
2422
{
25-
bulkCopy = CreateBulkCopy(dbProviderFactory, connectionString, out dbConnection, destinationTableName, columnMappings, commandTimeout);
23+
bulkCopy = CreateBulkCopy(connectionString, out dbConnection, destinationTableName, columnMappings, commandTimeout);
2624
_FreeQueue.Add(new SqlRocket(bulkCopy, dbConnection, bulkSize));
2725
}
2826
}
2927

30-
public SqlLauncher(string providerName, string connectionString, string destinationTableName, Action<SqlBulkCopyColumnMappingCollection> columnMappings = null,
31-
int multipleRockets = _DefaultMultipleRockets, int bulkSize = _DefaultBulkSize, int commandTimeout = _CommandTimeout)
32-
: this(DbProviderFactories.GetFactory(providerName), connectionString,
33-
destinationTableName, columnMappings, multipleRockets, bulkSize, commandTimeout)
34-
{
35-
}
36-
37-
public SqlLauncher(ConnectionStringSettings connSetting, string destinationTableName, Action<SqlBulkCopyColumnMappingCollection> columnMappings = null,
38-
int multipleRockets = _DefaultMultipleRockets, int bulkSize = _DefaultBulkSize, int commandTimeout = _CommandTimeout)
39-
: this(DbProviderFactories.GetFactory(connSetting.ProviderName), connSetting.ConnectionString,
40-
destinationTableName, columnMappings, multipleRockets, bulkSize, commandTimeout)
41-
{
42-
}
43-
44-
public SqlLauncher(string connectionStringKey, string destinationTableName, Action<SqlBulkCopyColumnMappingCollection> columnMappings = null,
45-
int multipleRockets = _DefaultMultipleRockets, int bulkSize = _DefaultBulkSize, int commandTimeout = _CommandTimeout)
46-
: this(ConfigurationManager.ConnectionStrings[connectionStringKey],
47-
destinationTableName, columnMappings, multipleRockets, bulkSize, commandTimeout)
48-
{
49-
}
50-
51-
private static SqlBulkCopy CreateBulkCopy(DbProviderFactory dbProviderFactory, string connectionString, out SqlConnection dbConnection,
28+
private static SqlBulkCopy CreateBulkCopy(string connectionString, out SqlConnection dbConnection,
5229
string destinationTableName, Action<SqlBulkCopyColumnMappingCollection> mapColumns, int commandTimeout)
5330
{
54-
dbConnection = dbProviderFactory.CreateConnection() as SqlConnection;
55-
dbConnection.ConnectionString = connectionString;
31+
dbConnection = new SqlConnection(connectionString);
5632

5733
SqlBulkCopy bulkCopy = new SqlBulkCopy(dbConnection);
5834

TaskParallelFoundation/Databases/Oracle/Scripts/1-Tables/01-TPW_PUMP_CONFIG.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ CREATE TABLE XYZ.TPW_PUMP_CONFIG
1010
CONSTRAINT CK_TPW_PUMP_CONFIG_VALUE CHECK (NUMBER_VALUE IS NOT NULL OR DATE_VALUE IS NOT NULL OR STRING_VALUE IS NOT NULL)
1111
)
1212
ORGANIZATION INDEX
13-
STORAGE (INITIAL 16K NEXT 8K);
13+
STORAGE (INITIAL 16K NEXT 8K BUFFER_POOL KEEP);
1414

1515
----------------------------------------------------------------------------------------------------
1616
--

TaskParallelFoundation/Databases/Oracle/Scripts/1-Tables/03-TPW_WF_ACTIVITY.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ CREATE TABLE XYZ.TPW_WF_ACTIVITY
1010
CONSTRAINT PK_TPW_WF_ACTIVITY PRIMARY KEY (ACTIVITY)
1111
)
1212
ORGANIZATION INDEX
13-
STORAGE (INITIAL 16K NEXT 8K);
13+
STORAGE (INITIAL 16K NEXT 8K BUFFER_POOL KEEP);
1414

1515
----------------------------------------------------------------------------------------------------
1616
--

TaskParallelFoundation/Databases/Oracle/Scripts/1-Tables/04-TPW_WF_STATE.sql

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
-- CREATE TABLE
22
CREATE TABLE XYZ.TPW_WF_STATE
33
(
4-
STATE_ID NUMBER(5) NOT NULL,
5-
ACTIVITY VARCHAR2(32) NOT NULL,
6-
STATE_NAME VARCHAR2(32) NOT NULL,
4+
STATE_ID NUMBER(5) NOT NULL,
5+
ACTIVITY VARCHAR2(32) NOT NULL,
6+
STATE_NAME VARCHAR2(32) NOT NULL,
7+
IS_DONE NUMBER(1) DEFAULT 0 NOT NULL,
78
DESCRIPTION_ VARCHAR2(256),
89
CONSTRAINT PK_TPW_WF_STATE PRIMARY KEY (STATE_ID),
910
CONSTRAINT UK_TPW_WF_STATE UNIQUE (ACTIVITY, STATE_NAME),
10-
CONSTRAINT FK_TPW_WF_STATE_ACTIVITY FOREIGN KEY (ACTIVITY) REFERENCES XYZ.TPW_WF_ACTIVITY (ACTIVITY)
11+
CONSTRAINT FK_TPW_WF_STATE_ACTIVITY FOREIGN KEY (ACTIVITY) REFERENCES XYZ.TPW_WF_ACTIVITY (ACTIVITY),
12+
CONSTRAINT CK_TPW_WF_STATE_DONE CHECK (IS_DONE = 0 OR IS_DONE = 1)
1113
)
1214
ORGANIZATION INDEX
13-
STORAGE (INITIAL 64K NEXT 64K);
15+
STORAGE (INITIAL 64K NEXT 64K BUFFER_POOL KEEP);
1416

1517
----------------------------------------------------------------------------------------------------
1618
--

TaskParallelFoundation/Databases/Oracle/Scripts/1-Tables/05-TPW_WF_EVENT.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ CREATE TABLE XYZ.TPW_WF_EVENT
1010
CONSTRAINT FK_TPW_WF_EVENT_ACTIVITY FOREIGN KEY (ACTIVITY) REFERENCES XYZ.TPW_WF_ACTIVITY (ACTIVITY)
1111
)
1212
ORGANIZATION INDEX
13-
STORAGE (INITIAL 32K NEXT 32K);
13+
STORAGE (INITIAL 32K NEXT 32K BUFFER_POOL KEEP);
1414

1515
----------------------------------------------------------------------------------------------------
1616
--

TaskParallelFoundation/Databases/Oracle/Scripts/1-Tables/07-TPW_WF_STATE_MACHINE.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ CREATE TABLE XYZ.TPW_WF_STATE_MACHINE
2121
CONSTRAINT FK_TPW_WF_STATE_MACHINE_EN FOREIGN KEY (ACTIVITY, EVENT_NAME) REFERENCES XYZ.TPW_WF_EVENT (ACTIVITY, EVENT_NAME),
2222
CONSTRAINT FK_TPW_WF_STATE_MACHINE_SN1 FOREIGN KEY (ACTIVITY, STATE_NAME_NEW) REFERENCES XYZ.TPW_WF_STATE (ACTIVITY, STATE_NAME)
2323
)
24-
STORAGE (INITIAL 128K NEXT 128K);
24+
STORAGE (INITIAL 128K NEXT 128K BUFFER_POOL KEEP);
2525

2626
CREATE UNIQUE INDEX XYZ.UK_TPW_WF_STATE_MACHINE_EV ON XYZ.TPW_WF_STATE_MACHINE (EVENT_ID, STATE_ID_OLD);
2727

0 commit comments

Comments
 (0)