Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion app/controllers/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.util.Properties

import kafka.manager.ActorModel.TopicIdentity
import kafka.manager.utils.TopicConfigs
import kafka.manager.{Kafka_0_8_2_1, ApiError, Kafka_0_8_2_0, Kafka_0_8_1_1}
import kafka.manager.{Kafka_0_8_2_1, ApiError, Kafka_0_8_2_0, Kafka_0_8_1_1, TopicListExtended}
import models.FollowLink
import models.form._
import models.navigation.Menus
Expand Down Expand Up @@ -79,6 +79,31 @@ object Topic extends Controller{
)(AddTopicPartitions.apply)(AddTopicPartitions.unapply)
)

val defaultAddMultipleTopicsPartitionsForm = Form(
mapping(
"topics" -> seq {
mapping(
"name" -> nonEmptyText,
"selected" -> boolean
)(TopicSelect.apply)(TopicSelect.unapply)
},
"brokers" -> seq {
mapping(
"id" -> number(min = 0),
"host" -> nonEmptyText,
"selected" -> boolean
)(BrokerSelect.apply)(BrokerSelect.unapply)
},
"partitions" -> number(min = 1, max = 10000),
"readVersions" -> seq {
mapping(
"topic" -> nonEmptyText,
"version" -> number(min = 0)
)(ReadVersion.apply)(ReadVersion.unapply)
}
)(AddMultipleTopicsPartitions.apply)(AddMultipleTopicsPartitions.unapply)
)

val defaultUpdateConfigForm = Form(
mapping(
"topic" -> nonEmptyText.verifying(maxLength(250), validateName),
Expand Down Expand Up @@ -179,6 +204,26 @@ object Topic extends Controller{
}
}

def addPartitionsToMultipleTopics(clusterName: String) = Action.async { implicit request =>
val errorOrFormFuture = kafkaManager.getTopicListExtended(clusterName).flatMap { errorOrTle =>
errorOrTle.fold( e => Future.successful(-\/(e)),{ topicListExtended =>
kafkaManager.getBrokerList(clusterName).map { errorOrBrokerList =>
errorOrBrokerList.map { bl =>
val tl = kafkaManager.topicListSortedByNumPartitions(topicListExtended)
val topics = tl.map(t => t._1).map(t => TopicSelect.from(t))
// default value is the largest number of partitions among existing topics with topic identity
val partitions = tl.head._2.map(_.partitions).getOrElse(0)
val readVersions = tl.map(t => t._2).flatMap(t => t).map(ti => ReadVersion(ti.topic, ti.readVersion))
defaultAddMultipleTopicsPartitionsForm.fill(AddMultipleTopicsPartitions(topics,bl.list.map(bi => BrokerSelect.from(bi)),partitions,readVersions))
}
}
})
}
errorOrFormFuture.map { errorOrForm =>
Ok(views.html.topic.addPartitionsToMultipleTopics(clusterName, errorOrForm))
}
}

def handleAddPartitions(clusterName: String, topic: String) = Action.async { implicit request =>
defaultAddPartitionsForm.bindFromRequest.fold(
formWithErrors => Future.successful(BadRequest(views.html.topic.addPartitions(clusterName, topic,\/-(formWithErrors)))),
Expand All @@ -197,6 +242,27 @@ object Topic extends Controller{
)
}

def handleAddPartitionsToMultipleTopics(clusterName: String) = Action.async { implicit request =>
defaultAddMultipleTopicsPartitionsForm.bindFromRequest.fold(
formWithErrors => Future.successful(BadRequest(views.html.topic.addPartitionsToMultipleTopics(clusterName, \/-(formWithErrors)))),
addMultipleTopicsPartitions => {
val topics = addMultipleTopicsPartitions.topics.filter(_.selected).map(_.name)
val brokers = addMultipleTopicsPartitions.brokers.filter(_.selected).map(_.id)
val readVersions = addMultipleTopicsPartitions.readVersions.map{ rv => (rv.topic, rv.version) }.toMap
kafkaManager.addMultipleTopicsPartitions(clusterName, topics, brokers, addMultipleTopicsPartitions.partitions, readVersions).map { errorOrSuccess =>
Ok(views.html.common.resultOfCommand(
views.html.navigation.clusterMenu(clusterName,"Topics","Add Partitions to Multiple Topics",Menus.clusterMenus(clusterName)),
models.navigation.BreadCrumbs.withNamedViewAndCluster("Topics",clusterName,"Add Partitions to Multiple Topics"),
errorOrSuccess,
"Add Partitions to All Topics",
FollowLink("Go to topic list.",routes.Topic.topics(clusterName).toString()),
FollowLink("Try again.",routes.Topic.addPartitionsToMultipleTopics(clusterName).toString())
))
}
}
)
}

private def updateConfigForm(clusterName: String, ti: TopicIdentity) = {
kafkaManager.getClusterConfig(clusterName).map { errorOrConfig =>
errorOrConfig.map { clusterConfig =>
Expand Down
8 changes: 8 additions & 0 deletions app/kafka/manager/ActorModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ object ActorModel {
partitions: Int,
partitionReplicaList: Map[Int, Seq[Int]],
readVersion: Int) extends CommandRequest
case class CMAddMultipleTopicsPartitions(topicsAndReplicas: Seq[(String, Map[Int, Seq[Int]])],
brokers: Seq[Int],
partitions: Int,
readVersions: Map[String,Int]) extends CommandRequest
case class CMUpdateTopicConfig(topic: String, config: Properties, readVersion: Int) extends CommandRequest
case class CMDeleteTopic(topic: String) extends CommandRequest
case class CMRunPreferredLeaderElection(topics: Set[String]) extends CommandRequest
Expand All @@ -79,6 +83,10 @@ object ActorModel {
partitions: Int,
partitionReplicaList: Map[Int, Seq[Int]],
readVersion: Int) extends CommandRequest
case class KCAddMultipleTopicsPartitions(topicsAndReplicas: Seq[(String, Map[Int, Seq[Int]])],
brokers: Seq[Int],
partitions: Int,
readVersions: Map[String, Int]) extends CommandRequest
case class KCUpdateTopicConfig(topic: String, config: Properties, readVersion: Int) extends CommandRequest
case class KCDeleteTopic(topic: String) extends CommandRequest
case class KCPreferredReplicaLeaderElection(topicAndPartition: Set[TopicAndPartition]) extends CommandRequest
Expand Down
21 changes: 21 additions & 0 deletions app/kafka/manager/ClusterManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,27 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
}
} pipeTo sender()

case CMAddMultipleTopicsPartitions(topicsAndReplicas, brokers, partitions, readVersions) =>
implicit val ec = longRunningExecutionContext
val eventualBrokerList = withKafkaStateActor(KSGetBrokers)(identity[BrokerList])
val eventualDescriptions = withKafkaStateActor(KSGetTopicDescriptions(topicsAndReplicas.map(x=>x._1).toSet))(identity[TopicDescriptions])
eventualDescriptions.map { topicDescriptions =>
val topicsWithoutDescription = topicsAndReplicas.map(x=>x._1).filter{t => !topicDescriptions.descriptions.map(td => td.topic).contains(t) }
require(topicsWithoutDescription.isEmpty, "Topic(s) don't exist: [%s]".format(topicsWithoutDescription.mkString(", ")))
eventualBrokerList.flatMap {
bl => {
val brokerSet = bl.list.map(_.id).toSet
val nonExistentBrokers = getNonExistentBrokers(bl, brokers)
require(nonExistentBrokers.isEmpty, "Nonexistent broker(s) selected: [%s]".format(nonExistentBrokers.mkString(", ")))
withKafkaCommandActor(KCAddMultipleTopicsPartitions(topicsAndReplicas, brokers.filter(brokerSet.apply), partitions, readVersions))
{
kcResponse: KCCommandResult =>
CMCommandResult(kcResponse.result)
}
}
}
} pipeTo sender()

case CMUpdateTopicConfig(topic, config, readVersion) =>
implicit val ec = longRunningExecutionContext
val eventualTopicDescription = withKafkaStateActor(KSGetTopicDescription(topic))(identity[Option[TopicDescription]])
Expand Down
8 changes: 8 additions & 0 deletions app/kafka/manager/KafkaCommandActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ class KafkaCommandActor(kafkaCommandActorConfig: KafkaCommandActorConfig) extend
})
}
}
case KCAddMultipleTopicsPartitions(topicsAndReplicas, brokers, partitions, readVersion) =>
longRunning {
Future {
KCCommandResult(Try {
adminUtils.addPartitionsToTopics(kafkaCommandActorConfig.curator, topicsAndReplicas, partitions, brokers, readVersion)
})
}
}
case KCUpdateTopicConfig(topic, config, readVersion) =>
longRunning {
Future {
Expand Down
42 changes: 42 additions & 0 deletions app/kafka/manager/KafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, ThreadPoolExecutor}
import akka.actor.{ActorPath, ActorSystem, Props}
import akka.util.Timeout
import com.typesafe.config.{ConfigFactory, Config}
import controllers.Topic
import kafka.manager.ActorModel._
import org.slf4j.{LoggerFactory, Logger}

Expand Down Expand Up @@ -302,6 +303,34 @@ class KafkaManager(akkaConfig: Config)
}
}

def addMultipleTopicsPartitions(
clusterName: String,
topics: Seq[String],
brokers: Seq[Int],
partitions: Int,
readVersions: Map[String, Int]
): Future[ApiError \/ Unit] =
{
implicit val ec = apiExecutionContext
getTopicListExtended(clusterName).flatMap { tleOrError =>
tleOrError.fold(
e => Future.successful(-\/(e)), { tle =>
// add partitions to only topics with topic identity
val topicsAndReplicas = topicListSortedByNumPartitions(tle).filter(t => topics.contains(t._1) && t._2.nonEmpty).map{ case (t,i) => (t, i.get.partitionsIdentity.mapValues(_.replicas)) }
withKafkaManagerActor(
KMClusterCommandRequest(
clusterName,
CMAddMultipleTopicsPartitions(topicsAndReplicas, brokers, partitions, readVersions)
)
) {
result: Future[CMCommandResult] =>
result.map(cmr => toDisjunction(cmr.result))
}
}
)
}
}

def updateTopicConfig(
clusterName: String,
topic: String,
Expand Down Expand Up @@ -488,4 +517,17 @@ class KafkaManager(akkaConfig: Config)
)
)(identity[Option[ReassignPartitions]])
}

def topicListSortedByNumPartitions(tle: TopicListExtended): Seq[(String, Option[TopicIdentity])] = {
def partition(tiOption: Option[TopicIdentity]): Int = {
tiOption match {
case Some(ti) => ti.partitions
case None => 0
}
}
val sortedByNumPartition = tle.list.sortWith{ (leftE, rightE) =>
partition(leftE._2) > partition(rightE._2)
}
sortedByNumPartition
}
}
23 changes: 20 additions & 3 deletions app/kafka/manager/utils/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ class AdminUtils(version: KafkaVersion) {
newNumPartitions: Int,
partitionReplicaList : Map[Int, Seq[Int]],
brokerList: Seq[Int],
readVersion: Int
) {
readVersion: Int) {

/*
import collection.JavaConverters._
Expand All @@ -207,7 +206,7 @@ class AdminUtils(version: KafkaVersion) {

val brokerListSorted: Seq[Int] = brokerList.sorted
val currentNumPartitions: Int = partitionReplicaList.size

checkCondition(currentNumPartitions > 0,
TopicErrors.PartitionsGreaterThanZero)

Expand Down Expand Up @@ -238,6 +237,24 @@ class AdminUtils(version: KafkaVersion) {
createOrUpdateTopicPartitionAssignmentPathInZK(curator, topic, newPartitionsReplicaList, update=true, readVersion=readVersion)
}

/* Add partitions to multiple topics. After this operation, all topics will have the same number of partitions */
def addPartitionsToTopics(curator: CuratorFramework,
topicAndReplicaList: Seq[(String, Map[Int, Seq[Int]])],
newNumPartitions: Int,
brokerList: Seq[Int],
readVersions: Map[String,Int]) {
val topicsWithoutReadVersion = topicAndReplicaList.map(x=>x._1).filter{t => !readVersions.contains(t)}
checkCondition(topicsWithoutReadVersion.isEmpty, TopicErrors.NoReadVersionFound(topicsWithoutReadVersion.mkString(", ")))

// topicAndReplicaList is sorted by number of partitions each topic has in order not to start adding partitions if any of requests doesn't work with newNumPartitions
for {
(topic, replicaList) <- topicAndReplicaList
readVersion = readVersions(topic)
} {
addPartitions(curator, topic, newNumPartitions, replicaList, brokerList, readVersion)
}
}

/**
* Update the config for an existing topic and create a change notification so the change will propagate to other brokers
* @param curator: The zk client handle used to write the new config to zookeeper
Expand Down
4 changes: 4 additions & 0 deletions app/kafka/manager/utils/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ object TopicErrors {
class FailedToAddNewPartitions private[TopicErrors] (topic: String, newPartitions: Int, found: Int) extends UtilError(
s"Failed to add new partitions topic=$topic, newPartitions=$newPartitions, after adding new partitions to assignment found=$found"
)
class NoReadVersionFound private[TopicErrors] (topics: String) extends UtilError(
s"Cannot find read version for topics: $topics while adding new partitions"
)
class TopicDoesNotExist private[TopicErrors] (topic: String) extends UtilError(s"Topic does not exist : $topic")

val TopicNameEmpty = new TopicNameEmpty
Expand All @@ -80,6 +83,7 @@ object TopicErrors {
def DuplicateReplicAssignment(topic: String, part: Int, replicas: Seq[Int]) = new DuplicateReplicaAssignment(topic,part,replicas)
def CannotAddZeroPartitions(topic: String, currentPartitions: Int, newPartitions:Int) = new CannotAddZeroPartitions(topic,currentPartitions,newPartitions)
def FailedToAddNewPartitions(topic: String, newPartitions:Int, found: Int) = new FailedToAddNewPartitions(topic,newPartitions,found)
def NoReadVersionFound(topics: String) = new NoReadVersionFound(topics)
def TopicDoesNotExist(topic: String) = new TopicDoesNotExist(topic)
}

2 changes: 2 additions & 0 deletions app/models/form/ReassignPartitionOperation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ object TopicSelect {
}
}

case class ReadVersion(topic: String, version: Int)

case class GenerateAssignment(brokers: Seq[BrokerSelect])
case class GenerateMultipleAssignments(topics: Seq[TopicSelect], brokers: Seq[BrokerSelect])
case class RunMultipleAssignments(topics: Seq[TopicSelect])
Expand Down
1 change: 1 addition & 0 deletions app/models/form/TopicOperation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ case class TConfig(name: String, value: Option[String])
case class CreateTopic(topic: String, partitions: Int, replication: Int, configs: List[TConfig]) extends TopicOperation
case class DeleteTopic(topic: String) extends TopicOperation
case class AddTopicPartitions(topic: String, brokers: Seq[BrokerSelect], partitions: Int, readVersion: Int) extends TopicOperation
case class AddMultipleTopicsPartitions(topics: Seq[TopicSelect],brokers: Seq[BrokerSelect], partitions: Int, readVersions: Seq[ReadVersion]) extends TopicOperation
case class UpdateTopicConfig(topic: String, configs: List[TConfig], readVersion: Int) extends TopicOperation
case class UnknownTO(op: String) extends TopicOperation

89 changes: 89 additions & 0 deletions app/views/topic/addPartitionsToMultipleTopics.scala.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
@*
* Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
* See accompanying LICENSE file.
*@
@import scalaz.{\/}
@(cluster: String, errorOrForm: kafka.manager.ApiError \/ Form[models.form.AddMultipleTopicsPartitions])(implicit request: RequestHeader)

@import helper._
@import b3.vertical.fieldConstructor
@import controllers.routes

@theMenu = {
@views.html.navigation.clusterMenu(cluster,"Topics","Add Partitions to Topics",models.navigation.Menus.clusterMenus(cluster))
}

@checkboxWithLink(field: play.api.data.Field, topic: String) = {
@b3.inputFormGroup(field, withFeedback = false, withLabelFor = false, b3.Args.withDefault(Seq(), 'disabled -> false)) { fieldInfo =>
<div class="checkbox">
<label for="@fieldInfo.id">
<input type="checkbox" id="@fieldInfo.id" name="@fieldInfo.name" value="true" @if(fieldInfo.value == Some("true")){checked} @toHtmlArgs(fieldInfo.innerArgsMap)>
<a href="@routes.Topic.topic(cluster,topic)">@topic</a>
</label>
</div>
}
}

@renderForm(addMultipleTopicsPartitionsForm: Form[models.form.AddMultipleTopicsPartitions]) = {
@b3.form(routes.Topic.handleAddPartitionsToMultipleTopics(cluster)) {
<table class="table">
<thead>
<tr><th>Add Partitions</th><th>Brokers</th></tr>
</thead>
<tbody>
<tr>
&#9888; Selected topics will have the same number of partitions after the operation.
</tr>
<tr>
<td>
@b3.text(addMultipleTopicsPartitionsForm("partitions"), '_label -> "Partitions", 'placeholder -> "8")
@helper.repeat(addMultipleTopicsPartitionsForm("readVersions"), min = 1) { readVersionForm =>
@b3.hidden(readVersionForm("topic").name,readVersionForm("topic").value.getOrElse(""))
@b3.hidden(readVersionForm("version").name,readVersionForm("version").value.getOrElse(-1))
}
<button type="button" class="btn btn-default" onClick="checkBoxSelect('topics',true);">
<b>Select All</b>
</button>
<button type="button" class="btn btn-default" onClick="checkBoxSelect('topics',false);">
<b>Select None</b>
</button>
@helper.repeat(addMultipleTopicsPartitionsForm("topics"), min = 1) { topicSelectForm =>
@b3.hidden(topicSelectForm("name").name,topicSelectForm("name").value.getOrElse(""))
@checkboxWithLink(topicSelectForm("selected"),topicSelectForm("name").value.getOrElse(""))
}
</td>
<td>
<button type="button" class="btn btn-default" onClick="checkBoxSelect('brokers',true);">
<b>Select All</b>
</button>
<button type="button" class="btn btn-default" onClick="checkBoxSelect('brokers',false);">
<b>Select None</b>
</button>
@helper.repeat(addMultipleTopicsPartitionsForm("brokers"), min = 1) { brokerSelectForm =>
@b3.hidden(brokerSelectForm("id").name,brokerSelectForm("id").value.getOrElse(-1))
@b3.hidden(brokerSelectForm("host").name,brokerSelectForm("host").value.getOrElse(""))
@b3.checkbox(brokerSelectForm("selected"), '_text -> s"${brokerSelectForm("id").value.getOrElse(-1)} - ${brokerSelectForm("host").value.getOrElse("")}")
}
</td>
</tr>
</tbody>
</table>
@b3.submit('class -> "submit-button btn btn-primary"){ Add Partitions }
<a href="@routes.Topic.topics(cluster)" class="cancel-button btn btn-default" role="button">Cancel</a>
}
}

@main(
"Add Partitions to Multiple Topics",
menu = theMenu,
breadcrumbs=views.html.navigation.breadCrumbs(models.navigation.BreadCrumbs.withViewAndCluster("Topics",cluster))) {
<div class="col-md-6 un-pad-me">
<div class="panel panel-default">
<div class="panel-heading"><h3><button type="button" class="btn btn-link" onclick="goBack()"><span class="glyphicon glyphicon-arrow-left" aria-hidden="true"></span></button>Add Partitions</h3></div>
<div class="panel-body">
@errorOrForm.fold( views.html.errors.onApiError(_), renderForm(_))
</div>
</div>
</div>
}

Loading