解决相关依赖:
org.apache.kafka
kafka_2.12
2.1.0
生产者:
成都创新互联公司2013年至今,是专业互联网技术服务公司,拥有项目网站设计制作、成都网站建设网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元曲靖做网站,已为上家服务,为曲靖各地企业和个人服务,联系电话:028-86922220
packagecom.zy.kafka;
importjava.util.Properties;
importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.Producer;
importorg.apache.kafka.clients.producer.ProducerRecord;
publicclassKafkaTest {
publicstaticvoidmain(String[] args) {
//1.加载配置文件
//1.1封装配置文件对象
Properties prps=newProperties();
//配置broker地址
prps.put("bootstrap.servers", "hadoop02:9092");
//配置ack级别:0 1 -1(all)
prps.put("acks", "all");
//重试次数
prps.put("retries", 3);
prps.put("batch.size", 16384);
prps.put("linger.ms",1);
prps.put("buffer.memory", 33554432);
//指定(message的K-V)的序列化
prps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2.创建生产者对象(指定的key和value的泛型)
Producerproducer=new KafkaProducer<>(prps);
//生产者发送消息
for(inti=0;i<100;i++) {
/**
* ProducerRecord(topic, value)
* topic:主题名称
* key:
* value:
*/
//消息的封装对象
ProducerRecordpr=newProducerRecord("test_topic", "key"+i, "value"+i);
producer.send(pr);
}
producer.close();
}
}
消费者:
packagecom.zy.kafka;
importjava.util.Arrays;
importjava.util.Properties;
importorg.apache.kafka.clients.consumer.ConsumerRecord;
importorg.apache.kafka.clients.consumer.ConsumerRecords;
importorg.apache.kafka.clients.consumer.KafkaConsumer;
importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.Producer;
importorg.apache.kafka.clients.producer.ProducerRecord;
publicclassKafkaTest {
publicstaticvoidmain(String[] args) {
//1.加载配置文件
//1.1封装配置文件对象
Properties prps=newProperties();
//配置broker地址
prps.put("bootstrap.servers", "hadoop02:9092");
//指定消费的组的ID
prps.put("group.id", "test");
//是否启动自动提交(是否自动提交反馈信息,向zookeeper提交)
prps.put("enable.auto.commit", "true");
//自动提交的时间间隔
prps.put("auto.commit.interval.ms", "1000");
//指定(message的K-V)的序列化
prps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//创建kafka的消费者
KafkaConsumerconsumer=newKafkaConsumer<>(prps);
//添加消费主题
consumer.subscribe(Arrays.asList("kafka_test"));
//开始消费
while(true) {
//设置从哪里开始消费,返回的是一个消费记录
ConsumerRecordspoll = consumer.poll(10);
for(ConsumerRecordp:poll) {
System.out.printf("offset=%d,key=%s,value=%s\n",p.offset(),p.key(),p.value());
}
}
}
}
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import kafka.admin.TopicCommand;
public class KafkaAPI {
public static void main(String[] args) throws IOException {
/*
kafka-topics.sh \
--create \
--zookeeper hadoop02:2181,hadoop03:2181,hadoop04:2181 \
--replication-factor 3 \
--partitions 10 \
--topic kafka_test11
*/
//创建一个topic
String ops[]=new String []{
"--create",
"--zookeeper","hadoop01:2181,hadoop02:2181,hadoop03:2181",
"--replication-factor","3",
"--topic","zy_topic","--partitions","5"
};
String list[]=new String[] {
"--list",
"--zookeeper",
"hadoop01:2181,hadoop02:2181,hadoop03:2181"
};
//以命令的方式提交
TopicCommand.main(list);
}
}
shell中常用操作:
#!/usr/bin/env bash
#查看kafka的topic
kafka-topics.sh --list --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
#查看kafkatopic的偏移量
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic kafka_api_r1p1
#创建topic
kafka-topics.sh --create --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --partitions 3 --replication-factor 1 --topic kafka_api_r1p3
#删除topic
kafka-topics.sh --delete --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic act_inventory_r1p1_test1
#查看具体的group 的偏移量
kafka-consumer-groups.sh
①简单实现,kafka的消费者,并且将由kafka自动管理偏移量(单分区消费)
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
* * Created with IntelliJ IDEA.
* * User: ZZY
* * Date: 2019/9/9
* * Time: 19:44
* * Description: 简单实现,kafka的消费者,并且将由kafka自动管理偏移量(单分区消费)
*/
public class MyConsumer01 {
private static Properties props = new Properties();
static {
props.put("group.id", "kafka_api_group_2");
//设置kafka集群的地址
props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
//开启offset自动提交
props.put("enable.auto.commit", "true");
//手动提交偏移量
//props.put("enable.auto.commit", "false");
//设置自动提交时间
props.put("auto.commit.interval.ms", "100");
//设置消费方式
props.put("auto.offset.reset","earliest");
//序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
public static void main(String[] args) throws InterruptedException {
String topic = "kafka_api_r1p1";
//实例化一个消费者
KafkaConsumer consumer = new KafkaConsumer<>(props);
//消费者订阅主题,可以订阅多个主题
// consumer.subscribe(Collections.singleton(topic));
consumer.subscribe(Arrays.asList(topic));
//死循环不停的从broker中拿数据
while(true){
ConsumerRecords records = consumer.poll(10);
for(ConsumerRecord record : records){
System.out.printf("offset=%d,key=%s,value=%s",record.offset(),
record.key(),record.value());
}
Thread.sleep(2000);
}
//consumer.commitAsync(); 提交偏移量信息
}
}
②实现多分区消费
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
/**
* * Created with IntelliJ IDEA.
* * User: ZZY
* * Date: 2019/9/10
* * Time: 8:55
* * Description: 实现多分区消费
*/
public class MyConsumer02 {
private static Properties props = new Properties();
static{
//设置kafka集群的地址
props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
//设置消费者组,组名字自定义,组名字相同的消费者在一个组
props.put("group.id", "kafka_api_group_1");
//开启offset自动提交
props.put("enable.auto.commit", "false");
//序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
public static void main(String[] args) {
String topicName="kafka_api_r1p3";
//实例化一个消费者
KafkaConsumer consumer =new KafkaConsumer<>(props);
//消费者订阅主题,可以订阅多个主题
consumer.subscribe(Arrays.asList(topicName));
while(true){
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
//获取每个分区的数据
for(TopicPartition partition :records.partitions()){
System.out.println("开始消费第"+partition.partition()+"分区数据!");
List> partitionRecords = records.records(partition);
//获取每个分区里的records
for(ConsumerRecord partitionRecord:partitionRecords){
System.out.println("partition:"+partition.partition()+",key:"+partitionRecord.key()+",value"
+partitionRecord.value()+",offset:"+partitionRecord.offset());
}
//更新每个分区的偏移量(取分区中最后一个record的偏移量,就是这个分区的偏移量)
long lastOffset =partitionRecords.get(partitionRecords.size()-1).offset();
consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastOffset +1)));
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
③实现消费者从指定分区拉取数据
注意:
(1)kafka提供的消费者组内的协调功能就不再有效
(2)样的写法可能出现不同消费者分配了相同的分区,为了避免偏移量提交冲突,每个消费者实例的group_id要不重复
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
/**
* * Created with IntelliJ IDEA.
* * User: ZZY
* * Date: 2019/9/10
* * Time: 10:10
* * Description: 消费者从指定分区拉取数据
* 一旦指定特定的分区消费需要注意:
* (1)kafka提供的消费者组内的协调功能就不再有效
* (2)样的写法可能出现不同消费者分配了相同的分区,为了避免偏移量提交冲突,每个消费者实例的group_id要不重复
*/
public class MyConsumer03 {
private static Properties props = new Properties();
//实例化一个消费者
static KafkaConsumer consumer;
static {
//设置kafka集群的地址
props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
//设置消费者组,组名字自定义,组名字相同的消费者在一个组
props.put("group.id", "kafka_api_group_1");
//开启offset自动提交
props.put("enable.auto.commit", "false");
//序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
}
public static void main(String[] args) {
//消费者订阅主题,并设置要拉取的分区
String topic="kafka_api_r1p3";
int partitionNum=0;
//消费者订阅主题,并设置要拉取的分区
TopicPartition partition0 =new TopicPartition(topic,partitionNum);
consumer.assign(Arrays.asList(partition0));
while(true){
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
for(TopicPartition partition : records.partitions()){
List> partitionRecords = records.records(partition);
for(ConsumerRecord partitionRecord:partitionRecords){
System.out.println("分区:"+partitionRecord.partition()+",key:"+partitionRecord.key()+",value:"
+partitionRecord.value()+"offset:"+partitionRecord.offset());
}
long lastOffset =partitionRecords.get(partitionRecords.size()-1).offset();
consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastOffset+1)));
}
}
}
}
④重置kafka组的offset
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
/**
* * Created with IntelliJ IDEA.
* * User: ZZY
* * Date: 2019/9/10
* * Time: 9:46
* * Description: 该API用于重置kafka组的offset
*/
public class ReSetOffset {
//用于重置的offset
final private static String group="kafka_api_group_1";
final private static Properties props = new Properties();
static KafkaConsumer consumer;
static{
props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
props.put("group.id",group);
props.put("enable.auto.commit", "true");
//props.put("auto.offset.reset","earliest");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumer=new KafkaConsumer(props);
}
public static String resetOffset(String topic,long offset){
int partitionNums=getTopicPartitionNum(topic);
for(int i=0;i(props);
consumer_temp.assign(Arrays.asList(tp));
consumer_temp.seek(tp,offset);
consumer_temp.close();
}
consumer.close();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss ");
return dateFormat.format(new Date())+ group +" ResetOffset Succeed!!";
}
private static int getTopicPartitionNum(String topic){
int partitionNums=consumer.partitionsFor(topic).size();
return partitionNums;
}
public static void main(String[] args) {
String topic="kafka_api_r1p1";
System.out.println(ReSetOffset.resetOffset(topic,0));
}
}
⑤多线程版本的消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* * Created with IntelliJ IDEA.
* * User: ZZY
* * Date: 2019/9/10
* * Time: 10:45
* * Description: 这是一个consumer的线程
*/
public class ConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
private final CountDownLatch latch;
public ConsumerRunner(KafkaConsumer consumer, CountDownLatch latch) {
this.consumer = consumer;
this.latch = latch;
}
@Override
public void run() {
System.out.println("threadName....." + Thread.currentThread().getName());
try {
consumer.subscribe(Arrays.asList("kafka_api_r1p1"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(150);
for (ConsumerRecord record : records)
System.out.printf("threadName= %s, offset = %d, key = %s, value = %s%n", Thread.currentThread().getName(), record.offset(), record.key(), record.value());
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (WakeupException e) {
if(!closed.get()){
throw e;
}
}finally {
consumer.close();
latch.countDown();
}
}
public void shutdown(){
System.out.println("close ConsumerRunner");
closed.set(true);
consumer.wakeup();
}
}
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* * Created with IntelliJ IDEA.
* * User: ZZY
* * Date: 2019/9/10
* * Time: 10:52
* * Description: 这里主要测试多线程下的Consumer
*/
public class RunConsumer {
private static Properties props = new Properties();
static{
//设置kafka集群的地址
props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
//设置消费者组,组名字自定义,组名字相同的消费者在一个组
props.put("group.id", "kafka_api_group_1");
//开启offset自动提交
props.put("enable.auto.commit", "true");
//自动提交时间间隔
props.put("auto.commit.interval.ms", "1000");
//序列化器
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
public static void main(String[] args) {
//实例化一个消费者
final List consumers = new ArrayList<>();
final List> kafkaConsumers = new ArrayList<>();
for(int i=0;i<2;i++){
kafkaConsumers.add(new KafkaConsumer(props));
}
//倒计时,利用await方法使主线程阻塞,利用countDown递减,当递减到0时,唤醒主线程,功能类似于join
final CountDownLatch latch = new CountDownLatch(2);
ExecutorService executor = Executors.newFixedThreadPool(2);
for(int i=0;i<2;i++){
ConsumerRunner c= new ConsumerRunner(kafkaConsumers.get(i),latch);
consumers.add(c);
executor.submit(c);
}
/**
* 这个方法的意思就是在jvm中增加一个关闭的钩子,当JVM关闭时,会执行系统中已经设置的所有
* 方法addShutdownHook添加的钩子,当系统执行完成这些钩子后,jvm才会关闭,
* 所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁、关闭连接等操作。
*/
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run() {
System.out.println("....................");
for(ConsumerRunner consumer:consumers){
consumer.shutdown();
}
executor.shutdown();
try {
executor.awaitTermination(5000, TimeUnit.MICROSECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}