import logging import requests from django.contrib.auth import get_user_model from django.contrib.auth.models import AnonymousUser from rest_framework.generics import GenericAPIView from rest_framework_jwt.utils import jwt_payload_handler, jwt_encode_handler from common.exceptions import ( NeedLoginException, InvalidParamsException, InternalErrorException, ObjectNotExitException, AsyncWaitException, NoPermissionException, IllegalOperationException) class GenericExceptionMixin: def need_login(self, msg='need login'): raise NeedLoginException(msg) def invalid_params(self, msg='invalid params'): raise InvalidParamsException(msg) def internal_error(self, msg='internal error'): raise InternalErrorException(msg) def object_not_exit(self, msg='object not exit'): raise ObjectNotExitException(msg) def async_wait(self, msg='async wait'): raise AsyncWaitException(msg) def no_permission(self, msg='no permission'): raise NoPermissionException(msg) def illegal_operation(self, msg='illegal operation'): raise IllegalOperationException(msg) class LoggerMixin: running_log = logging.getLogger('running') exception_log = logging.getLogger('exception') online_log = logging.getLogger('online') folder_log = logging.getLogger('folder') # bs_log = logging.getLogger('bs') # license_log = logging.getLogger('license') idcard_log = logging.getLogger('idcard') class GenericView(LoggerMixin, GenericExceptionMixin, GenericAPIView): need_print_logger = True def print_logger(self, request): parameters = getattr(request, request.method, {}) if not parameters: parameters = getattr(request, 'data', {}) if not parameters: parameters = {} parameters_string = '' for key, value in parameters.items(): parameters_string += '[%s=%s] ' % (key, value) for key, value in self.kwargs.items(): parameters_string += '[%s=%s] ' % (key, value) if request.user and not isinstance(request.user, AnonymousUser): user_id = request.user.id else: user_id = 0 self.running_log.info('[%s_%s_request] with parameters [user_id=%s] %s' % (self.__class__.__name__, request.method, user_id, parameters_string)) def dispatch(self, request, *args, **kwargs): """ `.dispatch()` is pretty much the same as Django's regular dispatch, but with extra hooks for startup, finalize, and exception handling. """ self.args = args self.kwargs = kwargs request = self.initialize_request(request, *args, **kwargs) self.request = request self.headers = self.default_response_headers # deprecate? try: self.initial(request, *args, **kwargs) # Get the appropriate handler method if request.method.lower() in self.http_method_names: handler = getattr(self, request.method.lower(), self.http_method_not_allowed) else: handler = self.http_method_not_allowed if self.need_print_logger: self.print_logger(request) response = handler(request, *args, **kwargs) except Exception as exc: response = self.handle_exception(exc) self.response = self.finalize_response( request, response, *args, **kwargs) return self.response def get_object(self): return None class IWABaseView: permission_classes = () authentication_classes = () @staticmethod def get_token(iwa_url_base, code, redirect_uri, client_id_base64): headers = { 'Authorization': 'Basic {0}'.format(client_id_base64), # client_id:secretåbase64encode 'Content-type': 'application/x-www-form-urlencoded', } get_params_dict = { 'grant_type': 'authorization_code', 'code': code, 'redirect_uri': redirect_uri, } # get_params_str = '&'.join(['{0}={1}'.format(k, v) for k, v in get_params_dict.items()]) iwa_token_url = '{0}intranetb2x/access_token'.format(iwa_url_base) res = requests.post(url=iwa_token_url, headers=headers, data=get_params_dict) return res.json().get('access_token', '') def get_q_number(self, iwa_url_base, code, redirect_uri, client_id_base64): access_token = self.get_token(iwa_url_base, code, redirect_uri, client_id_base64) headers = { 'Authorization': 'Bearer {0}'.format(access_token) } iwa_user_url = '{0}intranetb2x/userinfo'.format(iwa_url_base) res = requests.get(iwa_user_url, headers=headers) return res.json().get('sub', '') @staticmethod def validate(q_number): if not q_number: return False, 'get q_number empty' user = get_user_model().objects.filter(username=q_number).first() if user: if not user.is_active: msg = 'User account is disabled.' return False, msg payload = jwt_payload_handler(user) user_info = { 'user_id': user.id, 'user_name': user.username, 'token': jwt_encode_handler(payload), } return True, user_info else: msg = 'q_number user not found' return False, msg @staticmethod def validate_admin(q_number): if not q_number: return False, 'get q_number empty' user = get_user_model().objects.filter(username=q_number).first() if user: if not user.is_active: msg = 'User account is disabled.' return False, msg if not user.is_superuser: msg = 'User account is not admin user' return False, msg return True, user else: msg = 'q_number user not found' return False, msg