这篇文章主要介绍“IBatchSpout API怎么使用”,在日常操作中,相信很多人在IBatchSpout API怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”IBatchSpout API怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
创新互联建站长期为1000+客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为鹿邑企业提供专业的成都网站设计、成都网站建设,鹿邑网站改版等技术服务。拥有十多年丰富建站经验和众多成功案例,为您定制开发。
IBatchSpout是storm trident推出的一种可以批量发射的Spout。非事务性,基本的spout
1:Map getComponentConfiguration();定义配置,可以用backtype.storm.Config。
2:void open(Map conf, TopologyContext context); Spout的初始化方法 ,参数conf即是getComponentConfiguration定义的配置
3:Fields getOutputFields(); 声明输出的fields
4:void emitBatch(long batchId, TridentCollector collector); 批量发射tuple,本次的批次号为batchId
5:void ack(long batchId);批次号为batchId的数据处理成功
6: void close();
一个例子
package storm.projectA; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import storm.trident.operation.TridentCollector; import storm.trident.spout.IBatchSpout; import backtype.storm.Config; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class MySpout implements IBatchSpout{ /** * */ private static final long serialVersionUID = 1L; private long maxBatchSize;//每批次最大的数量 private BufferedReader br;//源文件流 HashMap>> batches = new HashMap >>();//保存发送过的所有数据,以便于重复发送 /** * @param conf 配置 * @param context */ @Override public void open(Map conf, TopologyContext context) { String filePath = (String)conf.get("filePath"); maxBatchSize = (Long)conf.get("maxBatchSize"); try { br = new BufferedReader(new FileReader(filePath)); } catch (FileNotFoundException e) { e.printStackTrace(); } } /*** spout的发送方法 * @param batchId 批次id * @param collector 批量发射器 */ @Override public void emitBatch(long batchId, TridentCollector collector) { List > batch = batches.get(batchId); if (batch == null) { batch = new ArrayList
>(); for (int i = 0; i < maxBatchSize; i++) { try { String line = br.readLine(); if(line == null){ break; } batch.add(new Values(line)); } catch (IOException e) { e.printStackTrace(); } } } for(List
到此,关于“IBatchSpout API怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!