创建批量服务

2022/5/4 23:18:24

本文主要是介绍创建批量服务,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

本指南将引导你完成创建一个基本的批量驱动的解决方案的过程。

What You Will build

你将建立一个服务,从CSV电子表格中导入数据,用自定义代码进行转换,并将最终结果存储在一个数据库中。

What You Need

  • 大约15分钟
  • 你喜欢的文本编辑器或IDE
  • JDK 1.8 以上
  • Graddle 4+ 或 Maven 3.2+
  • 你也可以直接将代码导入进你的IDE
    • Spring Tool Suite (STS)
    • InteelliJ IDEA

How to complete this guide

正如Spring大多数指南一样,你可以从头开始并且完成每一个步骤或者你可以绕开你已经熟悉的基本设置。无论哪种方式,你最终都是完成代码。

为了从头开始,移步到Starting with Spring Initializr

为了跳过基础,做如下步骤:

  • 下载并解压这篇指南的原仓库,或者使用GIt克隆git clone https://github.com/spring-guides/gs-batch-processing.git
  • 进入(cd)gs-batch-processing/initial
  • 移步到Create a Business Class

当你完成以后,你可以再次检查你的gs-batch-processing/complete的代码。

Business Data

通常情况下,你的客户或业务分析员会提供一个电子表格。对于这个简单的例子,你可以在src/main/resources/sample-data.csv中找到一些捏造的数据。

Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe

这个电子表格的每一行都包含一个名字和一个姓氏,用逗号分开。这是一个相当常见的模式,Spring可以处理,无需定制。

接下来,你需要写一个SQL脚本来创建一个表来存储数据。你可以在src/main/resources/schema-all.sql中找到这样一个脚本。

DROP TABLE people IF EXISTS;

CREATE TABLE people  (
    person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
    first_name VARCHAR(20),
    last_name VARCHAR(20)
);

Spring Boot在启动时自动运行schema-@platform@.sql-all是所有平台的默认值。

Starting with Spring Initializr

你可以使用预初始化项目点击Generate来下载ZIP文件。这一项目被配置为适合本篇指南。

为了手动初始化项目:

  1. 浏览https://start.spring.io/。这个服务拉取了你的应用所有需要的依赖并且已经为了做了多数配置。
  2. 选择Gradle或Maven与你想使用的语言。本文假设你选择Java。
  3. 点击Dependencies选择Spring BatchHyperSQL Database
  4. 点击Generate
  5. 下载ZIP文件,这是一个为你的选择配置好的网络应用文件。

如果你的IDE整合了Spring Initializr,你可以从你的IDE完成此步骤

你也可以从GitHub中fork此项目使用你的IDE或者编辑器打开。

Create a Business Class

现在你可以看到数据输入和输出的格式,你可以写代码来表示一行数据,正如下面的例子(来自src/main/java/com/example/batchprocessing/Person.java)所示。

package com.example.batchprocessing;

public class Person {

  private String lastName;
  private String firstName;

  public Person() {
  }

  public Person(String firstName, String lastName) {
    this.firstName = firstName;
    this.lastName = lastName;
  }

  public void setFirstName(String firstName) {
    this.firstName = firstName;
  }

  public String getFirstName() {
    return firstName;
  }

  public String getLastName() {
    return lastName;
  }

  public void setLastName(String lastName) {
    this.lastName = lastName;
  }

  @Override
  public String toString() {
    return "firstName: " + firstName + ", lastName: " + lastName;
  }

}

你可以通过构造函数将Person类名字和姓氏实例化,也可以通过setter属性来实现。

Create an Intermediate Processor

批处理中的一个常见范式是摄入数据,对其进行转换,然后将其输送到其他地方。在这里,你需要写一个简单的转换器,将名字转换为大写字母。下面的列表(来自src/main/java/com/example/batchprocessing/PersonItemProcessor.java)展示了如何做到这一点。

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {

  private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);

  @Override
  public Person process(final Person person) throws Exception {
    final String firstName = person.getFirstName().toUpperCase();
    final String lastName = person.getLastName().toUpperCase();

    final Person transformedPerson = new Person(firstName, lastName);

    log.info("Converting (" + person + ") into (" + transformedPerson + ")");

    return transformedPerson;
  }

}

PersonItemProcessor实现了Spring Batch的ItemProcessor接口。这使得代码很容易连接到你将在本指南后面定义的批处理作业中。根据该接口,你接收一个传入的Person对象,之后将其转换为大写的Person

输入和输出的类型不需要相同。事实上,在一个数据源被读取后,有时应用程序的数据流需要一个不同的数据类型。

Put Together a Batch Job

现在你需要把实际的批处理工作放在一起。Spring Batch提供了许多实用类,减少了编写自定义代码的需要。相反,你可以专注于业务逻辑。

为了配置你的工作,你必须首先创建一个Spring @Configuration类,就像下面src/main/java/com/exampe/batchprocessing/BatchConfiguration.java中的例子。

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

  @Autowired
  public JobBuilderFactory jobBuilderFactory;

  @Autowired
  public StepBuilderFactory stepBuilderFactory;

    ...

}

首先,@EnableBatchProcessing注解添加了许多支持作业的关键bean,为你节省了大量的腿部工作。这个例子使用了一个基于内存的数据库(由@EnableBatchProcessing提供),这意味着,当它完成后,数据就消失了。它还自动连接了下面需要的几个工厂。现在在你的BatchConfiguration类中添加以下Bean,以定义一个阅读器、一个处理器和一个写入器。

@Bean
public FlatFileItemReader<Person> reader() {
  return new FlatFileItemReaderBuilder<Person>()
    .name("personItemReader")
    .resource(new ClassPathResource("sample-data.csv"))
    .delimited()
    .names(new String[]{"firstName", "lastName"})
    .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
      setTargetType(Person.class);
    }})
    .build();
}

@Bean
public PersonItemProcessor processor() {
  return new PersonItemProcessor();
}

@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
  return new JdbcBatchItemWriterBuilder<Person>()
    .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
    .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
    .dataSource(dataSource)
    .build();
}

第一块代码定义了输入、处理器和输出。

  • reader()创建一个ItemReader。它寻找一个名为sample-data.csv的文件,并对每一行的信息进行解析,以便把它变成一个Person
  • processor()创建了一个你之前定义的PersonItemProcessor的实例,意在将数据转换为大写。
  • writer(DataSource)创建一个ItemWriter。这个是针对JDBC目标的,并自动获得由@EnableBatchProcessing创建的数据源的副本。它包括插入一个Peron所需的SQL语句,由Java bean属性驱动。

最后一块(来自src/main/java/com/example/batchprocessing/BatchConfiguration.java)显示了实际的作业配置。

@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
  return jobBuilderFactory.get("importUserJob")
    .incrementer(new RunIdIncrementer())
    .listener(listener)
    .flow(step1)
    .end()
    .build();
}

@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {
  return stepBuilderFactory.get("step1")
    .<Person, Person> chunk(10)
    .reader(reader())
    .processor(processor())
    .writer(writer)
    .build();
}

第一种方法定义了job,第二种方法定义了单个步骤。工作是由步骤建立起来的,每个步骤都可以涉及一个读者、一个处理器和一个写者。

在这个作业定义中,你需要一个增量器,因为作业使用数据库来维持执行状态。然后你列出每个步骤,(尽管这个作业只有一个步骤)。作业结束后,Java API会产生一个完美配置的作业。

在步骤定义中,你定义一次要写多少数据。在本例中,它一次最多写入10条记录。接下来,你通过使用先前注入的bean来配置阅读器、处理器和写入器。

chunk()的前缀是<Person,Person>,因为它是一个通用方法。这代表了每个"块"处理的输入和输出类型,并与ItemReader<Person>ItemWriter<Person>保持一致。

批处理配置的最后一点是在作业完成时获得通知的方法。下面的例子(来自src/main/java/com/example/batchprocessing/JobCompletionNotificationListener.java)展示了这样一个类。

package com.example.batchprocessing;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

  private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);

  private final JdbcTemplate jdbcTemplate;

  @Autowired
  public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
    this.jdbcTemplate = jdbcTemplate;
  }

  @Override
  public void afterJob(JobExecution jobExecution) {
    if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
      log.info("!!! JOB FINISHED! Time to verify the results");

      jdbcTemplate.query("SELECT first_name, last_name FROM people",
        (rs, row) -> new Person(
          rs.getString(1),
          rs.getString(2))
      ).forEach(person -> log.info("Found <" + person + "> in the database."));
    }
  }
}

JobCompletionNotificationListener监听一个作业的BatchStatus.COMPLETED,然后使用JdbcTemplate来检查结果。

Make the Application Executable

虽然批处理可以被嵌入到Web应用程序和WAR文件中,但下面演示的更简单的方法可以创建一个独立的应用程序。你把所有东西都打包在一个可执行的JAR文件中,由一个好的老的Java main()方法驱动。

Spring Initializr为你创建了一个应用类。对于这个简单的例子,它不需要进一步修改就可以工作。下面的列表(来自src/main/java/com/example/batchprocessing/BatchProcessingApplication.java)显示了该应用类。

package com.example.batchprocessing;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BatchProcessingApplication {

  public static void main(String[] args) throws Exception {
    System.exit(SpringApplication.exit(SpringApplication.run(BatchProcessingApplication.class, args)));
  }
}

@SpringBootApplication是一个添加了如下展示的所有内容的方便注解:

  • @Configuration:对于应用上下文(context),将类标记为bean定义的来源。
  • @EnableAutoConfiguration:告诉Spring Boot开始根据类路径设置添加bean,其它bean,以及各种属性设置。例如,如果spring-webmvc在类路径中,这一注解将应用程序标记为web应用以及激活关键行为,例如设置DispatcherServlet
  • @ComponentScan:告诉Spring寻找com.example包下其它组件、配置以及服务,使其找到控制器。

main()方法使用Spring Boot的SpringApplication.run()方法来启动一个应用程序。你是否注意到没有一行的XML?也同样没有web.xml文件。这一个应用程序是100%纯Java并且你并没有必须处理配置任何一个plumbing或infrastructure。

请注意,SpringApplication.exit()System.exit()确保JVM在作业完成后退出。更多细节请参见Application Exit section in Spring Boot Reference documentation。

为了演示,代码可以创建一个JdbcTemplate,查询数据库,并打印出批处理作业插入的人名。

Build an executable JAR

你可以在命令行以Gradle或Maven运行应用程序。你也可以构建一个单一可执行的JAR包,其中包含所有必须的依赖、类以及资源并且运行。构建一个可执行jar可以易于传输、版本和部署服务作为应用,贯穿至整个开发周期,穿插于不同环境等等。

如果你使用Gradle,你可以通过使用./gradlew bootRun来运行应用程序。或者说,你可以使用./gradlew build来构建JAR包,然后运行JAR包,如下命令:

java -jar build/libs/gs-rest-service-0.1.0.jar

如果你使用Maven,你可以通过./mvnw spring-boot:run来运行应用程序。或者说,你可以使用./mvnw clean package来构建JAR包,通过如下命令执行JAR包。

java -jar target/gs-rest-service-0.1.0.jar

此处步骤描述了创建一个可执行JAR包。你也可以构造一个经典的WAR文件。

该作业为每个被转化的person打印出一行。工作运行后,你也可以看到查询数据库的输出。它应该类似于下面的输出。

Converting (firstName: Jill, lastName: Doe) into (firstName: JILL, lastName: DOE)
Converting (firstName: Joe, lastName: Doe) into (firstName: JOE, lastName: DOE)
Converting (firstName: Justin, lastName: Doe) into (firstName: JUSTIN, lastName: DOE)
Converting (firstName: Jane, lastName: Doe) into (firstName: JANE, lastName: DOE)
Converting (firstName: John, lastName: Doe) into (firstName: JOHN, lastName: DOE)
Found <firstName: JILL, lastName: DOE> in the database.
Found <firstName: JOE, lastName: DOE> in the database.
Found <firstName: JUSTIN, lastName: DOE> in the database.
Found <firstName: JANE, lastName: DOE> in the database.
Found <firstName: JOHN, lastName: DOE> in the database.


这篇关于创建批量服务的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程