这篇文章主要介绍了python如何实现每5分钟从kafka中提取数据功能,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
创新互联公司专注于汉阴企业网站建设,响应式网站开发,商城网站制作。汉阴网站建设公司,为汉阴等地区提供建站服务。全流程按需定制制作,专业设计,全程项目跟踪,创新互联公司专业和态度为您提供的服务代码
import sys sys.path.append("..") from datetime import datetime from utils.kafka2file import KafkaDownloader import os """ 实现取kafka数据,文件按照取数据的间隔命名 如每5分钟从kafka取数据写入文件中,文件名为当前时间加5 """ TOPIC = "rtz_queue" HOSTS = "ip:9092,ip:9092" GROUP = "2001" def get_end_time(hour,minute,time_step): if (minute+time_step)%60<60: if (minute+time_step)%60<10: return str(hour+int((minute+time_step)/60))+":"+"0"+str((minute+time_step)%60) else: return str(hour+int((minute+time_step)/60))+":"+str((minute+time_step)%60) else: pass def kafkawritefile(time_step,time_num): start = datetime.now() downloader = KafkaDownloader(HOSTS, TOPIC, GROUP) i = 1 while(i<=time_num): end_time = get_end_time(start.hour, start.minute,i*time_step) end_time_file = end_time.replace(':', '_') outfile_path = "/data/tmp/" + end_time_file + ".csv" if os.path.exists(outfile_path): os.remove(outfile_path) writefile = open(outfile_path, 'a+', encoding='utf-8') for msg in downloader.message(): curr_time = datetime.now() curr_time = str(curr_time) split_curr_time = curr_time.split(' ') curr_time_str = split_curr_time[1][0:5] if curr_time_str >= str(end_time): break i += 1 if __name__=='__main__': time_step = 15 time_num = 1 kafkawritefile(time_step,time_num)
感谢你能够认真阅读完这篇文章,希望小编分享的“python如何实现每5分钟从kafka中提取数据功能”这篇文章对大家有帮助,同时也希望大家多多支持创新互联成都网站设计公司,关注创新互联成都网站设计公司行业资讯频道,更多相关知识等着你来学习!
另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、网站设计器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。