public static void consumer(){
创新互联公司于2013年成立,是专业互联网技术服务公司,拥有项目成都网站设计、网站建设、外贸网站建设网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元苏尼特左做网站,已为上家服务,为苏尼特左各地企业和个人服务,联系电话:028-86922220
Properties props = new Properties();
props.put("zk.connect", "hadoop-2:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "fans_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
MapString, Integer map = new HashMapString, Integer();
map.put("fans", 1);
// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
MapString, ListKafkaStreamMessage topicMessageStreams = consumerConnector.createMessageStreams(map);
ListKafkaStreamMessage streams = topicMessageStreams.get("fans");
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(1);
long startTime = System.currentTimeMillis();
// consume the messages in the threads
for(final KafkaStreamMessage stream: streams) {
executor.submit(new Runnable() {
public void run() {
ConsumerIteratorMessage it = stream.iterator();
while (it.hasNext()){
log.debug(byteBufferToString(it.next().message().payload()));
}
}
});
log.debug("use time="+(System.currentTimeMillis()-startTime));
}
}
一般消息发不出去很大可能都是配置或环境的问题
1、排查环境是否有问题,zookeeper节点是否存活,kafka节点是否存活,通过命令行的方式能否发出去消息(使用kafka-console-producer.sh),如果通过命令行都发不出去那就是集群的问题了。
2、网络问题,调用机器和集群之间网络是否通畅
3、调用时配置的host、port和集群中配置的是否一致,是否需要使用主机名而不是ip
4、客户端api版本是否和服务端差别太大导致不兼容
5、防火墙问题,关闭集群的防火墙实时
诸如此类,可能性太多就不一 一列举了。
你这既然有打印堆栈,如果报错肯定有异常信息的,可能卡住的时间比较长,耐心等待吧,祝你早日解决bug。
我这里是使用的是,kafka自带的zookeeper。
以及关于kafka的日志文件啊,都放在默认里即/tmp下,我没修改。保存默认的
1、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ jps
2625 Jps
2、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/zookeeper-server-start.sh config/zookeeper.properties
此刻,这时,会一直停在这,因为是前端运行。
另开一窗口,
3、 [hadoop@sparksinglenode kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties
也是前端运行。