python通过内置模块监控磁盘、内存、CPU、负载
01. 概述
闲来无事干,说干就干。主要是通过python函数以及python内置模块来实现对磁盘、内存、CPU、负载的数据采集,然后发送到企业号,并到达微信端,方便查看信息。 脚本存放于gitlab上:python监控 原文地址:https://www.mairoot.com/?p=1708
02. 脚本附件
#!/usr/bin/env python3
# coding=utf-8
# Create for mai
# Copyright https://www.mairoot.com
# Create date 2018-10-21
# 引入模块
import os, time, socket, requests, json, urllib.request, re
from collections import namedtuple
from collections import OrderedDict
# ------------------------通用信息--------------------------------
# 业务信息
run_project="test-all"
# 获取日期时间
get_date=time.strftime("%Y-%m-%d", time.localtime())
get_date_time=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# IP地址
def get_host_ip():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
inside_net=get_host_ip()
outside_net="1.1.1.1"
# 主机名
host_name=socket.gethostname()
# 发生事件
hint_content="None"
# ------------------------磁盘信息--------------------------------
# 获取磁盘信息
disk_ntuple = namedtuple('partition', 'device mountpoint fstype')
usage_ntuple = namedtuple('usage', 'total used free percent')
# 获取所有磁盘设备
def disk_partitions(all=False):
"""Return all mountd partitions as a nametuple.
If all == False return phyisical partitions only.
"""
phydevs = []
f = open("/proc/filesystems", "r")
for line in f:
if not line.startswith('none'):
phydevs.append(line.strip())
retlist = []
f = open('/etc/mtab', "r")
for line in f:
if not all and line.startswith('none'):
continue
fields = line.split()
device = fields[0]
mountpoint = fields[1]
fstype = fields[2]
if not all and fstype not in phydevs:
continue
if device == 'none':
device = ''
ntuple = disk_ntuple(device, mountpoint, fstype)
retlist.append(ntuple)
return retlist
# 统计磁盘使用情况
def disk_state(path):
"""Return disk usage associated with path."""
st = os.statvfs(path)
free = (st.f_bavail * st.f_frsize)
total = (st.f_blocks * st.f_frsize)
used = (st.f_blocks - st.f_bfree) * st.f_frsize
try:
percent = ret = (float(used) / total) * 100
except ZeroDivisionError:
percent = 0
return usage_ntuple(round(total/1024/1024/1024, 3), round(used/1024/1024/1024, 3), round(free/1024/1024/1024, 3), round(percent, 3))
# ------------------------内存信息--------------------------------
def get_meminfo():
''' Return the information in /proc/meminfo as a dictionary '''
meminfo=OrderedDict()
with open('/proc/meminfo') as f:
for line in f:
meminfo[line.split(':')[0]] = line.split(':')[1].strip()
return meminfo
# ------------------------负载信息--------------------------------
def get_load():
f = open("/proc/loadavg")
loadstate=f.read().split()
return loadstate
# ------------------------CPU信息--------------------------------
def get_cpu_use():
last_worktime=0
last_idletime=0
f=open("/proc/stat","r")
line=""
while not "cpu " in line: line=f.readline()
f.close()
spl=line.split(" ")
worktime=int(spl[2])+int(spl[3])+int(spl[4])
idletime=int(spl[5])
dworktime=(worktime-last_worktime)
didletime=(idletime-last_idletime)
rate=float(dworktime)/(didletime+dworktime)
cpu_t = rate*100
last_worktime=worktime
last_idletime=idletime
if(last_worktime==0): return 0
return cpu_t
# ------------------------发送信息--------------------------------
# 定义验证信息
ID="ww4730ead71a1818a6"
Secret="y4CCI-4LoiWmuqs6A5kYRpEyzhUlCveKeQ_Ik_WeEW4"
UserID = "mai"
PartyID=1
AppID = 1000002
# 获取token
def get_token():
gurl = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={}&corpsecret={}".format(ID, Secret)
r=requests.get(gurl)
dict_result= (r.json())
return dict_result['access_token']
def get_media_ID(path):
Gtoken = get_token()
img_url = "https://qyapi.weixin.qq.com/cgi-bin/media/upload?access_token={}&type=image".format(Gtoken)
files = {'image': open(path, 'rb')}
r = requests.post(img_url, files=files)
re = json.loads(r.text)
return re['media_id']
# 发送文本
def send_text(text):
post_data = {}
msg_content = {}
msg_content['content'] = text
post_data['touser'] = UserID
post_data['toparty'] = PartyID
post_data['msgtype'] = 'text'
post_data['agentid'] = AppID
post_data['text'] = msg_content
post_data['safe'] = '0'
Gtoken = get_token()
purl1="https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}".format(Gtoken)
json_post_data = json.dumps(post_data,False,False)
request_post = urllib.request.urlopen(purl1,json_post_data.encode(encoding='UTF8'))
return request_post
# 发送图片
def send_tu(path):
img_id = get_media_ID(path)
post_data1 = {}
msg_content1 = {}
msg_content1['media_id'] = img_id
post_data1['touser'] = UserID
post_data1['toparty'] = PartyID
post_data1['msgtype'] = 'image'
post_data1['agentid'] = AppID
post_data1['image'] = msg_content1
post_data1['safe'] = '0'
Gtoken = get_token()
purl2="https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}".format(Gtoken)
json_post_data1 = json.dumps(post_data1,False,False)
request_post = urllib.request.urlopen(purl2,json_post_data1.encode(encoding='UTF8'))
return request_post
# ------------------------修改内容--------------------------------
# 磁盘 -------------------
def disk_send_mgs():
disk_paths=['/', '/boot', '/data']
for disk_path in disk_paths:
disk_new_state=disk_state(disk_path)
disk_percent=float(disk_state(disk_path)[3])
#print(disk_new_state)
#print(disk_percent)
if disk_percent > 80:
run_events="目录{}使用率为{}%".format(disk_path, disk_new_state[3])
disk_send_state="获取时间:{};\n运行业务:{};\n远程地址:{};\n内网地址:{};\n主机名称:{};\n发生事件:{};\n提示内容:{}.".format(get_date_time, run_project, outside_net, inside_net, host_name, run_events, hint_content)
#print (disk_new_state)
send_text(disk_send_state)
# 内存 -------------------
def mem_send_mgs():
get_run_meminfo=get_meminfo() #数据转换为list
memtotal=format(get_run_meminfo['MemTotal'])
memtotal=float(memtotal.split()[0])
memfree=format(get_run_meminfo['MemFree'])
memfree=float(memfree.split()[0])
memswap=format(get_run_meminfo['SwapTotal'])
memswap=float(memswap.split()[0])
memswapfree=format(get_run_meminfo['SwapFree'])
memswapfree=float(memswapfree.split()[0])
mem_percent=round(((memtotal - memfree) / memtotal)*100, 1)
memswap_percent=round(((memswap - memswapfree) / memswap)*100, 1)
if mem_percent > 80:
run_events="内存使用率超过{}%".format(mem_percent)
mem_send_state="获取时间:{};\n运行业务:{};\n远程地址:{};\n内网地址:{};\n主机名称:{};\n发生事件:{};\n提示内容:{}.".format(get_date_time, run_project, outside_net, inside_net, host_name, run_events, hint_content)
send_text(mem_send_state)
#print (mem_percent)
if memswap_percent > 80:
run_events="交换内存使用率超过{}%".format(memswap_percent)
memswap_send_state="获取时间:{};\n运行业务:{};\n远程地址:{};\n内网地址:{};\n主机名称:{};\n发生事件:{};\n提示内容:{}.".format(get_date_time, run_project, outside_net, inside_net, host_name, run_events, hint_content)
send_text(memswap_send_state)
#print (memswap_percent)
# 负载 -------------------
def load_send_mgs():
loadstate=get_load()
loadstate1=loadstate[0]
loadstate15=float(loadstate[2])
if loadstate15 > 5:
run_events="系统负载过高{}%".format(loadstate15)
load_send_state="获取时间:{};\n运行业务:{};\n远程地址:{};\n内网地址:{};\n主机名称:{};\n发生事件:{};\n提示内容:{}.".format(get_date_time, run_project, outside_net, inside_net, host_name, run_events, hint_content)
send_text(load_send_state)
#print(loadstate15)
# CPU -------------------
def cpu_send_mgs():
cpuuse_percent=round(get_cpu_use(),3)
if cpuuse_percent > 80:
run_events="CPU使用过高{}%".format(cpuuse_percent)
cpu_send_state="获取时间:{};\n运行业务:{};\n远程地址:{};\n内网地址:{};\n主机名称:{};\n发生事件:{};\n提示内容:{}.".format(get_date_time, run_project, outside_net, inside_net, host_name, run_events, hint_content)
send_text(cpu_send_state)
# ------------------------执行内容--------------------------------
# 运行main函数
if __name__=='__main__':
disk_send_mgs()
mem_send_mgs()
load_send_mgs()
cpu_send_mgs()
03. 微信端接收信息
