这篇文章主要介绍“Executor容错安全性实例分析”,在日常操作中,相信很多人在Executor容错安全性实例分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Executor容错安全性实例分析”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
公司主营业务:成都网站设计、成都网站建设、移动网站开发等业务。帮助企业客户真正实现互联网宣传,提高企业的竞争能力。创新互联是一支青春激扬、勤奋敬业、活力青春激扬、勤奋敬业、活力澎湃、和谐高效的团队。公司秉承以“开放、自由、严谨、自律”为核心的企业文化,感谢他们对我们的高要求,感谢他们从不同领域给我们带来的挑战,让我们激情的团队有机会用头脑与智慧不断的给客户带来惊喜。创新互联推出邢台县免费做网站回馈大家。
sparkstreaming会不断的接收数据、不断的产生job、不断的提交job。所以有一个至关重要的问题就是数据安全性。由于sparkstreaming是基于sparkcore的,如果我们可以确保数据安全可靠的话(sparkstreaming生产job的时候里面是基于RDD),即使运行的时候出现错误或者故障,也可以基于RDD的容错的能力自动进行恢复。所以要确保数据的安全性。
对于executor的安全容错主要是数据的安全容错。Executor计算时候的安全容错是借助spark core的RDD的,所以天然是安全的。
数据安全性的一种方式是存储一份副本,另一种方式是不做副本,但是数据源支持重放(也就是可以反复的读取数据源的数据),如果之前读取的数据出现问题,可以重新读取数据。
做副本的方式可以借助blockmanager做备份。Blockmanager存储数据的时候有很多storagelevel,Receiver接收数据后,存储的时候指定storagelevel为MEMORY_AND_DISK_SER_2的方式。Blockmanager早存储的时候会先考虑memory,只有memory不够的时候才会考虑disk,一般memory都是够的。所以至少两个executor上都会有数据,假设一个executor挂掉,就会马上切换到另一个executor。
ReceiverSupervisorImpl在存储数据的时候会有两种方式,一种是WAL的方式,究竟是不是WAL得方式是通过配置修改的。默认是false。如果用WAL的方式必须有checkpoint的目录,因为WAL的数据是放在checkpoint的目录之下的。
def enableReceiverLog(conf: SparkConf): Boolean = {
conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)
}
Storagelevel是在构建inputDstream的时候传入的,默认就是MEMORY_AND_DISK_SER_2。
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
现在来看ReceiverSupervisorImpl在存储数据的另一种方式(副本方式)。注释中说的很清楚,根据指定的storagelevel把接收的blocks交给blockmanager。也就是通过blockmanager来存储。
/**
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
* stores the received blocks into a block manager with the specified storage level.
*/
private[streaming] class BlockManagerBasedBlockHandler(
blockManager: BlockManager, storageLevel: StorageLevel)
Blockmanager存储的时候会分为多种不同的数据类型,ArrayBufferBlock,IteratorBlock,ByteBufferBlock。
Blockmanager存储数据前面已经讲过了。Receiver在接收到数据后除了在自己这个executor上面存储,还会在另外一个executor上存储。如果一个executor出现问题会瞬间切换到另一个executor。
WAL的方式原理:在具体的目录下会做一份日志,假设后续处理的过程中出了问题,可以基于日志恢复,日志是写在checkpoint下。在生产环境下checkpoint是在HDFS上,这样日志就会有三份副本。
下面就是用WAL存储数据的类,先写日志再交给blockmanager存储。
/**
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
* stores the received blocks in both, a write ahead log and a block manager.
*/
private[streaming] class WriteAheadLogBasedBlockHandler(
如果采用WAL的方式,存储数据的时候就不需要有两份副本,这样太浪费内存,如果storagelevel.replication大于1就会打印警告日志。
private val effectiveStorageLevel = {
if (storageLevel.deserialized) {
logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
s" write ahead log is enabled, change to serialization false")
}
if (storageLevel.replication > 1) {
logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
s"write ahead log is enabled, change to replication 1")
}
StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
}
这里采用两条线程的线程池,使得blockmanager存储数据和write ahead log可以并发的执行。
// For processing futures used in parallel block storing into block manager and write ahead log
// # threads = 2, so that both writing to BM and WAL can proceed in parallel
implicit private val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))
这个是把日志写入WAL中
// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {
writeAheadLog.write(serializedBlock, clock.getTimeMillis())
}
负责读写WAL的是WriteAheadLog,这是一个抽象类,负责写入、读取、清除数据的功能。在写入数据后会返回一个句柄,以供读取数据使用。
看一下具体写入数据的实现。如果失败并且失败次数小于最大的失败次数就会重试。确实是返回了一个句柄。
/**
* Write a byte buffer to the log file. This method synchronously writes the data in the
* ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
* to HDFS, and will be available for readers to read.
*/
def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized {
var fileSegment: FileBasedWriteAheadLogSegment = null
var failures = 0
var lastException: Exception = null
var succeeded = false
while (!succeeded && failures < maxFailures) {
try {
fileSegment = getLogWriter(time).write(byteBuffer)
if (closeFileAfterWrite) {
resetWriter()
}
succeeded = true
} catch {
case ex: Exception =>
lastException = ex
logWarning("Failed to write to write ahead log")
resetWriter()
failures += 1
}
}
if (fileSegment == null) {
logError(s"Failed to write to write ahead log after $failures failures")
throw lastException
}
fileSegment
}
下面就是把数据写入HDFS的代码
/** Write the bytebuffer to the log file */
def write(data: ByteBuffer): FileBasedWriteAheadLogSegment = synchronized {
assertOpen()
data.rewind() // Rewind to ensure all data in the buffer is retrieved
val lengthToWrite = data.remaining()
val segment = new FileBasedWriteAheadLogSegment(path, nextOffset, lengthToWrite)
stream.writeInt(lengthToWrite)
if (data.hasArray) {
stream.write(data.array())
} else {
// If the buffer is not backed by an array, we transfer using temp array
// Note that despite the extra array copy, this should be faster than byte-by-byte copy
while (data.hasRemaining) {
val array = new Array[Byte](data.remaining)
data.get(array)
stream.write(array)
}
}
flush()
nextOffset = stream.getPos()
segment
}
不管是WAL还是直接交给blockmanager都是采用副本的方式。还有一种是数据源支持数据存放,典型的就是kafka。Kafka已经成为了数据存储系统,它天然具有容错和数据副本。
Kafka有receiver和direct的方式。Receiver的方式其实是交给zookeper来管理matadata的(偏移量offset),如果数据处理失败后,kafka会基于offset重新读取数据。为什么可以重新读取?如果程序崩溃或者数据没处理完是不会给zookeper发ack。Zookeper就认为这个数据没有被消费。实际生产环境下越来越多的使用directAPI的方式,直接去操作kafka并且是自己管理offset。这就可以保证有且只有一次的容错处理。DirectKafkaInputDstream,它会去看最新的offset,并把这个内容放入batch中。
获取最新的offset,通过最新的offset减去上一个offset就可以确定读哪些数据,也就是一个batch中的数据。
@tailrec
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
// Either.fold would confuse @tailrec, do it manually
if (o.isLeft) {
val err = o.left.get.toString
if (retries <= 0) {
throw new SparkException(err)
} else {
log.error(err)
Thread.sleep(kc.config.refreshLeaderBackoffMs)
latestLeaderOffsets(retries - 1)
}
} else {
o.right.get
}
}
容错的弊端就是消耗性能,占用时间。也不是所有情况都不能容忍数据丢失。有些情况下可以不进行容错来提高性能。
假如一次处理1000个block,但是有1个block出错,就需要把1000个block进行重新读取或者恢复,这也有性能问题。
到此,关于“Executor容错安全性实例分析”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!