HandleSql.py
6.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/8/19 上午10:23
# @Author : 付孟奇
from util.HandleJenkins import config
from util.HandleDB import dataBaseList
from util.log import logger
import pymysql
class DBSql:
def data_base(self, sqls, fe):
env = config['env']
pro = config['pro']
da_list = dataBaseList(pro + '-' + env)
try:
conn = pymysql.connect(host=da_list['host'],
port=da_list['port'],
user=da_list['user'],
passwd=da_list['password'],
db=da_list['db'],
cursorclass=pymysql.cursors.DictCursor)
with conn.cursor() as cur:
logger.info(sqls)
cur.execute(sqls)
if fe == 'SELECT':
result = cur.fetchall()
return result
elif fe == 'DELETE' or fe == 'UPDATE':
conn.commit()
cur.close()
conn.close()
except pymysql.MySQLError as e:
logger.error(str(e))
# 查询产品ID
def select_product(self, args):
fe = 'SELECT'
sql = '''SELECT * FROM product_library where product_code='%s';''' % (args)
result = self.data_base(sql, fe)
logger.info(result)
if result == ():
return None
else:
productId = result[0]['id']
return productId
# 删除产品
def delete_product(self, args):
fe = 'DELETE'
sql = '''DELETE from product_library where product_code='%s';''' % (args)
self.data_base(sql, fe)
# 查询产品附加属性Id
def select_attributes(self, args):
fe = 'SELECT'
sql = '''SELECT * FROM product_additional_attributes_key where keyword='%s';''' % (args)
result = self.data_base(sql, fe)
logger.info(result)
if result == ():
return None
else:
attributes_id = result[0]['id']
return attributes_id
# 删除产品附加属性
def delete_attributes(self, args):
fe = 'DELETE'
sql = '''DELETE from product_additional_attributes_key where keyword='%s';''' % (args)
self.data_base(sql, fe)
# 查询上传文件Id
def select_file(self, args):
fe = 'SELECT'
sql = '''select * from order_product_code_file where deleted=0 and file_name='%s';''' % (args)
result = self.data_base(sql, fe)
logger.info(result)
if result == ():
return None
else:
file_id = result[0]['id']
return file_id
# 查询年龄值
def select_age_value(self, args):
fe = 'SELECT'
sql = '''select * from quanlity_audit_proportion_conf where id=%s;''' % (args)
result = self.data_base(sql, fe)
logger.info(result)
if result == ():
return None
else:
file_id = result[0]['option_value']
return file_id
# 查询账号ID
def select_user_id(self, args):
fe = 'SELECT'
sql = '''select * from citic_user where username='%s';''' % (args)
result = self.data_base(sql, fe)
logger.info(result)
if result == ():
return None
else:
file_id = result[0]['id']
return file_id
# 查询账号绑定角色ID
def select_user_role_id(self, args):
fe = 'SELECT'
sql = '''select * from citic_user_role where user_id=%s;''' % (args)
result = self.data_base(sql, fe)
logger.info(result)
if result == ():
return None
else:
file_id = result[0]['id']
return file_id
# 删除账号
def delete_user(self, args):
fe = 'DELETE'
account_id = self.select_user_id(args)
user_sql = '''DELETE from citic_user where id=%s;''' % (account_id)
user_role_sql = '''DELETE from citic_user_role where user_id=%s;''' % (account_id)
self.data_base(user_sql, fe)
self.data_base(user_role_sql, fe)
# 查询话术ID
def select_pahse(self, args):
fe = 'SELECT'
sql = '''SELECT * FROM speech_template_library WHERE speech_name='%s';''' % (args)
result = self.data_base(sql, fe)
logger.info(result)
if result == ():
return None
else:
file_id = result[0]['id']
return file_id
# 删除话术内容
def delete_phase(self, args):
fe = 'DELETE'
sql_1 = '''DELETE FROM speech_template_library where id=%s;''' % (args)
sql_2 = '''DELETE FROM speech_big_phase where stl_id=%s;''' % (args)
sql_3 = '''DELETE FROM speech_little_phase where stl_id=%s;''' % (args)
sql_4 = '''DELETE FROM speech_take_effect_rule where stl_id=%s;''' % (args)
sql_5 = '''DELETE FROM speech_take_effect_age_condition where stl_id=%s;''' % (args)
sql_6 = '''DELETE FROM speech_take_effect_channel_condition where stl_id=%s;''' % (args)
sql_7 = '''DELETE FROM speech_take_effect_organization_condition where stl_id=%s;''' % (args)
self.data_base(sql_1, fe)
self.data_base(sql_2, fe)
self.data_base(sql_3, fe)
self.data_base(sql_4, fe)
self.data_base(sql_5, fe)
self.data_base(sql_6, fe)
self.data_base(sql_7, fe)
# 更新订单锁状态
def update_order_status(self, args):
fe = 'UPDATE'
sql = '''update order_base_info set lock_order=2 where id='%s';''' % (args)
self.data_base(sql, fe)
# 查询订单入件信息
def select_extra_info(self, args):
fe = 'SELECT'
sql = '''SELECT * FROM order_base_info WHERE id=%s;''' % (args)
result = self.data_base(sql, fe)
if result == ():
return None
else:
res = result[0]['extra_info']
return res
db_sql = DBSql()
if __name__ == '__main__':
# db_sql.delete_user('apiappuser')
# db_sql.delete_user('apiwebuser')
# db_sql.delete_phase(1030)
# db_sql.select_user_id('apiwebuser')
db_sql.delete_product('apitest')