概述
首先,對camel的基本概念和用法以及Kafka的基本概念和用法沒有嘮叨。
這篇文章假設(shè)你對二者都有基本的認識。camel 本身是一個路由引擎,通過 camel 你可以定義路由規(guī)則,指定從哪里(源)接收消息,如何處理這些消息,以及發(fā)往哪里(目標)。camel-kafka 就是 camel 的其中一個組件,它從指定的 kafka topic 獲取消息來源進行處理。
有些小伙伴可能有疑問了,kafka 本身不就是生產(chǎn)者-消費者模式嗎?原生 kafka 發(fā)布消息,然后消費進行消息處理不就行了,為啥還用 camel-kafka 呢?
首先恭喜你是一個愛思考的小伙伴!這個問題的答案是這樣,camel 本身提供的是高層次的抽象,你可以選擇從 kafka 作為源接收數(shù)據(jù),也可以使用其它組件,比如mq,文件等。camel 讓你能使用相同的api和處理流程,處理不同協(xié)議和數(shù)據(jù)類型的系統(tǒng)。
所有總結(jié)下,(下面這句話很重要,讀三遍)
camel實現(xiàn)了客戶端與服務(wù)端的解耦, 生產(chǎn)者和消費者的解耦。
比如我們可以選擇從kafka獲取消息,然后發(fā)送到j(luò)ms(activemq)。
from("kafka:test?brokers=localhost:9092") .to("jms:queue:test.mq.queue")詳解camel-kafka
camel對每個組件約定一個發(fā)送和接受的 endpoint uri,kafka 的uri格式是,
kafka:topic[?options]option中有很多選項,比如最重要的 brokers 指定 kafka 服務(wù)的地址。然后是uri的參數(shù),類似http uri的參數(shù)格式。下面是個示例:
from("kafka:test?brokers=localhost:9200" + "&maxPollRecords=5000" + "&consumersCount=1" + "&seekTo=beginning" + "&groupId=kafkaGroup") .routeId("FromKafka") .to("stream:out");每個參數(shù)的意思我就不一一解釋了,在文章最后我會給出官方的鏈接,大家可以自己去查閱。
說了這么多,我們還是運行一個程序看看效果。這個程序來自 apache camel 官方example,完整的代碼在文章的最后有鏈接。
首先,pom引入依賴,
<dependency> <groupId>org.a;/groupId> <artifactId>camel-kafka</artifactId> <version>2.24.1</version> </dependency>我需要在本地啟動一個 kafka 的server,具體過程網(wǎng)上很多,這里不啰嗦了。唯一要注意的是 kafka server 的版本最好跟 camel-kafka 引入的 kafka-client 版本一致,以免踩坑。
kafka環(huán)境安裝好之后,創(chuàng)建兩個topic,
bogon:kafka_2.11-2.2.0 ponyma$ ./bin --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic TestLog Created topic TestLog. bogon:kafka_2.11-2.2.0 ponyma$ ./bin --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic AccessLog Created topic AccessLog.先來看看消費者部分的代碼,
public static void main(String[] args) throws Exception { Sy("About to run Kafka-camel integration2..."); CamelContext camelContext = new DefaultCamelContext(); // Add route to send messages to Kafka camelCon(new RouteBuilder() { public void configure() { PropertiesComponent pc = getContext().getComponent("properties", Pro); ("classpa;); Sy("About to start route: Kafka Server -> Log "); from("kafka:?brokers=:" + "&maxPollRecords=" + "&consumersCount=" + "&seekTo=" + "&groupId=") .routeId("FromKafka") .to("stream:out"); } }); camelCon(); // let it run for 5 minutes before shutting down T(5 * 60 * 1000); camelCon(); }這個代碼的核心就是camel的路由配置,也很簡單,當(dāng)前這個路由的意思是,從 kafka 某個 topic 讀取數(shù)據(jù),不做任何處理直接發(fā)送到標準輸出。
再來看看生產(chǎn)者,
camelCon(new RouteBuilder() { public void configure() { PropertiesComponent pc = getContext().getComponent("properties", Pro); ("classpa;); // setup kafka component with the brokers KafkaComponent kafka = new KafkaComponent(); ka(":"); camelCon("kafka", kafka); from("direct:kafkaStart").routeId("DirectToKafka") .to("kafka:").log, "${headers}"); // Topic can be set in header as well. from("direct:kafkaStartNoTopic").routeId("kafkaStartNoTopic") .to("kafka:dummy") .log, "${headers}"); // Use custom partitioner based on the key. /** * partitioner指定分區(qū)發(fā)送 */ from("direct:kafkaStartWithPartitioner").routeId("kafkaStartWithPartitioner") .to("kafka:?partitioner=") .log("${headers}"); // Takes input from the command line. from("stream:in").setHeader, simple("0")) .setHeader, simple("1")).to("direct:kafkaStart"); } });- 第一個 from to 意思是監(jiān)聽 direct:kafkaStart ,發(fā)送到指定的 topic。
- 第二個 from to 也是監(jiān)聽某個 direct,但是沒有發(fā)送的 kafka的topic上。
- 第三個 from to 是監(jiān)聽 direct:kafkaStartWithPartitioner,發(fā)送到特定 topic 的特定分區(qū)上。
- 第四個 from to 是監(jiān)聽我們在控制臺的輸入,發(fā)送到 direct:kafkaStart。
上面四個 from to 對應(yīng) 下面四個發(fā)送的示例,通過日志打印我們可以看看數(shù)據(jù)是否被正確的進行路由了。
, 0); , "1"); ("direct:kafkaStart", testKafkaMessage, headers);這段代碼的意思是,生產(chǎn)者發(fā)送數(shù)據(jù)到 direct:kafkaStart 這個endpoint上, headers指定了所有的消息都會發(fā)送到 kafka topic 的第一個分區(qū)。
, "2"); , "TestLog"); ("direct:kafkaStartNoTopic", testKafkaMessage, headers);生產(chǎn)者發(fā)送數(shù)據(jù)到 direct:kafkaStartNoTopic 這個endpoint上,對應(yīng)上面第二個 from to ,雖然沒有指定發(fā)送目標的 kafka topic,但是我們在 header 里指定了 topic,所以跟第一個 from to 其實可以達到同樣的效果。
后面兩個就不貼出代碼了,一個是發(fā)送到分區(qū)0,一個發(fā)送到分區(qū)1。分區(qū)的原則是 header 里指定的key,分區(qū)器是自定義的,在源碼 中。這里不表。
先啟動消費者端,然后啟動生產(chǎn)者端,結(jié)果如下:
可以看到,運行的結(jié)果跟我們分析的是一致的。
1.《【camel是什么意思】技術(shù)分享:kafka系列之camel-kafka》援引自互聯(lián)網(wǎng),旨在傳遞更多網(wǎng)絡(luò)信息知識,僅代表作者本人觀點,與本網(wǎng)站無關(guān),侵刪請聯(lián)系頁腳下方聯(lián)系方式。
2.《【camel是什么意思】技術(shù)分享:kafka系列之camel-kafka》僅供讀者參考,本網(wǎng)站未對該內(nèi)容進行證實,對其原創(chuàng)性、真實性、完整性、及時性不作任何保證。
3.文章轉(zhuǎn)載時請保留本站內(nèi)容來源地址,http://f99ss.com/yule/1964066.html