HandleSql.py 6.15 KB
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/8/19 上午10:23
# @Author : 付孟奇

from util.HandleJenkins import config
from util.HandleDB import dataBaseList
from util.log import logger
import pymysql


class DBSql:
    def data_base(self, sqls, fe):
        env = config['env']
        pro = config['pro']
        da_list = dataBaseList(pro + '-' + env)
        try:
            conn = pymysql.connect(host=da_list['host'],
                                   port=da_list['port'],
                                   user=da_list['user'],
                                   passwd=da_list['password'],
                                   db=da_list['db'],
                                   cursorclass=pymysql.cursors.DictCursor)
            with conn.cursor() as cur:
                logger.info(sqls)
                cur.execute(sqls)
                if fe == 'SELECT':
                    result = cur.fetchall()
                    return result
                elif fe == 'DELETE' or fe == 'UPDATE':
                    conn.commit()
                cur.close()
                conn.close()
        except pymysql.MySQLError as e:
            logger.error(str(e))

    # 查询产品ID
    def select_product(self, args):
        fe = 'SELECT'
        sql = '''SELECT * FROM product_library where product_code='%s';''' % (args)
        result = self.data_base(sql, fe)
        logger.info(result)
        if result == ():
            return None
        else:
            productId = result[0]['id']
            return productId

    # 删除产品
    def delete_product(self, args):
        fe = 'DELETE'
        sql = '''DELETE from product_library where product_code='%s';''' % (args)
        self.data_base(sql, fe)

    # 查询产品附加属性Id
    def select_attributes(self, args):
        fe = 'SELECT'
        sql = '''SELECT * FROM product_additional_attributes_key where keyword='%s';''' % (args)
        result = self.data_base(sql, fe)
        logger.info(result)
        if result == ():
            return None
        else:
            attributes_id = result[0]['id']
            return attributes_id

    # 删除产品附加属性
    def delete_attributes(self, args):
        fe = 'DELETE'
        sql = '''DELETE from product_additional_attributes_key where keyword='%s';''' % (args)
        self.data_base(sql, fe)

    # 查询上传文件Id
    def select_file(self, args):
        fe = 'SELECT'
        sql = '''select * from order_product_code_file where deleted=0 and file_name='%s';''' % (args)
        result = self.data_base(sql, fe)
        logger.info(result)
        if result == ():
            return None
        else:
            file_id = result[0]['id']
            return file_id

    # 查询年龄值
    def select_age_value(self, args):
        fe = 'SELECT'
        sql = '''select * from quanlity_audit_proportion_conf where id=%s;''' % (args)
        result = self.data_base(sql, fe)
        logger.info(result)
        if result == ():
            return None
        else:
            file_id = result[0]['option_value']
            return file_id

    # 查询账号ID
    def select_user_id(self, args):
        fe = 'SELECT'
        sql = '''select * from citic_user where username='%s';''' % (args)
        result = self.data_base(sql, fe)
        logger.info(result)
        if result == ():
            return None
        else:
            file_id = result[0]['id']
            return file_id

    # 查询账号绑定角色ID
    def select_user_role_id(self, args):
        fe = 'SELECT'
        sql = '''select * from citic_user_role where user_id=%s;''' % (args)
        result = self.data_base(sql, fe)
        logger.info(result)
        if result == ():
            return None
        else:
            file_id = result[0]['id']
            return file_id

    # 删除账号
    def delete_user(self, args):
        fe = 'DELETE'
        account_id = self.select_user_id(args)
        user_sql = '''DELETE from citic_user where id=%s;''' % (account_id)
        user_role_sql = '''DELETE from citic_user_role where user_id=%s;''' % (account_id)
        self.data_base(user_sql, fe)
        self.data_base(user_role_sql, fe)

    # 查询话术ID
    def select_pahse(self, args):
        fe = 'SELECT'
        sql = '''SELECT * FROM speech_template_library WHERE speech_name='%s';''' % (args)
        result = self.data_base(sql, fe)
        logger.info(result)
        if result == ():
            return None
        else:
            file_id = result[0]['id']
            return file_id

    # 删除话术内容
    def delete_phase(self, args):
        fe = 'DELETE'
        sql_1 = '''DELETE FROM speech_template_library where id=%s;''' % (args)
        sql_2 = '''DELETE FROM speech_big_phase where stl_id=%s;''' % (args)
        sql_3 = '''DELETE FROM speech_little_phase where stl_id=%s;''' % (args)
        sql_4 = '''DELETE FROM speech_take_effect_rule where stl_id=%s;''' % (args)
        sql_5 = '''DELETE FROM speech_take_effect_age_condition where stl_id=%s;''' % (args)
        sql_6 = '''DELETE FROM speech_take_effect_channel_condition where stl_id=%s;''' % (args)
        sql_7 = '''DELETE FROM speech_take_effect_organization_condition where stl_id=%s;''' % (args)
        self.data_base(sql_1, fe)
        self.data_base(sql_2, fe)
        self.data_base(sql_3, fe)
        self.data_base(sql_4, fe)
        self.data_base(sql_5, fe)
        self.data_base(sql_6, fe)
        self.data_base(sql_7, fe)

    # 更新订单锁状态
    def update_order_status(self, args):
        fe = 'UPDATE'
        sql = '''update order_base_info set lock_order=2 where id='%s';''' % (args)
        self.data_base(sql, fe)

    # 查询订单入件信息
    def select_extra_info(self, args):
        fe = 'SELECT'
        sql = '''SELECT * FROM order_base_info WHERE id=%s;''' % (args)
        result = self.data_base(sql, fe)
        if result == ():
            return None
        else:
            res = result[0]['extra_info']
            return res


db_sql = DBSql()

if __name__ == '__main__':
    # db_sql.delete_user('apiappuser')
    # db_sql.delete_user('apiwebuser')
    # db_sql.delete_phase(1030)
    # db_sql.select_user_id('apiwebuser')
    db_sql.delete_product('apitest')