Skip to content

Commit 5869061

Browse files
Copilotdandrejvv
andcommitted
Update Kafka, Solace, and Aws.Sqs to support IMessageBus and composite mode
Co-authored-by: dandrejvv <[email protected]>
1 parent f0595de commit 5869061

File tree

9 files changed

+171
-17
lines changed

9 files changed

+171
-17
lines changed

Modules/Intent.Modules.Aws.Sqs/Intent.Modules.Aws.Sqs.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
<ItemGroup>
2424
<ProjectReference Include="..\Intent.Modules.Constants\Intent.Modules.Constants.csproj" PrivateAssets="All" />
25+
<ProjectReference Include="..\Intent.Modules.Eventing.Contracts\Intent.Modules.Eventing.Contracts.csproj" PrivateAssets="All" />
2526
</ItemGroup>
2627

2728
<Import Project="..\Intent.Modules.Integration.IaC.Shared.AwsSqs\Intent.Modules.Integration.IaC.Shared.AwsSqs.projitems" Label="Shared" />

Modules/Intent.Modules.Aws.Sqs/Templates/SqsConfiguration/SqsConfigurationTemplatePartial.cs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,29 @@ public SqsConfigurationTemplate(IOutputTarget outputTarget, object model = null)
4848
method.AddParameter("IServiceCollection", "services", param => param.WithThisModifier());
4949
method.AddParameter("IConfiguration", "configuration");
5050

51+
var compositeTemplate = GetTemplate<ICSharpFileBuilderTemplate>(TemplateDependency.OnTemplate("Intent.Eventing.Contracts.CompositeMessageBusConfiguration"));
52+
var isCompositeMode = compositeTemplate != null;
53+
54+
if (isCompositeMode)
55+
{
56+
method.AddParameter(this.GetMessageBrokerRegistryName(), "registry");
57+
}
58+
5159
method.AddStatement("services.AddAWSService<AmazonSQSClient>();");
5260
method.AddStatement("services.AddSingleton(typeof(IAmazonSQS), sp => sp.GetRequiredService<AmazonSQSClient>());", stmt => stmt.SeparatedFromPrevious());
5361

5462
// Register event bus
55-
method.AddStatement($"services.AddScoped<{this.GetEventBusInterfaceName()}, {this.GetTypeName(SqsEventBusTemplate.TemplateId)}>();", stmt => stmt.SeparatedFromPrevious());
63+
if (isCompositeMode)
64+
{
65+
method.AddStatement("// Register as concrete type for composite message bus");
66+
method.AddStatement($"services.AddScoped<{this.GetTypeName(SqsEventBusTemplate.TemplateId)}>();", stmt => stmt.SeparatedFromPrevious());
67+
}
68+
else
69+
{
70+
method.AddStatement("// Register as IMessageBus and IEventBus for standalone mode");
71+
method.AddStatement($"services.AddScoped<{this.GetMessageBusInterfaceName()}, {this.GetTypeName(SqsEventBusTemplate.TemplateId)}>();", stmt => stmt.SeparatedFromPrevious());
72+
method.AddStatement($"services.AddScoped<{this.GetEventBusInterfaceName()}, {this.GetTypeName(SqsEventBusTemplate.TemplateId)}>();");
73+
}
5674

5775
// Register dispatcher
5876
method.AddStatement($"services.AddSingleton<{this.GetTypeName(SqsMessageDispatcherTemplate.TemplateId)}>();");
@@ -93,6 +111,17 @@ public SqsConfigurationTemplate(IOutputTarget outputTarget, object model = null)
93111
}));
94112
}
95113

114+
if (isCompositeMode && publishers.Any())
115+
{
116+
method.AddStatement("");
117+
method.AddStatement("// Register message types with the composite message bus registry");
118+
foreach (var publisher in publishers)
119+
{
120+
var messageType = publisher.GetModelTypeName(this);
121+
method.AddStatement($"registry.Register<{messageType}, {this.GetTypeName(SqsEventBusTemplate.TemplateId)}>();");
122+
}
123+
}
124+
96125
method.AddReturn("services", stmt => stmt.SeparatedFromPrevious());
97126
});
98127
});

Modules/Intent.Modules.Aws.Sqs/Templates/SqsEventBus/SqsEventBusTemplatePartial.cs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public SqsEventBusTemplate(IOutputTarget outputTarget, object model = null) : ba
3636
.AddUsing("Microsoft.Extensions.Options")
3737
.AddClass($"SqsEventBus", @class =>
3838
{
39-
@class.ImplementsInterface(this.GetEventBusInterfaceName());
39+
@class.ImplementsInterface(this.GetMessageBusInterfaceName());
4040
@class.AddField("IAmazonSQS", "_sqsClient", field => field.PrivateReadOnly());
4141
@class.AddField("List<MessageEntry>", "_messageQueue", field => field.PrivateReadOnly().WithAssignment(new CSharpStatement("[]")));
4242
@class.AddField("Dictionary<string, PublisherEntry>", "_lookup", field => field.PrivateReadOnly());
@@ -60,14 +60,34 @@ public SqsEventBusTemplate(IOutputTarget outputTarget, object model = null) : ba
6060
method.AddStatement("_messageQueue.Add(new MessageEntry(message));");
6161
});
6262

63+
@class.AddMethod("void", "Publish", method =>
64+
{
65+
method.AddGenericParameter("T", out var T);
66+
method.AddGenericTypeConstraint(T, c => c.AddType("class"));
67+
method.AddParameter(T, "message");
68+
method.AddParameter("IDictionary<string, object>", "additionalData");
69+
70+
method.AddStatement("// Note: AWS SQS does not support additional data in this implementation, ignoring parameter");
71+
method.AddStatement("Publish(message);");
72+
});
73+
6374
@class.AddMethod("void", "Send", method =>
6475
{
6576
method.AddGenericParameter("T", out var T);
6677
method.AddGenericTypeConstraint(T, c => c.AddType("class"));
6778
method.AddParameter(T, "message");
6879

69-
method.AddStatement("ValidateMessage(message);");
70-
method.AddStatement("_messageQueue.Add(new MessageEntry(message));");
80+
method.AddStatement("Publish(message);");
81+
});
82+
83+
@class.AddMethod("void", "Send", method =>
84+
{
85+
method.AddGenericParameter("T", out var T);
86+
method.AddGenericTypeConstraint(T, c => c.AddType("class"));
87+
method.AddParameter(T, "message");
88+
method.AddParameter("IDictionary<string, object>", "additionalData");
89+
90+
method.AddStatement("Publish(message, additionalData);");
7191
});
7292

7393
@class.AddMethod("Task", "FlushAllAsync", method =>

Modules/Intent.Modules.Eventing.Kafka/Intent.Modules.Eventing.Kafka.csproj

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
</PropertyGroup>
66

77
<ItemGroup>
8-
<PackageReference Include="Intent.Modules.Common" Version="3.7.2" />
9-
<PackageReference Include="Intent.Modules.Common.CSharp" Version="3.8.1" />
8+
<PackageReference Include="Intent.Modules.Common" Version="3.7.3" />
9+
<PackageReference Include="Intent.Modules.Common.CSharp" Version="3.9.0" />
1010
<PackageReference Include="Intent.Modules.Eventing.Contracts" Version="5.0.3" />
11-
<PackageReference Include="Intent.Modules.Modelers.Eventing" Version="5.1.0" />
12-
<PackageReference Include="Intent.Modules.Modelers.Services" Version="3.7.3" />
13-
<PackageReference Include="Intent.Modules.Modelers.Services.EventInteractions" Version="1.0.4" />
11+
<PackageReference Include="Intent.Modules.Modelers.Eventing" Version="6.0.1" />
12+
<PackageReference Include="Intent.Modules.Modelers.Services" Version="3.8.3" />
13+
<PackageReference Include="Intent.Modules.Modelers.Services.EventInteractions" Version="1.1.0" />
1414
<PackageReference Include="Intent.Packager" Version="3.6.0">
1515
<PrivateAssets>all</PrivateAssets>
1616
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
@@ -22,6 +22,7 @@
2222
<ItemGroup>
2323
<ProjectReference Include="..\Intent.Modules.Constants\Intent.Modules.Constants.csproj" PrivateAssets="All" />
2424
<ProjectReference Include="..\Intent.Modules.Common.UnitOfWork\Intent.Modules.Common.UnitOfWork.csproj" PrivateAssets="All" />
25+
<ProjectReference Include="..\Intent.Modules.Eventing.Contracts\Intent.Modules.Eventing.Contracts.csproj" PrivateAssets="All" />
2526
</ItemGroup>
2627

2728
<ItemGroup>

Modules/Intent.Modules.Eventing.Kafka/Templates/KafkaConfiguration/KafkaConfigurationTemplatePartial.cs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,14 @@ public KafkaConfigurationTemplate(IOutputTarget outputTarget, object model = nul
8787
method.Static();
8888
method.AddParameter("IServiceCollection", "services", p => p.WithThisModifier());
8989

90+
var compositeTemplate = GetTemplate<ICSharpFileBuilderTemplate>(TemplateDependency.OnTemplate("Intent.Eventing.Contracts.CompositeMessageBusConfiguration"));
91+
var isCompositeMode = compositeTemplate != null;
92+
93+
if (isCompositeMode)
94+
{
95+
method.AddParameter(this.GetMessageBrokerRegistryName(), "registry");
96+
}
97+
9098
method.AddInvocationStatement("services.AddSingleton<ISchemaRegistryClient>", invocation =>
9199
{
92100
invocation.AddArgument(new CSharpLambdaBlock("serviceProvider"), statement =>
@@ -101,7 +109,18 @@ public KafkaConfigurationTemplate(IOutputTarget outputTarget, object model = nul
101109
block.AddStatement("return new CachedSchemaRegistryClient(schemaRegistryConfig);", s => s.SeparatedFromPrevious());
102110
});
103111
});
104-
method.AddStatement($"services.AddScoped<{this.GetEventBusInterfaceName()}, {this.GetKafkaEventBusName()}>();");
112+
113+
if (isCompositeMode)
114+
{
115+
method.AddStatement("// Register as concrete type for composite message bus");
116+
method.AddStatement($"services.AddScoped<{this.GetKafkaEventBusName()}>();");
117+
}
118+
else
119+
{
120+
method.AddStatement("// Register as IMessageBus and IEventBus for standalone mode");
121+
method.AddStatement($"services.AddScoped<{this.GetMessageBusInterfaceName()}, {this.GetKafkaEventBusName()}>();");
122+
method.AddStatement($"services.AddScoped<{this.GetEventBusInterfaceName()}, {this.GetKafkaEventBusName()}>();");
123+
}
105124
method.AddStatement($"services.AddScoped(typeof({this.GetKafkaEventDispatcherInterfaceName()}<>), typeof({this.GetKafkaEventDispatcherName()}<>));");
106125
method.AddStatement($"services.AddHostedService<{this.GetKafkaConsumerBackgroundServiceName()}>();");
107126

@@ -116,6 +135,16 @@ public KafkaConfigurationTemplate(IOutputTarget outputTarget, object model = nul
116135
{
117136
method.AddStatement($"services.AddSingleton(serviceProvider => {GetProducerFactoryStatement(message)});");
118137
}
138+
139+
if (isCompositeMode && _publishedMessageModels.Value.Any())
140+
{
141+
method.AddStatement("");
142+
method.AddStatement("// Register message types with the composite message bus registry");
143+
foreach (var message in _publishedMessageModels.Value)
144+
{
145+
method.AddStatement($"registry.Register<{this.GetIntegrationEventMessageName(message)}, {this.GetKafkaEventBusName()}>();");
146+
}
147+
}
119148
});
120149

121150
if (_publishedMessageModels.Value.Any())

Modules/Intent.Modules.Eventing.Kafka/Templates/KafkaEventBus/KafkaEventBusTemplatePartial.cs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public KafkaEventBusTemplate(IOutputTarget outputTarget, object model = null) :
3232
.AddUsing("Microsoft.Extensions.DependencyInjection")
3333
.AddClass($"KafkaEventBus", @class =>
3434
{
35-
@class.ImplementsInterface(this.GetEventBusInterfaceName());
35+
@class.ImplementsInterface(this.GetMessageBusInterfaceName());
3636
@class.AddField($"ConcurrentDictionary<Type, IProducer>", "_producersByMessageType", f => f
3737
.PrivateReadOnly()
3838
.WithAssignment(new CSharpStatement("new ConcurrentDictionary<Type, IProducer>()")));
@@ -52,6 +52,36 @@ public KafkaEventBusTemplate(IOutputTarget outputTarget, object model = null) :
5252
method.AddStatement("producer.EnqueueMessage(message);");
5353
});
5454

55+
@class.AddMethod("void", "Publish", method =>
56+
{
57+
method.AddGenericParameter("T", out var t);
58+
method.AddGenericTypeConstraint(t, c => c.AddType("class"));
59+
method.AddParameter(t, "message");
60+
method.AddParameter("IDictionary<string, object>", "additionalData");
61+
62+
method.AddStatement("// Note: Kafka does not support additional data in this implementation, ignoring parameter");
63+
method.AddStatement("Publish(message);");
64+
});
65+
66+
@class.AddMethod("void", "Send", method =>
67+
{
68+
method.AddGenericParameter("T", out var t);
69+
method.AddGenericTypeConstraint(t, c => c.AddType("class"));
70+
method.AddParameter(t, "message");
71+
72+
method.AddStatement("Publish(message);");
73+
});
74+
75+
@class.AddMethod("void", "Send", method =>
76+
{
77+
method.AddGenericParameter("T", out var t);
78+
method.AddGenericTypeConstraint(t, c => c.AddType("class"));
79+
method.AddParameter(t, "message");
80+
method.AddParameter("IDictionary<string, object>", "additionalData");
81+
82+
method.AddStatement("Publish(message, additionalData);");
83+
});
84+
5585
@class.AddMethod("void", "FlushAllAsync", method =>
5686
{
5787
method.Async();

Modules/Intent.Modules.Eventing.Solace/Intent.Modules.Eventing.Solace.csproj

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
</PropertyGroup>
66

77
<ItemGroup>
8-
<PackageReference Include="Intent.Modules.Common" Version="3.7.2" />
9-
<PackageReference Include="Intent.Modules.Common.CSharp" Version="3.8.1" />
8+
<PackageReference Include="Intent.Modules.Common" Version="3.7.3" />
9+
<PackageReference Include="Intent.Modules.Common.CSharp" Version="3.9.0" />
1010
<PackageReference Include="Intent.Modules.Common.Types" Version="4.0.0" />
1111
<PackageReference Include="Intent.Modules.Eventing.Contracts" Version="5.0.3" />
12-
<PackageReference Include="Intent.Modules.Modelers.Eventing" Version="5.1.1" />
13-
<PackageReference Include="Intent.Modules.Modelers.Services" Version="3.7.3" />
12+
<PackageReference Include="Intent.Modules.Modelers.Eventing" Version="6.0.1" />
13+
<PackageReference Include="Intent.Modules.Modelers.Services" Version="3.8.3" />
1414
<PackageReference Include="Intent.Modules.Modelers.Services.EventInteractions" Version="1.1.0" />
1515
<PackageReference Include="Intent.Packager" Version="3.5.0">
1616
<PrivateAssets>all</PrivateAssets>
@@ -23,6 +23,7 @@
2323
<ItemGroup>
2424
<ProjectReference Include="..\Intent.Modules.Constants\Intent.Modules.Constants.csproj" PrivateAssets="All" />
2525
<ProjectReference Include="..\Intent.Modules.Common.UnitOfWork\Intent.Modules.Common.UnitOfWork.csproj" PrivateAssets="All" />
26+
<ProjectReference Include="..\Intent.Modules.Eventing.Contracts\Intent.Modules.Eventing.Contracts.csproj" PrivateAssets="All" />
2627
</ItemGroup>
2728

2829
<ItemGroup>

Modules/Intent.Modules.Eventing.Solace/Templates/SolaceConfiguration/SolaceConfigurationTemplatePartial.cs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ public SolaceConfigurationTemplate(IOutputTarget outputTarget, object model = nu
4343
method.AddParameter("IServiceCollection", "services", p => p.WithThisModifier());
4444
method.AddParameter("IConfiguration", "configuration");
4545

46+
var compositeTemplate = GetTemplate<ICSharpFileBuilderTemplate>(TemplateDependency.OnTemplate("Intent.Eventing.Contracts.CompositeMessageBusConfiguration"));
47+
var isCompositeMode = compositeTemplate != null;
48+
49+
if (isCompositeMode)
50+
{
51+
method.AddParameter(this.GetMessageBrokerRegistryName(), "registry");
52+
}
53+
4654
method.AddStatements(@"var config = configuration.GetSection(""Solace"").Get<SolaceConfig>();
4755
if (config == null)
4856
{
@@ -85,7 +93,19 @@ public SolaceConfigurationTemplate(IOutputTarget outputTarget, object model = nu
8593
method.AddStatement($"services.AddSingleton<{this.GetMessageRegistryName()}>();");
8694
method.AddStatement($"services.AddHostedService<{this.GetSolaceConsumingServiceName()}>();");
8795
method.AddStatement($"services.AddScoped(typeof({this.GetSolaceEventDispatcherInterfaceName()}<>),typeof({this.GetSolaceEventDispatcherName()}<>) );");
88-
method.AddStatement($"services.AddScoped<{this.GetEventBusInterfaceName()},{this.GetSolaceEventBusName()}>();");
96+
97+
if (isCompositeMode)
98+
{
99+
method.AddStatement("// Register as concrete type for composite message bus");
100+
method.AddStatement($"services.AddScoped<{this.GetSolaceEventBusName()}>();");
101+
}
102+
else
103+
{
104+
method.AddStatement("// Register as IMessageBus and IEventBus for standalone mode");
105+
method.AddStatement($"services.AddScoped<{this.GetMessageBusInterfaceName()}, {this.GetSolaceEventBusName()}>();");
106+
method.AddStatement($"services.AddScoped<{this.GetEventBusInterfaceName()}, {this.GetSolaceEventBusName()}>();");
107+
}
108+
89109
method.AddStatement($"services.AddTransient<{this.GetSolaceConsumerName()}>();");
90110

91111
@class.AddMethod("void", "HandleSessionMessageEvent", method =>

0 commit comments

Comments
 (0)