由于最近使用spark kafka 总是出现已成,于是觉得安装一个 kafka 的管理工具来监控一下 kafka. 看到了 yahoo的kafka-manager的介绍,于是决定安装一下试试。

介绍

kafka-manager的主要支持内容:

  1. 管理多个集群
  2. 监控集群的状态(包含:topics, consumers, offsets, brokers, replica distribution, partition distribution)
  3. Run preferred replica election
  4. Generate partition assignments with option to select brokers to use
  5. 分区的修改(基于生成的任务)
  6. 创建一个可选配置的 topic
  7. 删除主题( 0.8.2 以上支持,需要配置delete.topic.enable=true)
  8. topics 可显示已删除的
  9. Batch generate partition assignments for multiple topics with option to select brokers to use
  10. 为多个topic 批量运行分区配置
  11. 为已经存在的topic添加分区
  12. 更新存在的topic
  13. Optionally enable JMX polling for broker level and topic level metrics.
  14. Optionally filter out consumers that do not have ids/ owners/ & offsets/ directories in zookeeper.

下载地址 :https://github.com/yahoo/kafka-manager

安装

安装要求:jdk 1.8+,kafka 0.8.1.1 or 0.8.2.* or 0.9.0.* 开始安装!!! 解压安装(sbt 已经安装好,有点耗时,需要联网更新)

[cmcc@new3 soft]$ cd kafka-manager-master/
[cmcc@new3 kafka-manager-master]$ ll
总用量 72
drwxrwxr-x 9 cmcc cmcc 4096 4月 26 07:11 app
-rw-rw-r-- 1 cmcc cmcc 3023 4月 26 07:11 build.sbt
drwxrwxr-x 2 cmcc cmcc 4096 4月 26 07:11 conf
drwxrwxr-x 2 cmcc cmcc 4096 4月 26 07:11 img
-rw-rw-r-- 1 cmcc cmcc 11323 4月 26 07:11 LICENCE
drwxrwxr-x 2 cmcc cmcc 4096 4月 26 07:11 project
drwxrwxr-x 5 cmcc cmcc 4096 4月 26 07:11 public
-rw-rw-r-- 1 cmcc cmcc 6323 4月 26 07:11 README.md
-rwxr-xr-x 1 cmcc cmcc 19460 4月 26 07:11 sbt
drwxrwxr-x 4 cmcc cmcc 4096 4月 26 07:11 src
drwxrwxr-x 4 cmcc cmcc 4096 4月 26 07:11 test
[cmcc@new3 kafka-manager-master]$ sbt clean dist
[info] Loading project definition from /home/cmcc/soft/kafka-manager-master/project
[info] Updating {file:/home/cmcc/soft/kafka-manager-master/project/}kafka-manager-master-build…
[info] Resolving org.apache.httpcomponents#project;7 …



[info] Done packaging.
[info] Compilation completed in 6.836 s
model contains 659 documentable templates
[info] Main Scala API documentation successful.
[info] Packaging /home/cmcc/soft/kafka-manager-master/target/scala-2.11/kafka-manager_2.11-1.3.0.8-javadoc.jar …
[info] Done packaging.
[info] Packaging /home/cmcc/soft/kafka-manager-master/target/scala-2.11/kafka-manager_2.11-1.3.0.8.jar …
[info] Done packaging.
[info] Packaging /home/cmcc/soft/kafka-manager-master/target/scala-2.11/kafka-manager_2.11-1.3.0.8-sans-externalized.jar …
[info] Done packaging.
[info]
[info] Your package is ready in /home/cmcc/soft/kafka-manager-master/target/universal/kafka-manager-1.3.0.8.zip
[info]
[success] Total time: 422 s, completed 2016-5-25 2:00:52

启动

[cmcc@new3 kafka-manager-1.3.0.8]$ ll
总用量 24
drwxrwxr-x 2 cmcc cmcc 4096 5月 25 02:10 bin
drwxrwxr-x 2 cmcc cmcc 4096 5月 25 02:10 conf
drwxrwxr-x 2 cmcc cmcc 4096 5月 25 02:10 lib
-rw-r–r-- 1 cmcc cmcc 6323 4月 26 07:11 README.md
drwxrwxr-x 3 cmcc cmcc 4096 5月 25 02:10 share

配置

修改配置文件

[cmcc@new3 conf]$ vim application.conf

# Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
# See accompanying LICENSE file.

# This is the main configuration file for the application.
# ~~~~~

# Secret key
# ~~~~~
# The secret key is used to secure cryptographics functions.
# If you deploy your application to several instances be sure to use the same key!
play.crypto.secret=“^<csmm5Fx4d=r2HEX8pelM3iBkFVv?k[mc;IZE<_Qoq8EkX_/7@Zt6dP05Pzea3U”
play.crypto.secret=${?APPLICATION_SECRET}

# The application languages
# ~~~~~
play.i18n.langs=[“en”]

play.http.requestHandler = “play.http.DefaultHttpRequestHandler”

play.application.loader=loader.KafkaManagerLoader

kafka-manager.zkhosts=“new-cdh12:2181,new-cdh13:2181,new-cdh15:2181,new-cdh16:2181,new-cdh17:2181”
kafka-manager.zkhosts=${?ZK_HOSTS}
pinned-dispatcher.type=“PinnedDispatcher”
pinned-dispatcher.executor=“thread-pool-executor”
application.features=[“KMClusterManagerFeature”,“KMTopicManagerFeature”,“KMPreferredReplicaElectionFeature”,“KMReassignPartitionsFeature”]

akka {
loggers = [“akka.event.slf4j.Slf4jLogger”]
loglevel = “INFO”
}

basicAuthentication.enabled=false
basicAuthentication.username=“admin”
basicAuthentication.password=“!@cmcc1234”
basicAuthentication.realm=“Kafka-Manager”

启动

bin/kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=8080

使用常见错误

1 jdk版本原因

java.lang.UnsupportedClassVersionError: com/typesafe/config/ConfigException : Unsupported major.minor version 52.0
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader1.run(URLClassLoader.java:355)atjava.security.AccessController.doPrivileged(NativeMethod)atjava.net.URLClassLoader.findClass(URLClassLoader.java:354)atjava.lang.ClassLoader.loadClass(ClassLoader.java:425)atjava.lang.ClassLoader.loadClass(ClassLoader.java:358)atcom.typesafe.sbt.web.SbtWeb1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at com.typesafe.sbt.web.SbtWebanonfunanonfuncomtypesafetypesafesbtwebwebSbtWeb$$load1.apply(SbtWeb.scala:535)atcom.typesafe.sbt.web.SbtWeb1.apply(SbtWeb.scala:535) at com.typesafe.sbt.web.SbtWebanonfunanonfuncomtypesafetypesafesbtwebwebSbtWeb$$load1.apply(SbtWeb.scala:535)atscala.Option.fold(Option.scala:157)atcom.typesafe.sbt.web.SbtWeb1.apply(SbtWeb.scala:535) at scala.Option.fold(Option.scala:157) at com.typesafe.sbt.web.SbtWeb.comtypesafetypesafesbtwebwebSbtWeb$$load(SbtWeb.scala:549)
at com.typesafe.sbt.web.SbtWeb$$anonfun$globalSettings11anonfunanonfunapply1.apply(SbtWeb.scala:143)atcom.typesafe.sbt.web.SbtWeb1.apply(SbtWeb.scala:143) at com.typesafe.sbt.web.SbtWebanonfunanonfunglobalSettings11anonfunanonfunapply1.apply(SbtWeb.scala:143)atscala.Function11.apply(SbtWeb.scala:143) at scala.Function1anonfunanonfunandThen1.apply(Function1.scala:55)atsbt.Project1.apply(Function1.scala:55) at sbt.Project.setProject(Project.scala:319)
at sbt.BuiltinCommands.doLoadProject(Main.scala:484)atsbt.BuiltinCommands.doLoadProject(Main.scala:484) at sbt.BuiltinCommandsanonfunanonfunloadProjectImpl2.apply(Main.scala:475)atsbt.BuiltinCommands2.apply(Main.scala:475) at sbt.BuiltinCommandsanonfunanonfunloadProjectImpl2.apply(Main.scala:475)atsbt.Command2.apply(Main.scala:475) at sbt.CommandanonfunanonfunapplyEffect11anonfunanonfunapply2.apply(Command.scala:59)atsbt.Command2.apply(Command.scala:59) at sbt.CommandanonfunanonfunapplyEffect11anonfunanonfunapply2.apply(Command.scala:59)atsbt.Command2.apply(Command.scala:59) at sbt.CommandanonfunanonfunapplyEffect22anonfunanonfunapply3.apply(Command.scala:61)atsbt.Command3.apply(Command.scala:61) at sbt.CommandanonfunanonfunapplyEffect22anonfunanonfunapply3.apply(Command.scala:61)atsbt.Command3.apply(Command.scala:61) at sbt.Command.process(Command.scala:93)
at sbt.MainLoop$$anonfun11anonfunanonfunapply1.apply(MainLoop.scala:98)atsbt.MainLoop1.apply(MainLoop.scala:98) at sbt.MainLoop$anonfun11anonfunanonfunapply1.apply(MainLoop.scala:98)atsbt.State1.apply(MainLoop.scala:98) at sbt.State$anon1.process(State.scala:184)atsbt.MainLoop1.process(State.scala:184) at sbt.MainLoop$anonfun1.apply(MainLoop.scala:98)atsbt.MainLoop1.apply(MainLoop.scala:98) at sbt.MainLoop$anonfun1.apply(MainLoop.scala:98)atsbt.ErrorHandling1.apply(MainLoop.scala:98) at sbt.ErrorHandling.wideConvert(ErrorHandling.scala:17)
at sbt.MainLoop.next(MainLoop.scala:98)atsbt.MainLoop.next(MainLoop.scala:98) at sbt.MainLoop.run(MainLoop.scala:91)
at sbt.MainLoop$$anonfun$runWithNewLog1.apply(MainLoop.scala:70)atsbt.MainLoop1.apply(MainLoop.scala:70) at sbt.MainLoopanonfunanonfunrunWithNewLog1.apply(MainLoop.scala:65)atsbt.Using.apply(Using.scala:24)atsbt.MainLoop1.apply(MainLoop.scala:65) at sbt.Using.apply(Using.scala:24) at sbt.MainLoop.runWithNewLog(MainLoop.scala:65)
at sbt.MainLoop.runAndClearLast(MainLoop.scala:48)atsbt.MainLoop.runAndClearLast(MainLoop.scala:48) at sbt.MainLoop.runLoggedLoop(MainLoop.scala:32)
at sbt.MainLoop.runLogged(MainLoop.scala:24)atsbt.StandardMain.runLogged(MainLoop.scala:24) at sbt.StandardMain.runManaged(Main.scala:53)
at sbt.xMain.run(Main.scala:28)
at xsbt.boot.Launch$$anonfun$run1.apply(Launch.scala:109)atxsbt.boot.Launch1.apply(Launch.scala:109) at xsbt.boot.Launch.withContextLoader(Launch.scala:128)
at xsbt.boot.Launch.run(Launch.scala:109)atxsbt.boot.Launch.run(Launch.scala:109) at xsbt.boot.Launchanonfunanonfunapply1.apply(Launch.scala:35)atxsbt.boot.Launch1.apply(Launch.scala:35) at xsbt.boot.Launch.launch(Launch.scala:117)
at xsbt.boot.Launch.apply(Launch.scala:18)atxsbt.boot.Boot.apply(Launch.scala:18) at xsbt.boot.Boot.runImpl(Boot.scala:41)
at xsbt.boot.Boot$.main(Boot.scala:17)
at xsbt.boot.Boot.main(Boot.scala)
[error] java.lang.UnsupportedClassVersionError: com/typesafe/config/ConfigException : Unsupported major.minor version 52.0
[error] Use ‘last’ for the full log.
Project loading failed: ®etry, (q)uit, (l)ast, or (i)gnore? q

上述错误是因为版本检查不是 jdk8 更换为1.8 即可,