本篇文章为大家展示了在Spark Streaming job中如何读取Kafka messages及其offsetRange,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
创新互联是一家集网站建设,红安企业网站建设,红安品牌网站建设,网站定制,红安网站建设报价,网络营销,网络优化,红安网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。
在Spark Streaming job中读取Kafka topic(s)中的messages时,有时我们会需要同步记录下每次读取的messages的offsetRange。要达到这一目的,下面这两段代码(代码1和代码2)都是正确的,而且是等价的。
代码1(正确):
-----------------------
JavaPairInputDStream
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.foreachRDD(
new Function
@Override
public Void call(JavaPairRDD
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
JavaRDD
long msgNum = processEachRDD(valueRDD, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
代码2(正确):
-----------------------
JavaPairInputDStream
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
final AtomicReference
lines = messages.transformToPair(new Function
@Override
public JavaPairRDD
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets);
return rdd;
}
}).map(new Function
@Override
public String call(Tuple2
return tuple2._2();
}
});
lines.foreachRDD(new Function
@Override
public Void call(JavaRDD
long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
OffsetRange[] offsets = offsetRanges.get();
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
但是要注意,下面这两段代码(代码3和代码4)是错误的,它们都会抛出一个exception:java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
代码3(错误):
-----------------------
JavaPairInputDStream
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.transform(new Function
@Override
public JavaRDD
return rdd.values();
}
}).foreachRDD(new Function
@Override
public Void call(JavaRDD
long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
OffsetRange[] offsets = offsetRanges.get();
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
代码4(错误):
-----------------------
JavaPairInputDStream
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.map(new Function
@Override
public String call(Tuple2
return tuple2._2();
}
}).foreachRDD(new Function
@Override
public Void call(JavaRDD
long msgNum = processEachRDD(rdd, outputFolderPath, definedDuration);
if (msgNum > 0 && zkPathRoot!= null) {
OffsetRange[] offsets = offsetRanges.get();
writeOffsetToZookeeper(zkClient, zkPathRoot, offsets);
}
return null;
}
});
上述内容就是在Spark Streaming job中如何读取Kafka messages及其offsetRange,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注创新互联行业资讯频道。