百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

使用SpringBatch读取csv文件

bigegpt 2024-09-20 13:58 3 浏览


1、需求

系统每日从某个固定的目录中读取csv文件,并在控制台上打印。

2、解决方案

要解决上述需求,可以使用的方法有很多,此处选择使用Spring Batch来实现。

3、注意事项

1、文件路径的获取

此处简单处理,读取 JobParameters 中的日期,然后构建一个文件路径,并将文件路径放入到 ExecutionContext中。此处为了简单,文件路径会在程序中写死,但是同时也会将文件路径存入到 ExecutionContext 中,并且在具体的某个Step中从ExecutionContext中获取路径。

注意:
ExecutionContext中存入的数据虽然在各个Step中都可以获取到,但是不推荐存入比较大的数据到ExecutionContext中,因为这个对象的数据需要存入到数据库中。

2、各个Step如果获取到ExecutionContext中的值

  1. 类上加入 @StepScope 注解
  2. 通过 @Value("#{jobExecutionContext['importPath']}") 来获取

eg:

@Bean
@StepScope
public FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) {
    // 读取数据
    return new FlatFileItemReaderBuilder<Person>()
            .name("read-csv-file")
            .resource(new ClassPathResource(importPath))
            .delimited().delimiter(",")
            .names("username", "age", "sex")
            .fieldSetMapper(new RecordFieldSetMapper<>(Person.class))
            .build();
}

解释:在程序实例化FlatFileItemReader的时候,此时是没有jobExecutionContext的,那么就会报错,如果加上@StepScope,此时就没有问题了。@StepScope表示到达Step阶段才实例化这个Bean

3、FlatFileItemReader使用注意

当我们使用FlatFileItemReader来读取我们的csv文件时,此处需要返回 FlatFileItemReader类型,而不能直接返回ItemReader,否则可能出现如下错误 Reader must be open before it can be read

4、实现步骤

1、导入依赖,配置

1、导入依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
</dependencies>

2、初始化SpringBatch数据库

spring.datasource.username=root
spring.datasource.password=root@1993
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/spring-batch?useUnicode=true&characterEncoding=utf8&autoReconnectForPools=true&useSSL=false
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# 程序启动时,默认不执行job
spring.batch.job.enabled=false
spring.batch.jdbc.initialize-schema=always
# 初始化spring-batch数据库脚本
spring.batch.jdbc.schema=classpath:org/springframework/batch/core/schema-mysql.sql

2、构建文件读取路径

此处我的想法是,在JobExecutionListener中完成文件路径的获取,并将之放入到ExecutionContext,然后在各个Step中就可以获取到文件路径的值了。

/**
 * 在此监听器中,获取到具体的需要读取的文件路径,并保存到 ExecutionContext
 *
 * @author huan.fu
 * @date 2022/8/30 - 22:22
 */
@Slf4j
public class AssemblyReadCsvPathListener implements JobExecutionListener {
    @Override
    public void beforeJob(JobExecution jobExecution) {
        ExecutionContext executionContext = jobExecution.getExecutionContext();
        JobParameters jobParameters = jobExecution.getJobParameters();
        String importDate = jobParameters.getString("importDate");
        log.info("从 job parameter 中获取的 importDate 参数的值为:[{}]", importDate);
        String readCsvPath = "data/person.csv";
        log.info("根据日期组装需要读取的csv路径为:[{}],此处排除日期,直接写一个死的路径", readCsvPath);
        executionContext.putString("importPath", readCsvPath);
    }

    @Override
    public void afterJob(JobExecution jobExecution) {

    }
}

3、构建Tasklet,输出文件路径

@Slf4j
@Component
@StepScope
public class PrintImportFilePathTaskLet implements Tasklet {

    @Value("#{jobExecutionContext['importPath']}")
    private String importFilePath;

    @Value("#{jobParameters['importDate']}")
    private String importDate;

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

        log.info("从job parameter 中获取到的 importDate:[{}],从 jobExecutionContext 中获取的 importPath:[{}]",
                importDate, importFilePath);

        return RepeatStatus.FINISHED;
    }
}

需要注意的是,此类上加入了 @StepScope注解

4、编写实体类

@AllArgsConstructor
@Getter
@ToString
public class Person {
    /**
     * 用户名
     */
    private String username;
    /**
     * 年龄
     */
    private Integer age;
    /**
     * 性别
     */
    private String sex;
}

5、编写Job配置

@Configuration
@AllArgsConstructor
@Slf4j
public class ImportPersonJobConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    private final PrintImportFilePathTaskLet printImportFilePathTaskLet;
    private final ItemReader<Person> readCsvItemReader;

    @Bean
    public Job importPersonJob() {
        // 获取一个job builder, jobName可以是不存在的
        return jobBuilderFactory.get("import-person-job")
                // 添加job execution 监听器
                .listener(new AssemblyReadCsvPathListener())
                // 打印 job parameters 和 ExecutionContext 中的值
                .start(printParametersAndContextVariables())
                // 读取csv的数据并处理
                .next(handleCsvFileStep())
                .build();
    }

    /**
     * 读取数据
     * 注意:此处需要返回 FlatFileItemReader类型,而不要返回ItemReader
     * 否则可能报如下异常 Reader must be open before it can be read
     *
     * @param importPath 文件路径
     * @return reader
     */
    @Bean
    @StepScope
    public FlatFileItemReader<Person> readCsvItemReader(@Value("#{jobExecutionContext['importPath']}") String importPath) {
        // 读取数据
        return new FlatFileItemReaderBuilder<Person>()
                .name("read-csv-file")
                .resource(new ClassPathResource(importPath))
                .delimited().delimiter(",")
                .names("username", "age", "sex")
                .fieldSetMapper(new RecordFieldSetMapper<>(Person.class))
                .build();
    }

    @Bean
    public Step handleCsvFileStep() {

        // 每读取一条数据,交给这个处理
        ItemProcessor<Person, Person> processor = item -> {
            if (item.getAge() > 25) {
                log.info("用户[{}]的年龄:[{}>25]不处理", item.getUsername(), item.getAge());
                return null;
            }
            return item;
        };

        // 读取到了 chunk 大小的数据后,开始执行写入
        ItemWriter<Person> itemWriter = items -> {
            log.info("开始写入数据");
            for (Person item : items) {
                log.info("{}", item);
            }
        };

        return stepBuilderFactory.get("handle-csv-file")
                // 每读取2条数据,执行一次write,当每read一条数据后,都会执行process
                .<Person, Person>chunk(2)
                // 读取数据
                .reader(readCsvItemReader)
                // 读取一条数据就开始处理
                .processor(processor)
                // 当读取的数据的数量到达 chunk 时,调用该方法进行处理
                .writer(itemWriter)
                .build();
    }

    /**
     * 打印 job parameters 和 ExecutionContext 中的值
     * <p>
     * TaskletStep是一个非常简单的接口,仅有一个方法——execute。
     * TaskletStep会反复的调用这个方法直到获取一个RepeatStatus.FINISHED返回或者抛出一个异常。
     * 所有的Tasklet调用都会包装在一个事物中。
     *
     * @return Step
     */
    private Step printParametersAndContextVariables() {
        return stepBuilderFactory.get("print-context-params")
                .tasklet(printImportFilePathTaskLet)
                // 当job重启时,如果达到了3此,则该step不在执行
                .startLimit(3)
                // 当job重启时,如果该step的是已经处理完成即COMPLETED状态时,下方给false表示该step不在重启,即不在执行
                .allowStartIfComplete(false)
                // 添加 step 监听
                .listener(new CustomStepExecutionListener())
                .build();
    }
}

6、编写Job启动类

@Component
@Slf4j
public class StartImportPersonJob {

    @Autowired
    private Job importPersonJob;
    @Autowired
    private JobLauncher jobLauncher;

    @PostConstruct
    public void startJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("importDate", LocalDate.of(2022, 08, 31).format(DateTimeFormatter.ofPattern("yyyyMMdd")))
                .toJobParameters();
        JobExecution execution = jobLauncher.run(importPersonJob, jobParameters);
        log.info("job invoked");
    }
}

7、自动配置SpringBatch

@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchReadCsvApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchReadCsvApplication.class, args);
    }
}

主要是 @EnableBatchProcessing 注解

5、执行结果

执行结果

6、完整代码

https://gitee.com/huan1993/spring-cloud-parent/tree/master/spring-batch/spring-batch-read-csv

相关推荐

方差分析简介(方差分析通俗理解)

介绍方差分析(ANOVA,AnalysisofVariance)是一种广泛使用的统计方法,用于比较两个或多个组之间的均值。单因素方差分析是方差分析的一种变体,旨在检测三个或更多分类组的均值是否存在...

正如404页面所预示,猴子正成为断网元凶--吧嗒吧嗒真好吃

吧嗒吧嗒,绘图:MakiNaro你可以通过加热、冰冻、水淹、模塑、甚至压溃压力来使网络光缆硬化。但用猴子显然是不行的。光缆那新挤压成型的塑料外皮太尼玛诱人了,无法阻挡一场试吃盛宴的举行。印度政府正...

Python数据可视化:箱线图多种库画法

概念箱线图通过数据的四分位数来展示数据的分布情况。例如:数据的中心位置,数据间的离散程度,是否有异常值等。把数据从小到大进行排列并等分成四份,第一分位数(Q1),第二分位数(Q2)和第三分位数(Q3)...

多组独立(完全随机设计)样本秩和检验的SPSS操作教程及结果解读

作者/风仕在上一期,我们已经讲完了两组独立样本秩和检验的SPSS操作教程及结果解读,这期开始讲多组独立样本秩和检验,我们主要从多组独立样本秩和检验介绍、两组独立样本秩和检验使用条件及案例的SPSS操作...

方差分析 in R语言 and Excel(方差分析r语言例题)

今天来写一篇实际中比较实用的分析方法,方差分析。通过方差分析,我们可以确定组别之间的差异是否超出了由于随机因素引起的差异范围。方差分析分为单因素方差分析和多因素方差分析,这一篇先介绍一下单因素方差分析...

可视化:前端数据可视化插件大盘点 图表/图谱/地图/关系图

前端数据可视化插件大盘点图表/图谱/地图/关系图全有在大数据时代,很多时候我们需要在网页中显示数据统计报表,从而能很直观地了解数据的走向,开发人员很多时候需要使用图表来表现一些数据。随着Web技术的...

matplotlib 必知的 15 个图(matplotlib各种图)

施工专题,我已完成20篇,施工系列几乎覆盖Python完整技术栈,目标只总结实践中最实用的东西,直击问题本质,快速帮助读者们入门和进阶:1我的施工计划2数字专题3字符串专题4列表专题5流程控制专题6编...

R ggplot2常用图表绘制指南(ggplot2绘制折线图)

ggplot2是R语言中强大的数据可视化包,基于“图形语法”(GrammarofGraphics),通过分层方式构建图表。以下是常用图表命令的详细指南,涵盖基本语法、常见图表类型及示例,适合...

Python数据可视化:从Pandas基础到Seaborn高级应用

数据可视化是数据分析中不可或缺的一环,它能帮助我们直观理解数据模式和趋势。本文将全面介绍Python中最常用的三种可视化方法。Pandas内置绘图功能Pandas基于Matplotlib提供了简洁的绘...

Python 数据可视化常用命令备忘录

本文提供了一个全面的Python数据可视化备忘单,适用于探索性数据分析(EDA)。该备忘单涵盖了单变量分析、双变量分析、多变量分析、时间序列分析、文本数据分析、可视化定制以及保存与显示等内容。所...

统计图的种类(统计图的种类及特点图片)

统计图是利用几何图形或具体事物的形象和地图等形式来表现社会经济现象数量特征和数量关系的图形。以下是几种常见的统计图类型及其适用场景:1.条形图(BarChart)条形图是用矩形条的高度或长度来表示...

实测,大模型谁更懂数据可视化?(数据可视化和可视化分析的主要模型)

大家好,我是Ai学习的老章看论文时,经常看到漂亮的图表,很多不知道是用什么工具绘制的,或者很想复刻类似图表。实测,大模型LaTeX公式识别,出乎预料前文,我用Kimi、Qwen-3-235B...

通过AI提示词让Deepseek快速生成各种类型的图表制作

在数据分析和可视化领域,图表是传达信息的重要工具。然而,传统图表制作往往需要专业的软件和一定的技术知识。本文将介绍如何通过AI提示词,利用Deepseek快速生成各种类型的图表,包括柱状图、折线图、饼...

数据可视化:解析箱线图(box plot)

箱线图/盒须图(boxplot)是数据分布的图形表示,由五个摘要组成:最小值、第一四分位数(25th百分位数)、中位数、第三四分位数(75th百分位数)和最大值。箱子代表四分位距(IQR)。IQR是...

[seaborn] seaborn学习笔记1-箱形图Boxplot

1箱形图Boxplot(代码下载)Boxplot可能是最常见的图形类型之一。它能够很好表示数据中的分布规律。箱型图方框的末尾显示了上下四分位数。极线显示最高和最低值,不包括异常值。seaborn中...