spark技巧有哪些呢,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
目前创新互联已为成百上千家的企业提供了网站建设、域名、雅安服务器托管、网站托管维护、企业网站设计、金山网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。
1. 设置消息尺寸最大值
def main(args: Array[String]) { System.setProperty("spark.akka.frameSize", "1024") }
2.与yarn结合时设置队列
val conf=new SparkConf().setAppName("WriteParquet") conf.set("spark.yarn.queue","wz111") val sc=new SparkContext(conf)
3.运行时使用yarn分配资源,并设置--num-executors参数
nohup /home/SASadm/spark-1.4.1-bin-hadoop2.4/bin/spark-submit --name mergePartition --class main.scala.week2.mergePartition --num-executors 30 --master yarn mergePartition.jar >server.log 2>&1 &
4.读取impala的parquet,对String串的处理
sqlContext.setConf("spark.sql.parquet.binaryAsString","true")
5.parquetfile的写
case class ParquetFormat(usr_id:BigInt , install_ids:String ) val appRdd=sc.textFile("hdfs://").map(_.split("\t")).map(r=>ParquetFormat(r(0).toLong,r(1))) sqlContext.createDataFrame(appRdd).repartition(1).write.parquet("hdfs://")
6.parquetfile的读
val parquetFile=sqlContext.read.parquet("hdfs://") parquetFile.registerTempTable("install_running") val data=sqlContext.sql("select user_id,install_ids from install_running") data.map(t=>"user_id:"+t(0)+" install_ids:"+t(1)).collect().foreach(println)
7.写文件时,将所有结果汇集到一个文件
repartition(1)
8.如果重复使用的rdd,使用cache缓存
cache()
9.spark-shell 添加依赖包
spark-1.4.1-bin-hadoop2.4/bin/spark-shell local[4] --jars code.jar
10.spark-shell使用yarn模式,并使用队列
spark-1.4.1-bin-hadoop2.4/bin/spark-shell --master yarn-client --queue wz111
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注创新互联行业资讯频道,感谢您对创新互联的支持。