Hello everyone,

Greetings today!

Today we will talk about the spring batch decider, which is mainly used to achieve conditional execution of steps.

JobExecutionDecider -- JobExecutionDecider is the interface we need to implement overriding the decide() method. From the decide() method, we can return different FlowExecutionStatus which we can use in batch job configuration for conditional flow.

Let's see how we can implement a conditional flow with the example below.

Create a project with the following dependency by going to Spring Initializr. & open project in IDE.



Here, the spring batch decider and spring boot will be used to create the conditional flow that is shown below.

First, create a model entity that will be mapped to the sales_enquiry table as shown below
package com.practice.model;

import lombok.Getter;
import lombok.Setter;

import javax.persistence.*;
import java.time.LocalDateTime;

@Entity
@Table(name = "sales_enquiry")
@Getter
@Setter
public class SalesEnquiry {

    @Id
    @Column
    @GeneratedValue(strategy = GenerationType.SEQUENCE,
    	generator = "id_sequence")
    @SequenceGenerator(name = "id_sequence",
    	sequenceName = "id_sequence")
    private Long id;

    @Column
    private String customerName;

    @Column
    private String customerContactNo;

    @Column
    private Boolean productSold;

    @Column
    private String productName;

    @Column
    private LocalDateTime enquiryDate;

}
Next, we will create a repository layer for the sales_enquiry  table using Spring JPA which will be used to query tables to check sales status using date.
package com.practice.dao;

import com.practice.model.SalesEnquiry;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import java.time.LocalDateTime;
import java.util.List;

@Repository
public interface SalesEnquiryRepository extends 
	JpaRepository<SalesEnquiry,Long> {

    List<SalesEnquiry> findByEnquiryDateBetweenAndProductSold
			(LocalDateTime startDate,
             LocalDateTime endDate,
             Boolean productSold);
}
Now we will create a DTO class to map the model entity and we will use this in creating CSV files
package com.practice.dto;

import lombok.*;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class SalesEnquiryDTO {

    private Long id;

    private String customerName;

    private String customerContactNo;

    private String productName;

}
We will now create a Processor by implementing ItemProcessor to map SalesEnquiry to SalesEnquiryDTO
package com.practice.processor;

import com.practice.dto.SalesEnquiryDTO;
import com.practice.model.SalesEnquiry;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

@Component
public class SalesReportProcessor implements 
		ItemProcessor<SalesEnquiry, SalesEnquiryDTO> {

    @Override
    public SalesEnquiryDTO process(SalesEnquiry salesEnquiry) 
    	throws Exception {
        
     return SalesEnquiryDTO.builder().id(salesEnquiry.getId()).
	  customerContactNo(salesEnquiry.getCustomerContactNo()).
       	customerName(salesEnquiry.getCustomerName()).
	  productName(salesEnquiry.getProductName())
      .build();
    }
}

Now we will create the below 2 deciders by implementing the JobExecutionDecider interface
  • SalesSuccessFlowDecider-It will check if any sale was successful today if yes it will return FlowExecutionStatus as SALES_SUCCESS else it will return flow execution status as COMPLETED
package com.practice.decider;

@Component
public class SalesSuccessFlowDecider 
	implements JobExecutionDecider {

    @Autowired
    private SalesEnquiryRepository salesEnquiryRepository;

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution,
    	StepExecution stepExecution) {
        
    ZonedDateTime zonedDateTime=ZonedDateTime.now();

    List<SalesEnquiry> salesSuccessEnquiryList
    	=salesEnquiryRepository
        	.findByEnquiryDateBetweenAndProductSold(
				LocalDate.now().atStartOfDay(),
				LocalDate.now().plusDays(1)
                .atStartOfDay(),
                Boolean.TRUE);
        if(!salesSuccessEnquiryList.isEmpty()){
            return new FlowExecutionStatus("SALES_SUCCESS");
        }
        return new FlowExecutionStatus("COMPLETED");
    }
}
  • SalesUnsuccessfulDecider-- It will check if there are any sales inquiries today for which sale does not happen if yes then it will return the status as SALES_UN_SUCCESSFUL else it will return the status as COMPLETED as shown below
package com.practice.decider;

@Component
public class SalesUnsuccessfulDecider
		implements JobExecutionDecider {

    @Autowired
    private SalesEnquiryRepository salesEnquiryRepository;

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution,
    	StepExecution stepExecution) {
        
        List<SalesEnquiry> salesUnsuccessfulFulEnquiryList
        		=salesEnquiryRepository
                .findByEnquiryDateBetweenAndProductSold
                (
                LocalDate.now().atStartOfDay(),
				LocalDate.now().plusDays(1)
                	.atStartOfDay(),
                Boolean.FALSE
                );
        
        if(!salesUnsuccessfulFulEnquiryList.isEmpty()){
            return new FlowExecutionStatus("SALES_UN_SUCCESSFUL");
        }
        return new FlowExecutionStatus("COMPLETED");
    }
}

Final step is to define flow where we will configure what batch job should do when it receives different flow execution status like SALES_UN_SUCCESSFUL, SALES_SUCCESS and COMPLETED
@Bean
public Flow salesSuccessReportFlow(){
  return 
  new FlowBuilder<SimpleFlow>("salesSuccessReportFlow")
   .start(salesSuccessFlowDecider).on("SALES_SUCCESS")
    .to(salesSuccessCSVStep()).next(salesUnsuccessfulReportFlow())
   .from(salesSuccessFlowDecider).on("COMPLETED")
     .to(salesUnsuccessfulReportFlow()).build();
}

@Bean
public Flow salesUnsuccessfulReportFlow(){
  return 
  new FlowBuilder<SimpleFlow>("salesUnsuccessfulReportFlow")
     .start(salesUnsuccessfulDecider).on("SALES_UN_SUCCESSFUL")
      .to(salesUnsuccessfulCSVStep())
     .from(salesUnsuccessfulDecider).on("COMPLETED")
      .end()
     .build();
}

So we have configured flow as per our initial flow diagram i.e. 

SalesSuccessReportFlow using SalesSuccessFlowDecider which we have seen above
  • If any sales inquiry is converted to the sale for the current day then we will get status as SALES_SUCCESS and it will go to salesSuccessCSVStep() which will basically generate a CSV file for sales done today, after generating the CSV file it will go to salesUnsuccessfulReportFlow()
  • If no sale was done today then based on status COMPLETED it will again go to salesUnsuccessfulReportFlow()
SalesUnsuccessfulReportFlow using SalesUnsuccessfulDecider
  • It will check if there is any sale inquiry for which a sale does not occur if yes it will get status as SALES_UN_SUCCESSFUL and it will go to salesUnsuccessfulCSVStep to generate CSV of such records else it will get status as COMPLETED which will end our flow 
Below is complete code for batch job configuration and CSV file generation, for example of a database to CSV export using spring batch you can refer to Spring Batch Example Database To CSV Export
package com.practice.config;

@Configuration
@EnableBatchProcessing
public class BatchConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private SalesSuccessFlowDecider salesSuccessFlowDecider;

    @Autowired
    private SalesUnsuccessfulDecider salesUnsuccessfulDecider;

    @Autowired
    private DataSource dataSource;

    @Autowired
    private SalesReportProcessor salesReportProcessor;

    @Bean(name = "salesEnquiryReportJob")
    public Job salesReportJob() {
       return jobBuilderFactory.get("salesEnquiryReportJob")
        	.incrementer(new RunIdIncrementer())
             .start(salesSuccessReportFlow()).end().build();
    }

  @Bean
  public Flow salesSuccessReportFlow(){
    return new FlowBuilder<SimpleFlow>("salesSuccessReportFlow")
     .start(salesSuccessFlowDecider).on("SALES_SUCCESS")
       .to(salesSuccessCSVStep())
       .next(salesUnsuccessfulReportFlow())
     .from(salesSuccessFlowDecider).on("COMPLETED")
       .to(salesUnsuccessfulReportFlow()).build();
    }

  @Bean
  public Flow salesUnsuccessfulReportFlow(){
    return new FlowBuilder<SimpleFlow>
    ("salesUnsuccessfulReportFlow")
    .start(salesUnsuccessfulDecider)
     	.on("SALES_UN_SUCCESSFUL")
       	  .to(salesUnsuccessfulCSVStep())
     .from(salesUnsuccessfulDecider)
     	.on("COMPLETED")
        	.end()
     .build();
    }

 @Bean
 public Step salesSuccessCSVStep(){
    return stepBuilderFactory.get("salesSuccessCSVStep")
      .<SalesEnquiry, SalesEnquiryDTO>chunk(250)
        .reader(salesSuccessReader())
        .processor(salesReportProcessor)
        .writer(salesSuccessWriter())
        .build();
  }

 @Bean
 public JdbcCursorItemReader<SalesEnquiry> 
    salesSuccessReader(){
        
   JdbcCursorItemReader<SalesEnquiry> jdbcCursorItemReader
      =new JdbcCursorItemReader<>();
        
   jdbcCursorItemReader
     .setSql(
     "Select * from sales_enquiry where ENQUIRY_DATE
     >=trunc(sysdate) and 
     ENQUIRY_DATE<trunc(sysdate) + 1 and PRODUCT_SOLD=1"
     );
     jdbcCursorItemReader.setDataSource(dataSource);
     jdbcCursorItemReader.setRowMapper(
       new BeanPropertyRowMapper<SalesEnquiry>
       	(SalesEnquiry.class));
      return jdbcCursorItemReader;
  }

 @Bean
 public FlatFileItemWriter salesSuccessWriter(){
        
 FlatFileItemWriter salesSuccessWriter=new FlatFileItemWriter();
 salesSuccessWriter.setHeaderCallback((x)->x.write
 	("Id,Customer Name,Mobile No,Product Name"));
        
 salesSuccessWriter.setResource(
  new FileSystemResource(
 "F://SalesEnquiryReport//sales_success_report.csv"
 ));
     
 salesSuccessWriter.setLineAggregator
 	(salesEnquiryLineAggregator());
 
 return salesSuccessWriter;
 }

 @Bean
 public FlatFileItemWriter salesUnSuccessWriter()
 {
  FlatFileItemWriter salesSuccessWriter=new FlatFileItemWriter();
  salesSuccessWriter.setHeaderCallback((x)->x.write
  ("Id,Customer Name,Mobile No,Product Name")
  );
  
 salesSuccessWriter.setResource(
 new FileSystemResource(
	"F://SalesEnquiryReport//sales_unsuccessful_report.csv"
 ));
        
  salesSuccessWriter.setLineAggregator
  	(salesEnquiryLineAggregator());
  
  return salesSuccessWriter;
 }

 @Bean
 public DelimitedLineAggregator salesEnquiryLineAggregator()
 {
  DelimitedLineAggregator delimitedLineAggregator
   =new DelimitedLineAggregator();
        
  delimitedLineAggregator.setDelimiter(",");
  BeanWrapperFieldExtractor beanWrapperFieldExtractor
  	=new BeanWrapperFieldExtractor();
  beanWrapperFieldExtractor.setNames(
  	new String[]
    {"id","customerName","customerContactNo","productName"}
  );
   
  delimitedLineAggregator.
  	setFieldExtractor(beanWrapperFieldExtractor);
  
  return delimitedLineAggregator;
 }

 @Bean
 public Step salesUnsuccessfulCSVStep(){
  return stepBuilderFactory.get("salesUnsuccessfulCSVStep")
    .<SalesEnquiry,SalesEnquiryDTO>chunk(250)
    .reader(salesUnsuccessfulReport())
    .processor(salesReportProcessor)
    .writer(salesUnSuccessWriter()).build();
 }

 @Bean
 public JdbcCursorItemReader<SalesEnquiry> 
 	salesUnsuccessfulReport()
 {
 
 JdbcCursorItemReader<SalesEnquiry> jdbcCursorItemReader
    =new JdbcCursorItemReader<>();
        
 jdbcCursorItemReader
 .setSql("Select * from sales_enquiry where ENQUIRY_DATE
 	>=trunc(sysdate)
  and ENQUIRY_DATE<trunc(sysdate) + 1 and PRODUCT_SOLD=0");
       
 jdbcCursorItemReader.setDataSource(dataSource);
 jdbcCursorItemReader.setRowMapper(
 	new BeanPropertyRowMapper<SalesEnquiry>
    (SalesEnquiry.class));
 return jdbcCursorItemReader;
 }
}
Now we will add database configuration to the application. properties as shown below so that Spring Boot & Spring JPA can connect to the database, here I have used oracle DB

spring.datasource.initialize=true
spring.datasource.url=jdbc:oracle:thin:@localhost:1521:orcl
spring.datasource.password=add-your-password
spring.datasource.driver-class-name
	=oracle.jdbc.driver.OracleDriver
spring.jpa.properties.hibernate.dialect 
	= org.hibernate.dialect.Oracle10gDialect
spring.jpa.hibernate.ddl-auto = update
spring.datasource.username=sys as sysdba
spring.datasource.initialization-mode=always

Now you can add some dummy data in the sales_enquiry table, to test different scenarios like all sales inquiries were converted to sales then only sales success CSV should be generated 

We will add data using data.sql which will be executed on startup by spring boot

insert into sales_enquiry values(1,'9055447789','Suresh','Hyundai Creta',0,SYSTIMESTAMP );
insert into sales_enquiry values(2,'9011558888','Rajesh','Hyundai Venue',0,SYSTIMESTAMP );
insert into sales_enquiry values(3,'8755466447','Arjun','Hyundai i10',1,SYSTIMESTAMP );
insert into sales_enquiry values(4,'7895211488','Khyati','Hyundai i20',1,SYSTIMESTAMP );

Attaching the project structure for reference.


Let me know if you have any questions.

Other Spring Batch Post


Thank You 
Happy Learning!