Kafka 消费Lag监控

内容分享2个月前发布
19 0 0

需求描述:lag(滞后)是kafka消费队列性能监控的重大指标,lag的值越大,表明kafka的堆积越严重。本篇文章将使用python脚本+influxdb+grafana的方式对kafka的offset、logsiz和lag这三个参数进行监控,并以图形化的方式进行展现。

 架构描述:使用python收集kafka的相关信息并存储到influxdb里;配置grafana,将influxdb里的数据以图形化的方式展现出来。

一,准备工作


1,kafka,influxdb,grafana的安装(在此不详细描述,默认为阅读文章的各位对这三样工具的使用是熟悉的)

2,查询kafka消费状态的命令/kafka_2.11-0.10.1.0/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker –group group1 –topic topicname1 –zookeeper zoo1:2181,zoo2:2181,zoo3:2181,zoo4:2181,zoo5:2181。本篇文章也将以此条命令输出的信息作为基础编写脚本。

#/kafka_2.11-0.10.1.0/bin/kafka-run-class.shkafka.tools.ConsumerOffsetChecker –group group1 –topic topicname1 –zookeeper zoo1:2181,zoo2:2181,zoo3:2181,zoo4:2181,zoo5:2181Group Topic Pid Offset logSize Lag Owner

group1 topicname1 097833780697839022852422 none

group1 topicname1 197833784097839029552455 none

group1 topicname1 297826355797831605252495 none

group1 topicname1 397830707597835959752522 none

group1 topicname1 497833780397839035852555 none

group1 topicname1 597833781297839039452582none

说明:

group1 组名

topicname1 topic名

我们要用脚本取的,就是输出的这段内容的Offset logSize Lag这三个值,并将所有分片的这些值相加,从而获取单个topic的Offset logSize Lag的值,并将值输出到一个txt文件暂存。我这里使用一个shell脚本来取数据和一个python脚本来讲数据存储到influxdb中的方式来实现。

重大:kafka 0.9版本之后,不支持ConsumerOffsetChecker了,脚本可以改为如下:

./kafka-consumer-groups.sh –group group1 –bootstrap-server broker:9092 –describe | awk {print $1″ “$5} |grep -v ^ $ |grep -v LAG

二,编写脚本提取Offset logSize Lag这三个值


上述命令即可提取group 对应的topic的CURRENT-OFFSET LOG-END-OFFSET LAG,提取到对应的lag即可获取数据堆积情况,然后发送到influxdb即可。

也可以设置一个阈值,当积压超过对应阈值时告警,发送到微信或者钉钉群。

#!/usr/bin/python

# -*- coding: UTF-8 -*-

import time

import urllib2

import urllib

import json

#Read the file

f = open( /usr/monitor/tmp/topic-group1-topicname1-Offset.txt )

Offset_sum = f.read()

f.close()

f = open( /usr/monitor/tmp/topic-group1-topicname1-logSize.txt )

logSize_sum = f.read()

f.close()

f = open( /usr/monitor/tmp/topic-group1-topicname1-Lag.txt )

Lag_sum = f.read()

f.close()

dbreqdata = “group1,topic=topicname1,type=Offset value=”+str(Offset_sum)+


group1,topic=topicname1,type=logSize value=”+str(logSize_sum)+

group1,topic=topicname1,type=Lag value=”+str(Lag_sum)

print dbreqdata

dbrequrl = “http://127.0.0.1:8086/write?db=elkDB”

dbreq= urllib2.Request(url = dbrequrl,data =dbreqdata)

print dbreq

urllib2.urlopen(dbreq)

© 版权声明

相关文章

暂无评论

none
暂无评论...