Skip to content

Commit f10cd31

Browse files
ganesanarunmddubey
authored andcommitted
Enable read from replica
It's configurable in case if we have replica then we could use to read from replica, otherwise by default it will use the master configuration from read postgres client too
1 parent 0634d29 commit f10cd31

File tree

11 files changed

+116
-53
lines changed

11 files changed

+116
-53
lines changed

consent/src/main/java/in/projecteka/consentmanager/ConsentManagerConfiguration.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -163,21 +163,34 @@ public ClientRegistryClient clientRegistryClient(@Qualifier("customBuilder") Web
163163
}
164164

165165
@Bean
166-
public LockedUsersRepository lockedUsersRepository(DbOptions dbOptions) {
167-
return new LockedUsersRepository(pgPool(dbOptions));
166+
public LockedUsersRepository lockedUsersRepository(@Qualifier("readWriteClient") PgPool pgPool) {
167+
return new LockedUsersRepository(pgPool);
168168
}
169169

170-
@Bean
171-
public PgPool pgPool(DbOptions dbOptions) {
170+
@Bean("readWriteClient")
171+
public PgPool readWriteClient(DbOptions dbOptions) {
172172
PgConnectOptions connectOptions = new PgConnectOptions()
173173
.setPort(dbOptions.getPort())
174174
.setHost(dbOptions.getHost())
175175
.setDatabase(dbOptions.getSchema())
176176
.setUser(dbOptions.getUser())
177177
.setPassword(dbOptions.getPassword());
178178

179-
PoolOptions poolOptions = new PoolOptions()
180-
.setMaxSize(dbOptions.getPoolSize());
179+
PoolOptions poolOptions = new PoolOptions().setMaxSize(dbOptions.getPoolSize());
180+
181+
return PgPool.pool(connectOptions, poolOptions);
182+
}
183+
184+
@Bean("readOnlyClient")
185+
public PgPool readOnlyClient(DbOptions dbOptions) {
186+
PgConnectOptions connectOptions = new PgConnectOptions()
187+
.setPort(dbOptions.getReplica().getPort())
188+
.setHost(dbOptions.getReplica().getHost())
189+
.setDatabase(dbOptions.getSchema())
190+
.setUser(dbOptions.getReplica().getUser())
191+
.setPassword(dbOptions.getReplica().getPassword());
192+
193+
PoolOptions poolOptions = new PoolOptions().setMaxSize(dbOptions.getReplica().getPoolSize());
181194

182195
return PgPool.pool(connectOptions, poolOptions);
183196
}
@@ -344,7 +357,6 @@ ReactiveRedisOperations<String, String> stringReactiveRedisOperations(
344357
return new ReactiveRedisTemplate<>(factory, context);
345358
}
346359

347-
348360
@ConditionalOnProperty(value = "consentmanager.cacheMethod", havingValue = "guava", matchIfMissing = true)
349361
@Bean({"cacheForReplayAttack"})
350362
public CacheAdapter<String, LocalDateTime> stringLocalDateTimeCacheAdapter() {

consent/src/main/java/in/projecteka/consentmanager/consent/ConsentConfiguration.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,13 @@
3737
public class ConsentConfiguration {
3838

3939
@Bean
40-
public ConsentRequestRepository consentRequestRepository(PgPool pgPool) {
41-
return new ConsentRequestRepository(pgPool);
40+
public ConsentRequestRepository consentRequestRepository(@Qualifier("readWriteClient") PgPool readWriteClient,
41+
@Qualifier("readOnlyClient") PgPool readOnlyClient) {
42+
return new ConsentRequestRepository(readWriteClient, readOnlyClient);
4243
}
4344

4445
@Bean
45-
public ConsentArtefactRepository consentArtefactRepository(PgPool pgPool) {
46+
public ConsentArtefactRepository consentArtefactRepository(@Qualifier("readWriteClient") PgPool pgPool) {
4647
return new ConsentArtefactRepository(pgPool);
4748
}
4849

consent/src/main/java/in/projecteka/consentmanager/consent/ConsentRequestRepository.java

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import io.vertx.pgclient.PgPool;
1212
import io.vertx.sqlclient.Row;
1313
import io.vertx.sqlclient.RowSet;
14-
import io.vertx.sqlclient.Tuple;
14+
import lombok.AllArgsConstructor;
1515
import org.slf4j.Logger;
1616
import org.slf4j.LoggerFactory;
1717
import reactor.core.publisher.Flux;
@@ -27,24 +27,28 @@
2727
import static in.projecteka.consentmanager.consent.model.ConsentStatus.GRANTED;
2828
import static in.projecteka.library.common.Serializer.from;
2929
import static in.projecteka.library.common.Serializer.to;
30+
import static io.vertx.sqlclient.Tuple.of;
31+
import static reactor.core.publisher.Mono.create;
3032

33+
@AllArgsConstructor
3134
public class ConsentRequestRepository {
3235
private static final Logger logger = LoggerFactory.getLogger(ConsentRequestRepository.class);
3336
private static final String SELECT_CONSENT_REQUEST_BY_ID_AND_STATUS;
3437
private static final String SELECT_CONSENT_REQUEST_BY_ID;
3538
private static final String SELECT_CONSENT_REQUESTS_BY_STATUS;
3639
private static final String SELECT_CONSENT_DETAILS_FOR_PATIENT;
3740
private static final String SELECT_CONSENT_REQUEST_COUNT = "SELECT COUNT(*) FROM consent_request " +
38-
"WHERE LOWER(patient_id) = $1 and status != $3 and (status=$2 OR $2 IS NULL)";
41+
"WHERE LOWER(patient_id) = $1 and status != $3 and (status= $2 OR $2 IS NULL)";
3942
private static final String INSERT_CONSENT_REQUEST_QUERY = "INSERT INTO consent_request " +
4043
"(request_id, patient_id, status, details) VALUES ($1, $2, $3, $4)";
41-
private static final String UPDATE_CONSENT_REQUEST_STATUS_QUERY = "UPDATE consent_request SET status=$1, " +
42-
"date_modified=$2 WHERE request_id=$3";
44+
private static final String UPDATE_CONSENT_REQUEST_STATUS_QUERY = "UPDATE consent_request SET status = $1, " +
45+
"date_modified= $2 WHERE request_id= $3";
4346
private static final String FAILED_TO_SAVE_CONSENT_REQUEST = "Failed to save consent request";
4447
private static final String UNKNOWN_ERROR_OCCURRED = "Unknown error occurred";
4548
private static final String FAILED_TO_GET_CONSENT_REQUESTS_BY_STATUS = "Failed to get consent requests by status";
4649

47-
private final PgPool dbClient;
50+
private final PgPool readWriteClient;
51+
private final PgPool readOnlyClient;
4852

4953
static {
5054
String s = "SELECT request_id, status, details, date_created, date_modified FROM consent_request " +
@@ -57,14 +61,10 @@ public class ConsentRequestRepository {
5761
SELECT_CONSENT_REQUESTS_BY_STATUS = s + "status=$1";
5862
}
5963

60-
public ConsentRequestRepository(PgPool dbClient) {
61-
this.dbClient = dbClient;
62-
}
63-
6464
public Mono<Void> insert(RequestedDetail requestedDetail, UUID requestId) {
65-
return Mono.create(monoSink ->
66-
dbClient.preparedQuery(INSERT_CONSENT_REQUEST_QUERY)
67-
.execute(Tuple.of(requestId.toString(),
65+
return create(monoSink ->
66+
readWriteClient.preparedQuery(INSERT_CONSENT_REQUEST_QUERY)
67+
.execute(of(requestId.toString(),
6868
requestedDetail.getPatient().getId(),
6969
ConsentStatus.REQUESTED.name(),
7070
new JsonObject(from(requestedDetail))),
@@ -82,22 +82,21 @@ public Mono<ListResult<List<ConsentRequestDetail>>> requestsForPatient(String pa
8282
int limit,
8383
int offset,
8484
String status) {
85-
return Mono.create(monoSink -> dbClient.preparedQuery(SELECT_CONSENT_DETAILS_FOR_PATIENT)
86-
.execute(Tuple.of(patientId.toLowerCase(), limit, offset, status, GRANTED.toString()),
85+
return create(monoSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_DETAILS_FOR_PATIENT)
86+
.execute(of(patientId.toLowerCase(), limit, offset, status, GRANTED.toString()),
8787
handler -> {
8888
List<ConsentRequestDetail> requestList = getConsentRequestDetails(handler);
89-
dbClient.preparedQuery(SELECT_CONSENT_REQUEST_COUNT)
90-
.execute(Tuple.of(patientId, status, GRANTED.toString()), counter -> {
89+
readOnlyClient.preparedQuery(SELECT_CONSENT_REQUEST_COUNT)
90+
.execute(of(patientId, status, GRANTED.toString()),
91+
counter -> {
9192
if (handler.failed()) {
9293
logger.error(handler.cause().getMessage(), handler.cause());
9394
monoSink.error(new DbOperationError());
9495
return;
9596
}
96-
Integer count = counter.result().iterator()
97-
.next().getInteger("count");
97+
var count = counter.result().iterator().next().getInteger("count");
9898
monoSink.success(new ListResult<>(requestList, count));
99-
}
100-
);
99+
});
101100
}));
102101
}
103102

@@ -115,15 +114,13 @@ private List<ConsentRequestDetail> getConsentRequestDetails(AsyncResult<RowSet<R
115114
}
116115

117116
public Mono<ConsentRequestDetail> requestOf(String requestId, String status, String patientId) {
118-
return Mono.create(monoSink -> dbClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID_AND_STATUS)
119-
.execute(Tuple.of(requestId, status, patientId),
120-
consentRequestHandler(monoSink)));
117+
return create(monoSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID_AND_STATUS)
118+
.execute(of(requestId, status, patientId), consentRequestHandler(monoSink)));
121119
}
122120

123121
public Mono<ConsentRequestDetail> requestOf(String requestId) {
124-
return Mono.create(monoSink -> dbClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID)
125-
.execute(Tuple.of(requestId),
126-
consentRequestHandler(monoSink)));
122+
return create(monoSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID)
123+
.execute(of(requestId), consentRequestHandler(monoSink)));
127124
}
128125

129126
private Handler<AsyncResult<RowSet<Row>>> consentRequestHandler(MonoSink<ConsentRequestDetail> monoSink) {
@@ -162,8 +159,8 @@ private ConsentRequestDetail mapToConsentRequestDetail(Row result) {
162159
}
163160

164161
public Mono<Void> updateStatus(String id, ConsentStatus status) {
165-
return Mono.create(monoSink -> dbClient.preparedQuery(UPDATE_CONSENT_REQUEST_STATUS_QUERY)
166-
.execute(Tuple.of(status.toString(), LocalDateTime.now(ZoneOffset.UTC), id),
162+
return create(monoSink -> readWriteClient.preparedQuery(UPDATE_CONSENT_REQUEST_STATUS_QUERY)
163+
.execute(of(status.toString(), LocalDateTime.now(ZoneOffset.UTC), id),
167164
updateHandler -> {
168165
if (updateHandler.failed()) {
169166
monoSink.error(new Exception("Failed to update status"));
@@ -178,8 +175,8 @@ private ConsentStatus getConsentStatus(String status) {
178175
}
179176

180177
public Flux<ConsentRequestDetail> getConsentsByStatus(ConsentStatus status) {
181-
return Flux.create(fluxSink -> dbClient.preparedQuery(SELECT_CONSENT_REQUESTS_BY_STATUS)
182-
.execute(Tuple.of(status.toString()),
178+
return Flux.create(fluxSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_REQUESTS_BY_STATUS)
179+
.execute(of(status.toString()),
183180
handler -> {
184181
if (handler.failed()) {
185182
logger.error(handler.cause().getMessage(), handler.cause());

consent/src/main/java/in/projecteka/consentmanager/dataflow/DataFlowConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public DataFlowRequester dataRequest(@Qualifier("customBuilder") WebClient.Build
6565
}
6666

6767
@Bean
68-
public DataFlowRequestRepository dataRequestRepository(PgPool pgPool) {
68+
public DataFlowRequestRepository dataRequestRepository(@Qualifier("readWriteClient") PgPool pgPool) {
6969
return new DataFlowRequestRepository(pgPool);
7070
}
7171

consent/src/main/java/in/projecteka/consentmanager/link/LinkConfiguration.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@
66
import in.projecteka.consentmanager.clients.DiscoveryServiceClient;
77
import in.projecteka.consentmanager.clients.LinkServiceClient;
88
import in.projecteka.consentmanager.clients.UserServiceClient;
9+
import in.projecteka.consentmanager.clients.properties.GatewayServiceProperties;
10+
import in.projecteka.consentmanager.clients.properties.LinkServiceProperties;
11+
import in.projecteka.consentmanager.common.CentralRegistry;
12+
import in.projecteka.consentmanager.common.ServiceAuthentication;
13+
import in.projecteka.consentmanager.common.cache.CacheAdapter;
14+
import in.projecteka.consentmanager.common.cache.LoadingCacheAdapter;
15+
import in.projecteka.consentmanager.common.cache.RedisCacheAdapter;
916
import in.projecteka.consentmanager.link.discovery.Discovery;
1017
import in.projecteka.consentmanager.link.discovery.DiscoveryRepository;
1118
import in.projecteka.consentmanager.link.hiplink.UserAuthInitAction;
@@ -36,12 +43,12 @@
3643
public class LinkConfiguration {
3744

3845
@Bean
39-
public DiscoveryRepository discoveryRepository(PgPool pgPool) {
46+
public DiscoveryRepository discoveryRepository(@Qualifier("readWriteClient") PgPool pgPool) {
4047
return new DiscoveryRepository(pgPool);
4148
}
4249

4350
@Bean
44-
public LinkRepository linkRepository(PgPool pgPool) {
51+
public LinkRepository linkRepository(@Qualifier("readWriteClient") PgPool pgPool) {
4552
return new LinkRepository(pgPool);
4653
}
4754

consent/src/main/java/in/projecteka/consentmanager/properties/DbOptions.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,27 @@ public class DbOptions {
1616
private final String user;
1717
private final String password;
1818
private final int poolSize;
19+
private final boolean replicaReadEnabled;
20+
private final Replica replica;
21+
22+
public Replica getReplica() {
23+
return replica != null && replicaReadEnabled
24+
? replica
25+
: new Replica(host, port, user, password, getReadPoolSize());
26+
}
27+
28+
private int getReadPoolSize() {
29+
return poolSize / 2 + poolSize % 2;
30+
}
31+
32+
public int getPoolSize() {
33+
return replica != null && replicaReadEnabled
34+
? poolSize
35+
: poolSize / 2;
36+
}
1937

2038
public in.projecteka.library.common.DbOptions toHeartBeat() {
2139
return new in.projecteka.library.common.DbOptions(host, port);
2240
}
2341
}
42+

consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptRepository.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,16 @@
88
import lombok.AllArgsConstructor;
99
import org.slf4j.Logger;
1010
import org.slf4j.LoggerFactory;
11-
import org.springframework.stereotype.Repository;
1211
import reactor.core.publisher.Mono;
1312

1413
import java.util.ArrayList;
1514
import java.util.List;
1615

17-
18-
@Repository
1916
@AllArgsConstructor
2017
public class OtpAttemptRepository {
2118

2219
private final static Logger logger = LoggerFactory.getLogger(OtpAttemptRepository.class);
2320

24-
2521
private static final String INSERT_OTP_ATTEMPT = "INSERT INTO " +
2622
"otp_attempt (session_id ,cm_id, identifier_type, identifier_value, status, action) VALUES ($1,$2,$3,$4,$5,$6)";
2723

consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptService.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import in.projecteka.consentmanager.user.model.OtpAttempt;
44
import in.projecteka.library.clients.model.ClientError;
55
import lombok.AllArgsConstructor;
6-
import org.springframework.stereotype.Service;
76
import reactor.core.publisher.Mono;
87

98
import java.time.LocalDateTime;
@@ -55,8 +54,8 @@ public Mono<Void> removeMatchingAttempts(OtpAttempt otpAttempt) {
5554

5655
public <T> Mono<T> handleInvalidOTPError(ClientError error, OtpAttempt attempt) {
5756
Mono<T> invalidOTPError = Mono.error(error);
58-
if (error.getErrorCode().equals(OTP_INVALID)) {
59-
return saveOTPAttempt(attempt.toBuilder().attemptStatus(FAILURE).build()).then(invalidOTPError);
57+
if (error.getErrorCode().equals(ErrorCode.OTP_INVALID)) {
58+
return saveOTPAttempt(attempt.toBuilder().attemptStatus(OtpAttempt.AttemptStatus.FAILURE).build()).then(invalidOTPError);
6059
}
6160
return invalidOTPError;
6261
}

consent/src/main/java/in/projecteka/consentmanager/user/UserConfiguration.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public UserService userService(UserRepository userRepository,
5454
}
5555

5656
@Bean
57-
public UserRepository userRepository(PgPool pgPool) {
57+
public UserRepository userRepository(@Qualifier("readWriteClient") PgPool pgPool) {
5858
return new UserRepository(pgPool);
5959
}
6060

@@ -145,7 +145,7 @@ public SessionService sessionService(
145145
}
146146

147147
@Bean
148-
public TransactionPinRepository transactionPinRepository(PgPool dbClient) {
148+
public TransactionPinRepository transactionPinRepository(@Qualifier("readWriteClient") PgPool dbClient) {
149149
return new TransactionPinRepository(dbClient);
150150
}
151151

@@ -173,4 +173,15 @@ public TransactionPinService transactionPinService(TransactionPinRepository tran
173173
public ProfileService profileService(UserService userService, TransactionPinService transactionPinService) {
174174
return new ProfileService(userService, transactionPinService);
175175
}
176+
177+
@Bean
178+
public OtpAttemptRepository otpAttemptRepository(@Qualifier("readWriteClient") PgPool readWriteClient) {
179+
return new OtpAttemptRepository(readWriteClient);
180+
}
181+
182+
@Bean
183+
public OtpAttemptService otpAttemptService(OtpAttemptRepository otpAttemptRepository,
184+
UserServiceProperties userServiceProperties) {
185+
return new OtpAttemptService(otpAttemptRepository, userServiceProperties);
186+
}
176187
}

consent/src/main/resources/application.yml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,14 @@ consentmanager:
6262
schema: ${CONSENT_MANAGER_DB_NAME}
6363
user: ${POSTGRES_USER}
6464
password: ${POSTGRES_PASSWORD}
65-
poolSize: 5
65+
poolSize: ${MASTER_POOL_SIZE:5}
66+
replica-read-enabled: {REPLICA_READ_ENABLED:false}
67+
replica:
68+
host: ${POSTGRES_HOST}
69+
port: ${POSTGRES_PORT:5432}
70+
user: ${POSTGRES_USER}
71+
password: ${POSTGRES_PASSWORD}
72+
poolSize: ${REPLICA_POOL_SIZE:3}
6673
dataflow:
6774
consentmanager:
6875
url: ${CONSENT_MANAGER_URL}

0 commit comments

Comments
 (0)