import json import traceback import os from apscheduler.schedulers.blocking import BlockingScheduler import pandas as pd from ZlwlAlgosCommon.utils.ProUtils import * from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService from ZlwlAlgosCommon.service.iotp.Beans import DataField from train2 import * #读取参数 def download(sn,start_time,end_time,df_snpk_list,df_algo_pack_param): snindex = list(df_snpk_list['sn']).index(sn) for idx in df_tags_dataset.index: d = df_tags_dataset.loc[idx] factory_id = d['factory_id'] user = d['create_by'] sn_list = [d['sn']] start_time = str(d['start_time']) end_time = str(d['end_time']) # print(sn_list) df_data = iotp_datafactory_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time, end_time=end_time,factory=factory_id,user=user) # print(df_data) #break dataset=dataset.append(df_data) df_data['sn']=sn df_data['imei']=df_snpk_list['imei'][snindex] df_data2=pd.DataFrame() if len(df_data)>0: #if df_data.loc[0,'sn'][:5]=='PK504': if df_data['imei'].isnull().any(): df_data['imei']=sn pack_code = df_snpk_list['pack_model'][snindex] df_pack_param= df_algo_pack_param[df_algo_pack_param['pack_code']==pack_code] if len(df_pack_param)>0: celpack_param=json.loads(df_pack_param.iloc[0]['param']) cellnum = celpack_param['CellVoltTotalCount'] tempnum = celpack_param['CellTempTotalCount'] capacity = celpack_param['capacity'] df_data,df_table,cellvolt_name,celltemp_name=DataClean.datacleaning(df_data,cellnum,tempnum) df_data2=features_total(df_data,capacity) return df_data2 #下载全部错误数据 def makedf(df,df_snpk_list,df_algo_pack_param): dataset=pd.DataFrame() df.reset_index(drop=True,inplace=True) split=0 for k in range(len(df)): #for k in range(50):#for k in range(len(df)): try: sn =df.loc[k,'sn'] start_time=str(df.loc[k,'start_time']) end_time=str(df.loc[k,'end_time']) if end_time=='NaT': end_time=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') #type: str df_data2=download(sn,start_time,end_time,df_snpk_list,df_algo_pack_param) split=split+1 df_data2['split']=split dataset=dataset.append(df_data2) except Exception as e: logger.error('故障'+sn+' '+start_time+' '+str(e)) logger.error(traceback.format_exc()) pass return dataset #随机下载正常数据 def makedf_nor(df_snpk_list,df_algo_pack_param): SNnums = list(df_snpk_list['sn']) dataset=pd.DataFrame() split=0 for sn in list(set(SNnums))[:100]: try: snindex = list(df_snpk_list['sn']).index(sn) now_time=datetime.datetime.now() start_time=now_time-datetime.timedelta(hours=random.randint(1,365*24)) end_time=str(start_time+datetime.timedelta(hours=12))[:19] start_time=start_time.strftime('%Y-%m-%d %H:%M:%S') df_data2=download(sn,start_time,end_time,df_snpk_list,df_algo_pack_param) split=split+1 df_data2['split']=split dataset=dataset.append(df_data2) except Exception as e: logger.error('正常'+sn+' '+start_time+' '+str(e)) logger.error(traceback.format_exc()) pass return dataset cur_env = 'dev' # 设置运行环境 app_path = "/home/zhuxi/project/zlwl-algos/" # 设置app绝对路径 app_name = "" log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径 sysUtils = SysUtils(cur_env, app_path) logger = sysUtils.get_logger(app_name, log_base_path) # mysql mysql_algo_params = sysUtils.get_cf_param('mysql-algo') mysqlUtils = MysqlUtils() mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params) db_engine = mysql_algo_engine.connect() # redis redis_params = sysUtils.get_cf_param('redis') redisUtils = RedisUtils() rc = redisUtils.get_redis_conncect(redis_params) hbase_params = sysUtils.get_cf_param('hbase') iotp_service = IotpAlgoService(hbase_params=hbase_params) # dao=Dao() # sys_utils = SysUtils() # mysql_user, mysql_password, mysql_host, mysql_port, mysql_db = sys_utils.get_mysql() # rc = sys_utils.get_redis() # db_engine = create_engine("mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(mysql_user, mysql_password, mysql_host, mysql_port, mysql_db),pool_recycle=7200,pool_size=2) Session = sessionmaker(bind=db_engine) df_algo_adjustable_param, df_algo_list, df_algo_pack_param, df_snpk_list,df_snpk_list_scrap = update_param(db_engine, rc) df_snpk_list=pd.concat([df_snpk_list,df_snpk_list_scrap]) df_snpk_list.reset_index(drop=True,inplace=True) t_tag_child, r_battery_tag, t_algo_alarm_data_tag = update_lable(db_engine, rc) list_fault=list(t_tag_child[t_tag_child['tag_type']==3]['name']) # sel_columns = [packageInfo.Time, batteryStatus.PackCrnt, batteryStatus.PackVolt, batteryStatus.PackSoc, batteryStatus.PackSoh, # batteryStatus.InsulationRssPos, batteryStatus.InsulationRssNeg, batteryStatus.BMSSta, batteryStatus.AccumChrgWh, # batteryStatus.CellVoltage, batteryStatus.CellTemp, batteryStatus.OtherTempName, batteryStatus.OtherTempValue ,batteryStatus.AccumChrgAh,batteryStatus.AccumDsChgAh] sel_columns = [DataField.error_level, DataField.error_code, DataField.pack_crnt, DataField.pack_volt, DataField.bms_sta, DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp, DataField.pack_soc, DataField.other_temp_value, DataField.cell_balance, DataField.pack_soh, DataField.charge_sta] df_data = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time, end_time=end_time) iotp_service.data_clean(df_data) def diag_cal(): global logger for fault in list_fault: list_models=os.listdir('Resources/AI_Fault_Class/V_auto/models') if 'model_'+fault+'.h5' not in list_models: parent_id=int(t_tag_child[t_tag_child['name']==fault]['parent_id'].values[0]) child_id=str(t_tag_child[t_tag_child['name']==fault]['id'].values[0]) object_id=list(r_battery_tag[(r_battery_tag['tag_id']==parent_id) & (r_battery_tag['child_tag_list']==child_id)]['object_id']) df_diag_ram=t_algo_alarm_data_tag[t_algo_alarm_data_tag['id'].isin(object_id)] #判断样本足够 if len(df_diag_ram)>50: #收集样本 logger.info("下载样本") #下载全部错误数据 datatest=makedf(df_diag_ram,df_snpk_list,df_algo_pack_param) #deltatime以秒为单位 datatest.fillna(datatest.median(),inplace=True) # 填充中位数 datatest.to_csv('datatest'+fault+'.csv') #datatest=pd.read_csv('datatest'+fault+'.csv') #datatest=datatest.drop(['Unnamed: 0'],axis=1) #随机下载正常数据 dataset_nor=makedf_nor(df_snpk_list,df_algo_pack_param) dataset_nor.to_csv('datatestnor.csv') #dataset_nor=pd.read_csv('datatestnor.csv') #dataset_nor=dataset_nor.drop(['Unnamed: 0'],axis=1) median=dataset_nor.median() median2=str(median[['PackVolt','BMSSta','PackSoc','temp_max','temp_min','temp_mean','temp_diff','temp2_max','temp2_min', 'temp2_mean', 'temp2_diff']].to_dict()) #print(str(median[['PackVolt','BMSSta','PackSoc','temp_max','temp_min','temp_mean','temp_diff','temp2_max','temp2_min', 'temp2_mean', 'temp2_diff']].to_dict())) dataset_nor.fillna(median,inplace=True) # 填充中位数 logger.info("模型训练") #自动训练 model,scaler,loss_th_max,loss_th_sum,time_steps,key_col=train(datatest,dataset_nor) logger.info("参数及模型保存") #保存参数 #更新algo_list表 list_fault_hist=list(df_algo_list[(df_algo_list['algo_id']>100)&(df_algo_list['algo_id']<200)]['algo_name']) list_pack_code=list(df_algo_adjustable_param['pack_code'].drop_duplicates()) if fault+'_AI' not in list_fault_hist: id=max(list(set(df_algo_list['id'])))+1 algo_id=max(list(df_algo_list[(df_algo_list['algo_id']>100)&(df_algo_list['algo_id']<200)]['algo_id']))+1 create_time=str(datetime.datetime.now()) fault_code='C'+str(int(max(list(set(df_algo_list[(df_algo_list['fault_code']>'C250')&(df_algo_list['fault_code']<'C300')]['fault_code'])))[1:])+1) input_param2=pd.DataFrame({'id':[id],'create_time':[create_time],'create_by':['zhuxi'],'algo_id':[algo_id],'algo_name':[fault+'_AI'],'is_activate':[1],'fault_level':[2],'fault_code':[fault_code],'fault_influence':['存在安全风险'],'model_type':[0],'configurable_flag':[str(1100111)],'is_delete':[0],'model_alarm_type':[1]}) input_param2.to_sql("algo_list",con=db_engine, if_exists="append",index=False) id2=list(df_algo_adjustable_param[df_algo_adjustable_param['algo_id']==algo_id]['id']) else: algo_id=df_algo_list[df_algo_list['algo_name']==fault+'_AI']['algo_id'].values[0] id2=list(range(max(list(set(df_algo_adjustable_param['id'])))+1,max(list(set(df_algo_adjustable_param['id'])))+1+len(list_pack_code))) #更新algo_adjustable_param表 param_ai={"time_steps":str(time_steps),"median":median2} param={"key_feature":key_col,"loss_max":str(loss_th_max),"loss_sum":str(loss_th_sum)} input_param=pd.DataFrame({'id':id2,'algo_id':algo_id,'pack_code':list_pack_code,'param':str(param),'param_ai':str(param_ai)}) session = Session() session.execute("DELETE FROM algo_adjustable_param WHERE algo_id ='{}'".format(algo_id)) session.commit() input_param.to_sql("algo_adjustable_param",con=db_engine, if_exists="append",index=False) #保存模型 pickle.dump(scaler,open('Resources/AI_Fault_Class/V_auto/scalers/scaler_'+fault+'.pkl','wb')) model.save('Resources/AI_Fault_Class/V_auto/models/model_'+fault+'.h5') import datetime import gc import re from multiprocessing import Pool import json import logging import logging.handlers import os import time import traceback import warnings from sqlalchemy import text, delete, and_, or_, update import pandas as pd from ZlwlAlgosCommon.utils.ProUtils import * from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService from ZlwlAlgosCommon.service.iotp.Beans import DataField from ZlwlAlgosCommon.orm.models import * def invoke_algo1(logger, mysql_algo_conn, mysql_algo_Session, start_time, df_data): pass def invoke_algo2(logger, mysql_algo_conn, mysql_algo_Session, start_time, df_data): pass def main(process_num): # 程序不能停止 while(True): warnings.filterwarnings("ignore") try: # 调用算法前的准备工作 kafka_topic_key = 'topic_task_month_1' kafka_groupid_key = 'group_task_month_1' algo_list = ['FaultClass_Train'] # 本调度所包含的算法名列表。 loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger logger_main.info(f"process-{process_num}: 配置中间件") # mysql mysql_algo_params = sysUtils.get_cf_param('mysql-algo') mysqlUtils = MysqlUtils() mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params) mysql_algo_conn = mysql_algo_engine.connect() # redis redis_params = sysUtils.get_cf_param('redis') redisUtils = RedisUtils() redis_conn = redisUtils.get_redis_conncect(redis_params) # hbase hbase_params = sysUtils.get_cf_param('hbase') iotp_service = IotpAlgoService(hbase_params=hbase_params) # kafka kafka_params = sysUtils.get_cf_param('kafka') kafkaUtils = KafkaUtils() kafka_consumer = kafkaUtils.get_kafka_consumer(kafka_params, kafka_topic_key, kafka_groupid_key, client_id=kafka_topic_key) logger_main.info(f"process-{process_num}: 获取算法参数及电池参数") except Exception as e: logger_main.error(f'process-{process_num}: {e}') logger_main.error(f'process-{process_num}: {traceback.format_exc()}') # 开始准备调度 try: logger_main.info(f"process-{process_num}: 监听topic {kafka_params[kafka_topic_key]}等待kafka 调度") param_update_timer = time.time() for message in kafka_consumer: try: logger_main.info(f'process-{process_num}: 收到调度 {message.value}') if mysql_algo_conn.close: mysql_algo_conn = mysql_algo_engine.connect() # 从连接池中获取一个myslq连接 schedule_params = json.loads(message.value) if (schedule_params is None) or (schedule_params ==''): logger_main.info(f'process-{process_num}: {message.value} kafka数据异常,跳过本次运算') continue # kafka 调度参数解析 df_snlist = pd.DataFrame(schedule_params['snlist']) df_algo_adjustable_param = pd.DataFrame([(d['algo_id'], d['param'],d['param_ai']) for d in schedule_params['adjustable_param']], columns=['algo_id', 'param','param_ai']) df_algo_pack_param = json.loads(schedule_params['pack_param'][0]['param']) df_algo_list = pd.DataFrame(schedule_params['algo_list']) start_time = schedule_params['start_time'] end_time = schedule_params['end_time'] pack_code = schedule_params['pack_code'] cell_type = schedule_params['cell_type'] sn_list=df_snlist['sn'].tolist() # 获取标签集数据 hbase_params = sysUtils.get_cf_param('hbase') hbase_datafactory_params = sysUtils.get_cf_param('hbase-datafactory') iotp_service = IotpAlgoService(hbase_params=hbase_params) iotp_datafactory_service = IotpAlgoService(hbase_params=hbase_datafactory_params) mysql_datafactory_params = sysUtils.get_cf_param('mysql-datafactory') mysqlUtils = MysqlUtils() mysql_datafactory_engine, mysql_datafactory_Session= mysqlUtils.get_mysql_engine(mysql_datafactory_params) mysql_datafactory_conn = mysql_datafactory_engine.connect() df_tags_dataset = iotp_datafactory_service.get_dataset_tags(mysql_datafactory_conn) # 取数 columns = [DataField.sn,DataField.time,DataField.error_level,DataField.error_code,DataField.pack_crnt,DataField.pack_volt,DataField.bms_sta,DataField.pack_soh,DataField.cell_voltage, DataField.cell_temp,DataField.pack_soc,DataField.charge_sta,DataField.other_temp_value,DataField.cell_voltage_count,DataField.cell_temp_count] data = rc.get("algo_param_from_mysql:t_device") if pd.isnull(data): df_snpk_list = pd.read_sql("select sn, imei,pack_model from t_device", db_engine) df_snpk_list.rename(columns={'pack_model':'pack_code'}) else: df_snpk_list = pd.DataFrame(json.loads(data)) logger_main.info(f"process-{process_num}: 开始取数") columns = [DataField.error_level, DataField.error_code, DataField.pack_crnt, DataField.pack_volt, DataField.bms_sta, DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp, DataField.pack_soc, DataField.other_temp_value, DataField.cell_balance, DataField.pack_soh, DataField.charge_sta] df_data = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time, end_time=end_time) logger_main.info(f"process-{process_num}: {str(sn_list)}获取到{str(len(df_data))}条数据") except Exception as e: logger_main.error(f'process-{process_num}: {pack_code}运行出错') logger_main.error(f'process-{process_num}: {e}') logger_main.error(f'process-{process_num}: {traceback.format_exc()}') try: # 数据清洗 if len(df_data) == 0: logger_main.info(f"process-{process_num}: 无数据跳过本次运算") continue df_data,df_table,cellvolt_name,celltemp_name=iotp_service.data_clean(df_data,df_algo_pack_param)#进行数据清洗 if len(df_data) == 0: logger_main.info(f"process-{process_num}: 数据清洗完成, 无有效数据,跳过本次运算") continue else: logger_main.info(f"process-{process_num}: {pack_code}, time_type:{df_data.loc[0, 'time']} ~ {df_data.iloc[-1]['time']}, 数据清洗完成") except Exception as e: logger_main.error(f"process-{process_num}:{pack_code}数据清洗出错") logger_main.error(f"process-{process_num}:{e}") logger_main.error(f"process-{process_num}:{traceback.format_exc()}") # 算法调用 try: invoke_algo1(loggers, mysql_algo_conn, mysql_algo_Session, start_time, df_data) except Exception as e: loggers['FaultWarning'].error('{}运行出错'.format(pack_code)) loggers['FaultWarning'].error(str(e)) loggers['FaultWarning'].error(traceback.format_exc()) # 第二个算法调用 try: invoke_algo2(loggers, mysql_algo_conn, mysql_algo_Session, start_time, df_data) except Exception as e: loggers['FaultWarning'].error('{}运行出错'.format(pack_code)) loggers['FaultWarning'].error(str(e)) loggers['FaultWarning'].error(traceback.format_exc()) except Exception as e: logger_main.error(f'process-{process_num}: {pack_code}运行出错') logger_main.error(f'process-{process_num}: {e}') logger_main.error(f'process-{process_num}: {traceback.format_exc()}') finally: iotp_service.close() if __name__ == '__main__': while(True): try: # 配置量 cur_env = 'dev' # 设置运行环境 app_path = "/home/zhuxi/project/zlwl-algos/" # 设置app绝对路径 log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径 app_name = "task_second_1" # 应用名 sysUtils = SysUtils(cur_env, app_path) logger_main = sysUtils.get_logger(app_name, log_base_path) logger_main.info(f"本次主进程号: {os.getpid()}") # 读取配置文件 (该部分请不要修改) processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '1')) # 默认为1个进程 pool = Pool(processes = int(processes)) logger_main.info("开始分配子进程") for i in range(int(processes)): pool.apply_async(main, (i, )) pool.close() logger_main.info("进程分配结束,堵塞主进程") pool.join() except Exception as e: logger_main.error(str(e)) logger_main.error(traceback.format_exc()) finally: handlers = logger_main.handlers.copy() for h in handlers: logger_main.removeHandler(h) pool.terminate()