defstartup(): Unit = { /* Schedule the cleanup task to delete old logs */ if (scheduler != null) { info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
scheduler.schedule("kafka-log-retention", cleanupLogs _, delay = InitialTaskDelayMs, period = retentionCheckMs, TimeUnit.MILLISECONDS) // ..... scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period deleteLogs _, delay = InitialTaskDelayMs, unit = TimeUnit.MILLISECONDS) } if (cleanerConfig.enableCleaner) cleaner.startup() }
/** * If topic deletion is enabled, delete any log segments that have either expired due to time based retention * or because the log size is > retentionSize. * * Whether or not deletion is enabled, delete any log segments that are before the log start offset */ defdeleteOldSegments(): Int = { if (config.delete) { deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments() } else { deleteLogStartOffsetBreachedSegments() } }
时间维度 deleteRetentionMsBreachedSegments
1 2 3 4 5 6 7 8
privatedefdeleteRetentionMsBreachedSegments(): Int = { if (config.retentionMs < 0) return0 val startMs = time.milliseconds // 闭包的形式传入函数 //比较时间 现在时间减去最近修改的时间? deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs, reason = s"retention time ${config.retentionMs}ms breach") }
空间维度 deleteRetentionSizeBreachedSegments
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
privatedefdeleteRetentionSizeBreachedSegments(): Int = { if (config.retentionSize < 0 || size < config.retentionSize) return0 //diff 为改 topic 的差值 var diff = size - config.retentionSize
privatedefdeleteSegments(deletable: Iterable[LogSegment], reason: SegmentDeletionReason): Int = { maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") { val numToDelete = deletable.size
if (numToDelete > 0) { // we must always have at least one segment, so if we are going to delete all the segments, create a new one first var segmentsToDelete = deletable if (localLog.segments.numberOfSegments == numToDelete) { val newSegment = roll() if (deletable.last.baseOffset == newSegment.baseOffset) { warn(s"Empty active segment at ${deletable.last.baseOffset} was deleted and recreated due to $reason") segmentsToDelete = deletable.dropRight(1) } } //检测当前的 segments是否还映射在内存中,还有则抛出异常 localLog.checkIfMemoryMappedBufferClosed()
private[log] defremoveAndDeleteSegments(segmentsToDelete: Iterable[LogSegment], asyncDelete: Boolean, reason: SegmentDeletionReason): Unit = { if (segmentsToDelete.nonEmpty) { // Most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by // removing the deleted segment, we should force materialization of the iterator here, so that results of the // iteration remain valid and deterministic. We should also pass only the materialized view of the // iterator to the logic that actually deletes the segments. val toDelete = segmentsToDelete.toList reason.logReason(toDelete) toDelete.foreach { segment => //segments 是个 map ,先删除 内存 中的元数据 segments.remove(segment.baseOffset) } //正式删除 关键函数 LocalLog.deleteSegmentFiles(toDelete, asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent) } }