Skip to content

Commit ba5f85b

Browse files
committed
Add db initializers to ts and csharp
1 parent dc8c729 commit ba5f85b

File tree

8 files changed

+432
-13
lines changed

8 files changed

+432
-13
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
using MongoDB.Driver;
2+
3+
namespace CreditCardEnrollment.Common.Util;
4+
5+
public class MongoInitializer
6+
{
7+
private readonly MongoSessionPool _sessionPool;
8+
private readonly string _databaseName;
9+
private readonly ILogger<MongoInitializer> _logger;
10+
11+
public MongoInitializer(
12+
MongoSessionPool sessionPool,
13+
string databaseName,
14+
ILogger<MongoInitializer> logger)
15+
{
16+
_sessionPool = sessionPool;
17+
_databaseName = databaseName;
18+
_logger = logger;
19+
}
20+
21+
public void Initialize()
22+
{
23+
using var session = _sessionPool.StartSession();
24+
var database = session.Client.GetDatabase(_databaseName);
25+
26+
try
27+
{
28+
_logger.LogInformation("Creating collections");
29+
CreateCollectionIfNotExists(database, "CreditCard_Enrollment_Enrollment");
30+
CreateCollectionIfNotExists(database, "CreditCard_Enrollment_ProductName");
31+
CreateCollectionIfNotExists(database, "CreditCard_Enrollment_ProductActiveStatus");
32+
_logger.LogInformation("Created collections");
33+
34+
_logger.LogInformation("Creating indexes");
35+
var enrollmentCollection = database.GetCollection<object>("CreditCard_Enrollment_Enrollment");
36+
var indexKeysDefinition = Builders<object>.IndexKeys.Ascending("userId");
37+
enrollmentCollection.Indexes.CreateOne(new CreateIndexModel<object>(indexKeysDefinition));
38+
_logger.LogInformation("Created indexes");
39+
}
40+
catch (Exception e)
41+
{
42+
_logger.LogError(e, "Error initializing MongoDB");
43+
throw;
44+
}
45+
}
46+
47+
private void CreateCollectionIfNotExists(IMongoDatabase database, string collectionName)
48+
{
49+
try
50+
{
51+
var collections = database.ListCollectionNames().ToList();
52+
if (!collections.Contains(collectionName))
53+
{
54+
database.CreateCollection(collectionName);
55+
_logger.LogInformation("Created collection {CollectionName}", collectionName);
56+
}
57+
else
58+
{
59+
_logger.LogInformation("Collection {CollectionName} already exists", collectionName);
60+
}
61+
}
62+
catch (Exception e)
63+
{
64+
_logger.LogWarning(e, "Error creating collection {CollectionName}", collectionName);
65+
throw;
66+
}
67+
}
68+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
using Npgsql;
2+
3+
namespace CreditCardEnrollment.Common.Util;
4+
5+
public class PostgresInitializer
6+
{
7+
private readonly PostgresConnectionPool _connectionPool;
8+
private readonly string _databaseName;
9+
private readonly string _tableName;
10+
private readonly string _replicationUsername;
11+
private readonly string _replicationPassword;
12+
private readonly string _replicationPublication;
13+
private readonly ILogger<PostgresInitializer> _logger;
14+
15+
public PostgresInitializer(
16+
PostgresConnectionPool connectionPool,
17+
string databaseName,
18+
string tableName,
19+
string replicationUsername,
20+
string replicationPassword,
21+
string replicationPublication,
22+
ILogger<PostgresInitializer> logger)
23+
{
24+
_connectionPool = connectionPool;
25+
_databaseName = databaseName;
26+
_tableName = tableName;
27+
_replicationUsername = replicationUsername;
28+
_replicationPassword = replicationPassword;
29+
_replicationPublication = replicationPublication;
30+
_logger = logger;
31+
}
32+
33+
public void Initialize()
34+
{
35+
using var connection = _connectionPool.OpenConnection();
36+
try
37+
{
38+
// Create table
39+
_logger.LogInformation("Creating table {TableName}", _tableName);
40+
ExecuteStatementIgnoreErrors(connection, $"""
41+
CREATE TABLE IF NOT EXISTS {_tableName} (
42+
id BIGSERIAL NOT NULL,
43+
event_id TEXT NOT NULL UNIQUE,
44+
aggregate_id TEXT NOT NULL,
45+
aggregate_version BIGINT NOT NULL,
46+
causation_id TEXT NOT NULL,
47+
correlation_id TEXT NOT NULL,
48+
recorded_on TEXT NOT NULL,
49+
event_name TEXT NOT NULL,
50+
json_payload TEXT NOT NULL,
51+
json_metadata TEXT NOT NULL,
52+
PRIMARY KEY (id));
53+
""");
54+
55+
// Create replication user
56+
_logger.LogInformation("Creating replication user");
57+
ExecuteStatementIgnoreErrors(connection,
58+
$"CREATE USER {_replicationUsername} REPLICATION LOGIN PASSWORD '{_replicationPassword}';");
59+
60+
// Grant permissions to user
61+
_logger.LogInformation("Granting permissions to replication user");
62+
ExecuteStatementIgnoreErrors(connection,
63+
$"""GRANT CONNECT ON DATABASE "{_databaseName}" TO {_replicationUsername};""");
64+
65+
_logger.LogInformation("Granting select to replication user");
66+
ExecuteStatementIgnoreErrors(connection,
67+
$"GRANT SELECT ON TABLE {_tableName} TO {_replicationUsername};");
68+
69+
// Create publication
70+
_logger.LogInformation("Creating publication for table");
71+
ExecuteStatementIgnoreErrors(connection,
72+
$"CREATE PUBLICATION {_replicationPublication} FOR TABLE {_tableName};");
73+
74+
// Create indexes
75+
_logger.LogInformation("Creating aggregate id, aggregate version index");
76+
ExecuteStatementIgnoreErrors(connection,
77+
$"CREATE UNIQUE INDEX event_store_idx_event_aggregate_id_version ON {_tableName}(aggregate_id, aggregate_version);");
78+
79+
_logger.LogInformation("Creating id index");
80+
ExecuteStatementIgnoreErrors(connection,
81+
$"CREATE UNIQUE INDEX event_store_idx_event_id ON {_tableName}(event_id);");
82+
83+
_logger.LogInformation("Creating causation index");
84+
ExecuteStatementIgnoreErrors(connection,
85+
$"CREATE INDEX event_store_idx_event_causation_id ON {_tableName}(causation_id);");
86+
87+
_logger.LogInformation("Creating correlation index");
88+
ExecuteStatementIgnoreErrors(connection,
89+
$"CREATE INDEX event_store_idx_event_correlation_id ON {_tableName}(correlation_id);");
90+
91+
_logger.LogInformation("Creating recording index");
92+
ExecuteStatementIgnoreErrors(connection,
93+
$"CREATE INDEX event_store_idx_occurred_on ON {_tableName}(recorded_on);");
94+
95+
_logger.LogInformation("Creating event name index");
96+
ExecuteStatementIgnoreErrors(connection,
97+
$"CREATE INDEX event_store_idx_event_name ON {_tableName}(event_name);");
98+
}
99+
finally
100+
{
101+
connection.Close();
102+
}
103+
}
104+
105+
private void ExecuteStatementIgnoreErrors(NpgsqlConnection connection, string sqlStatement)
106+
{
107+
try
108+
{
109+
_logger.LogInformation("Executing SQL: {SqlStatement}", sqlStatement);
110+
using var cmd = new NpgsqlCommand(sqlStatement, connection);
111+
cmd.ExecuteNonQuery();
112+
}
113+
catch (Exception e)
114+
{
115+
_logger.LogWarning(e, "Caught exception when executing SQL statement");
116+
}
117+
}
118+
}

application/backend-credit-card-enrollment/backend-c#/Program.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,19 @@
3030

3131
return new PostgresTransactionalEventStore(pool, serializer, deserializer, eventStoreTable, logger);
3232
});
33+
builder.Services.AddSingleton<PostgresInitializer>(provider => {
34+
var pool = provider.GetRequiredService<PostgresConnectionPool>();
35+
var logger = provider.GetRequiredService<ILogger<PostgresInitializer>>();
36+
return new PostgresInitializer(
37+
pool,
38+
GetEnvVar("EVENT_STORE_DATABASE_NAME"),
39+
GetEnvVar("EVENT_STORE_CREATE_TABLE_WITH_NAME"),
40+
GetEnvVar("EVENT_STORE_CREATE_REPLICATION_USER_WITH_USERNAME"),
41+
GetEnvVar("EVENT_STORE_CREATE_REPLICATION_USER_WITH_PASSWORD"),
42+
GetEnvVar("EVENT_STORE_CREATE_REPLICATION_PUBLICATION"),
43+
logger
44+
);
45+
});
3346

3447
var mongoConnectionString =
3548
$"mongodb://{GetEnvVar("MONGODB_PROJECTION_DATABASE_USERNAME")}:{GetEnvVar("MONGODB_PROJECTION_DATABASE_PASSWORD")}@" +
@@ -44,6 +57,15 @@
4457
var logger = provider.GetRequiredService<ILogger<MongoTransactionalProjectionOperator>>();
4558
return new MongoTransactionalProjectionOperator(sessionPool, mongoDatabaseName, logger);
4659
});
60+
builder.Services.AddSingleton<MongoInitializer>(provider => {
61+
var pool = provider.GetRequiredService<MongoSessionPool>();
62+
var logger = provider.GetRequiredService<ILogger<MongoInitializer>>();
63+
return new MongoInitializer(
64+
pool,
65+
GetEnvVar("MONGODB_PROJECTION_DATABASE_NAME"),
66+
logger
67+
);
68+
});
4769

4870
AddScopedInheritors<CommandController>(builder.Services);
4971
AddScopedInheritors<CommandHandler>(builder.Services);
@@ -87,6 +109,14 @@
87109
});
88110

89111
var app = builder.Build();
112+
113+
// Initialize databases
114+
var postgresInitializer = app.Services.GetRequiredService<PostgresInitializer>();
115+
var mongoInitializer = app.Services.GetRequiredService<MongoInitializer>();
116+
postgresInitializer.Initialize();
117+
mongoInitializer.Initialize();
118+
119+
// Register app exception handler
90120
app.UseExceptionHandler(errorApp =>
91121
{
92122
errorApp.Run(async context =>
@@ -106,6 +136,8 @@ await context.Response.WriteAsJsonAsync(new {
106136
});
107137
});
108138
});
139+
140+
// Run app
109141
app.MapControllers();
110142
app.Run();
111143
return;
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import { injectable, inject } from 'tsyringe';
2+
import { MongoClient } from 'mongodb';
3+
import { MongoSessionPool } from './MongoSessionPool';
4+
import { log } from './Logger';
5+
6+
@injectable()
7+
export class MongoInitializerService {
8+
private readonly client: MongoClient;
9+
10+
constructor(
11+
@inject(MongoSessionPool) private readonly sessionPool: MongoSessionPool,
12+
@inject("mongoDatabaseName") private readonly databaseName: string
13+
) {
14+
this.client = this.sessionPool.getClient();
15+
}
16+
17+
async initialize(): Promise<void> {
18+
log.info('Initializing MongoDB collections and indexes...');
19+
20+
try {
21+
await this.client.connect();
22+
const db = this.client.db(this.databaseName);
23+
24+
// Create collections
25+
log.info('Creating collections...');
26+
await Promise.all([
27+
this.ensureCollection(db, 'CreditCard_Enrollment_Enrollment'),
28+
this.ensureCollection(db, 'CreditCard_Enrollment_ProductName'),
29+
this.ensureCollection(db, 'CreditCard_Enrollment_ProductActiveStatus')
30+
]);
31+
log.info('Collections created successfully');
32+
33+
// Create indexes
34+
log.info('Creating indexes...');
35+
await this.createIndexes(db);
36+
log.info('Indexes created successfully');
37+
38+
} catch (error) {
39+
log.error('Error initializing MongoDB:', error as Error);
40+
throw error;
41+
}
42+
}
43+
44+
private async ensureCollection(db: any, collectionName: string): Promise<void> {
45+
try {
46+
const collections = await db.listCollections({ name: collectionName }).toArray();
47+
if (collections.length === 0) {
48+
await db.createCollection(collectionName);
49+
log.debug(`Collection ${collectionName} created`);
50+
} else {
51+
log.debug(`Collection ${collectionName} already exists`);
52+
}
53+
} catch (error) {
54+
log.error(`Error ensuring collection ${collectionName}:`, error as Error);
55+
throw error;
56+
}
57+
}
58+
59+
private async createIndexes(db: any): Promise<void> {
60+
try {
61+
const enrollmentCollection = db.collection('CreditCard_Enrollment_Enrollment');
62+
63+
await enrollmentCollection.createIndex(
64+
{ userId: 1 },
65+
{
66+
background: true,
67+
name: 'userId_asc'
68+
}
69+
);
70+
log.debug('Index created on CreditCard_Enrollment_Enrollment.userId');
71+
} catch (error) {
72+
log.error('Error creating indexes:', error as Error);
73+
throw error;
74+
}
75+
}
76+
}

0 commit comments

Comments
 (0)