Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
language: scala
sudo: false
jdk: oraclejdk8
install: true
script: "./sbt clean test"
script: "./sbt clean coverage assembly"
scala:
- 2.11.5
- 2.11.5
#after_success:
# - sbt coverageReport coveralls
2 changes: 1 addition & 1 deletion app/GlobalKafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object GlobalKafkaManager extends GlobalSettings {

override def beforeStart(app: Application): Unit = {
Logger.info("Init kafka manager...")
KafkaManagerContext.getKafkaManger
KafkaManagerContext.getKafkaManager
Thread.sleep(5000)
}

Expand Down
2 changes: 1 addition & 1 deletion app/controllers/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object Application extends Controller {

import play.api.libs.concurrent.Execution.Implicits.defaultContext

private[this] val kafkaManager = KafkaManagerContext.getKafkaManger
private[this] val kafkaManager = KafkaManagerContext.getKafkaManager

def index = Action.async {
kafkaManager.getClusterList.map { errorOrClusterList =>
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scalaz.{-\/, \/-}
object Cluster extends Controller {
import play.api.libs.concurrent.Execution.Implicits.defaultContext

private[this] val kafkaManager = KafkaManagerContext.getKafkaManger
private[this] val kafkaManager = KafkaManagerContext.getKafkaManager

val validateName : Constraint[String] = Constraint("validate name") { name =>
Try {
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/KafkaManagerContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object KafkaManagerContext {
import play.api.Play.current

private[this] val kafkaManager : KafkaManager = new KafkaManager(play.api.Play.configuration.underlying)
def getKafkaManger : KafkaManager = kafkaManager
def getKafkaManager : KafkaManager = kafkaManager
def shutdown() : Unit = {
kafkaManager.shutdown()
}
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/PreferredReplicaElection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scalaz.-\/
object PreferredReplicaElection extends Controller{
import play.api.libs.concurrent.Execution.Implicits.defaultContext

private[this] val kafkaManager = KafkaManagerContext.getKafkaManger
private[this] val kafkaManager = KafkaManagerContext.getKafkaManager


val validateOperation : Constraint[String] = Constraint("validate operation value") {
Expand Down
2 changes: 1 addition & 1 deletion app/controllers/ReassignPartitions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scalaz.{\/-, -\/}
object ReassignPartitions extends Controller{
import play.api.libs.concurrent.Execution.Implicits.defaultContext

private[this] val kafkaManager = KafkaManagerContext.getKafkaManger
private[this] val kafkaManager = KafkaManagerContext.getKafkaManager

val validateOperation : Constraint[String] = Constraint("validate operation value") {
case "confirm" => Valid
Expand Down
11 changes: 7 additions & 4 deletions app/controllers/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.util.Properties

import kafka.manager.ActorModel.TopicIdentity
import kafka.manager.utils.TopicConfigs
import kafka.manager.{ApiError, Kafka_0_8_2_0, Kafka_0_8_1_1}
import kafka.manager.{Kafka_0_8_2_1, ApiError, Kafka_0_8_2_0, Kafka_0_8_1_1}
import models.FollowLink
import models.form._
import models.navigation.Menus
Expand All @@ -29,7 +29,7 @@ import scalaz.{\/-, -\/}
object Topic extends Controller{
import play.api.libs.concurrent.Execution.Implicits.defaultContext

private[this] val kafkaManager = KafkaManagerContext.getKafkaManger
private[this] val kafkaManager = KafkaManagerContext.getKafkaManager

val validateName : Constraint[String] = Constraint("validate name") { name =>
Try {
Expand All @@ -42,7 +42,8 @@ object Topic extends Controller{

val kafka_0_8_1_1_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_8_1_1).map(n => TConfig(n,None)).toList)
val kafka_0_8_2_0_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_8_2_0).map(n => TConfig(n,None)).toList)

val kafka_0_8_2_1_Default = CreateTopic("",1,1,TopicConfigs.configNames(Kafka_0_8_2_1).map(n => TConfig(n,None)).toList)

val defaultCreateForm = Form(
mapping(
"topic" -> nonEmptyText.verifying(maxLength(250), validateName),
Expand Down Expand Up @@ -97,13 +98,14 @@ object Topic extends Controller{
clusterConfig.version match {
case Kafka_0_8_1_1 => defaultCreateForm.fill(kafka_0_8_1_1_Default)
case Kafka_0_8_2_0 => defaultCreateForm.fill(kafka_0_8_2_0_Default)
case Kafka_0_8_2_1 => defaultCreateForm.fill(kafka_0_8_2_1_Default)
}
}
}
}

def topics(c: String) = Action.async {
kafkaManager.getTopicListWithMoreInfo(c).map { errorOrTopicList =>
kafkaManager.getTopicListExtended(c).map { errorOrTopicList =>
Ok(views.html.topic.topicList(c,errorOrTopicList))
}
}
Expand Down Expand Up @@ -201,6 +203,7 @@ object Topic extends Controller{
val defaultConfigMap = clusterConfig.version match {
case Kafka_0_8_1_1 => TopicConfigs.configNames(Kafka_0_8_1_1).map(n => (n,TConfig(n,None))).toMap
case Kafka_0_8_2_0 => TopicConfigs.configNames(Kafka_0_8_2_0).map(n => (n,TConfig(n,None))).toMap
case Kafka_0_8_2_1 => TopicConfigs.configNames(Kafka_0_8_2_1).map(n => (n,TConfig(n,None))).toMap
}
val combinedMap = defaultConfigMap ++ ti.config.toMap.map(tpl => tpl._1 -> TConfig(tpl._1,Option(tpl._2)))
defaultUpdateConfigForm.fill(UpdateTopicConfig(ti.topic,combinedMap.toList.map(_._2),ti.configReadVersion))
Expand Down
45 changes: 45 additions & 0 deletions app/controllers/api/KafkaHealthCheck.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
* See accompanying LICENSE file.
*/

package controllers.api

import controllers.KafkaManagerContext
import play.api.libs.json._
import play.api.mvc._

object KafkaHealthCheck extends Controller {

import play.api.libs.concurrent.Execution.Implicits.defaultContext

private[this] val kafkaManager = KafkaManagerContext.getKafkaManager

def availableBrokers(c: String) = Action.async { implicit request =>
kafkaManager.getBrokerList(c).map { errorOrBrokerList =>
errorOrBrokerList.fold(
error => BadRequest(Json.obj("msg" -> error.msg)),
brokerList => Ok(Json.obj("availableBrokers" -> brokerList.list.map(bi => bi.id)))
)
}
}

def underReplicatedPartitions(c: String, t: String) = Action.async { implicit request =>
kafkaManager.getTopicIdentity(c,t).map { errorOrTopicIdentity =>
errorOrTopicIdentity.fold(
error => BadRequest(Json.obj("msg" -> error.msg)),
topicIdentity => Ok(Json.obj("topic" -> t, "underReplicatedPartitions" -> topicIdentity.partitionsIdentity.filter(_._2.isUnderReplicated).map{case (num, pi) => pi.partNum}))
)
}
}

def unavailablePartitions(c: String, t: String) = Action.async { implicit request =>
kafkaManager.getTopicIdentity(c,t).map { errorOrTopicIdentity =>
errorOrTopicIdentity.fold(
error => BadRequest(Json.obj("msg" -> error.msg)),
topicIdentity => Ok(Json.obj("topic" -> t, "unavailablePartitions" -> topicIdentity.partitionsIdentity.filter(_._2.isr.isEmpty).map{case (num, pi) => pi.partNum}))
)
}
}
}

18 changes: 10 additions & 8 deletions app/kafka/manager/ActorModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ object ActorModel {
case class BVGetView(id: Int) extends BVRequest
case class BVGetTopicMetrics(topic: String) extends BVRequest
case object BVGetBrokerMetrics extends BVRequest
case class BVView(topicPartitions: Map[TopicIdentity, IndexedSeq[Int]],
metrics: Option[BrokerMetrics] = None,
case class BVView(topicPartitions: Map[TopicIdentity, IndexedSeq[Int]], clusterConfig: ClusterConfig,
metrics: Option[BrokerMetrics] = None,
stats: Option[BrokerClusterStats] = None) extends QueryResponse {
def numTopics : Int = topicPartitions.size
def numPartitions : Int = topicPartitions.values.foldLeft(0)((acc,i) => acc + i.size)
Expand Down Expand Up @@ -134,7 +134,7 @@ object ActorModel {
deleteSupported: Boolean) extends QueryResponse
case class TopicDescriptions(descriptions: IndexedSeq[TopicDescription], lastUpdateMillis: Long) extends QueryResponse

case class BrokerList(list: IndexedSeq[BrokerIdentity]) extends QueryResponse
case class BrokerList(list: IndexedSeq[BrokerIdentity], clusterConfig: ClusterConfig) extends QueryResponse

case class PreferredReplicaElection(startTime: DateTime, topicAndPartition: Set[TopicAndPartition], endTime: Option[DateTime]) extends QueryResponse
case class ReassignPartitions(startTime: DateTime, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]], endTime: Option[DateTime]) extends QueryResponse
Expand Down Expand Up @@ -200,7 +200,8 @@ object ActorModel {
numBrokers: Int,
configReadVersion: Int,
config: List[(String,String)],
deleteSupported: Boolean,
deleteSupported: Boolean,
clusterConfig: ClusterConfig,
metrics: Option[BrokerMetrics] = None) {

val replicationFactor : Int = partitionsIdentity.head._2.replicas.size
Expand Down Expand Up @@ -248,7 +249,7 @@ object ActorModel {
import org.json4s.scalaz.JsonScalaz._
import scala.language.reflectiveCalls

implicit def from(brokers: Int,td: TopicDescription, tm: Option[BrokerMetrics]) : TopicIdentity = {
implicit def from(brokers: Int,td: TopicDescription, tm: Option[BrokerMetrics], clusterConfig: ClusterConfig) : TopicIdentity = {
val descJson = parse(td.description._2)
//val partMap = (descJson \ "partitions").as[Map[String,Seq[Int]]]
val partMap = field[Map[String,List[Int]]]("partitions")(descJson).fold({ e =>
Expand Down Expand Up @@ -276,11 +277,11 @@ object ActorModel {
(-1,Map.empty[String, String])
}
}
TopicIdentity(td.topic,td.description._1,partMap.size,tpi,brokers,config._1,config._2.toList,td.deleteSupported, tm)
TopicIdentity(td.topic,td.description._1,partMap.size,tpi,brokers,config._1,config._2.toList,td.deleteSupported, clusterConfig, tm)
}

implicit def from(bl: BrokerList,td: TopicDescription, tm: Option[BrokerMetrics]) : TopicIdentity = {
from(bl.list.size, td, tm)
implicit def from(bl: BrokerList,td: TopicDescription, tm: Option[BrokerMetrics], clusterConfig: ClusterConfig) : TopicIdentity = {
from(bl.list.size, td, tm, clusterConfig)
}

implicit def reassignReplicas(currentTopicIdentity: TopicIdentity,
Expand All @@ -302,6 +303,7 @@ object ActorModel {
currentTopicIdentity.configReadVersion,
currentTopicIdentity.config,
currentTopicIdentity.deleteSupported,
currentTopicIdentity.clusterConfig,
currentTopicIdentity.metrics)
}
}
Expand Down
20 changes: 14 additions & 6 deletions app/kafka/manager/BrokerViewCacheActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
log.info("Stopped actor %s".format(self.path))
log.info("Cancelling updater...")
Try(cancellable.map(_.cancel()))
super.postStop()
}

override protected def longRunningPoolConfig: LongRunningPoolConfig = config.longRunningPoolConfig
Expand Down Expand Up @@ -146,11 +147,14 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
brokerList <- brokerListOption
topicDescriptions <- topicDescriptionsOption
} {
val topicIdentity : IndexedSeq[TopicIdentity] = topicDescriptions.descriptions.map(TopicIdentity.from(brokerList.list.size,_,None))
val topicIdentity : IndexedSeq[TopicIdentity] = topicDescriptions.descriptions.map(
TopicIdentity.from(brokerList.list.size,_,None, config.clusterConfig))
topicIdentities = topicIdentity.map(ti => (ti.topic, ti)).toMap
val topicPartitionByBroker = topicIdentity.flatMap(ti => ti.partitionsByBroker.map(btp => (ti,btp.id,btp.partitions))).groupBy(_._2)
val topicPartitionByBroker = topicIdentity.flatMap(
ti => ti.partitionsByBroker.map(btp => (ti,btp.id,btp.partitions))).groupBy(_._2)

if (config.clusterConfig.jmxEnabled) {
//check for 2*broker list size since we schedule 2 jmx calls for each broker
if (config.clusterConfig.jmxEnabled && hasCapacityFor(2*brokerListOption.size)) {
implicit val ec = longRunningExecutionContext
val brokerLookup = brokerList.list.map(bi => bi.id -> bi).toMap
topicPartitionByBroker.foreach {
Expand All @@ -164,7 +168,8 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
mbsc =>
topicPartitions.map {
case (topic, id, partitions) =>
(topic.topic, KafkaMetrics.getBrokerMetrics(mbsc, Option(topic.topic)))
(topic.topic,
KafkaMetrics.getBrokerMetrics(config.clusterConfig.version, mbsc, Option(topic.topic)))
}
}
val result = tryResult match {
Expand All @@ -188,7 +193,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
Future {
val tryResult = KafkaJMX.doWithConnection(broker.host, broker.jmxPort) {
mbsc =>
KafkaMetrics.getBrokerMetrics(mbsc)
KafkaMetrics.getBrokerMetrics(config.clusterConfig.version, mbsc)
}

val result = tryResult match {
Expand All @@ -201,6 +206,8 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
}
}
}
} else if(config.clusterConfig.jmxEnabled) {
log.warning("Not scheduling update of JMX for all brokers, not enough capacity!")
}

topicPartitionByBroker.foreach {
Expand All @@ -209,7 +216,8 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
case (topic, id, partitions) =>
(topic, partitions)
}.toMap
brokerTopicPartitions.put(brokerId,BVView(topicPartitionsMap, brokerMetrics.get(brokerId)))
brokerTopicPartitions.put(
brokerId,BVView(topicPartitionsMap, config.clusterConfig, brokerMetrics.get(brokerId)))
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions app/kafka/manager/ClusterManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)

private[this] val adminUtils = new AdminUtils(cmConfig.clusterConfig.version)

private[this] val ksProps = Props(classOf[KafkaStateActor],sharedClusterCurator, adminUtils.isDeleteSupported)
private[this] val ksProps = Props(classOf[KafkaStateActor],sharedClusterCurator, adminUtils.isDeleteSupported, cmConfig.clusterConfig)
private[this] val kafkaStateActor : ActorPath = context.actorOf(ksProps.withDispatcher(cmConfig.pinnedDispatcherName),"kafka-state").path

private[this] val bvConfig = BrokerViewCacheActorConfig(
Expand Down Expand Up @@ -176,7 +176,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
bl <- eventualBrokerList
tm <- eventualTopicMetrics
tdO <- eventualTopicDescription
} yield tdO.map( td => CMTopicIdentity(Try(TopicIdentity.from(bl,td,tm))))
} yield tdO.map( td => CMTopicIdentity(Try(TopicIdentity.from(bl,td,tm,cmConfig.clusterConfig))))
result pipeTo sender

case any: Any => log.warning("cma : processQueryResponse : Received unknown message: {}", any)
Expand Down Expand Up @@ -263,7 +263,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
val generated: Future[IndexedSeq[(String, Map[Int, Seq[Int]])]] = for {
bl <- eventualBrokerList
tds <- eventualDescriptions
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None))
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None,cmConfig.clusterConfig))
} yield {
bl.list.map(_.id.toInt)
// check if any nonexistent broker got selected for reassignment
Expand Down Expand Up @@ -301,7 +301,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
val preferredLeaderElections = for {
bl <- eventualBrokerList
tds <- eventualDescriptions
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None))
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None, cmConfig.clusterConfig))
toElect = tis.map(ti => ti.partitionsIdentity.values.filter(!_.isPreferredLeader).map(tpi => TopicAndPartition(ti.topic, tpi.partNum))).flatten.toSet
} yield toElect
preferredLeaderElections.map { toElect =>
Expand All @@ -317,7 +317,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
val topicsAndReassignments = for {
bl <- eventualBrokerList
tds <- eventualDescriptions
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None))
tis = tds.descriptions.map(TopicIdentity.from(bl, _, None, cmConfig.clusterConfig))
} yield {
val reassignments = tis.map { ti =>
val topicZkPath = zkPathFrom(baseTopicsZkPath, ti.topic)
Expand Down
4 changes: 1 addition & 3 deletions app/kafka/manager/KafkaCommandActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ class KafkaCommandActor(kafkaCommandActorConfig: KafkaCommandActorConfig) extend

@scala.throws[Exception](classOf[Exception])
override def postStop(): Unit = {
log.info("Shutting down long running executor...")
Try(longRunningExecutor.shutdown())
super.postStop()
}

Expand All @@ -75,7 +73,7 @@ class KafkaCommandActor(kafkaCommandActorConfig: KafkaCommandActorConfig) extend
val result : KCCommandResult = KCCommandResult(Failure(new UnsupportedOperationException(
s"Delete topic not supported for kafka version ${kafkaCommandActorConfig.version}")))
sender ! result
case Kafka_0_8_2_0 =>
case Kafka_0_8_2_0 | Kafka_0_8_2_1 =>
longRunning {
Future {
KCCommandResult(Try {
Expand Down
Loading