Skip to content

Commit 3246bfc

Browse files
committed
kafka-messages has been added to Person and Salary service
1 parent e5f9485 commit 3246bfc

File tree

9 files changed

+174
-6
lines changed

9 files changed

+174
-6
lines changed

person-service/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,16 @@ services:
3434
We will use github public repository for our configuration:
3535
https://github.com/cevheri/microservices-config-server
3636
37+
## Kafka Configuration
38+
39+
---
40+
### Create Kafka Topic
41+
```shell
42+
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 5 --topic salary
43+
```
44+
---
45+
46+
3747

3848
---
3949
## Development

person-service/pom.xml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,15 @@
8383
<artifactId>springfox-swagger-ui</artifactId>
8484
<version>3.0.0</version>
8585
</dependency>
86-
86+
<dependency>
87+
<groupId>org.springframework.kafka</groupId>
88+
<artifactId>spring-kafka</artifactId>
89+
</dependency>
90+
<dependency>
91+
<groupId>org.springframework.kafka</groupId>
92+
<artifactId>spring-kafka-test</artifactId>
93+
<scope>test</scope>
94+
</dependency>
8795
</dependencies>
8896
<dependencyManagement>
8997
<dependencies>

person-service/src/main/java/com/cevher/ms/person/service/PersonService.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,28 @@ public class PersonService {
2020

2121
private final PersonRepository personRepository;
2222
private final RestTemplate restTemplate;
23+
private final SalaryProducer salaryProducer;
24+
25+
private final Double PERSON_DEFAULT_SALARY = 1000D;
26+
27+
private void sendFirstSalary(Person person) {
28+
log.info("sendFirstSalary method of PersonService for CurrentPerson");
29+
30+
SalaryMessage salaryMessage = SalaryMessage
31+
.builder()
32+
.personId(person.getId())
33+
.amount(PERSON_DEFAULT_SALARY)
34+
.build();
35+
salaryProducer.produce(salaryMessage.toString());
36+
37+
log.info("Send Kafka Message: " + salaryMessage);
38+
}
2339

2440
public Person savePerson(Person person) {
2541
log.info("savePerson method of PersonService");
26-
return personRepository.save(person);
42+
Person resultPerson = personRepository.save(person);
43+
sendFirstSalary(resultPerson);
44+
return resultPerson;
2745
}
2846

2947
public ResponseTempVM getPersonWithDepartment(Long personId) {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.cevher.ms.person.service;
2+
3+
import lombok.*;
4+
import lombok.extern.slf4j.Slf4j;
5+
import org.springframework.kafka.core.KafkaTemplate;
6+
import org.springframework.stereotype.Service;
7+
8+
import java.io.Serializable;
9+
import java.util.UUID;
10+
11+
@Service
12+
@Slf4j
13+
@RequiredArgsConstructor
14+
public class SalaryProducer {
15+
private static final String KAFKA_TOPIC = "salary";
16+
17+
private final KafkaTemplate<String, String> kafkaTemplate;
18+
19+
public void produce(String message) {
20+
kafkaTemplate.send(KAFKA_TOPIC, message);
21+
}
22+
}
23+
24+
25+
@AllArgsConstructor
26+
@NoArgsConstructor
27+
@ToString
28+
@EqualsAndHashCode
29+
@Builder()
30+
class SalaryMessage implements Serializable {
31+
32+
private String toService;
33+
private String fromService;
34+
private String uuid;
35+
36+
@Getter
37+
@Setter
38+
private Long personId;
39+
40+
@Getter
41+
@Setter
42+
private Double amount;
43+
44+
public String getUuid() {
45+
return UUID.randomUUID().toString();
46+
}
47+
48+
public String getFromService() {
49+
return "person-service";
50+
}
51+
public String getToService() {
52+
return "salary-service";
53+
}
54+
}

person-service/src/main/resources/application.yml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,17 @@ spring:
33
name: person-service
44
zipkin:
55
base-url: ${ZIPKIN_URI:http://zipkin:9411/}
6-
6+
kafka:
7+
consumer:
8+
bootstrap-servers: localhost:9092
9+
group-id: salary_group
10+
auto-offset-reset: earliest
11+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
12+
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
13+
producer:
14+
bootstrap-servers: localhost:9092
15+
key-serializer: org.apache.kafka.common.serialization.StringSerializer
16+
value-serializer: org.apache.kafka.common.serialization.StringSerializer
717
server:
818
port: 9002
919

@@ -28,4 +38,4 @@ eureka:
2838
#inform the Eureka service that client wants to be advertised by IP address.
2939
#Personally, we always set this attribute to true. Cloud-based microservices are sup-
3040
#posed to be ephemeral and stateless. They can be started up and shut down at will.
31-
#IP addresses are more appropriate for these types of services.
41+
#IP addresses are more appropriate for these types of services.

salary-service/src/main/java/com/cevher/ms/salary/domain/Salary.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
@ToString
1515
@EqualsAndHashCode
1616
@Entity
17-
@Table(name = "salary")
17+
@Table(name = "salary", catalog = "public")
1818
public class Salary implements Serializable {
1919

2020
@Id
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package com.cevher.ms.salary.service;
2+
3+
import lombok.*;
4+
import lombok.extern.slf4j.Slf4j;
5+
import org.springframework.kafka.annotation.KafkaListener;
6+
import org.springframework.stereotype.Service;
7+
8+
import java.io.Serializable;
9+
import java.util.UUID;
10+
11+
@Service
12+
@Slf4j
13+
@RequiredArgsConstructor
14+
public class SalaryConsumer {
15+
16+
private static final String KAFKA_TOPIC = "salary";
17+
private static final String KAFKA_CONSUMER_GROUP = "salary_group";
18+
19+
@KafkaListener(topics = KAFKA_TOPIC,
20+
groupId = KAFKA_CONSUMER_GROUP)
21+
public void consume(String message) {
22+
log.info("Consumer Group : " + message.toString());
23+
}
24+
}
25+
26+
@AllArgsConstructor
27+
@NoArgsConstructor
28+
@ToString
29+
@EqualsAndHashCode
30+
@Builder()
31+
class SalaryMessage implements Serializable {
32+
33+
private String toService;
34+
private String fromService;
35+
private String uuid;
36+
37+
@Getter
38+
@Setter
39+
private Long personId;
40+
41+
@Getter
42+
@Setter
43+
private Double amount;
44+
45+
public String getUuid() {
46+
return UUID.randomUUID().toString();
47+
}
48+
49+
public String getFromService() {
50+
return "person-service";
51+
}
52+
53+
public String getToService() {
54+
return "salary-service";
55+
}
56+
}

salary-service/src/main/java/com/cevher/ms/salary/rest/SalaryResource.java renamed to salary-service/src/main/java/com/cevher/ms/salary/web/rest/SalaryResource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.cevher.ms.salary.rest;
1+
package com.cevher.ms.salary.web.rest;
22

33
import com.cevher.ms.salary.dto.SalaryDto;
44
import com.cevher.ms.salary.service.SalaryService;

salary-service/src/main/resources/application.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,18 @@ spring:
33
name: salary-service
44
zipkin:
55
base-url: ${ZIPKIN_URI:http://zipkin:9411/}
6+
kafka:
7+
consumer:
8+
bootstrap-servers: localhost:9092
9+
group-id: salary_group
10+
auto-offset-reset: earliest
11+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
12+
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
13+
producer:
14+
bootstrap-servers: localhost:9092
15+
key-serializer: org.apache.kafka.common.serialization.StringSerializer
16+
value-serializer: org.apache.kafka.common.serialization.StringSerializer
17+
618
server:
719
port: 9004
820

0 commit comments

Comments
 (0)