Skip to content
Snippets Groups Projects
Commit 3b1b1267 authored by Filip Kollár's avatar Filip Kollár
Browse files

set up skeleton for etl in analytics service, added todo

parent ce8a4571
No related branches found
No related tags found
1 merge request!37Etl from transaction service into analytics
Showing
with 465 additions and 0 deletions
......@@ -79,6 +79,15 @@
<artifactId>spring-boot-starter-data-jpa</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
......
package cz.muni.fi.obs;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@SpringBootApplication
@EnableTransactionManagement
@EnableFeignClients
@EnableScheduling
@EnableBatchProcessing
public class AnalyticsManagement {
public static void main(String[] args) {
......
package cz.muni.fi.obs.controller;
import cz.muni.fi.obs.etl.EtlExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Util controller for launching etl by hand
*/
@RestController
@RequestMapping("/v1/etl")
public class EtlController {
private final EtlExecutor etlExecutor;
@Autowired
public EtlController(EtlExecutor etlExecutor) {
this.etlExecutor = etlExecutor;
}
@PostMapping("/execute")
public void execute() {
etlExecutor.executeEtl();
}
}
package cz.muni.fi.obs.data.dbo;
/**
* TODO: add fields see {@link cz.muni.fi.obs.etl.AccountDto}, entity is just used for temporary storage for ETL
*/
public class TempAccount {
}
package cz.muni.fi.obs.etl;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class AccountDto {
private String id;
private String customerId;
private String currencyCode;
private String accountNumber;
}
package cz.muni.fi.obs.etl;
/**
* Exception for etl errors
*/
public class EtlException extends RuntimeException {
public EtlException(Throwable cause) {
super(cause);
}
}
package cz.muni.fi.obs.etl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.HashMap;
@Service
@Slf4j
public class EtlExecutor {
private final JobLauncher jobLauncher;
private final Job etlJob;
@Autowired
public EtlExecutor(JobLauncher jobLauncher, Job etlJob) {
this.jobLauncher = jobLauncher;
this.etlJob = etlJob;
}
/**
* Scheduled 1 am every day
*/
@Scheduled(cron = "0 0 1 * * *")
public void executeEtl() {
JobParameter<Instant> timeParameter = new JobParameter<>(Instant.now(), Instant.class, true);
HashMap<String, JobParameter<?>> parameterMap = new HashMap<>();
parameterMap.put("job-execution-time", timeParameter);
JobParameters jobParameters = new JobParameters(parameterMap);
log.info("Starting etl...");
try {
jobLauncher.run(etlJob, jobParameters);
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException |
JobParametersInvalidException e) {
throw new EtlException(e);
}
}
}
package cz.muni.fi.obs.etl;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;
/**
* TODO: Check for status before and after start of job, log the start of job and end of job
* optional: use the jobExecution object to log some details (how many facts were created in the compute-facts-step)
*/
@Component
public class EtlJobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
JobExecutionListener.super.beforeJob(jobExecution);
}
@Override
public void afterJob(JobExecution jobExecution) {
JobExecutionListener.super.afterJob(jobExecution);
}
}
package cz.muni.fi.obs.etl;
import cz.muni.fi.obs.data.dbo.DailyTransaction;
import cz.muni.fi.obs.data.dbo.TempAccount;
import cz.muni.fi.obs.etl.step.clean.accounts.CleanTempAccountsTasklet;
import cz.muni.fi.obs.etl.step.create.facts.FactCreatorProcessor;
import cz.muni.fi.obs.etl.step.create.facts.FactWriter;
import cz.muni.fi.obs.etl.step.create.facts.TempAccountReader;
import cz.muni.fi.obs.etl.step.read.accounts.AccountProcessor;
import cz.muni.fi.obs.etl.step.read.accounts.AccountReader;
import cz.muni.fi.obs.etl.step.read.accounts.AccountWriter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
@Configuration
public class JobConfiguration {
public static final int CHUNK_SIZE = 10;
public static final String READ_ACCOUNTS_STEP_NAME = "read-accounts-step";
public static final String CREATE_FACTS_STEP_NAME = "create-facts-step";
public static final String CLEAN_ACCOUNTS_STEP_NAME = "clean-accounts-step";
public static final String JOB_NAME = "daily-etl-job";
@Bean
public Step readAccountsStep(JobRepository jobRepository,
DataSourceTransactionManager transactionManager,
AccountReader accountReader,
AccountProcessor accountProcessor,
AccountWriter accountWriter) {
return new StepBuilder(READ_ACCOUNTS_STEP_NAME, jobRepository)
.<AccountDto, TempAccount>chunk(CHUNK_SIZE, transactionManager)
.reader(accountReader)
.processor(accountProcessor)
.writer(accountWriter)
.build();
}
@Bean
public Step computeAccountFactsStep(JobRepository jobRepository,
DataSourceTransactionManager transactionManager,
TempAccountReader reader,
FactCreatorProcessor processor,
FactWriter writer) {
return new StepBuilder(CREATE_FACTS_STEP_NAME, jobRepository)
.<TempAccount, DailyTransaction>chunk(CHUNK_SIZE, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Step cleanAccountsStep(JobRepository jobRepository,
DataSourceTransactionManager transactionManager,
CleanTempAccountsTasklet tasklet) {
return new StepBuilder(CLEAN_ACCOUNTS_STEP_NAME, jobRepository)
.tasklet(tasklet, transactionManager)
.build();
}
@Bean
public Job etlJob(JobRepository jobRepository,
EtlJobListener jobListener,
Step cleanAccountsStep,
Step readAccountsStep,
Step computeAccountFactsStep) {
return new JobBuilder(JOB_NAME, jobRepository)
.listener(jobListener)
.start(cleanAccountsStep)
.next(readAccountsStep)
.next(computeAccountFactsStep)
.build();
}
}
package cz.muni.fi.obs.etl;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.TaskExecutorJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class LauncherConfig {
@Bean
public JobLauncher jobLauncher(JobRepository jobRepository) throws Exception {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}
package cz.muni.fi.obs.etl;
/**
* TODO: add fields, see transaction service for fields
*/
public class TransactionDto {
}
package cz.muni.fi.obs.etl.clients;
import cz.muni.fi.obs.etl.AccountDto;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestBody;
@FeignClient(
name = "accounts-client",
url = "${etl.transaction-service.url}"
)
public interface AccountsClient {
@GetMapping(value = "/list")
Page<AccountDto> listAccounts(@RequestBody Pageable pageable);
}
package cz.muni.fi.obs.etl.clients;
import org.springframework.cloud.openfeign.FeignClient;
/**
* TODO: add operation that lists transactions for specific account_id, check if transaction service already contains endpoint for this
* if it does just copy it, else you have to create operation GET Page<TransactionDto> (account_id) in transaction_service
*/
@FeignClient(
name = "transaction-client",
url = "${etl.transaction-service.url}"
)
public interface TransactionClient {
}
package cz.muni.fi.obs.etl.step.clean.accounts;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
/**
* TODO: Create TempAccount repository, inject it here and just call delete all on the table
*/
@Component
@StepScope
public class CleanTempAccountsTasklet implements Tasklet {
@Override
@Transactional
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
return null;
}
}
package cz.muni.fi.obs.etl.step.create.facts;
import cz.muni.fi.obs.data.dbo.DailyTransaction;
import cz.muni.fi.obs.data.dbo.TempAccount;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
/**
* TODO: use transaction client to fetch transactions for current day for the account,
* do all computations over these within the method and create the fact {@link DailyTransaction}
*/
@Component
@StepScope
public class FactCreatorProcessor implements ItemProcessor<TempAccount, DailyTransaction> {
@Override
public DailyTransaction process(TempAccount item) throws Exception {
return null;
}
}
package cz.muni.fi.obs.etl.step.create.facts;
import cz.muni.fi.obs.data.dbo.DailyTransaction;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;
/**
* TODO: just write the chunk to DB nothing else needed
*/
@Component
@StepScope
public class FactWriter implements ItemWriter<DailyTransaction> {
@Override
public void write(Chunk<? extends DailyTransaction> chunk) throws Exception {
}
}
package cz.muni.fi.obs.etl.step.create.facts;
import cz.muni.fi.obs.data.dbo.TempAccount;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemReader;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* TODO implement reading:
* <p>
* how to do this:
* <p>
* if there are accounts present in the collection just pop the first element and return it
* if there are no accounts read the next page store them into the field, and increment the page, then return first account
* from read page
* <p>
* if there are no accounts left just return null, this automatically terminates upon doing that
*/
@Component
@StepScope
public class TempAccountReader implements ItemReader<TempAccount> {
private List<TempAccount> accountCache;
// might be int?
private long currentPage;
// if this is set to 0 read the actual value of this param, use it to determine if there should no be accounts anymore
private long totalPages;
@Override
public TempAccount read() {
return null;
}
}
package cz.muni.fi.obs.etl.step.read.accounts;
import cz.muni.fi.obs.data.dbo.TempAccount;
import cz.muni.fi.obs.etl.AccountDto;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
/**
* TODO: use a mapper to convert the DTO from API into the object stored in temp db
*/
@Component
@StepScope
public class AccountProcessor implements ItemProcessor<AccountDto, TempAccount> {
@Override
public TempAccount process(AccountDto item) throws Exception {
return null;
}
}
package cz.muni.fi.obs.etl.step.read.accounts;
import cz.muni.fi.obs.etl.AccountDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.stereotype.Component;
import java.time.Instant;
/**
* TODO: read all accounts from transaction service, similar logic to TempAccountReader but use client here to fetch them
* see {@link cz.muni.fi.obs.etl.step.create.facts.TempAccountReader}
*/
@Component
@StepScope
@Slf4j
public class AccountReader implements ItemReader<AccountDto> {
private Instant instant = Instant.now();
@Override
public AccountDto read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
return null;
}
}
package cz.muni.fi.obs.etl.step.read.accounts;
import cz.muni.fi.obs.data.dbo.TempAccount;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;
/**
* TODO: use repository to write accounts
*/
@Component
@StepScope
public class AccountWriter implements ItemWriter<TempAccount> {
@Override
public void write(Chunk<? extends TempAccount> chunk) throws Exception {
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment