最近写了个kafka的接收消息的功能,需要使用回调处理收到的消息。
让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:主机域名、雅安服务器托管、营销软件、网站建设、沁县网站维护、网站推广。
一个是基本的回调,一个是使用接口功能实现回调,对接口是个很好的学习。
1.正常回调
kafka的接收消息处。收到消息后,使用传入的Onmessage进行处理。
调用kafka接收消息的单元,并在调用方写好回调
在调用方实现回调需要执行的方法
感觉还是使用基本回调相对简单点,接口就当学习了。
另外跨包的接口的方法要大写!定位了好久发现个入门的问题。
这10款工具如下:
AKHQ
Kowl
Kafdrop
UI for Apache Kafka
Lenses
CMAK
Confluent CC
Conduktor
LogiKM
kafka-console-ui
如果上面这个地址可以打开,可以直接去看介绍,下文也不再重复说明。
关于前8款的对比,可以看下面这张图片,图片也是于上面,我直接copy过来了(可能有好多同学打不开上面这个链接,就直接看这张图片了解了下吧)
关于这8款工具的介绍,人家说的很清晰了,这里就不再重复说明了,并且这些工具,大部分我也没用过,也没资格评价太多。
考虑到很多同学可能打开github太慢,我下面会把相关基本信息整理一下,供大家快速了解,方便选型。
概览
AKHQ (previously known as KafkaHQ)
开发语言:后端是java为主
Kowl - A Web UI for Apache Kafka
p.s. github上完整的动图这里上传失败,就只放一个静态的截图了,如果可以打开github,建议打开下面的地址直接看吧。
但是这个并不是所有功能都是免费,有部分功能是商业版才有:
开发语言:后端是go为主
Kafdrop – Kafka Web UI
开发语言:后端以java为主
要求jdk11或更高版本
UI for Apache Kafka – Free Web UI for Apache Kafka
开发语言:后端以java为主
要求jdk13或更高版本
Lenses.io
Apache Kafka 和 Kubernetes 的实时应用程序和数据操作 #DataOps 门户。
CMAK (Cluster Manager for Apache Kafka, previously known as Kafka Manager)
这个想必很多同学都知道,原来的名字就是kafka manager。
开发语言:后端以scala为主
Confluent Inc.
Apache
Conduktor
一个商业版本的桌面客户端
官网找到一个这样的图片,凑合看吧:
LogiKM
滴滴开源的一站式Apache Kafka集群指标监控与运维管控平台。
也是分社区版和商业版的。
这个建议直接看github说明吧,都是中文,内容清晰,相关的资料也都有。
我也简单的了解了下,有个逻辑集群的概念,对于规模比较大的kafka集群管理还是挺好的,不过,这里比较高端的特性都是不开源的,必须商业版才能用。
开发语言:后端以java为主
kafka-console-ui(kafka可视化管理平台)
一款轻量级的kafka可视化管理平台,安装配置快捷、简单易用。界面风格有点类似rocketmq-console。
这款权当是“王婆卖瓜,自卖自夸”吧,一个小工具,如果刚接触kafka的同学或者是中小型集群,想找个简单易用的,可以考虑一下。
开发语言:后端以java和scala为主
参考链接:
一、Kafka简述
1. 为什么需要用到消息队列
异步:对比以前的串行同步方式来说,可以在同一时间做更多的事情,提高效率;
解耦:在耦合太高的场景,多个任务要对同一个数据进行操作消费的时候,会导致一个任务的处理因为另一个任务对数据的操作变得及其复杂。
缓冲:当遇到突发大流量的时候,消息队列可以先把所有消息有序保存起来,避免直接作用于系统主体,系统主题始终以一个平稳的速率去消费这些消息。
2.为什么选择kafka呢?
这没有绝对的好坏,看个人需求来选择,我这里就抄了一段他人总结的的优缺点,可见原文
kafka的优点:
1.支持多个生产者和消费者2.支持broker的横向拓展3.副本集机制,实现数据冗余,保证数据不丢失4.通过topic将数据进行分类5.通过分批发送压缩数据的方式,减少数据传输开销,提高吞高量6.支持多种模式的消息7.基于磁盘实现数据的持久化8.高性能的处理信息,在大数据的情况下,可以保证亚秒级的消息延迟9.一个消费者可以支持多种topic的消息10.对CPU和内存的消耗比较小11.对网络开销也比较小12.支持跨数据中心的数据复制13.支持镜像集群
kafka的缺点:
1.由于是批量发送,所以数据达不到真正的实时2.对于mqtt协议不支持3.不支持物联网传感数据直接接入4.只能支持统一分区内消息有序,无法实现全局消息有序5.监控不完善,需要安装插件6.需要配合zookeeper进行元数据管理7.会丢失数据,并且不支持事务8.可能会重复消费数据,消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高
3. Golang 操作kafka
3.1. kafka的环境
网上有很多搭建kafka环境教程,这里就不再搭建,就展示一下kafka的环境,在kubernetes上进行的搭建,有需要的私我,可以发yaml文件
3.2. 第三方库
github.com/Shopify/sarama // kafka主要的库*github.com/bsm/sarama-cluster // kafka消费组
3.3. 消费者
单个消费者
funcconsumer(){varwg sync.WaitGroup consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{ fmt.Println("Failed to start consumer: %s", err)return} partitionList, err := consumer.Partitions("test0")//获得该topic所有的分区iferr !=nil{ fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList { pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{ fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return} wg.Add(1)gofunc(sarama.PartitionConsumer){//为每个分区开一个go协程去取值formsg :=rangepc.Messages() {//阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value)) }deferpc.AsyncClose() wg.Done() }(pc) } wg.Wait()}funcmain(){ consumer()}
消费组
funcconsumerCluster(){ groupID :="group-1"config := cluster.NewConfig() config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始从最新的offset开始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{ glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){ errors := c.Errors() noti := c.Notifications()for{select{caseerr := -errors: glog.Errorln(err)case-noti: } } }(c)formsg :=rangec.Messages() { fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value)) c.MarkOffset(msg,"")//MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset}}funcmain(){goconsumerCluster()}
3.4. 生产者
同步生产者
packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){ config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//写到随机分区中,默认设置8个分区config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{} msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!") client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{ fmt.Println("producer close err, ", err)return}deferclient.Close() pid, offset, err := client.SendMessage(msg)iferr !=nil{ fmt.Println("send message failed, ", err)return} fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}
异步生产者
funcasyncProducer(){ config := sarama.NewConfig() config.Producer.Return.Successes =true//必须有这个选项config.Producer.Timeout =5* time.Second p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//这个部分一定要写,不然通道会被堵塞gofunc(p sarama.AsyncProducer){ errors := p.Errors() success := p.Successes()for{select{caseerr := -errors:iferr !=nil{ glog.Errorln(err) }case-success: } } }(p)for{ v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000)) fmt.Fprintln(os.Stdout, v) msg := sarama.ProducerMessage{ Topic: topics, Value: sarama.ByteEncoder(v), } p.Input() - msg time.Sleep(time.Second *1) }}funcmain(){goasyncProducer()select{ }}
3.5. 结果展示-
同步生产打印:
分区ID:0,offset:90
消费打印:
Partition:0,Offset:90,key:,value:Hello World!
异步生产打印:
async:7272async:7616async:998
消费打印:
Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998
本项目用于移动端的数据统计,项目地址: 。开源的数据统计countly做的很好,但是基础免费版的功能实在不够看,因此我就决定用go语言来写了这个项目,一来可以在实践中学习go语言,二来也可以开发功能完整的开源平台。该项目正在开发中,欢迎有兴趣的gopher一起参与。
数据存储方面使用的是mongodb。由于数据统计业务几乎不涉及到事务以及严格的一致性场景,而且mongodb的自动分片功能可以支撑较大的数据量。使用大数据的存储组件的话就太过于重了。因此选用mongodb。
业务逻辑整体基于事件的发布订阅。当收到客户端请求, frontend 会对请求数据进行处理,然后发布响应的事件。 backend 收到事件后进行统计处理。
后台展示基于Vue-Admin-Template开发,本人前端能力基本就是依葫芦画瓢,希望有前端大神来开发后台页面,项目地址:
目前客户端API仅有2个。一个是上报 openApp 打开APP时间,一个是上报 usageTime 一次启动使用时长事件。SDK方面也需要移动端的大神开发,感兴趣的大佬可以一起开发。
下面放一点后台页面的效果图:
GoAnalytics是基于go实现的一个数据统计平台,用于统计移动端的数据指标,比如启动次数、用户增长、活跃用户、留存等指标分析。前端数据展示项目是 goanalytics-web 。目前正在积极开发中,欢迎提交新的需求和pull request。
Go版本需要支持module,本地开发测试
cmd/goanalytics_kafka 和 goanalytics_rmq 是分别基于 kafka 和 rocketmq 的发布订阅功能做的数据发布
和订阅处理,横向扩展能力比 local 高。另外由于 rocketmq 还没有原生基于 go 的客户端(原生客户端正在开发中
2.0.0 road map ),可能会存在问题。
项目结构
├── README.md
├── api
│ ├── authentication 用户认证、管理API
│ ├── middlewares GIN 中间件
│ └── router API route
├── cmd
│ ├── account 生成admin账号命令
│ ├── analytic_local 不依赖消息系统的goanalytics
│ ├── goanalytics_kafka 基于kafak的goanalytics
│ ├── goanalytics_rmq 基于rocketmq的goanalytics
│ └── test_data 生成测试数据命令
├── common
│ └── data.go
├── conf 配置
│ └── conf.go
├── event
│ ├── codec 数据编解码
│ └── pubsub 消息发布订阅
├── go.mod
├── go.sum
├── metric 所有的统计指标在这里实现
│ ├── init.go
│ └── user 用户相关指标的实现
├── schedule
│ └── schedule.go 定时任务调度
├── storage 存储模块
│ ├── counter.go 计数器接口
│ ├── data.go
│ └── mongodb 基于mongodb实现的存储及计数器
└── utils
├── date.go
├── date_test.go
├── errors.go
└── key.go
Filebeat是elastic公司beats系列工具中的一个,主要用于收集本地日志。
在服务器上安装后,filebeat会监控日志目录或者指定的日志文件,追踪读取这些文件(追踪文件的变化,不停的读),并且转发这些信息到配置文件中指定的输出端(例如:elasticsearch,logstarsh或kafka)。
Filebeat使用go语言开发,使用时没有其他依赖,比logstash-forworder轻量,不会占用部署服务器太多的资源。
filebeat的工作流程:当你开启filebeat程序的时候,它会启动一个或多个探测器(prospectors)去检测你指定的日志目录或文件,对于探测器找出的每一个日志文件,filebeat启动收割进程(harvester),每一个收割进程读取一个日志文件的新内容,并发送这些新的日志数据到处理程序(spooler),处理程序会集合这些事件,最后filebeat会发送集合的数据到你指定的地点。
2.配置filebeat
配置filebeat需要编辑filebeat的配置文件,不同安装方式,配置文件的存放路径有一些不同, 对于 rpm 和 deb的方式, 配置文件路径的是 /etc/filebeat/filebeat.yml,对于压缩包的方式,配置文件存在在解压目录下(例如:我是在home目录下进行的解压,那么配置文件的路径就应该是~/filebeat-6.2.4-linux-x86_64/filebeat.yml)。
由于我的预期目标是将filebeat收集的日志发送到kafka,所以配置output就选择了kafka。读者可根据自己的使用场景,配置output。
例子中的配置将对/var/log目录下所有以.log结尾的文件进行采集。
3.启动
本文中只是为满足需求对filebeat进行了最基本的配置。filebeat的很多重要的配置和特性并没有体现(例如:模块,多行消息),读者如果需要更深入的了解请参考: 。
欢迎大家在评论区讨论使用过程的心得和疑惑。
本文主要研究一下golang的zap的ZapKafkaWriter
WriteSyncer内嵌了io.Writer接口,定义了Sync方法;Sink接口内嵌了zapcore.WriteSyncer及io.Closer接口;ZapKafkaWriter实现Sink接口及zapcore.WriteSyncer接口,其Write方法直接将data通过kafka发送出去。