Skip to content

Commit 147a139

Browse files
committed
Enable read from replica
It's configurable in case if we have replica then we could use to read from replica, otherwise by default it will use the master configuration from read postgres client too
1 parent 71df29a commit 147a139

File tree

11 files changed

+110
-57
lines changed

11 files changed

+110
-57
lines changed

src/main/java/in/projecteka/consentmanager/ConsentManagerConfiguration.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -150,21 +150,34 @@ public ClientRegistryClient clientRegistryClient(@Qualifier("customBuilder") Web
150150
}
151151

152152
@Bean
153-
public LockedUsersRepository lockedUsersRepository(DbOptions dbOptions) {
154-
return new LockedUsersRepository(pgPool(dbOptions));
153+
public LockedUsersRepository lockedUsersRepository(@Qualifier("readWriteClient") PgPool pgPool) {
154+
return new LockedUsersRepository(pgPool);
155155
}
156156

157-
@Bean
158-
public PgPool pgPool(DbOptions dbOptions) {
157+
@Bean("readWriteClient")
158+
public PgPool readWriteClient(DbOptions dbOptions) {
159159
PgConnectOptions connectOptions = new PgConnectOptions()
160160
.setPort(dbOptions.getPort())
161161
.setHost(dbOptions.getHost())
162162
.setDatabase(dbOptions.getSchema())
163163
.setUser(dbOptions.getUser())
164164
.setPassword(dbOptions.getPassword());
165165

166-
PoolOptions poolOptions = new PoolOptions()
167-
.setMaxSize(dbOptions.getPoolSize());
166+
PoolOptions poolOptions = new PoolOptions().setMaxSize(dbOptions.getPoolSize());
167+
168+
return PgPool.pool(connectOptions, poolOptions);
169+
}
170+
171+
@Bean("readOnlyClient")
172+
public PgPool readOnlyClient(DbOptions dbOptions) {
173+
PgConnectOptions connectOptions = new PgConnectOptions()
174+
.setPort(dbOptions.getReplica().getPort())
175+
.setHost(dbOptions.getReplica().getHost())
176+
.setDatabase(dbOptions.getSchema())
177+
.setUser(dbOptions.getReplica().getUser())
178+
.setPassword(dbOptions.getReplica().getPassword());
179+
180+
PoolOptions poolOptions = new PoolOptions().setMaxSize(dbOptions.getReplica().getPoolSize());
168181

169182
return PgPool.pool(connectOptions, poolOptions);
170183
}
@@ -325,7 +338,6 @@ ReactiveRedisOperations<String, String> stringReactiveRedisOperations(
325338
return new ReactiveRedisTemplate<>(factory, context);
326339
}
327340

328-
329341
@ConditionalOnProperty(value = "consentmanager.cacheMethod", havingValue = "guava", matchIfMissing = true)
330342
@Bean({"cacheForReplayAttack"})
331343
public CacheAdapter<String, LocalDateTime> stringLocalDateTimeCacheAdapter() {

src/main/java/in/projecteka/consentmanager/DbOptions.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,23 @@ public class DbOptions {
1616
private final String user;
1717
private final String password;
1818
private final int poolSize;
19+
private final boolean replicaReadEnabled;
20+
private final Replica replica;
21+
22+
public Replica getReplica() {
23+
return replica != null && replicaReadEnabled
24+
? replica
25+
: new Replica(host, port, user, password, getReadPoolSize());
26+
}
27+
28+
private int getReadPoolSize() {
29+
return poolSize / 2 + poolSize % 2;
30+
}
31+
32+
public int getPoolSize() {
33+
return replica != null && replicaReadEnabled
34+
? poolSize
35+
: poolSize / 2;
36+
}
1937
}
38+
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package in.projecteka.consentmanager;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Getter;
5+
6+
@AllArgsConstructor
7+
@Getter
8+
public class Replica {
9+
private final String host;
10+
private final int port;
11+
private final String user;
12+
private final String password;
13+
private final int poolSize;
14+
}

src/main/java/in/projecteka/consentmanager/consent/ConsentConfiguration.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,13 @@
3636
public class ConsentConfiguration {
3737

3838
@Bean
39-
public ConsentRequestRepository consentRequestRepository(PgPool pgPool) {
40-
return new ConsentRequestRepository(pgPool);
39+
public ConsentRequestRepository consentRequestRepository(@Qualifier("readWriteClient") PgPool readWriteClient,
40+
@Qualifier("readOnlyClient") PgPool readOnlyClient) {
41+
return new ConsentRequestRepository(readWriteClient, readOnlyClient);
4142
}
4243

4344
@Bean
44-
public ConsentArtefactRepository consentArtefactRepository(PgPool pgPool) {
45+
public ConsentArtefactRepository consentArtefactRepository(@Qualifier("readWriteClient") PgPool pgPool) {
4546
return new ConsentArtefactRepository(pgPool);
4647
}
4748

src/main/java/in/projecteka/consentmanager/consent/ConsentRequestRepository.java

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import io.vertx.pgclient.PgPool;
1212
import io.vertx.sqlclient.Row;
1313
import io.vertx.sqlclient.RowSet;
14-
import io.vertx.sqlclient.Tuple;
14+
import lombok.AllArgsConstructor;
1515
import org.slf4j.Logger;
1616
import org.slf4j.LoggerFactory;
1717
import reactor.core.publisher.Flux;
@@ -27,24 +27,28 @@
2727
import static in.projecteka.consentmanager.common.Serializer.from;
2828
import static in.projecteka.consentmanager.common.Serializer.to;
2929
import static in.projecteka.consentmanager.consent.model.ConsentStatus.GRANTED;
30+
import static io.vertx.sqlclient.Tuple.of;
31+
import static reactor.core.publisher.Mono.create;
3032

33+
@AllArgsConstructor
3134
public class ConsentRequestRepository {
3235
private static final Logger logger = LoggerFactory.getLogger(ConsentRequestRepository.class);
3336
private static final String SELECT_CONSENT_REQUEST_BY_ID_AND_STATUS;
3437
private static final String SELECT_CONSENT_REQUEST_BY_ID;
3538
private static final String SELECT_CONSENT_REQUESTS_BY_STATUS;
3639
private static final String SELECT_CONSENT_DETAILS_FOR_PATIENT;
3740
private static final String SELECT_CONSENT_REQUEST_COUNT = "SELECT COUNT(*) FROM consent_request " +
38-
"WHERE LOWER(patient_id) = $1 and status != $3 and (status=$2 OR $2 IS NULL)";
41+
"WHERE LOWER(patient_id) = $1 and status != $3 and (status= $2 OR $2 IS NULL)";
3942
private static final String INSERT_CONSENT_REQUEST_QUERY = "INSERT INTO consent_request " +
4043
"(request_id, patient_id, status, details) VALUES ($1, $2, $3, $4)";
41-
private static final String UPDATE_CONSENT_REQUEST_STATUS_QUERY = "UPDATE consent_request SET status=$1, " +
42-
"date_modified=$2 WHERE request_id=$3";
44+
private static final String UPDATE_CONSENT_REQUEST_STATUS_QUERY = "UPDATE consent_request SET status = $1, " +
45+
"date_modified= $2 WHERE request_id= $3";
4346
private static final String FAILED_TO_SAVE_CONSENT_REQUEST = "Failed to save consent request";
4447
private static final String UNKNOWN_ERROR_OCCURRED = "Unknown error occurred";
4548
private static final String FAILED_TO_GET_CONSENT_REQUESTS_BY_STATUS = "Failed to get consent requests by status";
4649

47-
private final PgPool dbClient;
50+
private final PgPool readWriteClient;
51+
private final PgPool readOnlyClient;
4852

4953
static {
5054
String s = "SELECT request_id, status, details, date_created, date_modified FROM consent_request " +
@@ -57,14 +61,10 @@ public class ConsentRequestRepository {
5761
SELECT_CONSENT_REQUESTS_BY_STATUS = s + "status=$1";
5862
}
5963

60-
public ConsentRequestRepository(PgPool dbClient) {
61-
this.dbClient = dbClient;
62-
}
63-
6464
public Mono<Void> insert(RequestedDetail requestedDetail, UUID requestId) {
65-
return Mono.create(monoSink ->
66-
dbClient.preparedQuery(INSERT_CONSENT_REQUEST_QUERY)
67-
.execute(Tuple.of(requestId.toString(),
65+
return create(monoSink ->
66+
readWriteClient.preparedQuery(INSERT_CONSENT_REQUEST_QUERY)
67+
.execute(of(requestId.toString(),
6868
requestedDetail.getPatient().getId(),
6969
ConsentStatus.REQUESTED.name(),
7070
new JsonObject(from(requestedDetail))),
@@ -82,22 +82,21 @@ public Mono<ListResult<List<ConsentRequestDetail>>> requestsForPatient(String pa
8282
int limit,
8383
int offset,
8484
String status) {
85-
return Mono.create(monoSink -> dbClient.preparedQuery(SELECT_CONSENT_DETAILS_FOR_PATIENT)
86-
.execute(Tuple.of(patientId.toLowerCase(), limit, offset, status, GRANTED.toString()),
85+
return create(monoSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_DETAILS_FOR_PATIENT)
86+
.execute(of(patientId.toLowerCase(), limit, offset, status, GRANTED.toString()),
8787
handler -> {
8888
List<ConsentRequestDetail> requestList = getConsentRequestDetails(handler);
89-
dbClient.preparedQuery(SELECT_CONSENT_REQUEST_COUNT)
90-
.execute(Tuple.of(patientId, status, GRANTED.toString()), counter -> {
89+
readOnlyClient.preparedQuery(SELECT_CONSENT_REQUEST_COUNT)
90+
.execute(of(patientId, status, GRANTED.toString()),
91+
counter -> {
9192
if (handler.failed()) {
9293
logger.error(handler.cause().getMessage(), handler.cause());
9394
monoSink.error(new DbOperationError());
9495
return;
9596
}
96-
Integer count = counter.result().iterator()
97-
.next().getInteger("count");
97+
var count = counter.result().iterator().next().getInteger("count");
9898
monoSink.success(new ListResult<>(requestList, count));
99-
}
100-
);
99+
});
101100
}));
102101
}
103102

@@ -115,15 +114,13 @@ private List<ConsentRequestDetail> getConsentRequestDetails(AsyncResult<RowSet<R
115114
}
116115

117116
public Mono<ConsentRequestDetail> requestOf(String requestId, String status, String patientId) {
118-
return Mono.create(monoSink -> dbClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID_AND_STATUS)
119-
.execute(Tuple.of(requestId, status, patientId),
120-
consentRequestHandler(monoSink)));
117+
return create(monoSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID_AND_STATUS)
118+
.execute(of(requestId, status, patientId), consentRequestHandler(monoSink)));
121119
}
122120

123121
public Mono<ConsentRequestDetail> requestOf(String requestId) {
124-
return Mono.create(monoSink -> dbClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID)
125-
.execute(Tuple.of(requestId),
126-
consentRequestHandler(monoSink)));
122+
return create(monoSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID)
123+
.execute(of(requestId), consentRequestHandler(monoSink)));
127124
}
128125

129126
private Handler<AsyncResult<RowSet<Row>>> consentRequestHandler(MonoSink<ConsentRequestDetail> monoSink) {
@@ -162,8 +159,8 @@ private ConsentRequestDetail mapToConsentRequestDetail(Row result) {
162159
}
163160

164161
public Mono<Void> updateStatus(String id, ConsentStatus status) {
165-
return Mono.create(monoSink -> dbClient.preparedQuery(UPDATE_CONSENT_REQUEST_STATUS_QUERY)
166-
.execute(Tuple.of(status.toString(), LocalDateTime.now(ZoneOffset.UTC), id),
162+
return create(monoSink -> readWriteClient.preparedQuery(UPDATE_CONSENT_REQUEST_STATUS_QUERY)
163+
.execute(of(status.toString(), LocalDateTime.now(ZoneOffset.UTC), id),
167164
updateHandler -> {
168165
if (updateHandler.failed()) {
169166
monoSink.error(new Exception("Failed to update status"));
@@ -178,8 +175,8 @@ private ConsentStatus getConsentStatus(String status) {
178175
}
179176

180177
public Flux<ConsentRequestDetail> getConsentsByStatus(ConsentStatus status) {
181-
return Flux.create(fluxSink -> dbClient.preparedQuery(SELECT_CONSENT_REQUESTS_BY_STATUS)
182-
.execute(Tuple.of(status.toString()),
178+
return Flux.create(fluxSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_REQUESTS_BY_STATUS)
179+
.execute(of(status.toString()),
183180
handler -> {
184181
if (handler.failed()) {
185182
logger.error(handler.cause().getMessage(), handler.cause());

src/main/java/in/projecteka/consentmanager/dataflow/DataFlowConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public DataFlowRequester dataRequest(@Qualifier("customBuilder") WebClient.Build
6565
}
6666

6767
@Bean
68-
public DataFlowRequestRepository dataRequestRepository(PgPool pgPool) {
68+
public DataFlowRequestRepository dataRequestRepository(@Qualifier("readWriteClient") PgPool pgPool) {
6969
return new DataFlowRequestRepository(pgPool);
7070
}
7171

src/main/java/in/projecteka/consentmanager/link/LinkConfiguration.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import in.projecteka.consentmanager.clients.UserServiceClient;
99
import in.projecteka.consentmanager.clients.properties.GatewayServiceProperties;
1010
import in.projecteka.consentmanager.clients.properties.LinkServiceProperties;
11-
import in.projecteka.consentmanager.clients.properties.LinkTokenCacheProperties;
1211
import in.projecteka.consentmanager.common.CentralRegistry;
1312
import in.projecteka.consentmanager.common.ServiceAuthentication;
1413
import in.projecteka.consentmanager.common.cache.CacheAdapter;
@@ -36,12 +35,12 @@
3635
public class LinkConfiguration {
3736

3837
@Bean
39-
public DiscoveryRepository discoveryRepository(PgPool pgPool) {
38+
public DiscoveryRepository discoveryRepository(@Qualifier("readWriteClient") PgPool pgPool) {
4039
return new DiscoveryRepository(pgPool);
4140
}
4241

4342
@Bean
44-
public LinkRepository linkRepository(PgPool pgPool) {
43+
public LinkRepository linkRepository(@Qualifier("readWriteClient") PgPool pgPool) {
4544
return new LinkRepository(pgPool);
4645
}
4746

@@ -127,5 +126,5 @@ public CacheAdapter<String, String> createRedisCacheAdapter(
127126
ReactiveRedisOperations<String, String> stringReactiveRedisOperations) {
128127
return new RedisCacheAdapter(stringReactiveRedisOperations, 5);
129128
}
130-
129+
131130
}

src/main/java/in/projecteka/consentmanager/user/OtpAttemptRepository.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,16 @@
88
import lombok.AllArgsConstructor;
99
import org.slf4j.Logger;
1010
import org.slf4j.LoggerFactory;
11-
import org.springframework.stereotype.Repository;
1211
import reactor.core.publisher.Mono;
1312

1413
import java.util.ArrayList;
1514
import java.util.List;
1615

17-
18-
@Repository
1916
@AllArgsConstructor
2017
public class OtpAttemptRepository {
2118

2219
private final static Logger logger = LoggerFactory.getLogger(OtpAttemptRepository.class);
2320

24-
2521
private static final String INSERT_OTP_ATTEMPT = "INSERT INTO " +
2622
"otp_attempt (session_id ,cm_id, identifier_type, identifier_value, status, action) VALUES ($1,$2,$3,$4,$5,$6)";
2723

src/main/java/in/projecteka/consentmanager/user/OtpAttemptService.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,13 @@
44
import in.projecteka.consentmanager.clients.model.ErrorCode;
55
import in.projecteka.consentmanager.user.model.OtpAttempt;
66
import lombok.AllArgsConstructor;
7-
import org.springframework.stereotype.Service;
87
import reactor.core.publisher.Mono;
98

109
import java.time.LocalDateTime;
1110
import java.time.ZoneOffset;
1211
import java.util.ArrayList;
1312
import java.util.List;
1413

15-
16-
@Service
1714
@AllArgsConstructor
1815
public class OtpAttemptService {
1916

@@ -52,9 +49,9 @@ public Mono<Void> removeMatchingAttempts(OtpAttempt otpAttempt) {
5249
return otpAttemptRepository.removeAttempts(otpAttempt);
5350
}
5451

55-
public <T> Mono<T> handleInvalidOTPError(ClientError error, OtpAttempt attempt){
52+
public <T> Mono<T> handleInvalidOTPError(ClientError error, OtpAttempt attempt) {
5653
Mono<T> invalidOTPError = Mono.error(error);
57-
if(error.getErrorCode().equals(ErrorCode.OTP_INVALID)) {
54+
if (error.getErrorCode().equals(ErrorCode.OTP_INVALID)) {
5855
return saveOTPAttempt(attempt.toBuilder().attemptStatus(OtpAttempt.AttemptStatus.FAILURE).build()).then(invalidOTPError);
5956
}
6057
return invalidOTPError;

src/main/java/in/projecteka/consentmanager/user/UserConfiguration.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public UserService userService(UserRepository userRepository,
5353
}
5454

5555
@Bean
56-
public UserRepository userRepository(PgPool pgPool) {
56+
public UserRepository userRepository(@Qualifier("readWriteClient") PgPool pgPool) {
5757
return new UserRepository(pgPool);
5858
}
5959

@@ -141,7 +141,7 @@ public SessionService sessionService(
141141
}
142142

143143
@Bean
144-
public TransactionPinRepository transactionPinRepository(PgPool dbClient) {
144+
public TransactionPinRepository transactionPinRepository(@Qualifier("readWriteClient") PgPool dbClient) {
145145
return new TransactionPinRepository(dbClient);
146146
}
147147

@@ -169,4 +169,15 @@ public TransactionPinService transactionPinService(TransactionPinRepository tran
169169
public ProfileService profileService(UserService userService, TransactionPinService transactionPinService) {
170170
return new ProfileService(userService, transactionPinService);
171171
}
172+
173+
@Bean
174+
public OtpAttemptRepository otpAttemptRepository(@Qualifier("readWriteClient") PgPool readWriteClient) {
175+
return new OtpAttemptRepository(readWriteClient);
176+
}
177+
178+
@Bean
179+
public OtpAttemptService otpAttemptService(OtpAttemptRepository otpAttemptRepository,
180+
UserServiceProperties userServiceProperties) {
181+
return new OtpAttemptService(otpAttemptRepository, userServiceProperties);
182+
}
172183
}

0 commit comments

Comments
 (0)