CDH 错误集

Posted by danner on March 2, 2019

kafka

console-consumer 无法消费 msg

安装好 kafka 后,命令行测试创建主题,生产数据都没有问题,但consumer 始终无法接收到数据。kafka log(/var/local/kafka/data) 下主题文件存在且有数据,--desctibe 查看主题状态读正常。在log 目录 /var/log/kafka 下查看日志发现这么一行

2019-11-01 11:12:17,506 ERROR kafka.server.KafkaApis: 
[KafkaApi-43] Number of alive brokers '2' does not meet the required replication factor '3' 
for the offsets topic (configured via 'offsets.topic.replication.factor'). 
This error can be ignored if the cluster is starting up and not all brokers are up yet.

意思是 offsets topic 副本数是3,而当前运行的 kafka broker 只有 2;broker 必须要大于等于 offset副本数,可以配置 offsets.topic.replication.factor 。知道解决方案了,直接去 CDH web 界面的 kafka 配置里修改

保存配置然后重启 kafka 即可。

问题解决后,我们来说说为什么会有这个问题:刚开始安装 kafka 是有三台的,但有台机器内存吃不消就删除了,导致offset 副本默认是3。

Spark

kafka version

spark + Kafka 处理时,必须上传 kafka-clientsspark-streaming-kafka jar。在运行过程中可能提示

19/11/04 19:30:20 INFO utils.AppInfoParser: Kafka version : 0.9.0-kafka-2.0.2
19/11/04 19:30:20 INFO utils.AppInfoParser: Kafka commitId : unknown
Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V

提示找不到方法,但我们已经上传 kafka-clients,为什么还是找不到?看日志的第一行,kafka version 是0.9.0,但 CDH 的kafka 版本是2.2.1,为何会这样? 需要在 cdh 的spark 中设置 kafka 版本,默认是 0.9:

选择 None 就是自动选择 cdh 中安装的 Kafka 版本

19/11/05 09:48:26 INFO utils.AppInfoParser: Kafka version: 2.2.1
19/11/05 09:48:26 INFO utils.AppInfoParser: Kafka commitId: 55783d3133a5a49a

NoSuchMethodError

出现这个错误的原因是类中没有该方法(注意哦,不是找不到类),本质上是与kafka version 的错误一样,jar 版本冲突。以 guava 为例,平台环境(spark + yarn) 用的版本是11.0.1,代码使用的版本是 20.0,很显然20版本中有些函数在 11版本是不存在的。通常情况下,spark 作业执行时肯定是加载环境的jar 然后加载作业的jar,则在运行过程中guava 版本为 11,所以就出现 NoSuchMethodError 。解决错误也很简单,只需让运行过程中加载的 guava 版本是20 即可。

  • local 模式:
    • 所有代码都在 Driver(在本地),只不过跑了多线程而已,spark-submit 启动是添加 --driver-class-path /.../guava-20.0.jar 即可,这样就会预先加载 20版本的 guava
  • yarn-client
    • 增加了 Executor,那么就需要对该节点机器设置,增加环境变量 --conf spark.executor.extraClassPath=/.../guava-20.0.jar --conf spark.executor.userClassPathFirst=true;指定在Executor 中优先使用用户的 jar ,那么就会只加载 20版本的 guava
  • yarn-cluster
    • Driver 也在远程,设置 --driver-class-path 已经没有用了,要按照 Executor 一样设置: --conf spark.driver.extraClassPath --conf spark.driver.userClassPathFirst=true

以上做了这么多,无非只想做到一点:作业的jar 和平台已有的jar 版本冲突时,先加载作业的jar 包。上面涉及到的参数,看官网的详细描述。