base.py
2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from typing import NamedTuple
from urllib.parse import parse_qsl, unquote, urlparse
from redis import StrictRedis, ConnectionPool
try:
from collections.abc import Mapping
except ImportError:
from collections import Mapping
url_parts = NamedTuple('url_parts', [
('scheme', str),
('hostname', str),
('port', int),
('username', str),
('password', str),
('path', str),
('query', Mapping),
])
def url_to_parts(url):
# type: (str) -> urlparts
"""Parse URL into :class:`urlparts` tuple of components."""
scheme = urlparse(url).scheme
schemeless = url[len(scheme) + 3:]
# parse with HTTP URL semantics
parts = urlparse('http://' + schemeless)
path = parts.path or ''
path = path[1:] if path and path[0] == '/' else path
return url_parts(
scheme,
unquote(parts.hostname or '') or None,
parts.port,
unquote(parts.username or '') or None,
unquote(parts.password or '') or None,
unquote(path or '') or None,
dict(parse_qsl(parts.query)),
)
class Redis:
def __init__(self, url, connection_pool=None, max_connections=20, socket_timeout=120,
retry_on_timeout=None, socket_connect_timeout=None):
self._ConnectionPool = connection_pool
scheme, host, port, _, password, path, query = url_to_parts(url)
self.conn_params = {
'host': host,
'port': port,
'db': int(path),
'password': password,
'max_connections': max_connections,
'socket_timeout': socket_timeout and float(socket_timeout),
'retry_on_timeout': retry_on_timeout or False,
'socket_connect_timeout':
socket_connect_timeout and float(socket_connect_timeout),
'decode_responses': True
}
self.client = StrictRedis(
connection_pool=self._get_pool(**self.conn_params),
)
@property
def ConnectionPool(self):
if self._ConnectionPool is None:
self._ConnectionPool = ConnectionPool
return self._ConnectionPool
def _get_pool(self, **params):
return self.ConnectionPool(**params)
def get(self, key):
return self.client.get(key)
def mget(self, keys):
return self.client.mget(keys)
def set(self, key, value, expires=None):
if expires:
return self.client.setex(key, expires, value)
else:
return self.client.set(key, value)
def delete(self, key):
self.client.delete(key)
def incr(self, key):
return self.client.incr(key)
def expire(self, key, value):
return self.client.expire(key, value)
def lpush(self, key, values):
return self.client.lpush(key, *values) # int
def lrange(self, key, start, end):
return self.client.lrange(key, start, end) # list
def rpop(self, key):
return self.client.rpop(key) # str or None