12345678910111213141516171819202122232425262728 |
- threshold = 60 # 阈值s内不重复报警
- info_ram = {}
- for c in kafka_consumer:
- log_data = json.loads(str(c.value,'utf-8'))
- file_path = log_data['log']['file']['path']
- hostname = log_data['fields']['hostname']
- message = log_data['message']
- if 'algo-schedule' in log_data['log']['file']['path']:
- target = "调度"
- elif 'algos' in log_data['log']['file']['path']:
- target = "运行"
- else:
- target = "其他"
- msg = f'''算法{target}报错
- 主机名: {hostname}
- error文件路径: {file_path}
- error内容: {message}
- '''
- hash_res = hashlib.sha256(str(hostname+file_path).encode()).hexdigest()
- if last_time:=(info_ram.get(hash_res, None)):
- if (datetime.datetime.now() - last_time).total_seconds() < threshold: # 跳过
- pass
- else:
- print(msg)
- info_ram.update({hash_res:datetime.datetime.now()})
- else:
- print(msg)
- info_ram.update({hash_res:datetime.datetime.now()})
|