SpringBoot + MySQL + SpringBatch Job – How to import CSV data to MySQL Database using Spring Batch Job

SpringBoot + MySQL + SpringBatch Job – How to import CSV data to MySQL Database using Spring Batch Job

Spring Batch is a powerful module to implement a batch process for tons of data conveniently.
This tutorial guide you how to import CSV Data to MySQL Database using Spring Batch Job.

Related Articles:
How to start with Spring Batch using Spring Boot
How to use Spring JDBC Template for Postgres Database

I. Technology

– Java 1.8
– Maven 3.3.9
– Spring Tool Suite – Version 3.8.1.RELEASE
– Spring Boot: 1.4.0.RELEASE

II. Overview

1. Goal

To build a simple application that expresses how we can use Spring Batch with Job Step (including ItemReader, ItemProcessor, ItemWriter and JobExecutionListener) to read Customer Data from CSV file, then put them to MySQL Table named ‘customer’.
batchcsvsql-architect
– Reader: reads content of CSV file, then maps the read data to fields of DataModel Customer.
– Processor: converts each Customer record’s content to new content (for example, get Random ID and uppercase Name String) which will be written to Database Table.
– Writer: writes batch of records to MySQL Database using DAO.
– Listener: handles after Step, read data from Database Table to verify and show LOGs.

– RestController: runs Job using JobLauncher, then return Complete String to Client.
– DAO: interacts with Database.

2. Project Structure

batchcsvsql-structure

3. Step by step

– Create Spring Boot project
– Configure application properties
– Create a DataModel
– Create a DAO
– Create Job Step: Reader, Processor, Writer, Listener
– Create Batch Configuration
– Create a WebController
– Run Spring Boot Application & Enjoy Result

III. Practice

1. Create Spring Boot project

– Open Spring Tool Suite, on Menu, choose File -> New -> Spring Starter Project, then fill each fields:
batchcsvsql-startproj

Click Next, in:
I/O: choose Batch
SQL: choose JDBC and MySQL
Web: choose Web
batchcsvsql-pom-config

Click Finish. Spring Boot project will be created successfully.

2.Configure application properties, add SQL Script File & CSV file

– Add configuration for Datasource and disable Spring Batch Job auto-run in application.properties (locates in src/main/resources):


spring.datasource.url=jdbc:MySQL://localhost:5432/testcsvdb
spring.datasource.username=postgres
spring.datasource.password=123
spring.datasource.platform=MySQL
spring.batch.job.enabled=false

– under src/main/resources:
+ schema-postgresql.sql


DROP TABLE IF EXISTS customer;

CREATE TABLE customer  (
    id Bigserial PRIMARY KEY NOT NULL,
    first_name VARCHAR(20),
    last_name VARCHAR(20)
);

+ customer-data.csv


0,Jack,Smith
1,Adam,Johnson
2,Kim,Smith
3,David,Williams
4,Peter,Davis

3. Create a DataModel

Under package model, create class Customer.
Content of Customer.java:


package com.javasampleapproach.batchcsvMySQL.model;

public class Customer {
	private long id;
	private String firstName;
	private String lastName;

	public Customer() {
	}

	public Customer(long id, String firstName, String lastName) {
		this.id = id;
		this.firstName = firstName;
		this.lastName = lastName;
	}

	public long getId() {
		return id;
	}

	public void setId(long id) {
		this.id = id;
	}

	public String getFirstName() {
		return firstName;
	}

	public void setFirstName(String firstName) {
		this.firstName = firstName;
	}

	public String getLastName() {
		return lastName;
	}

	public void setLastName(String lastName) {
		this.lastName = lastName;
	}

	@Override
	public String toString() {
		return String.format("Customer[id=%d , firstName='%s', lastName='%s']", id, firstName, lastName);
	}
}

4. Create a DAO

– Under package dao, create interface CustomerDao:


package com.javasampleapproach.batchcsvMySQL.dao;

import java.util.List;

import com.javasampleapproach.batchcsvMySQL.model.Customer;

public interface CustomerDao {
	public void insert(List customers);
	List loadAllCustomers();
}

– Under package dao.impl, create implementation of CustomerDaoCustomerDaoImpl:


package com.javasampleapproach.batchcsvMySQL.dao.impl;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import javax.annotation.PostConstruct;
import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.support.JdbcDaoSupport;
import org.springframework.stereotype.Repository;

import com.javasampleapproach.batchcsvMySQL.dao.CustomerDao;
import com.javasampleapproach.batchcsvMySQL.model.Customer;

@Repository
public class CustomerDaoImpl extends JdbcDaoSupport implements CustomerDao {

	@Autowired
	DataSource dataSource;

	@PostConstruct
	private void initialize() {
		setDataSource(dataSource);
	}

	@Override
	public void insert(List Customers) {
		String sql = "INSERT INTO customer " + "(id, first_name, last_name) VALUES (?, ?, ?)";
		getJdbcTemplate().batchUpdate(sql, new BatchPreparedStatementSetter() {
			public void setValues(PreparedStatement ps, int i) throws SQLException {
				Customer customer = Customers.get(i);
				ps.setLong(1, customer.getId());
				ps.setString(2, customer.getFirstName());
				ps.setString(3, customer.getLastName());
			}

			public int getBatchSize() {
				return Customers.size();
			}
		});

	}

	@Override
	public List loadAllCustomers() {
		String sql = "SELECT * FROM customer";
		List> rows = getJdbcTemplate().queryForList(sql);

		List result = new ArrayList();
		for (Map row : rows) {
			Customer customer = new Customer();
			customer.setId((Long) row.get("id"));
			customer.setFirstName((String) row.get("first_name"));
			customer.setLastName((String) row.get("last_name"));
			result.add(customer);
		}

		return result;
	}
}

5. Create Job Step: Reader, Processor, Writer, Listener

Under package step:
Reader.java:


package com.javasampleapproach.batchcsvMySQL.step;

import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.core.io.ClassPathResource;

import com.javasampleapproach.batchcsvMySQL.model.Customer;

public class Reader {
	public static FlatFileItemReader reader(String path) {
		FlatFileItemReader reader = new FlatFileItemReader();

		reader.setResource(new ClassPathResource(path));
		reader.setLineMapper(new DefaultLineMapper() {
			{
				setLineTokenizer(new DelimitedLineTokenizer() {
					{
						setNames(new String[] { "id", "firstName", "lastName" });
					}
				});
				setFieldSetMapper(new BeanWrapperFieldSetMapper() {
					{
						setTargetType(Customer.class);
					}
				});
			}
		});
		return reader;
	}
}

Processor.java:


package com.javasampleapproach.batchcsvMySQL.step;

import java.util.Random;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;

import com.javasampleapproach.batchcsvMySQL.model.Customer;

public class Processor implements ItemProcessor {

	private static final Logger log = LoggerFactory.getLogger(Processor.class);

	@Override
	public Customer process(Customer customer) throws Exception {
		Random r = new Random();
		
		final String firstName = customer.getFirstName().toUpperCase();
		final String lastName = customer.getLastName().toUpperCase();

		final Customer fixedCustomer = new Customer(r.nextLong(), firstName, lastName);

		log.info("Converting (" + customer + ") into (" + fixedCustomer + ")");

		return fixedCustomer;
	}
}

Writer.java:


package com.javasampleapproach.batchcsvMySQL.step;

import java.util.List;

import org.springframework.batch.item.ItemWriter;

import com.javasampleapproach.batchcsvMySQL.dao.CustomerDao;
import com.javasampleapproach.batchcsvMySQL.model.Customer;

public class Writer implements ItemWriter {

	private final CustomerDao customerDao;
	
	public Writer(CustomerDao customerDao) {
		this.customerDao = customerDao;
	}

	@Override
	public void write(List customers) throws Exception {
		customerDao.insert(customers);
	}
}

Listener.java:


package com.javasampleapproach.batchcsvMySQL.step;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;

import com.javasampleapproach.batchcsvMySQL.dao.CustomerDao;
import com.javasampleapproach.batchcsvMySQL.model.Customer;

public class Listener extends JobExecutionListenerSupport {
	private static final Logger log = LoggerFactory.getLogger(Listener.class);

	private final CustomerDao customerDao;

	public Listener(CustomerDao customerDao) {
		this.customerDao = customerDao;
	}

	@Override
	public void afterJob(JobExecution jobExecution) {
		if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
			log.info("Finish Job! Check the results");

			List customers = customerDao.loadAllCustomers();

			for (Customer customer : customers) {
				log.info("Found <" + customer + "> in the database.");
			}
		}
	}
}

JobExecutionListenerSupport is an implementation of JobExecutionListener.
We can make our own operations before start of a Step (override beforeJob method) and after its ending (normally or failed by overriding afterJob method).
The annotations corresponding to this interface are @BeforeStep and @AfterStep.

6. Create Batch Configuration

Under package config, create BatchConfig.java:


package com.javasampleapproach.batchcsvMySQL.config;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.javasampleapproach.batchcsvMySQL.dao.CustomerDao;
import com.javasampleapproach.batchcsvMySQL.model.Customer;
import com.javasampleapproach.batchcsvMySQL.step.Listener;
import com.javasampleapproach.batchcsvMySQL.step.Processor;
import com.javasampleapproach.batchcsvMySQL.step.Reader;
import com.javasampleapproach.batchcsvMySQL.step.Writer;

@Configuration
@EnableBatchProcessing
public class BatchConfig {

	@Autowired
	public JobBuilderFactory jobBuilderFactory;

	@Autowired
	public StepBuilderFactory stepBuilderFactory;

	@Autowired
	public CustomerDao customerDao;

	@Bean
	public Job job() {
		return jobBuilderFactory.get("job").incrementer(new RunIdIncrementer()).listener(new Listener(customerDao))
				.flow(step1()).end().build();
	}

	@Bean
	public Step step1() {
		return stepBuilderFactory.get("step1").chunk(2)
				.reader(Reader.reader("customer-data.csv"))
				.processor(new Processor()).writer(new Writer(customerDao)).build();
	}
}

The input parameter for method chunk of StepBuilder specifies the number of items to read before writing out via the ItemWriter.

7. Create a WebController

Under package controller, create WebController.java:


package com.javasampleapproach.batchcsvMySQL.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class WebController {
	@Autowired
	JobLauncher jobLauncher;

	@Autowired
	Job job;

	@RequestMapping("/runjob")
	public String handle() throws Exception {
		Logger logger = LoggerFactory.getLogger(this.getClass());
		try {
			JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis())
					.toJobParameters();
			jobLauncher.run(job, jobParameters);
		} catch (Exception e) {
			logger.info(e.getMessage());
		}
		return "Done! Check Console Window for more details";
	}
}

8. Run Spring Boot Application & Enjoy Result

– Config maven build:
clean install
– Run project with mode Spring Boot App
– Check results by access:
http://localhost:8080/runjob
batchcsvsql-req

System shows:


Job: [FlowJob: [name=job]] launched with the following parameters: [{time=1474771650234}]
Executing step: [step1]
Converting (Customer[id=0 , firstName='Jack', lastName='Smith']) into (Customer[id=6323287188840625065 , firstName='JACK', lastName='SMITH'])
Converting (Customer[id=1 , firstName='Adam', lastName='Johnson']) into (Customer[id=-5061932588533513687 , firstName='ADAM', lastName='JOHNSON'])
Converting (Customer[id=2 , firstName='Kim', lastName='Smith']) into (Customer[id=250312719692371085 , firstName='KIM', lastName='SMITH'])
Converting (Customer[id=3 , firstName='David', lastName='Williams']) into (Customer[id=2361081371024882848 , firstName='DAVID', lastName='WILLIAMS'])
Converting (Customer[id=4 , firstName='Peter', lastName='Davis']) into (Customer[id=-8239802131547868532 , firstName='PETER', lastName='DAVIS'])
Finish Job! Check the results
Found  in the database.
Found  in the database.
Found  in the database.
Found  in the database.
Found  in the database.
Job: [FlowJob: [name=job]] completed with the following parameters: [{time=1474771650234}] and the following status: [COMPLETED]

Open phAdminIII to check table customer in database testcsvdb:
batchcsvsql-postgresqldb

IV. Source Code

springbatch-csv-postgresql

Leave a Reply

Your email address will not be published. Required fields are marked *