实际开发过程中遇到了需要实现选择性聚合的场景,即对于某一个key对应的数据,将满足条件的记录进行聚合,不满足条件的则不进行聚合。
创新互联建站-专业网站定制、快速模板网站建设、高性价比松阳网站开发、企业建站全套包干低至880元,成熟完善的模板库,直接使用。一站式松阳网站制作公司更省心,省钱,快速模板网站建设找我们,业务覆盖松阳地区。费用合理售后完善,10多年实体公司更值得信赖。使用spark处理这种计算场景时,想到了使用combineByKey算子,先将输入数据中的value映射成含一个元素的ArrayBuffer(scala中相当于java中的ArrayList),然后在聚合时对满足聚合条件的记录聚合后覆盖这一个ArrayBuffer,不满足条件的待聚合的两条记录都填入ArrayBuffer。最后调用flatMap将ArrayBuffer中的元素分拆。
比如下面的代码实现了对某个字段聚合时,按照时间条件进行选择性的聚合:
val rdd1 = sc.textFile(dayDayDir).union(sc.textFile(thisDayDir))
.map(line => line.split("\\|"))
.filter(arr => if(arr.length != 14 || !arr(3).substring(0, 8).equals(lastDay)) false else true)
.map(arr => (arr(0), arr))
.reduceByKey( (pure, after) => reduceSession(pure, after))
.map(tup => (tup._2(13), tup._2))
.combineByKey( x => ArrayBuffer(x),
(x:ArrayBuffer[Array[String]],y) => combineMergeValue(x, y),
(x:ArrayBuffer[Array[String]],y:ArrayBuffer[Array[String]]) => combineMergeCombiners(x, y))
.flatMap(tup => arrToStr(tup._2))
def combineMergeValue(x:ArrayBuffer[Array[String]], y:Array[String])
: ArrayBuffer[Array[String]] = {
var outList = x.clone()
var outarr = y.clone()
var flag = true
for(i <- 0 until outList.length){
if(checkTime(outList(i)(3), outList(i)(4), y(3), y(4))) {
outarr = reduceSession(outList(i), y)
outList(i) = outarr
flag = false
}
}
if(flag) {
outList += y
}
outList
}
def combineMergeCombiners(x:ArrayBuffer[Array[String]], y:ArrayBuffer[Array[String]])
: ArrayBuffer[Array[String]] = {
var outList = x.clone();
for(i <- 0 until y.length){
var outarr = y(i).clone()
var flag = true
for(j <- 0 until outList.length){
if(checkTime(outList(j)(3), outList(j)(4), y(i)(3), y(i)(4))) {
outarr = reduceSession(outList(j), y(i))
outList(j) = outarr
flag = false
}
}
if(flag) {
outList += y(i)
}
}
outList
}
另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。