d8cec4a0 by Lyu Kui

add new async

1 parent b2dde0a9
...@@ -29,30 +29,6 @@ tf_serving_settings = { ...@@ -29,30 +29,6 @@ tf_serving_settings = {
29 app.config.update(tf_serving_settings) 29 app.config.update(tf_serving_settings)
30 30
31 31
32 # 同步写法01
33 # @app.post("/sync_classification")
34 # async def sync_handler(request):
35 # image = request.files.get("image")
36 # img_array = np.frombuffer(image.body, np.uint8)
37 # image = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
38 # input_images = classifier.preprocess_input(image)
39 #
40 # options = [('grpc.max_send_message_length', 1000 * 1024 * 1024),
41 # ('grpc.max_receive_message_length', 1000 * 1024 * 1024)]
42 # with grpc.insecure_channel('localhost:8500', options=options) as channel:
43 # stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
44 # # See prediction_service.proto for gRPC request/response details.
45 # request = predict_pb2.PredictRequest()
46 # request.model_spec.name = classifier.model_name
47 # request.model_spec.signature_name = classifier.signature_name
48 #
49 # request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images))
50 # result = stub.Predict(request, timeout=100.0) # 100 secs timeout
51 # outputs = tf.make_ndarray(result.outputs['output'])
52 #
53 # res = classifier.reprocess_output(outputs)
54 # return json(res)
55
56 # 同步写法02 32 # 同步写法02
57 # @app.post("/sync_classification") 33 # @app.post("/sync_classification")
58 # async def sync_handler(request): 34 # async def sync_handler(request):
...@@ -67,12 +43,15 @@ app.config.update(tf_serving_settings) ...@@ -67,12 +43,15 @@ app.config.update(tf_serving_settings)
67 # request.model_spec.signature_name = classifier.signature_name 43 # request.model_spec.signature_name = classifier.signature_name
68 # stub = getattr(app, classifier.server_name) 44 # stub = getattr(app, classifier.server_name)
69 # 45 #
70 # request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images)) 46 # res_list = []
71 # result = stub.Predict(request, timeout=100.0) # 100 secs timeout 47 # for _ in range(5):
72 # outputs = tf.make_ndarray(result.outputs['output']) 48 # request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images))
49 # result = stub.Predict(request, timeout=100.0) # 100 secs timeout
50 # outputs = tf.make_ndarray(result.outputs['output'])
73 # 51 #
74 # res = classifier.reprocess_output(outputs) 52 # res = classifier.reprocess_output(outputs)
75 # return json(res) 53 # res_list.append(res)
54 # return json(res_list)
76 # 55 #
77 # @app.listener("before_server_start") 56 # @app.listener("before_server_start")
78 # async def set_grpc_channel(app, loop): 57 # async def set_grpc_channel(app, loop):
...@@ -83,7 +62,7 @@ app.config.update(tf_serving_settings) ...@@ -83,7 +62,7 @@ app.config.update(tf_serving_settings)
83 # stub = prediction_service_pb2_grpc.PredictionServiceStub(channel) 62 # stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
84 # setattr(app, server_name, stub) 63 # setattr(app, server_name, stub)
85 64
86 # 异步写法 65 # 异步写法02
87 @app.post("/async_classification") 66 @app.post("/async_classification")
88 async def async_handler(request): 67 async def async_handler(request):
89 image = request.files.get("image") 68 image = request.files.get("image")
...@@ -97,12 +76,15 @@ async def async_handler(request): ...@@ -97,12 +76,15 @@ async def async_handler(request):
97 request.model_spec.signature_name = classifier.signature_name 76 request.model_spec.signature_name = classifier.signature_name
98 stub = getattr(app, classifier.server_name) 77 stub = getattr(app, classifier.server_name)
99 78
100 request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images)) 79 res_list = []
101 result = await stub.Predict(request, timeout=100.0) # 100 secs timeout 80 for _ in range(5):
102 outputs = tf.make_ndarray(result.outputs['output']) 81 request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images))
82 result = await stub.Predict(request, timeout=100.0) # 100 secs timeout
83 outputs = tf.make_ndarray(result.outputs['output'])
103 84
104 res = classifier.reprocess_output(outputs) 85 res = classifier.reprocess_output(outputs)
105 return json(res) 86 res_list.append(res)
87 return json(res_list)
106 88
107 89
108 @app.listener("before_server_start") 90 @app.listener("before_server_start")
......
1 import os
2 import cv2
3 import grpc
4 import numpy as np
5 import tensorflow as tf
6 from tensorflow_serving.apis import prediction_service_pb2_grpc, predict_pb2
7
8 from sanic import Sanic
9 from sanic.response import json
10
11 from classification import classifier
12 os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
13
14 app = Sanic("async_test")
15
16 # TODO 从配置文件读取
17 tf_serving_settings = {
18 'options': [('grpc.max_send_message_length', 1000 * 1024 * 1024),
19 ('grpc.max_receive_message_length', 1000 * 1024 * 1024)],
20 'host_port': 'localhost:8500',
21 }
22 app.config.update(tf_serving_settings)
23
24
25 # 同步写法01
26 @app.post("/sync_classification")
27 async def sync_handler(request):
28 image = request.files.get("image")
29 img_array = np.frombuffer(image.body, np.uint8)
30 image = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
31 input_images = classifier.preprocess_input(image)
32
33 res_list = []
34 for _ in range(5):
35 # for _ in range(1):
36 with grpc.insecure_channel(app.config['host_port'], options=app.config['options']) as channel:
37 stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
38 # See prediction_service.proto for gRPC request/response details.
39 request = predict_pb2.PredictRequest()
40 request.model_spec.name = classifier.model_name
41 request.model_spec.signature_name = classifier.signature_name
42
43 request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images))
44 result = stub.Predict(request, timeout=100.0) # 100 secs timeout
45 outputs = tf.make_ndarray(result.outputs['output'])
46
47 res = classifier.reprocess_output(outputs)
48 res_list.append(res)
49 return json(res_list)
50
51 # 异步写法01
52 @app.post("/async_classification")
53 async def async_handler(request):
54 image = request.files.get("image")
55 img_array = np.frombuffer(image.body, np.uint8)
56 image = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
57 input_images = classifier.preprocess_input(image)
58
59 res_list = []
60 for _ in range(5):
61 # for _ in range(1):
62 channel = grpc.aio.insecure_channel(app.config['host_port'], options=app.config['options'])
63 stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
64 # See prediction_service.proto for gRPC request/response details.
65 request = predict_pb2.PredictRequest()
66 request.model_spec.name = classifier.model_name
67 request.model_spec.signature_name = classifier.signature_name
68
69 request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images))
70 result = await stub.Predict(request, timeout=100.0) # 100 secs timeout
71 outputs = tf.make_ndarray(result.outputs['output'])
72
73 res = classifier.reprocess_output(outputs)
74 res_list.append(res)
75 return json(res_list)
76
77
78 if __name__ == '__main__':
79 app.run(host='0.0.0.0', port=6699, workers=10)
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!