threshold = 60 # 阈值s内不重复报警 info_ram = {} for c in kafka_consumer: log_data = json.loads(str(c.value,'utf-8')) file_path = log_data['log']['file']['path'] hostname = log_data['fields']['hostname'] message = log_data['message'] if 'algo-schedule' in log_data['log']['file']['path']: target = "调度" elif 'algos' in log_data['log']['file']['path']: target = "运行" else: target = "其他" msg = f'''算法{target}报错 主机名: {hostname} error文件路径: {file_path} error内容: {message} ''' hash_res = hashlib.sha256(str(hostname+file_path).encode()).hexdigest() if last_time:=(info_ram.get(hash_res, None)): if (datetime.datetime.now() - last_time).total_seconds() < threshold: # 跳过 pass else: print(msg) info_ram.update({hash_res:datetime.datetime.now()}) else: print(msg) info_ram.update({hash_res:datetime.datetime.now()})