deploy.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. __author__ = 'lmstack'
  2. #coding=utf-8
  3. import os
  4. import datetime
  5. import pandas as pd
  6. from LIB.BACKEND import DBManager, Log
  7. from sqlalchemy import create_engine
  8. from sqlalchemy.orm import sessionmaker
  9. import time, datetime
  10. import traceback
  11. from LIB.MIDDLE.CellStateEstimation.Common import log
  12. from LIB.MIDDLE.CellStateEstimation.BatSafetyAlarm.V1_0_1 import CBMSSafetyAlarm
  13. from LIB.MIDDLE.CellStateEstimation.Common import DBDownload
  14. from urllib import parse
  15. import pymysql
  16. import pdb
  17. from apscheduler.schedulers.blocking import BlockingScheduler
  18. import datacompy
  19. import logging
  20. import multiprocessing
  21. #...................................电池包电芯安全诊断函数......................................................................................................................
  22. def diag_cal(df_sn, df_bms_ram, df_alarm_ram, log_name):
  23. # 日志
  24. logger = logging.getLogger()
  25. fh = logging.handlers.RotatingFileHandler(log_name + ".log", encoding="utf-8",maxBytes=1024*1024*1024, backupCount=5, mode="a")
  26. # fh = logging.handlers.db_res_engine(log_name + ".log", encoding="utf-8",maxBytes=1024*1024*1024, backupCount=5, mode="a")
  27. formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
  28. fh.setFormatter(formatter)
  29. logger.addHandler(fh)
  30. logger.setLevel(logging.INFO)
  31. logger.info("pid is {}".format(os.getpid()))
  32. # 读取结果数据库
  33. host2='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
  34. port2=3306
  35. db2='safety_platform'
  36. user2='qx_algo_rw'
  37. password2='qx@123456'
  38. db_res_engine = create_engine(
  39. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  40. user2, parse.quote_plus(password2), host2, port2, db2
  41. ))
  42. conn = pymysql.connect(host=host2, port=port2, user=user2, password=password2, database=db2)
  43. cursor = conn.cursor()
  44. result=pd.read_sql("select start_time, end_time, product_id, code, level, info, advice from all_fault_info", db_res_engine)
  45. result = result[['start_time', 'end_time', 'product_id', 'code', 'level', 'info', 'advice']]
  46. df_diag_ram=result[(result['end_time']=='0000-00-00 00:00:00') & (result['code']=='C599')]
  47. start=time.time()
  48. now_time=datetime.datetime.now()
  49. start_time=now_time-datetime.timedelta(seconds=70)
  50. start_time=start_time.strftime('%Y-%m-%d %H:%M:%S')
  51. end_time=now_time.strftime('%Y-%m-%d %H:%M:%S')
  52. for i in range(0, len(df_sn)):
  53. factory = "骑享"
  54. try:
  55. if df_sn.loc[i, 'imei'][5:9] == 'N640':
  56. celltype=1 #6040三元电芯
  57. elif df_sn.loc[i, 'imei'][5:9] == 'N440':
  58. celltype=2 #4840三元电芯
  59. elif df_sn.loc[i, 'imei'][5:9] == 'L660':
  60. celltype=99 # 6060锂电芯
  61. elif df_sn.loc[i, 'imei'][3:5] == 'LX' and df_sn.loc[i, 'imei'][5:9] == 'N750':
  62. celltype=3 #力信 50ah三元电芯
  63. elif df_sn.loc[i, 'imei'][3:5] == 'CL' and df_sn.loc[i, 'imei'][5:9] == 'N750':
  64. celltype=4 #CATL 50ah三元电芯
  65. elif df_sn.loc[i, 'imei'][3:9] == 'CLL128':
  66. celltype=100 # 重卡
  67. factory = "金茂换电"
  68. elif df_sn.loc[i, 'imei'][5:9] == 'L420':
  69. celltype=101 #20ah磷酸铁锂电芯
  70. elif df_sn.loc[i, 'imei'][5:9] == 'L264':
  71. celltype=102 #20ah磷酸铁锂电芯
  72. else:
  73. logger.info("pid-{} celltype-{} SN: {} SKIP!".format(os.getpid(), "未知", sn))
  74. continue
  75. sn = df_sn.loc[i, 'sn']
  76. logger.info("pid-{} celltype-{} SN: {} START!".format(os.getpid(), celltype, sn))
  77. #读取原始数据库数据........................................................................................................................................................
  78. dbManager = DBManager.DBManager()
  79. df_data = dbManager.get_data(sn=sn, start_time=start_time, end_time=end_time, data_groups=['bms'])
  80. df_bms = df_data['bms']
  81. # print(df_bms)
  82. #电池诊断................................................................................................................................................................
  83. df_diag_ram_sn=df_diag_ram[df_diag_ram['product_id']==sn]
  84. df_bms_ram_sn=df_bms_ram[df_bms_ram['sn']==sn]
  85. df_alarm_ram_sn=df_alarm_ram[df_alarm_ram['sn']==sn]
  86. if df_diag_ram_sn.empty:
  87. SafetyAlarm=CBMSSafetyAlarm.SafetyAlarm(sn,celltype,df_bms, df_bms_ram_sn,df_alarm_ram_sn)
  88. df_diag_res, df_bms_res,df_ram_res=SafetyAlarm.safety_alarm_diag()
  89. #更新bms的ram数据 和 diag的Ram数据
  90. sn_index=df_bms_ram.loc[df_bms_ram['sn']==sn].index
  91. df_bms_ram=df_bms_ram.drop(index=sn_index)
  92. df_bms_ram=df_bms_ram.append(df_bms_res)
  93. sn_index=df_alarm_ram.loc[df_alarm_ram['sn']==sn].index
  94. df_alarm_ram=df_alarm_ram.drop(index=sn_index)
  95. df_alarm_ram=df_alarm_ram.append(df_ram_res)
  96. # sn_index=df_diag_ram.loc[df_diag_ram['product_id']==sn].index
  97. # df_diag_ram=df_diag_ram.drop(index=sn_index)
  98. # df_diag_ram=df_diag_ram.append(df_diag_res)
  99. # df_diag_ram.reset_index(inplace=True,drop=True) #重置索引
  100. #当前热失控故障写入数据库
  101. if not df_diag_res.empty:
  102. df_diag_res.columns = ['start_time', 'end_time', 'product_id', 'code', 'level', 'info', 'advice']
  103. df_diag_res['add_time'] = datetime.datetime.now()
  104. df_diag_res['factory'] = factory
  105. df_diag_res.to_sql("all_fault_info",con=db_res_engine, if_exists="append",index=False)
  106. logger.info(u"{} 写入成功!!!\n".format(sn), exc_info=True)
  107. #当前热失控已超过三天变为历史故障并写入数据库,并删除原有数据库中的当前故障和ram中的当前故障
  108. else:
  109. fault_time=datetime.datetime.strptime(df_diag_ram_sn.iloc[-1]['start_time'], '%Y-%m-%d %H:%M:%S')
  110. if (now_time-fault_time).total_seconds()>24*3600:
  111. df_diag_ram_sn['end_time']=end_time
  112. df_diag_ram_sn['Batpos']=1
  113. try:
  114. cursor.execute('''
  115. update all_fault_info set update_time='{}',end_time='{}', Batpos={} where product_id='{}' and end_time='0000-00-00 00:00:00' and code='{}'
  116. '''.format(datetime.datetime.now(), end_time, 1,sn, "C599"))
  117. conn.commit()
  118. logger.info(u"{} 更新成功\n".format(sn), exc_info=True)
  119. except:
  120. logger.error(traceback.format_exc)
  121. logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True)
  122. except:
  123. logger.error(traceback.format_exc)
  124. logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True)
  125. break
  126. cursor.close()
  127. conn.close()
  128. db_res_engine.dispose()
  129. logger.info("pid-{} Done!".format(os.getpid()))
  130. return df_bms_ram,df_alarm_ram
  131. #...................................................主进程...........................................................................................................
  132. def mainprocess():
  133. global SNnums
  134. global df_bms_ram1, df_bms_ram2, df_alarm_ram1, df_alarm_ram2
  135. global log_path
  136. # 更新sn列表
  137. host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com'
  138. port=3306
  139. db='qixiang_oss'
  140. user='qixiang_oss'
  141. password='Qixiang2021'
  142. conn = pymysql.connect(host=host, port=port, user=user, password=password, database=db)
  143. cursor = conn.cursor()
  144. cursor.execute("select sn, imei, add_time from app_device where status in (1,2,3)")
  145. res = cursor.fetchall()
  146. df_sn = pd.DataFrame(res, columns=['sn', 'imei', 'add_time'])
  147. df_sn = df_sn.reset_index(drop=True)
  148. cursor.close()
  149. conn.close();
  150. process = 2
  151. pool = multiprocessing.Pool(processes = process)
  152. res_list=[]
  153. ram_list1=[df_bms_ram1, df_bms_ram2]
  154. ram_list2=[df_alarm_ram1, df_alarm_ram1]
  155. for i in range(process):
  156. sn_list = df_sn[int(len(df_sn)*i/process):int(len(df_sn)*(i+1)/process)]
  157. sn_list = sn_list.reset_index(drop=True)
  158. log_name = log_path + '/log_' + str(i)
  159. df_bms_ram=ram_list1[i]
  160. df_alarm_ram=ram_list2[i]
  161. df_res = pool.apply_async(diag_cal, (sn_list,df_bms_ram,df_alarm_ram,log_name)).get()
  162. res_list.append(df_res)
  163. pool.close()
  164. pool.join()
  165. df_bms_ram1=res_list[0][0]
  166. df_bms_ram2=res_list[1][0]
  167. df_alarm_ram1=res_list[0][1]
  168. df_alarm_ram2=res_list[1][1]
  169. if __name__ == "__main__":
  170. # 时间设置
  171. # now_time = datetime.datetime.now()
  172. # pre_time = now_time + dateutil.relativedelta.relativedelta(days=-1)# 前一日
  173. # end_time=datetime.datetime.strftime(now_time,"%Y-%m-%d 00:00:00")
  174. # start_time=datetime.datetime.strftime(pre_time,"%Y-%m-%d 00:00:00")
  175. history_run_flag = False # 历史数据运行标志
  176. # # 更新sn列表
  177. # host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com'
  178. # port=3306
  179. # db='qixiang_oss'
  180. # user='qixiang_oss'
  181. # password='Qixiang2021'
  182. # conn = pymysql.connect(host=host, port=port, user=user, password=password, database=db)
  183. # cursor = conn.cursor()
  184. # cursor.execute("select sn, imei, add_time from app_device")
  185. # res = cursor.fetchall()
  186. # df_sn = pd.DataFrame(res, columns=['sn', 'imei', 'add_time'])
  187. # df_sn = df_sn.reset_index(drop=True)
  188. # conn.close();
  189. # 数据库配置
  190. host = 'rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
  191. port = 3306
  192. user = 'qx_cas'
  193. password = parse.quote_plus('Qx@123456')
  194. database = 'qx_cas'
  195. db_engine = create_engine(
  196. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  197. user, password, host, port, database
  198. ))
  199. DbSession = sessionmaker(bind=db_engine)
  200. # 运行历史数据配置
  201. df_first_data_time = pd.read_sql("select * from bat_first_data_time", db_engine)
  202. # 日志配置
  203. now_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()).replace(":","_")
  204. log_path = 'log/' + now_str
  205. if not os.path.exists(log_path):
  206. os.makedirs(log_path)
  207. log = Log.Mylog(log_name='batsafetyAlarm', log_level = 'info')
  208. log.set_file_hl(file_name='{}/info.log'.format(log_path), log_level='info', size=1024* 1024 * 100)
  209. log.set_file_hl(file_name='{}/error.log'.format(log_path), log_level='error', size=1024* 1024 * 100)
  210. logger = log.get_logger()
  211. logger.info("pid is {}".format(os.getpid()))
  212. # 算法参数
  213. host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
  214. port=3306
  215. db='safety_platform'
  216. user='qx_read'
  217. password=parse.quote_plus('Qx@123456')
  218. tablename='all_fault_info'
  219. db_res_engine = create_engine(
  220. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  221. user, password, host, port, db
  222. ))
  223. #............................模块运行前,先读取数据库中所有结束时间为0的数据,需要从数据库中读取................
  224. # print("select start_time, end_time, product_id, code, level, info, advice, factory from {}".format(tablename))
  225. # result=pd.read_sql("select start_time, end_time, product_id, code, level, info, advice from all_fault_info where factory = '{}'".format('骑享'), db_res_engine)
  226. # result = result[['start_time', 'end_time', 'product_id', 'code', 'level', 'info', 'advice']]
  227. # df_diag_ram=result[(result['end_time']=='0000-00-00 00:00:00') & (result['code']==119)]
  228. df_bms_ram1=pd.DataFrame(columns=['time', 'sn', 'packvolt', 'cellvolt', 'celltemp'])
  229. df_bms_ram2=pd.DataFrame(columns=['time', 'sn', 'packvolt', 'cellvolt', 'celltemp'])
  230. df_alarm_ram1=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2'])
  231. df_alarm_ram2=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2'])
  232. mainprocess()
  233. #定时任务.......................................................................................................................................................................
  234. scheduler = BlockingScheduler()
  235. scheduler.add_job(mainprocess, 'interval', seconds=60, id='diag_job')
  236. try:
  237. scheduler.start()
  238. except Exception as e:
  239. scheduler.shutdown()
  240. logger.error(str(e))