handler.py 7.66 KB
import json
import time
import datetime


class DateTimeJSONEncoder(json.JSONEncoder):
    """
    JSONEncoder subclass that knows how to encode date/time, decimal types, and
    UUIDs.
    """
    def default(self, o):
        # See "Date Time String Format" in the ECMA-262 specification.
        if isinstance(o, datetime.datetime):
            r = o.isoformat()
            if o.microsecond:
                r = r[:23] + r[26:]
            if r.endswith('+00:00'):
                r = r[:-6] + 'Z'
            return r
        else:
            return super().default(o)


def dict_str_to_int(res):
    for k, v in res.items():
        res[k] = int(v)


class RedisHandler:

    def __init__(self, redis):
        self.redis = redis
        self.time_expires = datetime.timedelta(hours=24)
        self.time_format = '%a %b %d %H:%M:%S %Y'
        self.prefix = 'automl'
        self.training_time_key = '{0}:training_time'.format(self.prefix)
        self.queue_key = '{0}:queue'.format(self.prefix)
        self.prefix_training = '{0}:training'.format(self.prefix)
        self.prefix_models = '{0}:models'.format(self.prefix)
        self.prefix_img_info = '{0}:img_info'.format(self.prefix)

    def get_training_model_key(self, user_id, model_type):
        return '{0}:{1}:{2}'.format(self.prefix_training, user_id, model_type)

    def get_models_list_key(self, user_id, model_type):
        return '{0}:{1}:{2}'.format(self.prefix_models, user_id, model_type)

    def set_training_model(self, user_id, model_type, model_id, status):
        # True
        key = self.get_training_model_key(user_id, model_type)
        mapping = {
            'model_id': model_id,
            'model_status': status
        }
        return self.redis.hmset(key, mapping)

    def get_training_model(self, user_id, model_type):
        # {}
        # {'id': '1', 'status': '1'}
        key = self.get_training_model_key(user_id, model_type)
        res = self.redis.hgetall(key)
        dict_str_to_int(res)
        return res

    def set_models_list(self, user_id, model_type, models_list):
        key = self.get_models_list_key(user_id, model_type)
        value = json.dumps(models_list, cls=DateTimeJSONEncoder)
        return self.redis.set(key, value, expires=self.time_expires)

    def get_models_list(self, user_id, model_type):
        # list or None
        key = self.get_models_list_key(user_id, model_type)
        res_str = self.redis.get(key)
        res = None if res_str is None else json.loads(res_str)
        return res

    def del_models_list(self, user_id, model_type):
        # None
        key = self.get_models_list_key(user_id, model_type)
        return self.redis.delete(key)

    def set_training_finish_time(self, finish_time):
        # True
        finish_time_str = datetime.datetime.strftime(finish_time, self.time_format)
        return self.redis.set(self.training_time_key, finish_time_str)

    def get_training_finish_time(self):
        # datetime.datetime or None
        res = self.redis.get(self.training_time_key)
        finish_time = None if res is None else datetime.datetime.strptime(res, self.time_format)
        return finish_time

    def del_training_finish_time(self):
        # None
        return self.redis.delete(self.training_time_key)

    def enqueue(self, model_id):
        # 1
        mapping = {model_id: time.time()}
        return self.redis.zadd(self.queue_key, mapping)

    def dequeue(self):
        # model_id:int or None
        res_list = self.redis.zremrangebyrank(self.queue_key, 0, 0)
        pop_item_list = res_list[0]
        pop_item = int(pop_item_list[0]) if pop_item_list else None
        return pop_item

    def get_queue_end(self):
        # model_id:int or None
        res_list = self.redis.zrange(self.queue_key, -1, -1)
        end_id = int(res_list[0]) if res_list else None
        return end_id

    def get_queue_rank(self, model_id):
        # rank:int or None
        rank = self.redis.zrank(self.queue_key, model_id)
        if rank is None:
            return 0
        return rank + 1

    def set_img_info(self, user_id, model_id, count_sum, count_marked):
        # True
        key = '{0}:{1}:{2}'.format(self.prefix_img_info, user_id, model_id)
        mapping = {
            'count_sum': count_sum,
            'count_marked': count_marked
        }
        return self.redis.hmset(key, mapping)

    def get_img_info(self, user_id, model_id):
        # {}
        # {'count_sum': '70', 'count_marked': '0'}
        key = '{0}:{1}:{2}'.format(self.prefix_img_info, user_id, model_id)
        res = self.redis.hgetall(key)
        dict_str_to_int(res)
        return res

    def update_img_info(self, user_id, model_id, del_img=False):
        # res_count:int
        key = '{0}:{1}:{2}'.format(self.prefix_img_info, user_id, model_id)
        if del_img:
            return self.redis.hincrby(key, 'count_sum', amount=-1)
        else:
            return self.redis.hincrby(key, 'count_marked')

    def del_img_info(self, user_id, model_id):
        # None
        key = '{0}:{1}:{2}'.format(self.prefix_img_info, user_id, model_id)
        return self.redis.delete(key)

    def pipe_trained(self, user_id, model_type, model_id, status, success=True):
        # redis.set_training_model(user_id, model_type, model_id, model_status)
        # redis.del_training_finish_time()
        # redis.del_models_list(user_id, model_type)

        # redis.set_training_model(user_id, model_type, model_id, model_status)
        # redis.del_training_finish_time()

        training_model_key = self.get_training_model_key(user_id, model_type)
        models_list_key = self.get_models_list_key(user_id, model_type)
        mapping = {
            'model_id': model_id,
            'model_status': status
        }

        with self.redis.client.pipeline() as pipe:
            pipe.hmset(training_model_key, mapping)
            pipe.delete(self.training_time_key)
            if success is True:
                pipe.delete(models_list_key)
            item = pipe.execute()
        return item

    def pipe_training(self, user_id, model_type, model_id, status, finish_time):
        # redis.dequeue()
        # redis.set_training_model(user_id, model_type, model_id, model_status)
        # redis.set_training_finish_time(proleptic_finish_time)

        training_model_key = self.get_training_model_key(user_id, model_type)
        mapping = {
            'model_id': model_id,
            'model_status': status
        }
        finish_time_str = datetime.datetime.strftime(finish_time, self.time_format)

        with self.redis.client.pipeline() as pipe:
            pipe.zremrangebyrank(self.queue_key, 0, 0)
            pipe.hmset(training_model_key, mapping)
            pipe.set(self.training_time_key, finish_time_str)
            item = pipe.execute()
        return item

    def pipe_enqueue(self, model_id, user_id, model_type, status, section=True):
        # redis.enqueue(model_id)
        # redis.set_training_model(user_id, model_type,
        #                          model_id, ModelStatus.DATA_PRETREATMENT_DONE.value)
        # if model_type == ModelType.SECTION.value:
        #     redis.del_img_info(user_id, model_id)

        queue_mapping = {model_id: time.time()}
        training_model_key = self.get_training_model_key(user_id, model_type)
        mapping = {
            'model_id': model_id,
            'model_status': status
        }
        img_info_key = '{0}:{1}:{2}'.format(self.prefix_img_info, user_id, model_id)

        with self.redis.client.pipeline() as pipe:
            pipe.zadd(self.queue_key, queue_mapping)
            pipe.hmset(training_model_key, mapping)
            if section is True:
                pipe.delete(img_info_key)
            item = pipe.execute()
        return item