Apache Camel

作者:胡小根

邮箱:hxg@haomo-studio.com

Camel是什么?

系统间的粘合剂

如何用?

// 使用伪代码
from(Source)
.transform(Transformer)
.to(Destination)

一般来说,使用Camel的步骤如下:

  • 创建一个CamelContext对象。
  • 向CamelContext对象中添加Endpoints或者是Components
  • 向CamelContext对象中添加路由(routes)规则
  • 调用CamelContext的start()方法启动Camel引擎
  • 通过Endpoint发送或接收消息
  • 调用CamelContext的stop()方法时
package org.jeecg;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class CamelTest {
    public static void main(String args[]) throws Exception {
        CamelContext context = new DefaultCamelContext();
        context.addRoutes(new RouteBuilder() {
            public void configure() {
                from("timer://foo?period=1000")
                        .setBody(simple("测试日志"))
                        .log("body >>> ${body}");
            }
        });
        context.start();
        Thread.sleep(10000);
        context.stop();
    }
}

核心概念

Camel上下文

Camel上下文环境是某种容器,你可以将它看作Camel的运行时系统,它将Camel的所有功能组合在一起

路由 Route

路由首先从创建交换的节点开始(DSL中的“from”),在每个处理步骤中,上一个步骤输出的消息是下一个步骤的输入消息。

路由可以实现如下功能:

  • 动态确认客户端调用哪个服务器
  • 提供灵活的方法进行数据处理
  • 客户端和服务端分离
  • 不同的系统完成不同的业务,通过路由进行通讯,促进更好的设计实践
  • 增强某些系统(如消息代理和esb)的特性和功能
  • 允许对路由上的某个节点进行模拟测试
from("file:data/inbox")
    .filter().xpath("/order[not(@test)]")
    .to("jms:queue:order");

处理器 Processor

处理器,表示在路由中,够创建、使用、修改传入交换Exchange的处理节点。处理器可以是EIP的实现、特定的组件,也可以是你自定义的处理器。camel提供了大量内置Processor,用于逻辑运算、过滤等。

// 消息转换
from("amqp:queue:order")
.process(new XmlToJsonProcessor())
.to("bean:orderHandler");
// 一个自定义处理器的实现
public class OtherProcessor implements Processor {
    @Override
    public void process(Exchange exchange) throws Exception {
        Message message = exchange.getIn();
        String body = message.getBody().toString();
        //===============
        // 您可以在这里进行数据格式转换
        // 并且将结果存储到out message中
        //===============
        // 存入到exchange的out区域
        if(exchange.getPattern() == ExchangePattern.InOut) {
            Message outMessage = exchange.getOut();
            outMessage.setBody(body + " || other out");
        }
    }
}

组件 Component

组件Component是Camel中的主要扩展点,到目前为止,Camel生态里已经有280多个组件。

端点 Endpoint

端点Endpoint是Camel对最终数据通道的一个抽象,系统可以通过该通道发送或接收消息。在Camel中,通过使用uri配置端点,比如 file:data/inbox?delay=5000

endpoint URI

可以简单理解成消息的地址

  • 对于消费者(from方法)来说,表示消息从哪里来
  • 对于生产者(to方法)来说,表示消息到哪里去

生产者

指的是能够向端点发送消息的实体。

消费者

消费者本身也是一项服务,它接收一些外部系统生成的消息,将它们封装在交换器中,并将它们发送给要处理的消息。消费者是在Camel中路由的交换的来源。

Camel有两种消费者:事件驱动消费者和轮询消费者。

  • 事件驱动消费者:如TCP/IP端口、JMS队列、Twitter句柄、Amazon SQS队列、WebSocket等等。
  • 轮询消费者:主动地从特定的源获取消息。如FTP服务器;文件、FTP和电子邮件组件都使用定期轮询消费者。

Camel消息

分两类:

  • Message,即消息,是系统在使用消息传递通道时用来相互通信的实体。消息Message具有正文Body(消息载体Payload)、头Header和可选的附件Attachment;
  • Exchange,是消息在路由期间的容器,交换还定义了系统之间的交互模式,也称为message exchange patterns (MEP),MEP用于区分单向消息传递和请求-响应消息传递;

  • 同一个数据路由从数据开始传递到数据传递结束的整个生命周期里,交换对象保持不变,消息对象则可能随着路由路线的行进而改变

参考

results matching ""

    No results matching ""