diff --git a/consent/src/main/java/in/projecteka/consentmanager/ConsentManagerConfiguration.java b/consent/src/main/java/in/projecteka/consentmanager/ConsentManagerConfiguration.java index 18d0085e9..5b4534a39 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/ConsentManagerConfiguration.java +++ b/consent/src/main/java/in/projecteka/consentmanager/ConsentManagerConfiguration.java @@ -163,12 +163,12 @@ public ClientRegistryClient clientRegistryClient(@Qualifier("customBuilder") Web } @Bean - public LockedUsersRepository lockedUsersRepository(DbOptions dbOptions) { - return new LockedUsersRepository(pgPool(dbOptions)); + public LockedUsersRepository lockedUsersRepository(@Qualifier("readWriteClient") PgPool pgPool) { + return new LockedUsersRepository(pgPool); } - @Bean - public PgPool pgPool(DbOptions dbOptions) { + @Bean("readWriteClient") + public PgPool readWriteClient(DbOptions dbOptions) { PgConnectOptions connectOptions = new PgConnectOptions() .setPort(dbOptions.getPort()) .setHost(dbOptions.getHost()) @@ -176,8 +176,21 @@ public PgPool pgPool(DbOptions dbOptions) { .setUser(dbOptions.getUser()) .setPassword(dbOptions.getPassword()); - PoolOptions poolOptions = new PoolOptions() - .setMaxSize(dbOptions.getPoolSize()); + PoolOptions poolOptions = new PoolOptions().setMaxSize(dbOptions.getPoolSize()); + + return PgPool.pool(connectOptions, poolOptions); + } + + @Bean("readOnlyClient") + public PgPool readOnlyClient(DbOptions dbOptions) { + PgConnectOptions connectOptions = new PgConnectOptions() + .setPort(dbOptions.getReplica().getPort()) + .setHost(dbOptions.getReplica().getHost()) + .setDatabase(dbOptions.getSchema()) + .setUser(dbOptions.getReplica().getUser()) + .setPassword(dbOptions.getReplica().getPassword()); + + PoolOptions poolOptions = new PoolOptions().setMaxSize(dbOptions.getReplica().getPoolSize()); return PgPool.pool(connectOptions, poolOptions); } @@ -344,7 +357,6 @@ ReactiveRedisOperations stringReactiveRedisOperations( return new ReactiveRedisTemplate<>(factory, context); } - @ConditionalOnProperty(value = "consentmanager.cacheMethod", havingValue = "guava", matchIfMissing = true) @Bean({"cacheForReplayAttack"}) public CacheAdapter stringLocalDateTimeCacheAdapter() { diff --git a/consent/src/main/java/in/projecteka/consentmanager/consent/ConsentConfiguration.java b/consent/src/main/java/in/projecteka/consentmanager/consent/ConsentConfiguration.java index 4bfa6bf72..56b192250 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/consent/ConsentConfiguration.java +++ b/consent/src/main/java/in/projecteka/consentmanager/consent/ConsentConfiguration.java @@ -37,12 +37,13 @@ public class ConsentConfiguration { @Bean - public ConsentRequestRepository consentRequestRepository(PgPool pgPool) { - return new ConsentRequestRepository(pgPool); + public ConsentRequestRepository consentRequestRepository(@Qualifier("readWriteClient") PgPool readWriteClient, + @Qualifier("readOnlyClient") PgPool readOnlyClient) { + return new ConsentRequestRepository(readWriteClient, readOnlyClient); } @Bean - public ConsentArtefactRepository consentArtefactRepository(PgPool pgPool) { + public ConsentArtefactRepository consentArtefactRepository(@Qualifier("readWriteClient") PgPool pgPool) { return new ConsentArtefactRepository(pgPool); } diff --git a/consent/src/main/java/in/projecteka/consentmanager/consent/ConsentRequestRepository.java b/consent/src/main/java/in/projecteka/consentmanager/consent/ConsentRequestRepository.java index e9296c954..1547a3bd4 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/consent/ConsentRequestRepository.java +++ b/consent/src/main/java/in/projecteka/consentmanager/consent/ConsentRequestRepository.java @@ -11,7 +11,7 @@ import io.vertx.pgclient.PgPool; import io.vertx.sqlclient.Row; import io.vertx.sqlclient.RowSet; -import io.vertx.sqlclient.Tuple; +import lombok.AllArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -27,7 +27,10 @@ import static in.projecteka.consentmanager.consent.model.ConsentStatus.GRANTED; import static in.projecteka.library.common.Serializer.from; import static in.projecteka.library.common.Serializer.to; +import static io.vertx.sqlclient.Tuple.of; +import static reactor.core.publisher.Mono.create; +@AllArgsConstructor public class ConsentRequestRepository { private static final Logger logger = LoggerFactory.getLogger(ConsentRequestRepository.class); private static final String SELECT_CONSENT_REQUEST_BY_ID_AND_STATUS; @@ -35,16 +38,17 @@ public class ConsentRequestRepository { private static final String SELECT_CONSENT_REQUESTS_BY_STATUS; private static final String SELECT_CONSENT_DETAILS_FOR_PATIENT; private static final String SELECT_CONSENT_REQUEST_COUNT = "SELECT COUNT(*) FROM consent_request " + - "WHERE LOWER(patient_id) = $1 and status != $3 and (status=$2 OR $2 IS NULL)"; + "WHERE LOWER(patient_id) = $1 and status != $3 and (status= $2 OR $2 IS NULL)"; private static final String INSERT_CONSENT_REQUEST_QUERY = "INSERT INTO consent_request " + "(request_id, patient_id, status, details) VALUES ($1, $2, $3, $4)"; - private static final String UPDATE_CONSENT_REQUEST_STATUS_QUERY = "UPDATE consent_request SET status=$1, " + - "date_modified=$2 WHERE request_id=$3"; + private static final String UPDATE_CONSENT_REQUEST_STATUS_QUERY = "UPDATE consent_request SET status = $1, " + + "date_modified= $2 WHERE request_id= $3"; private static final String FAILED_TO_SAVE_CONSENT_REQUEST = "Failed to save consent request"; private static final String UNKNOWN_ERROR_OCCURRED = "Unknown error occurred"; private static final String FAILED_TO_GET_CONSENT_REQUESTS_BY_STATUS = "Failed to get consent requests by status"; - private final PgPool dbClient; + private final PgPool readWriteClient; + private final PgPool readOnlyClient; static { String s = "SELECT request_id, status, details, date_created, date_modified FROM consent_request " + @@ -57,14 +61,10 @@ public class ConsentRequestRepository { SELECT_CONSENT_REQUESTS_BY_STATUS = s + "status=$1"; } - public ConsentRequestRepository(PgPool dbClient) { - this.dbClient = dbClient; - } - public Mono insert(RequestedDetail requestedDetail, UUID requestId) { - return Mono.create(monoSink -> - dbClient.preparedQuery(INSERT_CONSENT_REQUEST_QUERY) - .execute(Tuple.of(requestId.toString(), + return create(monoSink -> + readWriteClient.preparedQuery(INSERT_CONSENT_REQUEST_QUERY) + .execute(of(requestId.toString(), requestedDetail.getPatient().getId(), ConsentStatus.REQUESTED.name(), new JsonObject(from(requestedDetail))), @@ -82,22 +82,21 @@ public Mono>> requestsForPatient(String pa int limit, int offset, String status) { - return Mono.create(monoSink -> dbClient.preparedQuery(SELECT_CONSENT_DETAILS_FOR_PATIENT) - .execute(Tuple.of(patientId.toLowerCase(), limit, offset, status, GRANTED.toString()), + return create(monoSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_DETAILS_FOR_PATIENT) + .execute(of(patientId.toLowerCase(), limit, offset, status, GRANTED.toString()), handler -> { List requestList = getConsentRequestDetails(handler); - dbClient.preparedQuery(SELECT_CONSENT_REQUEST_COUNT) - .execute(Tuple.of(patientId, status, GRANTED.toString()), counter -> { + readOnlyClient.preparedQuery(SELECT_CONSENT_REQUEST_COUNT) + .execute(of(patientId, status, GRANTED.toString()), + counter -> { if (handler.failed()) { logger.error(handler.cause().getMessage(), handler.cause()); monoSink.error(new DbOperationError()); return; } - Integer count = counter.result().iterator() - .next().getInteger("count"); + var count = counter.result().iterator().next().getInteger("count"); monoSink.success(new ListResult<>(requestList, count)); - } - ); + }); })); } @@ -115,15 +114,13 @@ private List getConsentRequestDetails(AsyncResult requestOf(String requestId, String status, String patientId) { - return Mono.create(monoSink -> dbClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID_AND_STATUS) - .execute(Tuple.of(requestId, status, patientId), - consentRequestHandler(monoSink))); + return create(monoSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID_AND_STATUS) + .execute(of(requestId, status, patientId), consentRequestHandler(monoSink))); } public Mono requestOf(String requestId) { - return Mono.create(monoSink -> dbClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID) - .execute(Tuple.of(requestId), - consentRequestHandler(monoSink))); + return create(monoSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_REQUEST_BY_ID) + .execute(of(requestId), consentRequestHandler(monoSink))); } private Handler>> consentRequestHandler(MonoSink monoSink) { @@ -162,8 +159,8 @@ private ConsentRequestDetail mapToConsentRequestDetail(Row result) { } public Mono updateStatus(String id, ConsentStatus status) { - return Mono.create(monoSink -> dbClient.preparedQuery(UPDATE_CONSENT_REQUEST_STATUS_QUERY) - .execute(Tuple.of(status.toString(), LocalDateTime.now(ZoneOffset.UTC), id), + return create(monoSink -> readWriteClient.preparedQuery(UPDATE_CONSENT_REQUEST_STATUS_QUERY) + .execute(of(status.toString(), LocalDateTime.now(ZoneOffset.UTC), id), updateHandler -> { if (updateHandler.failed()) { monoSink.error(new Exception("Failed to update status")); @@ -178,8 +175,8 @@ private ConsentStatus getConsentStatus(String status) { } public Flux getConsentsByStatus(ConsentStatus status) { - return Flux.create(fluxSink -> dbClient.preparedQuery(SELECT_CONSENT_REQUESTS_BY_STATUS) - .execute(Tuple.of(status.toString()), + return Flux.create(fluxSink -> readOnlyClient.preparedQuery(SELECT_CONSENT_REQUESTS_BY_STATUS) + .execute(of(status.toString()), handler -> { if (handler.failed()) { logger.error(handler.cause().getMessage(), handler.cause()); diff --git a/consent/src/main/java/in/projecteka/consentmanager/dataflow/DataFlowConfiguration.java b/consent/src/main/java/in/projecteka/consentmanager/dataflow/DataFlowConfiguration.java index 098b50420..2d966a79f 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/dataflow/DataFlowConfiguration.java +++ b/consent/src/main/java/in/projecteka/consentmanager/dataflow/DataFlowConfiguration.java @@ -65,7 +65,7 @@ public DataFlowRequester dataRequest(@Qualifier("customBuilder") WebClient.Build } @Bean - public DataFlowRequestRepository dataRequestRepository(PgPool pgPool) { + public DataFlowRequestRepository dataRequestRepository(@Qualifier("readWriteClient") PgPool pgPool) { return new DataFlowRequestRepository(pgPool); } diff --git a/consent/src/main/java/in/projecteka/consentmanager/link/LinkConfiguration.java b/consent/src/main/java/in/projecteka/consentmanager/link/LinkConfiguration.java index 98041f423..ba03e8ec9 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/link/LinkConfiguration.java +++ b/consent/src/main/java/in/projecteka/consentmanager/link/LinkConfiguration.java @@ -6,6 +6,13 @@ import in.projecteka.consentmanager.clients.DiscoveryServiceClient; import in.projecteka.consentmanager.clients.LinkServiceClient; import in.projecteka.consentmanager.clients.UserServiceClient; +import in.projecteka.consentmanager.clients.properties.GatewayServiceProperties; +import in.projecteka.consentmanager.clients.properties.LinkServiceProperties; +import in.projecteka.consentmanager.common.CentralRegistry; +import in.projecteka.consentmanager.common.ServiceAuthentication; +import in.projecteka.consentmanager.common.cache.CacheAdapter; +import in.projecteka.consentmanager.common.cache.LoadingCacheAdapter; +import in.projecteka.consentmanager.common.cache.RedisCacheAdapter; import in.projecteka.consentmanager.link.discovery.Discovery; import in.projecteka.consentmanager.link.discovery.DiscoveryRepository; import in.projecteka.consentmanager.link.hiplink.UserAuthInitAction; @@ -36,12 +43,12 @@ public class LinkConfiguration { @Bean - public DiscoveryRepository discoveryRepository(PgPool pgPool) { + public DiscoveryRepository discoveryRepository(@Qualifier("readWriteClient") PgPool pgPool) { return new DiscoveryRepository(pgPool); } @Bean - public LinkRepository linkRepository(PgPool pgPool) { + public LinkRepository linkRepository(@Qualifier("readWriteClient") PgPool pgPool) { return new LinkRepository(pgPool); } diff --git a/consent/src/main/java/in/projecteka/consentmanager/properties/DbOptions.java b/consent/src/main/java/in/projecteka/consentmanager/properties/DbOptions.java index cf1950327..806dc95b3 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/properties/DbOptions.java +++ b/consent/src/main/java/in/projecteka/consentmanager/properties/DbOptions.java @@ -16,8 +16,27 @@ public class DbOptions { private final String user; private final String password; private final int poolSize; + private final boolean replicaReadEnabled; + private final Replica replica; + + public Replica getReplica() { + return replica != null && replicaReadEnabled + ? replica + : new Replica(host, port, user, password, getReadPoolSize()); + } + + private int getReadPoolSize() { + return poolSize / 2 + poolSize % 2; + } + + public int getPoolSize() { + return replica != null && replicaReadEnabled + ? poolSize + : poolSize / 2; + } public in.projecteka.library.common.DbOptions toHeartBeat() { return new in.projecteka.library.common.DbOptions(host, port); } } + diff --git a/consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptRepository.java b/consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptRepository.java index 982977bea..99f34222d 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptRepository.java +++ b/consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptRepository.java @@ -8,20 +8,16 @@ import lombok.AllArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Repository; import reactor.core.publisher.Mono; import java.util.ArrayList; import java.util.List; - -@Repository @AllArgsConstructor public class OtpAttemptRepository { private final static Logger logger = LoggerFactory.getLogger(OtpAttemptRepository.class); - private static final String INSERT_OTP_ATTEMPT = "INSERT INTO " + "otp_attempt (session_id ,cm_id, identifier_type, identifier_value, status, action) VALUES ($1,$2,$3,$4,$5,$6)"; diff --git a/consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptService.java b/consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptService.java index fe5bec0bc..92610d4d2 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptService.java +++ b/consent/src/main/java/in/projecteka/consentmanager/user/OtpAttemptService.java @@ -3,7 +3,6 @@ import in.projecteka.consentmanager.user.model.OtpAttempt; import in.projecteka.library.clients.model.ClientError; import lombok.AllArgsConstructor; -import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; import java.time.LocalDateTime; @@ -55,8 +54,8 @@ public Mono removeMatchingAttempts(OtpAttempt otpAttempt) { public Mono handleInvalidOTPError(ClientError error, OtpAttempt attempt) { Mono invalidOTPError = Mono.error(error); - if (error.getErrorCode().equals(OTP_INVALID)) { - return saveOTPAttempt(attempt.toBuilder().attemptStatus(FAILURE).build()).then(invalidOTPError); + if (error.getErrorCode().equals(ErrorCode.OTP_INVALID)) { + return saveOTPAttempt(attempt.toBuilder().attemptStatus(OtpAttempt.AttemptStatus.FAILURE).build()).then(invalidOTPError); } return invalidOTPError; } diff --git a/consent/src/main/java/in/projecteka/consentmanager/user/UserConfiguration.java b/consent/src/main/java/in/projecteka/consentmanager/user/UserConfiguration.java index 40bff4e59..88a5a2dc6 100644 --- a/consent/src/main/java/in/projecteka/consentmanager/user/UserConfiguration.java +++ b/consent/src/main/java/in/projecteka/consentmanager/user/UserConfiguration.java @@ -54,7 +54,7 @@ public UserService userService(UserRepository userRepository, } @Bean - public UserRepository userRepository(PgPool pgPool) { + public UserRepository userRepository(@Qualifier("readWriteClient") PgPool pgPool) { return new UserRepository(pgPool); } @@ -145,7 +145,7 @@ public SessionService sessionService( } @Bean - public TransactionPinRepository transactionPinRepository(PgPool dbClient) { + public TransactionPinRepository transactionPinRepository(@Qualifier("readWriteClient") PgPool dbClient) { return new TransactionPinRepository(dbClient); } @@ -173,4 +173,15 @@ public TransactionPinService transactionPinService(TransactionPinRepository tran public ProfileService profileService(UserService userService, TransactionPinService transactionPinService) { return new ProfileService(userService, transactionPinService); } + + @Bean + public OtpAttemptRepository otpAttemptRepository(@Qualifier("readWriteClient") PgPool readWriteClient) { + return new OtpAttemptRepository(readWriteClient); + } + + @Bean + public OtpAttemptService otpAttemptService(OtpAttemptRepository otpAttemptRepository, + UserServiceProperties userServiceProperties) { + return new OtpAttemptService(otpAttemptRepository, userServiceProperties); + } } diff --git a/consent/src/main/resources/application.yml b/consent/src/main/resources/application.yml index 8dddb6bbf..45feeb9e9 100644 --- a/consent/src/main/resources/application.yml +++ b/consent/src/main/resources/application.yml @@ -62,7 +62,14 @@ consentmanager: schema: ${CONSENT_MANAGER_DB_NAME} user: ${POSTGRES_USER} password: ${POSTGRES_PASSWORD} - poolSize: 5 + poolSize: ${MASTER_POOL_SIZE:5} + replica-read-enabled: {REPLICA_READ_ENABLED:false} + replica: + host: ${POSTGRES_HOST} + port: ${POSTGRES_PORT:5432} + user: ${POSTGRES_USER} + password: ${POSTGRES_PASSWORD} + poolSize: ${REPLICA_POOL_SIZE:3} dataflow: consentmanager: url: ${CONSENT_MANAGER_URL} diff --git a/src/main/java/in/projecteka/consentmanager/Replica.java b/src/main/java/in/projecteka/consentmanager/Replica.java new file mode 100644 index 000000000..90d7724e6 --- /dev/null +++ b/src/main/java/in/projecteka/consentmanager/Replica.java @@ -0,0 +1,14 @@ +package in.projecteka.consentmanager; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@AllArgsConstructor +@Getter +public class Replica { + private final String host; + private final int port; + private final String user; + private final String password; + private final int poolSize; +}