Skip to content
Snippets Groups Projects
Commit 69dca455 authored by Filip Kollár's avatar Filip Kollár
Browse files

Merge branch 'analytics-etl' into 'master'

Etl from transaction service into analytics

See merge request !37
parents ce8a4571 57cc7338
No related branches found
No related tags found
1 merge request!37Etl from transaction service into analytics
Showing
with 396 additions and 10 deletions
......@@ -79,6 +79,15 @@
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
......
......@@ -2,10 +2,14 @@ package cz.muni.fi.obs;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@SpringBootApplication
@EnableTransactionManagement
@EnableFeignClients
@EnableScheduling
public class AnalyticsManagement {
public static void main(String[] args) {
......
package cz.muni.fi.obs.controller;
import cz.muni.fi.obs.etl.EtlExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Util controller for launching etl by hand
*/
@RestController
@RequestMapping("/v1/etl")
public class EtlController {
private final EtlExecutor etlExecutor;
@Autowired
public EtlController(EtlExecutor etlExecutor) {
this.etlExecutor = etlExecutor;
}
@PostMapping("/execute")
public void execute() {
etlExecutor.executeEtl();
}
}
......@@ -14,7 +14,7 @@ import lombok.Setter;
@AllArgsConstructor
@NoArgsConstructor
@Table(name = "account_dim")
public class Account extends Dbo {
public class AccountDimension extends Dbo {
@Column(nullable = false, unique = true)
String accountNumber;
}
package cz.muni.fi.obs.data.dbo;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@Entity
@AllArgsConstructor
@NoArgsConstructor
@Table(name = "currency_dim")
public class CurrencyDimension extends Dbo {
@Column(nullable = false, unique = true)
String symbol;
}
package cz.muni.fi.obs.data.dbo;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.ManyToOne;
import jakarta.persistence.Table;
import jakarta.persistence.*;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
......@@ -17,7 +14,7 @@ import java.math.BigDecimal;
@AllArgsConstructor
@Entity
@Table(name = "daily_transaction")
public class DailyTransaction extends Dbo {
public class DailyTransactionFact extends Dbo {
@Column(nullable = false)
Integer totalWithdrawalTransactions;
......@@ -41,8 +38,14 @@ public class DailyTransaction extends Dbo {
BigDecimal averageDepositAmount;
@ManyToOne
Account account;
@JoinColumn(name = "account_id")
AccountDimension accountDimension;
@ManyToOne
Date date;
@JoinColumn(name = "date_id")
DateDimension dateDimension;
@ManyToOne
@JoinColumn(name = "currency_id")
CurrencyDimension currencyDimension;
}
......@@ -14,7 +14,7 @@ import java.time.LocalDate;
@Entity
@NoArgsConstructor
@Table(name = "date_dim")
public class Date extends Dbo {
public class DateDimension extends Dbo {
@Column(nullable = false)
int yearNumber;
......@@ -28,7 +28,7 @@ public class Date extends Dbo {
@Column(nullable = false)
LocalDate fullDate;
public Date(int year, int month, int day) {
public DateDimension(int year, int month, int day) {
this.yearNumber = year;
this.monthNumber = month;
this.dayNumber = day;
......
package cz.muni.fi.obs.data.dbo;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "temp_account")
public class TempAccount extends Dbo{
@Column(nullable = false)
private String customerId;
@Column(nullable = false)
private String currencyCode;
@Column(nullable = false, unique = true)
private String accountId;
}
package cz.muni.fi.obs.data.repository;
import cz.muni.fi.obs.data.dbo.AccountDimension;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface AccountRepository extends JpaRepository<AccountDimension, String>{
}
package cz.muni.fi.obs.data;
package cz.muni.fi.obs.data.repository;
import cz.muni.fi.obs.data.dbo.DailyTransaction;
import cz.muni.fi.obs.data.dbo.DailyTransactionFact;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
......@@ -9,11 +9,11 @@ import java.math.BigDecimal;
import java.util.List;
@Repository
public interface AnalyticsRepository extends JpaRepository<DailyTransaction, String> {
@Query("SELECT dt FROM DailyTransaction dt WHERE dt.account.accountNumber = ?1 AND dt.date.yearNumber = ?2 AND dt.date.monthNumber = ?3")
List<DailyTransaction> getDailyTransactions(String accountNumber, int year, int month);
public interface AnalyticsRepository extends JpaRepository<DailyTransactionFact, String> {
@Query("SELECT dt FROM DailyTransactionFact dt WHERE dt.accountDimension.accountNumber = ?1 AND dt.dateDimension.yearNumber = ?2 AND dt.dateDimension.monthNumber = ?3")
List<DailyTransactionFact> getDailyTransactions(String accountNumber, int year, int month);
@Query("SELECT dt FROM DailyTransaction dt WHERE dt.account.accountNumber = ?1 AND dt.totalTransactionAmount BETWEEN ?2 AND ?3")
List<DailyTransaction> getDailyTransactionsByAmountRange(String accountNumber, BigDecimal minAmount, BigDecimal maxAmount);
@Query("SELECT dt FROM DailyTransactionFact dt WHERE dt.accountDimension.accountNumber = ?1 AND dt.totalTransactionAmount BETWEEN ?2 AND ?3")
List<DailyTransactionFact> getDailyTransactionsByAmountRange(String accountNumber, BigDecimal minAmount, BigDecimal maxAmount);
}
package cz.muni.fi.obs.data.repository;
import cz.muni.fi.obs.data.dbo.CurrencyDimension;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import java.util.Optional;
@Repository
public interface CurrencyRepository extends JpaRepository<CurrencyDimension, String> {
@Query("SELECT c FROM CurrencyDimension c WHERE c.symbol = :currencyCode")
Optional<CurrencyDimension> findByCurrencyCode(String currencyCode);
}
package cz.muni.fi.obs.data.repository;
import cz.muni.fi.obs.data.dbo.DateDimension;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import java.util.Optional;
@Repository
public interface DateRepository extends JpaRepository<DateDimension, String>{
@Query("SELECT d FROM DateDimension d WHERE d.yearNumber = ?1 AND d.monthNumber = ?2 AND d.dayNumber = ?3")
Optional<DateDimension> findByYearAndMonthAndDay(int year, int monthValue, int dayOfMonth);
}
package cz.muni.fi.obs.data.repository;
import cz.muni.fi.obs.data.dbo.TempAccount;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface TempAccountRepository extends JpaRepository<TempAccount, Long> {
}
package cz.muni.fi.obs.etl;
/**
* Exception for etl errors
*/
public class EtlException extends RuntimeException {
public EtlException(Throwable cause) {
super(cause);
}
}
package cz.muni.fi.obs.etl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.HashMap;
@Service
@Slf4j
public class EtlExecutor {
private final JobLauncher jobLauncher;
private final Job etlJob;
@Autowired
public EtlExecutor(JobLauncher jobLauncher, Job etlJob) {
this.jobLauncher = jobLauncher;
this.etlJob = etlJob;
}
/**
* Scheduled 1 am every day
*/
@Scheduled(cron = "0 0 1 * * *")
public void executeEtl() {
JobParameter<String> timeParameter = new JobParameter<>(Instant.now().toString(), String.class, true);
HashMap<String, JobParameter<?>> parameterMap = new HashMap<>();
parameterMap.put("job-execution-time", timeParameter);
JobParameters jobParameters = new JobParameters(parameterMap);
log.info("Starting etl...");
try {
jobLauncher.run(etlJob, jobParameters);
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException |
JobParametersInvalidException e) {
throw new EtlException(e);
}
}
}
package cz.muni.fi.obs.etl;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class EtlJobListener implements JobExecutionListener {
@Override
public void beforeJob(@NotNull JobExecution jobExecution) {
log.info("Starting job: {} at {}", jobExecution.getJobInstance().getJobName(), jobExecution.getStartTime());
}
@Override
public void afterJob(@NotNull JobExecution jobExecution) {
log.info("Job finished with status: {} at {}", jobExecution.getStatus(), jobExecution.getEndTime());
}
}
package cz.muni.fi.obs.etl;
import cz.muni.fi.obs.data.dbo.DailyTransactionFact;
import cz.muni.fi.obs.data.dbo.TempAccount;
import cz.muni.fi.obs.etl.dto.AccountDto;
import cz.muni.fi.obs.etl.step.clean.accounts.CleanTempAccountsTasklet;
import cz.muni.fi.obs.etl.step.create.facts.FactCreatorProcessor;
import cz.muni.fi.obs.etl.step.create.facts.FactWriter;
import cz.muni.fi.obs.etl.step.create.facts.TempAccountReader;
import cz.muni.fi.obs.etl.step.read.accounts.AccountProcessor;
import cz.muni.fi.obs.etl.step.read.accounts.AccountReader;
import cz.muni.fi.obs.etl.step.read.accounts.AccountWriter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
public class JobConfiguration {
public static final int CHUNK_SIZE = 10;
public static final String READ_ACCOUNTS_STEP_NAME = "read-accounts-step";
public static final String CREATE_FACTS_STEP_NAME = "create-facts-step";
public static final String CLEAN_ACCOUNTS_STEP_NAME = "clean-accounts-step";
public static final String JOB_NAME = "daily-etl-job";
@Bean
public Step readAccountsStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
AccountReader accountReader,
AccountProcessor accountProcessor,
AccountWriter accountWriter) {
return new StepBuilder(READ_ACCOUNTS_STEP_NAME, jobRepository)
.<AccountDto, TempAccount>chunk(CHUNK_SIZE, transactionManager)
.reader(accountReader)
.processor(accountProcessor)
.writer(accountWriter)
.build();
}
@Bean
public Step computeAccountFactsStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
TempAccountReader reader,
FactCreatorProcessor processor,
FactWriter writer) {
return new StepBuilder(CREATE_FACTS_STEP_NAME, jobRepository)
.<TempAccount, DailyTransactionFact>chunk(CHUNK_SIZE, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Step cleanAccountsStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
CleanTempAccountsTasklet tasklet) {
return new StepBuilder(CLEAN_ACCOUNTS_STEP_NAME, jobRepository)
.tasklet(tasklet, transactionManager)
.build();
}
@Bean
public Job etlJob(JobRepository jobRepository,
EtlJobListener jobListener,
Step cleanAccountsStep,
Step readAccountsStep,
Step computeAccountFactsStep) {
return new JobBuilder(JOB_NAME, jobRepository)
.listener(jobListener)
.start(cleanAccountsStep)
.next(readAccountsStep)
.next(computeAccountFactsStep)
.build();
}
}
package cz.muni.fi.obs.etl.clients;
import cz.muni.fi.obs.etl.dto.AccountDto;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.data.domain.Page;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(
name = "accounts-client",
url = "${etl.transaction-service.url}"
)
public interface AccountsClient {
@GetMapping(value = "/v1/accounts/list", produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
Page<AccountDto> listAccounts(@RequestParam("page") Integer page, @RequestParam("pageSize") Integer pageSize);
}
package cz.muni.fi.obs.etl.clients;
import cz.muni.fi.obs.etl.dto.TransactionDto;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.data.domain.Page;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
import java.time.LocalDate;
@FeignClient(
name = "transaction-client",
url = "${etl.transaction-service.url}"
)
public interface TransactionClient {
@GetMapping(value = "/v1/transactions/{accountId}/list",
produces = MediaType.APPLICATION_JSON_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE)
ResponseEntity<Page<TransactionDto>> listTransactions(
@PathVariable("accountId") String accountId,
@RequestParam("pageNumber") int pageNumber,
@RequestParam("pageSize") int pageSize,
@RequestParam("date") LocalDate date);
}
package cz.muni.fi.obs.etl.dto;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class AccountDto {
private String id;
private String customerId;
private String currencyCode;
private String accountNumber;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment