server2.py
4.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os
import cv2
import grpc
import numpy as np
import tensorflow as tf
from tensorflow_serving.apis import prediction_service_pb2_grpc, predict_pb2
from sanic import Sanic
from sanic.response import json
from classification import classifier
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
app = Sanic("async_test")
# TODO 从配置文件读取
tf_serving_settings = {
'servers': {
'server_1': {
'host': 'localhost',
'port': '8500',
'options': [
('grpc.max_send_message_length', 1000 * 1024 * 1024),
('grpc.max_receive_message_length', 1000 * 1024 * 1024),
],
}
},
}
app.config.update(tf_serving_settings)
# 同步写法01
# @app.post("/sync_classification")
# async def sync_handler(request):
# image = request.files.get("image")
# img_array = np.frombuffer(image.body, np.uint8)
# image = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
# input_images = classifier.preprocess_input(image)
#
# options = [('grpc.max_send_message_length', 1000 * 1024 * 1024),
# ('grpc.max_receive_message_length', 1000 * 1024 * 1024)]
# with grpc.insecure_channel('localhost:8500', options=options) as channel:
# stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
# # See prediction_service.proto for gRPC request/response details.
# request = predict_pb2.PredictRequest()
# request.model_spec.name = classifier.model_name
# request.model_spec.signature_name = classifier.signature_name
#
# request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images))
# result = stub.Predict(request, timeout=100.0) # 100 secs timeout
# outputs = tf.make_ndarray(result.outputs['output'])
#
# res = classifier.reprocess_output(outputs)
# return json(res)
# 同步写法02
# @app.post("/sync_classification")
# async def sync_handler(request):
# image = request.files.get("image")
# img_array = np.frombuffer(image.body, np.uint8)
# image = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
# input_images = classifier.preprocess_input(image)
#
# # See prediction_service.proto for gRPC request/response details.
# request = predict_pb2.PredictRequest()
# request.model_spec.name = classifier.model_name
# request.model_spec.signature_name = classifier.signature_name
# stub = getattr(app, classifier.server_name)
#
# request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images))
# result = stub.Predict(request, timeout=100.0) # 100 secs timeout
# outputs = tf.make_ndarray(result.outputs['output'])
#
# res = classifier.reprocess_output(outputs)
# return json(res)
#
# @app.listener("before_server_start")
# async def set_grpc_channel(app, loop):
# for server_name, server_settings in app.config['servers'].items():
# channel = grpc.insecure_channel(
# '{0}:{1}'.format(server_settings['host'], server_settings['port']),
# options=server_settings.get('options'))
# stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
# setattr(app, server_name, stub)
# 异步写法
@app.post("/async_classification")
async def async_handler(request):
image = request.files.get("image")
img_array = np.frombuffer(image.body, np.uint8)
image = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
input_images = classifier.preprocess_input(image)
# See prediction_service.proto for gRPC request/response details.
request = predict_pb2.PredictRequest()
request.model_spec.name = classifier.model_name
request.model_spec.signature_name = classifier.signature_name
stub = getattr(app, classifier.server_name)
request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images))
result = await stub.Predict(request, timeout=100.0) # 100 secs timeout
outputs = tf.make_ndarray(result.outputs['output'])
res = classifier.reprocess_output(outputs)
return json(res)
@app.listener("before_server_start")
async def set_grpc_channel(app, loop):
for server_name, server_settings in app.config['servers'].items():
channel = grpc.aio.insecure_channel(
'{0}:{1}'.format(server_settings['host'], server_settings['port']),
options=server_settings.get('options'))
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
setattr(app, server_name, stub)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=6699, workers=10)