server3.py
2.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import os
import cv2
import grpc
import numpy as np
import tensorflow as tf
from tensorflow_serving.apis import prediction_service_pb2_grpc, predict_pb2
from sanic import Sanic
from sanic.response import json
from classification import classifier
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
app = Sanic("async_test")
# TODO 从配置文件读取
tf_serving_settings = {
'options': [('grpc.max_send_message_length', 1000 * 1024 * 1024),
('grpc.max_receive_message_length', 1000 * 1024 * 1024)],
'host_port': 'localhost:8500',
}
app.config.update(tf_serving_settings)
# 同步写法01
@app.post("/sync_classification")
async def sync_handler(request):
image = request.files.get("image")
img_array = np.frombuffer(image.body, np.uint8)
image = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
input_images = classifier.preprocess_input(image)
res_list = []
for _ in range(5):
# for _ in range(1):
with grpc.insecure_channel(app.config['host_port'], options=app.config['options']) as channel:
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
# See prediction_service.proto for gRPC request/response details.
request = predict_pb2.PredictRequest()
request.model_spec.name = classifier.model_name
request.model_spec.signature_name = classifier.signature_name
request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images))
result = stub.Predict(request, timeout=100.0) # 100 secs timeout
outputs = tf.make_ndarray(result.outputs['output'])
res = classifier.reprocess_output(outputs)
res_list.append(res)
return json(res_list)
# 异步写法01
@app.post("/async_classification")
async def async_handler(request):
image = request.files.get("image")
img_array = np.frombuffer(image.body, np.uint8)
image = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
input_images = classifier.preprocess_input(image)
res_list = []
for _ in range(5):
# for _ in range(1):
channel = grpc.aio.insecure_channel(app.config['host_port'], options=app.config['options'])
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
# See prediction_service.proto for gRPC request/response details.
request = predict_pb2.PredictRequest()
request.model_spec.name = classifier.model_name
request.model_spec.signature_name = classifier.signature_name
request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images))
result = await stub.Predict(request, timeout=100.0) # 100 secs timeout
outputs = tf.make_ndarray(result.outputs['output'])
res = classifier.reprocess_output(outputs)
res_list.append(res)
return json(res_list)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=6699, workers=10)