diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index bc07df1ddc12a..424b635062156 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -95,6 +95,8 @@ class LogSegment private[log] (val log: FileRecords, private var created = time.milliseconds + def createdMs: Long = created + /* the number of bytes since we last added an entry in the offset index */ private var bytesSinceLastIndexEntry = 0 @@ -123,6 +125,9 @@ class LogSegment private[log] (val log: FileRecords, /* Return the size in bytes of this log segment */ def size: Int = log.sizeInBytes() + /* Return the index size in bytes of this log segment */ + def indexSize: Int = offsetIndex.sizeInBytes + timeIndex.sizeInBytes + /** * checks that the argument offset can be represented as an integer offset relative to the baseOffset. */ diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala index a2aedf0a29035..38271d192fbfa 100644 --- a/core/src/main/scala/kafka/log/UnifiedLog.scala +++ b/core/src/main/scala/kafka/log/UnifiedLog.scala @@ -47,6 +47,7 @@ import java.nio.file.Files import java.util import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import java.util.{Collections, Optional, OptionalInt, OptionalLong} +import java.util.{List => JList, ArrayList => JArrayList} import scala.annotation.nowarn import scala.collection.mutable.ListBuffer import scala.collection.{Seq, immutable, mutable} @@ -435,10 +436,12 @@ class UnifiedLog(@volatile var logStartOffset: Long, val tags = (Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString) ++ (if (isFuture) Map("is-future" -> "true") else Map.empty)).asJava metricsGroup.newGauge(LogMetricNames.NumLogSegments, () => numberOfSegments, tags) + metricsGroup.newGauge(LogMetricNames.LogSegments, () => logSegmentsDetail, tags) metricsGroup.newGauge(LogMetricNames.LogStartOffset, () => logStartOffset, tags) metricsGroup.newGauge(LogMetricNames.LogEndOffset, () => logEndOffset, tags) metricsGroup.newGauge(LogMetricNames.Size, () => size, tags) metricNames = Map(LogMetricNames.NumLogSegments -> tags, + LogMetricNames.LogSegments -> tags, LogMetricNames.LogStartOffset -> tags, LogMetricNames.LogEndOffset -> tags, LogMetricNames.Size -> tags) @@ -588,6 +591,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, */ def numberOfSegments: Int = localLog.segments.numberOfSegments + /** + * The detailed metrics for log segments + */ + def logSegmentsDetail: JList[String] = { + val list = logSegments.toSeq.map { seg => + s"baseOffset=${seg.baseOffset}; created=${seg.createdMs}; logSize=${seg.size}; indexSize=${seg.indexSize}" + } + new JArrayList[String](list.asJava) + } + /** * Close this log. * The memory mapped buffer for index files of this log will be left open until the log is deleted. @@ -2114,12 +2127,13 @@ object UnifiedLog extends Logging { object LogMetricNames { val NumLogSegments: String = "NumLogSegments" + val LogSegments: String = "LogSegments" val LogStartOffset: String = "LogStartOffset" val LogEndOffset: String = "LogEndOffset" val Size: String = "Size" def allMetricNames: List[String] = { - List(NumLogSegments, LogStartOffset, LogEndOffset, Size) + List(NumLogSegments, LogSegments, LogStartOffset, LogEndOffset, Size) } } diff --git a/docs/ops.html b/docs/ops.html index b506b2860647e..05a8bb769fbe9 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -1847,6 +1847,11 @@