deploy.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. __author__ = 'lmstack'
  2. #coding=utf-8
  3. import os
  4. import datetime
  5. import pandas as pd
  6. from LIB.BACKEND import DBManager, Log
  7. from sqlalchemy import create_engine
  8. from sqlalchemy.orm import sessionmaker
  9. import time, datetime
  10. import dateutil.relativedelta
  11. import traceback
  12. from LIB.MIDDLE.CellStateEstimation.Common import log
  13. from pandas.core.frame import DataFrame
  14. from LIB.MIDDLE.SaftyCenter.Common import QX_BatteryParam
  15. from LIB.MIDDLE.SaftyCenter.diagfault.SC_SamplingSafty import SamplingSafty
  16. from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import DBDownload
  17. from urllib import parse
  18. import pymysql
  19. import pdb
  20. from apscheduler.schedulers.blocking import BlockingScheduler
  21. import datacompy
  22. from LIB.MIDDLE.SaftyCenter.Common import FeiShuData
  23. from LIB.MIDDLE.SaftyCenter.Common import QX_BatteryParam
  24. from LIB.MIDDLE.SaftyCenter.diagfault import CBMSBatDiag
  25. from LIB.MIDDLE.SaftyCenter.diagfault.SC_SamplingSafty import SamplingSafty
  26. def fun():
  27. global df_sn
  28. global db_res_engine
  29. global logger
  30. global df_Diag_Ram
  31. # 读取结果数据库
  32. host2='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
  33. port2=3306
  34. db2='safety_platform'
  35. user2='qx_read'
  36. password2='Qx@123456'
  37. start=time.time()
  38. end_time=datetime.datetime.now()
  39. start_time=end_time-datetime.timedelta(seconds=120)
  40. start_time=start_time.strftime('%Y-%m-%d %H:%M:%S')
  41. end_time=end_time.strftime('%Y-%m-%d %H:%M:%S')
  42. logger.info("cycle start !!!!!!!!!!!!!!!!!!!!")
  43. start=time.time()
  44. end_time=datetime.datetime.now()
  45. start_time=end_time-datetime.timedelta(seconds=130)
  46. start_time=start_time.strftime('%Y-%m-%d %H:%M:%S')
  47. end_time=end_time.strftime('%Y-%m-%d %H:%M:%S')
  48. df_read_Yunw = FeiShuData.getFeiShuDATA()#运维表格数据
  49. df_read_Yunw.rename(columns={'电池编码':'product_id'},inplace=True)
  50. df_read_Yunw.rename(columns={'内容描述':'info'},inplace=True)
  51. df_read_Yunw.rename(columns={'发生时间':'start_time'},inplace=True)
  52. df_read_Yunw.rename(columns={'维修信息':'advice'},inplace=True)
  53. for i in range(0, len(df_sn)):
  54. try:
  55. if df_sn.loc[i, 'imei'][5:9] == 'N640':
  56. celltype=1 #6040三元电芯
  57. elif df_sn.loc[i, 'imei'][5:9] == 'N440':
  58. celltype=2 #4840三元电芯
  59. elif df_sn.loc[i, 'imei'][5:9] == 'L660':
  60. celltype=99 # 6060锂电芯
  61. elif df_sn.loc[i, 'imei'][3:5] == 'LX' and df_sn.loc[i, 'imei'][5:9] == 'N750':
  62. celltype=3 #力信 50ah三元电芯
  63. elif df_sn.loc[i, 'imei'][3:5] == 'CL' and df_sn.loc[i, 'imei'][5:9] == 'N750':
  64. celltype=4 #CATL 50ah三元电芯
  65. else:
  66. logger.info("pid-{} celltype-{} SN: {} SKIP!".format(os.getpid(), "未知", sn))
  67. continue
  68. sn = df_sn.loc[i, 'sn']
  69. logger.info("pid-{} celltype-{} SN: {} START!".format(os.getpid(), celltype, sn))
  70. param=QX_BatteryParam.BatteryInfo(celltype)
  71. #读取原始数据库数据........................................................................................................................................................
  72. dbManager = DBManager.DBManager()
  73. df_data = dbManager.get_data(sn=sn, start_time=start_time, end_time=end_time, data_groups=['bms'])
  74. df_bms = df_data['bms']
  75. #读取结果数据库数据........................................................................................................................................................
  76. host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
  77. port=3306
  78. db='qx_cas'
  79. user='qx_read'
  80. password='Qx@123456'
  81. mode=1
  82. tablename1='cellstateestimation_soh'
  83. tablename2='cellstateestimation_uniform_socvoltdiff'
  84. #电池诊断................................................................................................................................................................
  85. DBRead=DBDownload.DBDownload(host, port, db, user, password,mode)
  86. with DBRead as DBRead:
  87. df_soh=DBRead.getdata('time_st,sn,soh,cellsoh', tablename=tablename1, sn=sn, timename='time_sp', st=start_time, sp=end_time)
  88. df_uniform=DBRead.getdata('time,sn,cellsoc_diff,cellmin_num,cellmax_num', tablename=tablename2, sn=sn, timename='time', st=start_time, sp=end_time)
  89. #电池诊断................................................................................................................................................................
  90. CellFltInfo=DataFrame(columns=['start_time', 'end_time', 'product_id', 'code', 'level', 'info','advice'])
  91. df_Diag_Ram = df_Diag_Ram.drop_duplicates(subset=['product_id','code','start_time','Batpos','info'],keep='first')#sn之外的故障
  92. df_Diag_Ram_sn = df_Diag_Ram.loc[df_Diag_Ram['product_id']==sn]#历史故障
  93. df_Diag_Ram_sn_else = pd.concat([df_Diag_Ram,df_Diag_Ram_sn,df_Diag_Ram_sn]).drop_duplicates(subset=['product_id','code','start_time','Batpos','info'],keep=False)#sn之外的故障
  94. CellFltInfo = df_Diag_Ram_sn.drop('Batpos',axis=1)
  95. df_Diag_Ram_add = pd.DataFrame()
  96. df_Diag_Ram_Update_change = pd.DataFrame()
  97. if not df_bms.empty:
  98. df_Diag_Batdiag_update_xq=SamplingSafty.main(sn,param,df_bms,CellFltInfo)#学琦计算故障
  99. BatDiag=CBMSBatDiag.BatDiag(sn,celltype,df_bms, df_soh, df_uniform, CellFltInfo)#鹏飞计算
  100. df_Diag_Batdiag_update=BatDiag.diag()
  101. df_Diag_Cal_Update_add = pd.concat([df_Diag_Batdiag_update_xq,df_Diag_Batdiag_update])#重新计算的该SN下的故障
  102. df_Diag_Cal_Update_temp = df_Diag_Cal_Update_add.drop_duplicates(subset=['product_id','start_time','end_time','code','info'], keep='first', inplace=False, ignore_index=False)#去除相同故障
  103. df_Diag_cal_early_unfix = pd.DataFrame()
  104. df_Diag_Cal_finish = pd.DataFrame()
  105. df_Diag_cal_early_fix = pd.DataFrame()
  106. if not df_Diag_Cal_Update_temp.empty:
  107. #------------------------------合并两者故障,并将同一sn号下的车辆故障放一起----------------------------------------------
  108. df_Diag_Cal_Update = df_Diag_Cal_Update_temp#替换上一行
  109. df_Diag_Cal_finish = df_Diag_Cal_Update.loc[df_Diag_Cal_Update['end_time'] != '0000-00-00 00:00:00']
  110. df_Diag_Cal_new = df_Diag_Cal_Update.loc[df_Diag_Cal_Update['end_time'] == '0000-00-00 00:00:00']
  111. df_Diag_Cal_finish['Batpos'] = 1
  112. df_Diag_Cal_new['Batpos'] = 0
  113. df_feishu_sta = df_read_Yunw.loc[(df_read_Yunw['product_id'] == sn)]#飞书中该sn车辆状态
  114. if df_feishu_sta.empty:#飞书中没有该sn记录故障的新增
  115. df_Diag_cal_early_unfix = df_Diag_Cal_new#如果为新出故障,则直接记录在df_diag_frame中
  116. else:
  117. df_Diag_cal_later = df_Diag_Cal_new.loc[pd.to_datetime(df_Diag_Cal_new['start_time']) > max(pd.to_datetime(df_feishu_sta['start_time']))]#故障表中故障时间晚于飞书记录时间的新增
  118. df_Diag_cal_early = pd.concat([df_Diag_Cal_new,df_Diag_cal_later,df_Diag_cal_later]).drop_duplicates(subset=['product_id','code','start_time'],keep=False)#故障表中故障时间早于飞书记录时间
  119. df_feishu_sta_latest = df_feishu_sta.loc[pd.to_datetime(df_feishu_sta['start_time']) == max(pd.to_datetime(df_feishu_sta['start_time']))]#飞书中该SN下的最新故障
  120. df_feishu_diag_unfix = (df_feishu_sta_latest['advice'] == '需正常返仓') | (df_feishu_sta_latest['advice'] == '需紧急返仓')
  121. if any(df_feishu_diag_unfix):
  122. df_Diag_cal_early_unfix = df_Diag_Cal_new
  123. else:
  124. df_Diag_cal_early_fix = df_Diag_cal_early
  125. df_Diag_cal_early_unfix = df_Diag_cal_later
  126. if not df_Diag_cal_early_fix.empty:
  127. df_Diag_cal_early_fix['Batpos'] = 1
  128. df_Diag_Ram_Update = pd.concat([df_Diag_cal_early_unfix,df_Diag_cal_early_fix,df_Diag_Cal_finish])
  129. df_Diag_Ram_Update['start_time'] = pd.to_datetime(df_Diag_Ram_Update['start_time'])
  130. df_Diag_Ram_Update.sort_values(by = ['start_time'], axis = 0, ascending=True,inplace=True)#该sn下当次诊断的故障状态
  131. df_Diag_Ram_add = pd.concat([df_Diag_Ram_Update,df_Diag_Ram_sn,df_Diag_Ram_sn]).drop_duplicates(subset=['start_time','code'],keep=False)#此次判断中新增故障
  132. df_Diag_Ram_Update_old = pd.concat([df_Diag_Ram_Update,df_Diag_Ram_add,df_Diag_Ram_add]).drop_duplicates(subset=['start_time','code'],keep=False)#此次判断中新增故障
  133. df_Diag_Ram_Update_change = pd.concat([df_Diag_Ram_Update_old,df_Diag_Ram_sn,df_Diag_Ram_sn]).drop_duplicates(subset=['start_time','code','Batpos'],keep=False)#此次判断中新增故障
  134. df_Diag_Ram_sav = df_Diag_Ram_Update.loc[df_Diag_Ram_Update['Batpos'] == 0]
  135. df_Diag_Ram = pd.concat([df_Diag_Ram_sn_else,df_Diag_Ram_sav])
  136. # if (len(df_Diag_Ram_add) > 0) | (len(df_Diag_Ram_Update_change) > 0):#历史及现有故障
  137. # df_Diag_Ram.to_csv(r'D:\Work\Code_write\data_analyze_platform\USER\01Screen_Problem\result.csv',index=False,encoding='GB18030')
  138. if len(df_Diag_Ram_add) > 0:#新增故障
  139. df_Diag_Ram_add.columns = ['start_time', 'end_time', 'product_id', 'code', 'level', 'info','advice', 'Batpos']
  140. df_Diag_Ram_add['factory'] = '骑享'
  141. df_Diag_Ram_add.to_sql("all_fault_info",con=db_res_engine, if_exists="append",index=False)
  142. # df_Diag_Ram_add.to_csv(r'D:\Work\Code_write\data_analyze_platform\USER\01Screen_Problem\result_add.csv',index=False,encoding='GB18030')
  143. if len(df_Diag_Ram_Update_change) > 0:#更改故障
  144. df_Diag_Ram_Update_change = df_Diag_Ram_Update_change.reset_index(drop=True)
  145. try:
  146. conn = pymysql.connect(host=host2, port=port2, user=user2, password=password2, database=db2)
  147. cursor = conn.cursor()
  148. logger.info(df_Diag_Ram_Update_change)
  149. for i in range(0,len(df_Diag_Ram_Update_change)):
  150. sql = '''
  151. update all_fault_info set end_time='{}', Batpos={} where product_id='{}' and code={} and start_time='{}'
  152. '''.format(df_Diag_Ram_Update_change.loc[i,'end_time'], df_Diag_Ram_Update_change.loc[i, 'Batpos'],
  153. df_Diag_Ram_Update_change.loc[i,'product_id'], df_Diag_Ram_Update_change.loc[i,'code'],
  154. df_Diag_Ram_Update_change.loc[i,'start_time'])
  155. logger.info(sql)
  156. cursor.execute(sql)
  157. conn.commit()
  158. conn.close();
  159. except:
  160. logger.error(traceback.format_exc)
  161. logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True)
  162. conn.close();
  163. # df_Diag_Ram_Update_change.to_csv(r'D:\Work\Code_write\data_analyze_platform\USER\01Screen_Problemm\result_change.csv',index=False,encoding='GB18030')
  164. end=time.time()
  165. logger.info("pid-{} celltype-{} SN: {} DONE!".format(os.getpid(), celltype, sn))
  166. except:
  167. logger.error(traceback.format_exc)
  168. logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True)
  169. logger.info("cycle DONE !!!!!!!!!!!!!!!!!!!!")
  170. if __name__ == "__main__":
  171. # 时间设置
  172. # now_time = datetime.datetime.now()
  173. # pre_time = now_time + dateutil.relativedelta.relativedelta(days=-1)# 前一日
  174. # end_time=datetime.datetime.strftime(now_time,"%Y-%m-%d 00:00:00")
  175. # start_time=datetime.datetime.strftime(pre_time,"%Y-%m-%d 00:00:00")
  176. history_run_flag = False # 历史数据运行标志
  177. # 更新sn列表
  178. host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com'
  179. port=3306
  180. db='qixiang_oss'
  181. user='qixiang_oss'
  182. password='Qixiang2021'
  183. conn = pymysql.connect(host=host, port=port, user=user, password=password, database=db)
  184. cursor = conn.cursor()
  185. cursor.execute("select sn, imei, add_time from app_device")
  186. res = cursor.fetchall()
  187. df_sn = pd.DataFrame(res, columns=['sn', 'imei', 'add_time'])
  188. df_sn = df_sn.reset_index(drop=True)
  189. conn.close();
  190. # 数据库配置
  191. host = 'rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
  192. port = 3306
  193. user = 'qx_cas'
  194. password = parse.quote_plus('Qx@123456')
  195. database = 'qx_cas'
  196. db_engine = create_engine(
  197. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  198. user, password, host, port, database
  199. ))
  200. DbSession = sessionmaker(bind=db_engine)
  201. # 运行历史数据配置
  202. df_first_data_time = pd.read_sql("select * from bat_first_data_time", db_engine)
  203. # 日志配置
  204. now_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()).replace(":","_")
  205. log_path = 'log/' + now_str
  206. if not os.path.exists(log_path):
  207. os.makedirs(log_path)
  208. log = Log.Mylog(log_name='saftyCenter_diagfault', log_level = 'info')
  209. log.set_file_hl(file_name='{}/info.log'.format(log_path), log_level='info', size=1024* 1024 * 100)
  210. log.set_file_hl(file_name='{}/error.log'.format(log_path), log_level='error', size=1024* 1024 * 100)
  211. logger = log.get_logger()
  212. logger.info("pid is {}".format(os.getpid()))
  213. # 算法参数
  214. host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
  215. port=3306
  216. db='safety_platform'
  217. user='qx_read'
  218. password=parse.quote_plus('Qx@123456')
  219. tablename='all_fault_info'
  220. db_res_engine = create_engine(
  221. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  222. user, password, host, port, db
  223. ))
  224. #............................模块运行前,先读取数据库中所有结束时间为0的数据,需要从数据库中读取................``
  225. # print("select start_time, end_time, product_id, code, level, info, advice, factory from {}".format(tablename))
  226. df_Diag_Ram=pd.read_sql("select start_time, end_time, product_id, code, level, info, advice, Batpos from all_fault_info where factory = '{}'".format('骑享'), db_res_engine)
  227. # result = result[['start_time', 'end_time', 'product_id', 'code', 'level', 'info', 'advice']]
  228. # df_Diag_Ram=result[result['end_time']=='0000-00-00 00:00:00']
  229. #定时任务.......................................................................................................................................................................
  230. fun()
  231. scheduler = BlockingScheduler()
  232. scheduler.add_job(fun, 'interval', seconds=600, id='diag_job')
  233. try:
  234. scheduler.start()
  235. except Exception as e:
  236. scheduler.shutdown()
  237. logger.error(str(e))