diff --git a/transaction-processor/src/main/java/cz/muni/pa165/banking/application/configuration/AsyncMessagingConfiguration.java b/transaction-processor/src/main/java/cz/muni/pa165/banking/application/configuration/AsyncMessagingConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..7146acc5621a547e260540d86fceab6af9af9b65 --- /dev/null +++ b/transaction-processor/src/main/java/cz/muni/pa165/banking/application/configuration/AsyncMessagingConfiguration.java @@ -0,0 +1,20 @@ +package cz.muni.pa165.banking.application.configuration; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +@Configuration +public class AsyncMessagingConfiguration { + + @Bean + public ThreadPoolTaskExecutor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + executor.setThreadNamePrefix("Async-"); + executor.initialize(); + return executor; + } + +} diff --git a/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/MessagingService.java b/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/MessagingService.java new file mode 100644 index 0000000000000000000000000000000000000000..fc38675999d1da2621818ef013418abbb58c478a --- /dev/null +++ b/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/MessagingService.java @@ -0,0 +1,40 @@ +package cz.muni.pa165.banking.application.messaging; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Service +public class MessagingService { + + private final Logger logger = LoggerFactory.getLogger(MessagingService.class); + + private final ProcessListener processListener; + + private final Queue<String> queue = new ConcurrentLinkedQueue<>(); + + public MessagingService(ProcessListener processListener) { + this.processListener = processListener; + } + + public void addToQueue(String message) { + queue.add(message); + triggerRead(); + } + + @Async + void triggerRead() { + String message = queue.poll(); + try { + processListener.onReceived(message); + } catch (JsonProcessingException e) { + logger.error("Error while processing message"); + } + } + +} diff --git a/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/ProcessListener.java b/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/ProcessListener.java index d6bf974e6db5935d3006487872558fd08a0d2726..896cb904bebbc898371c3c4919ac58d35340e860 100644 --- a/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/ProcessListener.java +++ b/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/ProcessListener.java @@ -1,5 +1,7 @@ package cz.muni.pa165.banking.application.messaging; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import cz.muni.pa165.banking.application.service.ProcessHandlerService; import cz.muni.pa165.banking.domain.messaging.ProcessRequest; import org.springframework.stereotype.Service; @@ -7,18 +9,20 @@ import org.springframework.stereotype.Service; @Service public class ProcessListener { - private final ProcessHandlerService service; + private final ObjectMapper objectMapper; + + private final ProcessHandlerService processHandlerService; - public ProcessListener(ProcessHandlerService service) { - this.service = service; + public ProcessListener(ObjectMapper objectMapper, ProcessHandlerService processHandlerService) { + this.objectMapper = objectMapper; + this.processHandlerService = processHandlerService; } - - // TODO Milestone-2/3, add messaging RabbitMq dependencies -// @RabbitListener(queues = "${messaging.exchange:process-request}") - public void onReceived(ProcessRequest message) { + public void onReceived(String message) throws JsonProcessingException { System.out.println(message); - service.handle(message); + + ProcessRequest request = objectMapper.readValue(message, ProcessRequest.class); + processHandlerService.handle(request); } } diff --git a/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/ProcessProducer.java b/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/ProcessProducer.java index 44cdc381f250ed25160a6138efbbb89cf074ac8a..e625165d8618b02c5faf37edfde0338cd87ca6c9 100644 --- a/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/ProcessProducer.java +++ b/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/ProcessProducer.java @@ -3,27 +3,31 @@ package cz.muni.pa165.banking.application.messaging; import com.fasterxml.jackson.databind.ObjectMapper; import cz.muni.pa165.banking.domain.messaging.MessageProducer; import cz.muni.pa165.banking.domain.messaging.ProcessRequest; +import cz.muni.pa165.banking.domain.transaction.TransactionType; import cz.muni.pa165.banking.exception.ServerError; -import org.springframework.beans.factory.annotation.Value; +import jakarta.annotation.PostConstruct; import org.springframework.stereotype.Service; import java.util.Map; +import java.util.UUID; @Service public class ProcessProducer implements MessageProducer { - // TODO Milestone-2/3, add messaging RabbitMq dependencies -// private final RabbitTemplate template; + private final MessagingService messagingService; private final ObjectMapper mapper; - @Value("${messaging.exchange:process-request}") - private String EXCHANGE_NAME; - - public ProcessProducer(ObjectMapper mapper) { + public ProcessProducer(MessagingService messagingService, ObjectMapper mapper) { + this.messagingService = messagingService; this.mapper = mapper; } + @PostConstruct + void testing() { + ProcessRequest data = new ProcessRequest(UUID.randomUUID(), TransactionType.DEPOSIT); + send(data); + } @Override public void send(ProcessRequest data) { @@ -39,7 +43,7 @@ public class ProcessProducer implements MessageProducer { ) ); } -// template.convertAndSend(EXCHANGE_NAME, "", dataAsJsonString); + messagingService.addToQueue(dataAsJsonString); } } diff --git a/transaction-processor/src/main/resources/application.yaml b/transaction-processor/src/main/resources/application.yaml index 2dc85c373a9d07b3ebca98472350bbea9a9f0c20..e901a7e85a223446b84f1254289f06810e7b04f3 100644 --- a/transaction-processor/src/main/resources/application.yaml +++ b/transaction-processor/src/main/resources/application.yaml @@ -1,8 +1,6 @@ db: hostname: localhost -spring: - application: - name: Transaction-Processor + banking: apps: management: @@ -13,7 +11,10 @@ banking: host: localhost port: 8081 url: ${banking.apps.query.host}:${banking.apps.query.port} - + +spring: + application: + name: Transaction-Processor datasource: url: jdbc:postgresql://${db.hostname}:5432/banking driver-class-name: org.postgresql.Driver