资讯

精准传达 • 有效沟通

从品牌网站建设到网络营销策划,从策略到执行的一站式服务

Flume-1.6.0学习笔记(六)kafkasource

在洪洞等地区,都构建了全面的区域性战略布局,加强发展的系统性、市场前瞻性、产品创新能力,以专注、极致的服务理念,为客户提供成都网站设计、成都网站建设、外贸网站建设 网站设计制作按需定制设计,公司网站建设,企业网站建设,品牌网站设计,成都全网营销,外贸营销网站建设,洪洞网站建设费用合理。

鲁春利的工作笔记,好记性不如烂笔头


Flume1.6.0增加了对kafka的完全支持:

Flume Sink and Source for Apache Kafka
A new channel that uses Kafka

Kafka Source(http://flume.apache.org/FlumeUserGuide.html#kafka-source)

    Kafka Source is an Apache Kafka consumer that reads messages from a Kafka topic.

    If you have multiple Kafka sources running, you can configure them with the same Consumer Group so each will read a unique set of partitions for the topic.

File Channel(http://flume.apache.org/FlumeUserGuide.html#file-channel)

    

HBase Sink(http://flume.apache.org/FlumeUserGuide.html#hbasesink)

    The type is the FQCN: org.apache.flume.sink.hbase.HBaseSink.

Kafka生成的topic为myhbase

[hadoop@nnode kafka0.8.2.1]$ bin/kafka-topics.sh --create --zookeeper nnode:2181,dnode1:2181,dnode2:2181 --replication-factor 1 --partitions 1 --topic myhbase
Created topic "myhbase".
[hadoop@nnode kafka0.8.2.1]$ bin/kafka-topics.sh --list --zookeeper nnode:2181,dnode1:2181,dnode2:2181
myhbase
mykafka
mytopic - marked for deletion
test - marked for deletion
[hadoop@nnode kafka0.8.2.1]$

HBase表结构

[hadoop@nnode kafka0.8.2.1]$ hbase shell
HBase Shell; enter 'help' for list of supported commands.
Type "exit" to leave the HBase Shell
Version 1.0.1, r66a93c09df3b12ff7b86c39bc8475c60e15af82d, Fri Apr 17 22:14:06 PDT 2015

表名:t_inter_log
列族:cf

Flume配置文件

vim conf/kafka-hbase.conf

# read from kafka and write to hbase

agent.sources = kafka-source
agent.channels = mem-channel
agent.sinks = hbase-sink

# source
agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka-source.zookeeperConnect = nnode:2181,dnode1:2181,dnode2:2181
agent.sources.kafka-source.groupId = flume
agent.sources.kafka-source.topic = myhbase
agent.sources.kafka-source.kafka.consumer.timeout.ms = 100

# channel
agent.channels.mem-channel.type = memory

# sink
agent.sinks.hbase-sink.type = hbase
agent.sinks.hbase-sink.table = t_inter_log
agent.sinks.hbase-sink.columnFamily  = cf
# agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

# assemble
agent.sources.kafka-source.channels = mem-channel
agent.sinks.hbase-sink.channel = mem-channel

启动Kafka

[hadoop@nnode kafka0.8.2.1]# bin/kafka-server-start.sh config/server.properties

启动flume-ng

[hadoop@nnode flume1.6.0]$ bin/flume-ng agent --conf conf --name agent --conf-file conf/kafka-hbase.conf -Dflume.root.logger=INFO,console

通过Java Api实现producer

package com.lucl.kafka.simple;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.apache.log4j.Logger;

/**
 * 

 Copyright: Copyright (c) 2015 

 *   * 

 Date : 2015-11-17 21:42:50 

 *   * 

 Description : JavaApi for kafka producer 

 *  * @author luchunli  *   * @version 1.0  *  */ public class SimpleKafkaProducer {     private static final Logger logger = Logger.getLogger(SimpleKafkaProducer.class);     /**      *       */     private void execMsgSend() {         Properties props = new Properties();         props.put("metadata.broker.list", "192.168.137.117:9092");         props.put("serializer.class", "kafka.serializer.StringEncoder");         props.put("key.serializer.class", "kafka.serializer.StringEncoder");         props.put("request.required.acks", "0");                  ProducerConfig config = new ProducerConfig(props);                   logger.info("set config info(" + config + ") ok.");                  Producer procuder = new Producer<>(config);                  String topic = "myhbase";                  String columnFamily = "cf";         String column = "count";         for (int i = 1; i <= 10; i++) {             String rowkey = "www.value_" + i + ".com";             String value = "value_" + i;                          String event = rowkey + ", " + columnFamily + ":" + column + ", " + value;             logger.info(event);             KeyedMessage msg = new KeyedMessage(topic, event);             procuder.send(msg);         }         logger.info("send message over.");                      procuder.close();     }          /**      * @param args      */     public static void main(String[] args) {         SimpleKafkaProducer simpleProducer = new SimpleKafkaProducer();         simpleProducer.execMsgSend();     } }

观察Flume-ng控制台输出

2015-11-21 23:09:47,466 (flume_nnode-1448118584558-54f0a1ba-leader-finder-thread) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] [ConsumerFetcherManager-1448118585060] Added fetcher for partitions ArrayBuffer([[myhbase,0], initOffset 70 to broker id:117,host:nnode,port:9092] )
2015-11-21 23:09:59,147 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SINK, name: hbase-sink: Successfully registered new MBean.
2015-11-21 23:09:59,147 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SINK, name: hbase-sink started
2015-11-21 23:15:30,702 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:351)] Failed to commit transaction.Transaction rolled back.
java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)V
        at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:377)
        at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:372)
        at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
        at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:372)
        at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:342)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
2015-11-21 23:15:30,716 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:354)] Failed to commit transaction.Transaction rolled back.
java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)V
        at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:377)
        at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:372)
        at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
        at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:372)
        at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:342)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)V
        at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:377)
        at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:372)
        at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
        at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:372)
        at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:342)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
^X^C2015-11-21 23:15:38,090 (agent-shutdown-hook) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.stop(LifecycleSupervisor.java:79)] Stopping lifecycle supervisor 10
2015-11-21 23:15:38,103 (PollableSourceRunner-KafkaSource-kafka-source) [INFO - org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:149)] Source runner interrupted. Exiting

    写入失败。

查看HBase的表

hbase(main):004:0> scan 't_inter_log'
ROW                                                COLUMN+CELL                                                                                                                                      
0 row(s) in 0.0140 seconds

hbase(main):005:0>

新闻名称:Flume-1.6.0学习笔记(六)kafkasource
当前链接:http://cdkjz.cn/article/jgjdgp.html
多年建站经验

多一份参考,总有益处

联系快上网,免费获得专属《策划方案》及报价

咨询相关问题或预约面谈,可以通过以下方式与我们联系

业务热线:400-028-6601 / 大客户专线   成都:13518219792   座机:028-86922220