From 7f8fcaeaaad52b7e076feeca71b489705f6bb78b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Filip=20Pit=C3=A1k?= <xpitak@fi.muni.cz>
Date: Sun, 21 Apr 2024 01:43:43 +0200
Subject: [PATCH] Migrate to local async processing simulating messaging

---
 .../AsyncMessagingConfiguration.java          | 20 ++++++++++
 .../messaging/MessagingService.java           | 40 +++++++++++++++++++
 .../messaging/ProcessListener.java            | 20 ++++++----
 .../messaging/ProcessProducer.java            | 20 ++++++----
 .../src/main/resources/application.yaml       |  9 +++--
 5 files changed, 89 insertions(+), 20 deletions(-)
 create mode 100644 transaction-processor/src/main/java/cz/muni/pa165/banking/application/configuration/AsyncMessagingConfiguration.java
 create mode 100644 transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/MessagingService.java

diff --git a/transaction-processor/src/main/java/cz/muni/pa165/banking/application/configuration/AsyncMessagingConfiguration.java b/transaction-processor/src/main/java/cz/muni/pa165/banking/application/configuration/AsyncMessagingConfiguration.java
new file mode 100644
index 0000000..7146acc
--- /dev/null
+++ b/transaction-processor/src/main/java/cz/muni/pa165/banking/application/configuration/AsyncMessagingConfiguration.java
@@ -0,0 +1,20 @@
+package cz.muni.pa165.banking.application.configuration;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+@Configuration
+public class AsyncMessagingConfiguration {
+
+    @Bean
+    public ThreadPoolTaskExecutor taskExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setCorePoolSize(5);
+        executor.setMaxPoolSize(10);
+        executor.setThreadNamePrefix("Async-");
+        executor.initialize();
+        return executor;
+    }
+    
+}
diff --git a/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/MessagingService.java b/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/MessagingService.java
new file mode 100644
index 0000000..fc38675
--- /dev/null
+++ b/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/MessagingService.java
@@ -0,0 +1,40 @@
+package cz.muni.pa165.banking.application.messaging;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+@Service
+public class MessagingService {
+
+    private final Logger logger = LoggerFactory.getLogger(MessagingService.class);
+    
+    private final ProcessListener processListener;
+    
+    private final Queue<String> queue = new ConcurrentLinkedQueue<>();
+
+    public MessagingService(ProcessListener processListener) {
+        this.processListener = processListener;
+    }
+
+    public void addToQueue(String message) {
+        queue.add(message);
+        triggerRead();
+    }
+
+    @Async
+    void triggerRead() {
+        String message = queue.poll();
+        try {
+            processListener.onReceived(message);
+        } catch (JsonProcessingException e) {
+            logger.error("Error while processing message");
+        }
+    }
+
+}
diff --git a/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/ProcessListener.java b/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/ProcessListener.java
index d6bf974..896cb90 100644
--- a/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/ProcessListener.java
+++ b/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/ProcessListener.java
@@ -1,5 +1,7 @@
 package cz.muni.pa165.banking.application.messaging;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import cz.muni.pa165.banking.application.service.ProcessHandlerService;
 import cz.muni.pa165.banking.domain.messaging.ProcessRequest;
 import org.springframework.stereotype.Service;
@@ -7,18 +9,20 @@ import org.springframework.stereotype.Service;
 @Service
 public class ProcessListener {
     
-    private final ProcessHandlerService service;
+    private final ObjectMapper objectMapper;
+    
+    private final ProcessHandlerService processHandlerService;
 
-    public ProcessListener(ProcessHandlerService service) {
-        this.service = service;
+    public ProcessListener(ObjectMapper objectMapper, ProcessHandlerService processHandlerService) {
+        this.objectMapper = objectMapper;
+        this.processHandlerService = processHandlerService;
     }
 
-
-    // TODO Milestone-2/3, add messaging RabbitMq dependencies
-//    @RabbitListener(queues = "${messaging.exchange:process-request}")
-    public void onReceived(ProcessRequest message) {
+    public void onReceived(String message) throws JsonProcessingException {
         System.out.println(message);
-        service.handle(message);
+        
+        ProcessRequest request = objectMapper.readValue(message, ProcessRequest.class);
+        processHandlerService.handle(request);
     }
     
 }
diff --git a/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/ProcessProducer.java b/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/ProcessProducer.java
index 44cdc38..e625165 100644
--- a/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/ProcessProducer.java
+++ b/transaction-processor/src/main/java/cz/muni/pa165/banking/application/messaging/ProcessProducer.java
@@ -3,27 +3,31 @@ package cz.muni.pa165.banking.application.messaging;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import cz.muni.pa165.banking.domain.messaging.MessageProducer;
 import cz.muni.pa165.banking.domain.messaging.ProcessRequest;
+import cz.muni.pa165.banking.domain.transaction.TransactionType;
 import cz.muni.pa165.banking.exception.ServerError;
-import org.springframework.beans.factory.annotation.Value;
+import jakarta.annotation.PostConstruct;
 import org.springframework.stereotype.Service;
 
 import java.util.Map;
+import java.util.UUID;
 
 @Service
 public class ProcessProducer implements MessageProducer {
     
-    // TODO Milestone-2/3, add messaging RabbitMq dependencies
-//    private final RabbitTemplate template;
+    private final MessagingService messagingService;
     
     private final ObjectMapper mapper;
     
-    @Value("${messaging.exchange:process-request}")
-    private String EXCHANGE_NAME;
-
-    public ProcessProducer(ObjectMapper mapper) {
+    public ProcessProducer(MessagingService messagingService, ObjectMapper mapper) {
+        this.messagingService = messagingService;
         this.mapper = mapper;
     }
 
+    @PostConstruct
+    void testing() {
+        ProcessRequest data = new ProcessRequest(UUID.randomUUID(), TransactionType.DEPOSIT);
+        send(data);
+    }
     
     @Override
     public void send(ProcessRequest data) {
@@ -39,7 +43,7 @@ public class ProcessProducer implements MessageProducer {
                     )
             );
         }
-//        template.convertAndSend(EXCHANGE_NAME, "", dataAsJsonString); 
+        messagingService.addToQueue(dataAsJsonString);
     }
     
 }
diff --git a/transaction-processor/src/main/resources/application.yaml b/transaction-processor/src/main/resources/application.yaml
index 2dc85c3..e901a7e 100644
--- a/transaction-processor/src/main/resources/application.yaml
+++ b/transaction-processor/src/main/resources/application.yaml
@@ -1,8 +1,6 @@
 db:
   hostname: localhost
-spring:
-  application:
-    name: Transaction-Processor
+
 banking:
   apps:
     management:
@@ -13,7 +11,10 @@ banking:
       host: localhost
       port: 8081
       url: ${banking.apps.query.host}:${banking.apps.query.port}
-    
+
+spring:
+  application:
+    name: Transaction-Processor  
   datasource:
     url: jdbc:postgresql://${db.hostname}:5432/banking
     driver-class-name: org.postgresql.Driver
-- 
GitLab