Apache Camel 路由
2021/10/12 23:15:10
本文主要是介绍Apache Camel 路由,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
1. 简介
1.1 路由简介
Camel的最重要的特性是路由,没有路由,Camel就只是一个传输连接库。
在企业软件系统间进行消息传递的场景下,路由就是将消息从输入队列中取出并根据一组预设的条件发送到多个输出队列中的过程,如下图所示:
输入和输出队列并不知道消息传递的条件。消息传递的逻辑与消息的生产者和消费者之间是解耦的。
输入和输出队列并不知道消息传递的条件。消息传递的逻辑与消息的生产者和消费者之间是解耦的。
1.2 路由作用
在Camel中,路由是一个更普通的概念。消息从我们称之为消费者的端点进入路由,Camel指挥消息一步一步地移动,这就是路由。
消费者端点可以从外部服务接收消息,也可以从外部数据源上轮询得到消息,甚至可以直接创建一个消息。
这些消息会在Camel的路由定义中流经处理节点,处理节点可以是企业集成模式(EIP)、处理器、拦截器或另一个自定义组件。
消息最终被发送到称之为生产者的目标端点。
一个路由可以包含多个处理节点,每个处理节点都可以对消息进行修改或者将其发送到另一个位置。
路由也可以没有处理组件,没有处理节点时,路由就是一个简单的、接通数据源和数据目标的管道。
2. 具体场景下举例
2.1 虚拟场景
假设有一家摩托车配件生产销售为主营业务的公司,名叫“骑手摩配”。这家公司发展了很多年,接收订单的方式一变再变。
最初,客户需要将CSV文件上传到FTP服务器来下订单,消息格式后来更改为XML。
再后来,公司提供了一个网站,通过该网站,订单可以通过HTTP以XML消息的形式提交。
“骑手摩配”现在要求新客户使用HTTP接口下单,但是由于之前与老客户之间签订了服务水平协议(SLA),公司必须保持所有老的数据交换接口可以以老的数据格式继续正常运行。
公司在处理这些订单之前,都会将其转换为一个普通Java对象(POJO)之后再进行处理。订单处理系统的简单架构图如图下图所示。
很多公司一样面临着同一个问题:经过多年的运营,每个版本的数据传输方式和数据格式就成了现在的技术包袱。
使用Camel这样的集成框架可以轻而易举的解决这些问题。
首先,我们将在“骑手摩配”公司的前置系统中,实现一个FTP模块,实现这个FTP前置模块包括以下步骤:
- 从FTP服务器上检查并下载新订单文件
- 将订单文件转换为JMS消息
- 将消息发送到JMS的incomingOrders队列
要完成第1步和第3步,我们需要了解如何使用Camel端点建立与FTP和JMS的通信。
要完成整个任务,我们还需要了解如何使用Java DSL进行路由。
2.2 理解Endpoint终端
Endpoint终端是一种抽象,它是Camel对消息通道末端的建模,软件系统可以通过这些通道发送或接收消息。
2.2.1 从FTP 终端消费数据
Camel简单易用的一个主要原因是Endpoint终端URI的设计。通过Endpoint终端URI,可以标识要使用的组件和对该组件的相关配置。然后可以决定将该组件作为消息生产者,发送消息到由该URI配置的组件,还是将该组件作为消息消费者,从该组件中获取消息。
结合2.1 “骑手摩配”的业务场景。要从FTP服务器下载新订单,需要执行以下操作:
- 使用默认端口21连接到
rider.com
的FTP服务器。 - 提供用户名
rider
和密码secret
。 - 更改FTP当前目录为
orders
文件夹。 - 如果有新的订单文件,则进行下载。
如下图所示,可以非常轻松地使用一串URI来配置Camel实现这点:
首先,Camel在组件的注册表中查找FTP的连接方案,最终通过注册表解析得到使用为FTP组件(FtpComponent
);
然后,Camel使用FTP组件作为工厂,根据上下文路径和参数来创建FTP端点(FtpEndpoint
);上下文路径rider.com/orders告诉FTP组件它应该通过默认FTP端口登录到rider.com的FTP服务器,并将目录更改为orders;
最后,Options选项指定了用户名和密码,它们用于登录FTP服务器。
Ftp组件并不在camel-core模块内,所以我们需要给你的工程添加一个额外的依赖包,如下:
// 使用maven 添加如下的依赖到pom文件中 <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-ftp</artifactId> <version>2.20.1</version> </dependency>
这个Endpoint终端URI无论是作为使用者还是作为生产者都会生效。
当前场景下,我们是用它从FTP服务器上下载订单,所以还需要在Camel DSL的from节点中使用它:
from("ftp://rider.com/orders?username=rider&password=secret")
2.2.2 发送消息到JMS 队列
1、什么是JMS?
Java Message Service (JMS)是一个Java API,它可以创建、发送、接收和读取消息。
JMS要求消息传递是异步的,具有一些可靠性的设计,例如JMS可以提供一次消息发送仅有一次消息接收的方案。
JMS可能是Java社区中部署最广泛的消息传递解决方案。
在JMS中,消息使用者和生产者通过一个中介(JMS目的地)相互通信。如下图所示,目的地可以是队列也可以是主题:
队列是严格的点对点消息通讯,每条消息只会被一个消费者消费。
主题基于发布/订阅模式,如果有多个用户订阅了该主题,那么一条消息将发送给多个用户。
JMS同样会提供一个ConnectionFactory
,以便于客户端(例如Camel)可以使用它来创建一个与JMS服务的通讯连接。
JMS服务有时又被称之为JMS消息代理,因为它们代替业务系统来实现消息生产者和消费者之间的消息通讯。
2、如何配置Camel以连接JMS服务
我们使用适当的ConnectionFactory
来配置Camel的JMS组件,以便将Camel连接到特定的JMS系统。
Apache ActiveMQ是最流行的开源JMS提供者之一,它是Camel团队用来测试JMS组件的主要JMS代理,这里我们也将使用它来进行演示。
在使用Apache ActiveMQ的情况下,我们创建一个ActiveMQConnectionFactory
,由它来指定正在运行的ActiveMQ代理地址:
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
URIvm://localhost
意味着我们应该连接到运行在当前JVM中,一个名为localhost的嵌入式代理。如果代理还没有运行,ActiveMQ中的vm传输连接器将按需创建一个代理,因此非常适合拿来快速构建一个测试用的JMS应用程序。
对于生产场景,建议连接到已经运行的代理。此外,在生产场景中,我们建议在连接到JMS代理时使用连接池。
接下来,在创建CamelContext
时,可以添加JMS组件,如下所示:
CamelContext context = new DefaultCamelContext(); context.addComponent("jms",JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
JMS组件和Activemq的ConnectionFactory不是camel-core模块的一部分。所以需要将依赖项添加到maven项目中:
// 使用maven 添加如下的依赖到pom文件中 <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-jms</artifactId> <version>2.20.1</version> </dependency>
JMS的连接工厂来自于ActiveMQ的相关API,所以需要以下依赖项:
// 使用maven 添加如下的依赖到pom文件中 <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.2</version> </dependency>
现在,我们就已经将JMS组件配置为连接到特定的JMS代理。
3、使用URI来指定目的地
配置好JMS组件后,我们可以发送和接收JMS消息了。因为使用的是uri,所以配置起来非常简单。
假设我们希望向名为incomingOrders
的队列发送一条JMS消息,URI如下:
jms:queue:incomingOrders
这串URI本身就已经说明了它的用途:
- jms前缀表示你正在使用之前配置的jms组件;
- 通过指定queue,JMS组件知道目的地是名为
incomingOrders
的队列
这里甚至可以省略queue那一部分,因为JMS组件的默认行为就是发送到队列而不是主题。
使用Camel的Java DSL,我们可以使用to关键字像这样发送消息到incomingOrders
队列:
... to("jms:queue:incomingOrders")
这一串URI可以读作:发送到to
名为incomingOrders
的JMS队列queue
。
2.3 在Java中创建一个数据路由
我们可以使用RouteBuilder
来创建一个路由,并且每个CamelContext
可以包含多个路由。不过CamelContext
在运行时并不会使用RouteBuilder
作为最终路由定义,RouteBuilder
只是一个路由的构建器,由它构建出的一个或者多个路由,会添加到CamelContext
中进行运行,如下图所示:
特别注意,RouteBuilder和Route在概念上的区别非常重要。在RouteBuilder中编写的DSL代码,无论是Java DSL还是XML DSL,都只是一个设计时的构造,Camel在启动时只使用一次。例如,你可以在IDE中调试从RouteBuilder构建路由。
CamelContext
的addRoutes
方法接收一个RoutesBuilder
,注意,这个方法接收的不是RouteBuilder
。RoutesBuilder
接口里面只有一个简单的方法定义:
void addRoutesToCamelContext(CamelContext context) throws Exception;
理论上,我们可以使用自己的自定义类来构建Camel路由,但不建议这么做。
Camel提供了RouteBuilder
类,它实现了RoutesBuilder
接口,并且还可以使用Camel的Java DSL来创建路由。
2.3.1 使用RouteBuilder
Camel的org.apache.camel.builder.RouteBuilder
抽象类会非常频繁的出现,我们需要使用它来通过Java DSL构建路由。
要使用RouteBuilder
类,需要自己写有一个类来继承它,然后实现其中的configure
方法,例如:
public class MyRouteBuilder extends RouteBuilder { public void configure() throws Exception { ... } }
然后需要将它的实例通过addRoutes
方法添加到CamelContext
中:
CamelContext context = new DefaultCamelContext(); context.addRoutes(new MyRouteBuilder());
或者,也可以通过直接在CamelContext
中添加一个匿名RouteBuilder
类来合并RouteBuilder
和CamelContext
配置,如下:
CamelContext context = new DefaultCamelContext(); context.addRoutes(new RouteBuilder() { public void configure() throws Exception { ... } });
在configure
方法中,可以使用Java DSL定义路由。
要开始一个路由,我们应该使用from方法,所有的路由都以一个from节点开始。
from方法接受端点URI作为参数,这里添加一个FTP端点URI来连接到“骑手摩配”的订单服务器,如下所示:
from("ftp://rider.com/orders?username=rider&password=secret")
from方法会返回一个RouteDefinition
对象,我们可以在该对象上调用实现EIP和其他消息传递概念中的各种方法。
2.3.2 使用Java DSL
领域限定语言(Domain-specific languages)即DSL,是针对特定问题领域的计算机语言,它与大多数编程语言不同,多数编程语言例如Java,是针对通用领域的计算机语言。例如,我们可能已经使用正则表达式DSL来匹配文本字符串,正则是一种匹配字符串的简洁方法,而在Java中不同正则实现相同的功能就没那么容易。正则表达式DSL是一个外部的DSL、它有自定义的语法,因此需要一个单独的编译器或解释器来执行。与外部DSL相对的是内部DSL,它使用现有的通用语言(如Java),使DSL感觉像是来自特定领域的语言。最简单的方法是通过特定的方法命令和参数来匹配相关领域的概念。
实现内部DSL的另一种比较流行的方法是使用链式编程接口(也称为链式构建器)。在使用链式编程接口时,可以将一串方法调用链接在一起来构建对象。最终执行一个操作,返回构建对象实例。
Camel的领域是企业集成,它的Java DSL是一组链式构建器,这个链式构建器包含各种EIP的术语命名的方法。
1、清单2.1拉取FTP消息,并发送到incomingOrders队列
import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.jms.JmsComponent; import org.apache.camel.impl.DefaultCamelContext; public class FtpToJMSExample { public static void main(String args[]) throws Exception { CamelContext context = new DefaultCamelContext(); ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); context.addRoutes(new RouteBuilder() { public void configure() { from("ftp://rider.com/orders" + "?username=rider&password=secret") .to("jms:incomingOrders"); // 使用java语言构建路由 } }); context.start(); Thread.sleep(10000); context.stop(); } }
from方法告诉Camel使用来自FTP端点的消息,to方法告诉Camel将消息发送到JMS端点。
这个简单路由中的消息流可以看作是一个基本管道,消费者的输出作为生产者的输出,如图下图所示:
我们没有做任何从FTP文件类型到JMS消息类型的数据转换,这一步是通过Camel的类型转换工具自动完成的。
Caml允许使用者在路由执行过程中的任何节点上做强制类型转换,但是大部分情况下我们不需要自己做转换。
如果我们想看看路由中间到底发生了什么,我们可以使用Camel提供的流式钩子和行为特性注入的方式对其执行流程进行控制。
下面我们通过使用Processor来访问消息的简单方法。
2、添加一个Processor
Camel中的Processor接口是复杂路由的重要组成部分。它是一个简单的接口,只有一个方法:
public void process(Exchange exchange) throws Exception;
这个方法提供了对消息交换的完全访问权,可以对消息负载或消息头执行几乎任何想要的操作。
Camel中的所有EIP都是作为Processor实现的。我们甚至可以添加一个简单的内联Processor到指定路由:
from("ftp://rider.com/orders?username=rider&password=secret") .process(new Processor() { public void process(Exchange exchange) throws Exception { System.out.println("We just downloaded: " + exchange.getIn().getHeader("CamelFileName")); } }) .to("jms:incomingOrders");
此路由现在将在控制台中打印已下载的订单的文件名,然后再将其发送到JMS队列。
通过将这个Processor添加到路由的中间,就可以将它添加到前面提到的路由的管道中,如下图2.9所示:
FTP消费者的输出作为输入,输入到Processor,Processor不修改消息负载或消息头,Exchange将这个Processor的输出到作为输入的JMS生成器。
Camel创建路由的主要方法之一是通过Java DSL。毕竟,它是内置在Camel-core模块中的。
不过,还有其他创建路由的方法,其中一些可能更适合你的业务场景。例如,Camel为在XML中编写路由提供了扩展。
文献:《Camel In Action》 Routing with Camel
这篇关于Apache Camel 路由的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23增量更新怎么做?-icode9专业技术文章分享
- 2024-11-23压缩包加密方案有哪些?-icode9专业技术文章分享
- 2024-11-23用shell怎么写一个开机时自动同步远程仓库的代码?-icode9专业技术文章分享
- 2024-11-23webman可以同步自己的仓库吗?-icode9专业技术文章分享
- 2024-11-23在 Webman 中怎么判断是否有某命令进程正在运行?-icode9专业技术文章分享
- 2024-11-23如何重置new Swiper?-icode9专业技术文章分享
- 2024-11-23oss直传有什么好处?-icode9专业技术文章分享
- 2024-11-23如何将oss直传封装成一个组件在其他页面调用时都可以使用?-icode9专业技术文章分享
- 2024-11-23怎么使用laravel 11在代码里获取路由列表?-icode9专业技术文章分享
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享