monitor.py 1004 B

12345678910111213141516171819202122232425262728
  1. threshold = 60 # 阈值s内不重复报警
  2. info_ram = {}
  3. for c in kafka_consumer:
  4. log_data = json.loads(str(c.value,'utf-8'))
  5. file_path = log_data['log']['file']['path']
  6. hostname = log_data['fields']['hostname']
  7. message = log_data['message']
  8. if 'algo-schedule' in log_data['log']['file']['path']:
  9. target = "调度"
  10. elif 'algos' in log_data['log']['file']['path']:
  11. target = "运行"
  12. else:
  13. target = "其他"
  14. msg = f'''算法{target}报错
  15. 主机名: {hostname}
  16. error文件路径: {file_path}
  17. error内容: {message}
  18. '''
  19. hash_res = hashlib.sha256(str(hostname+file_path).encode()).hexdigest()
  20. if last_time:=(info_ram.get(hash_res, None)):
  21. if (datetime.datetime.now() - last_time).total_seconds() < threshold: # 跳过
  22. pass
  23. else:
  24. print(msg)
  25. info_ram.update({hash_res:datetime.datetime.now()})
  26. else:
  27. print(msg)
  28. info_ram.update({hash_res:datetime.datetime.now()})