add async
Showing
2 changed files
with
75 additions
and
14 deletions
1 | import os | ||
1 | import time | 2 | import time |
3 | import random | ||
2 | from locust import HttpUser, task, between, constant, tag | 4 | from locust import HttpUser, task, between, constant, tag |
3 | 5 | ||
4 | 6 | ||
7 | base_dir = '/home/lk/MyProject/BMW_F3OCR/数据集/文件分类/营业执照' | ||
8 | file_path_list = [os.path.join(base_dir, file_name) for file_name in os.listdir(base_dir)] | ||
9 | |||
10 | |||
5 | class QuickstartUser(HttpUser): | 11 | class QuickstartUser(HttpUser): |
12 | |||
6 | # wait_time = between(1, 5) | 13 | # wait_time = between(1, 5) |
7 | 14 | ||
8 | @tag('sync') | 15 | @tag('sync') |
... | @@ -18,6 +25,13 @@ class QuickstartUser(HttpUser): | ... | @@ -18,6 +25,13 @@ class QuickstartUser(HttpUser): |
18 | @tag('sync_classification') | 25 | @tag('sync_classification') |
19 | @task | 26 | @task |
20 | def sync_classification(self): | 27 | def sync_classification(self): |
21 | img_path = '/home/lk/MyProject/BMW_F3OCR/数据集/文件分类/营业执照/授信资料-43.jpg' | 28 | img_path = random.choice(file_path_list) |
22 | files=[('image', ('', open(img_path,'rb'), ''))] | 29 | files=[('image', ('', open(img_path,'rb'), ''))] |
23 | self.client.post("/sync_classification", files=files) | 30 | self.client.post("/sync_classification", files=files) |
31 | |||
32 | @tag('async_classification') | ||
33 | @task | ||
34 | def async_classification(self): | ||
35 | img_path = random.choice(file_path_list) | ||
36 | files=[('image', ('', open(img_path,'rb'), ''))] | ||
37 | self.client.post("/async_classification", files=files) | ... | ... |
... | @@ -29,15 +29,68 @@ tf_serving_settings = { | ... | @@ -29,15 +29,68 @@ tf_serving_settings = { |
29 | app.config.update(tf_serving_settings) | 29 | app.config.update(tf_serving_settings) |
30 | 30 | ||
31 | 31 | ||
32 | @app.post("/sync_classification") | 32 | # 同步写法01 |
33 | async def sync_handler(request): | 33 | # @app.post("/sync_classification") |
34 | # async def sync_handler(request): | ||
35 | # image = request.files.get("image") | ||
36 | # img_array = np.frombuffer(image.body, np.uint8) | ||
37 | # image = cv2.imdecode(img_array, cv2.IMREAD_COLOR) | ||
38 | # input_images = classifier.preprocess_input(image) | ||
39 | # | ||
40 | # options = [('grpc.max_send_message_length', 1000 * 1024 * 1024), | ||
41 | # ('grpc.max_receive_message_length', 1000 * 1024 * 1024)] | ||
42 | # with grpc.insecure_channel('localhost:8500', options=options) as channel: | ||
43 | # stub = prediction_service_pb2_grpc.PredictionServiceStub(channel) | ||
44 | # # See prediction_service.proto for gRPC request/response details. | ||
45 | # request = predict_pb2.PredictRequest() | ||
46 | # request.model_spec.name = classifier.model_name | ||
47 | # request.model_spec.signature_name = classifier.signature_name | ||
48 | # | ||
49 | # request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images)) | ||
50 | # result = stub.Predict(request, timeout=100.0) # 100 secs timeout | ||
51 | # outputs = tf.make_ndarray(result.outputs['output']) | ||
52 | # | ||
53 | # res = classifier.reprocess_output(outputs) | ||
54 | # return json(res) | ||
55 | |||
56 | # 同步写法02 | ||
57 | # @app.post("/sync_classification") | ||
58 | # async def sync_handler(request): | ||
59 | # image = request.files.get("image") | ||
60 | # img_array = np.frombuffer(image.body, np.uint8) | ||
61 | # image = cv2.imdecode(img_array, cv2.IMREAD_COLOR) | ||
62 | # input_images = classifier.preprocess_input(image) | ||
63 | # | ||
64 | # # See prediction_service.proto for gRPC request/response details. | ||
65 | # request = predict_pb2.PredictRequest() | ||
66 | # request.model_spec.name = classifier.model_name | ||
67 | # request.model_spec.signature_name = classifier.signature_name | ||
68 | # stub = getattr(app, classifier.server_name) | ||
69 | # | ||
70 | # request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images)) | ||
71 | # result = stub.Predict(request, timeout=100.0) # 100 secs timeout | ||
72 | # outputs = tf.make_ndarray(result.outputs['output']) | ||
73 | # | ||
74 | # res = classifier.reprocess_output(outputs) | ||
75 | # return json(res) | ||
76 | # | ||
77 | # @app.listener("before_server_start") | ||
78 | # async def set_grpc_channel(app, loop): | ||
79 | # for server_name, server_settings in app.config['servers'].items(): | ||
80 | # channel = grpc.insecure_channel( | ||
81 | # '{0}:{1}'.format(server_settings['host'], server_settings['port']), | ||
82 | # options=server_settings.get('options')) | ||
83 | # stub = prediction_service_pb2_grpc.PredictionServiceStub(channel) | ||
84 | # setattr(app, server_name, stub) | ||
85 | |||
86 | # 异步写法 | ||
87 | @app.post("/async_classification") | ||
88 | async def async_handler(request): | ||
34 | image = request.files.get("image") | 89 | image = request.files.get("image") |
35 | img_array = np.frombuffer(image.body, np.uint8) | 90 | img_array = np.frombuffer(image.body, np.uint8) |
36 | image = cv2.imdecode(img_array, cv2.IMREAD_COLOR) | 91 | image = cv2.imdecode(img_array, cv2.IMREAD_COLOR) |
37 | input_images = classifier.preprocess_input(image) | 92 | input_images = classifier.preprocess_input(image) |
38 | 93 | ||
39 | # print(type(image)) | ||
40 | |||
41 | # See prediction_service.proto for gRPC request/response details. | 94 | # See prediction_service.proto for gRPC request/response details. |
42 | request = predict_pb2.PredictRequest() | 95 | request = predict_pb2.PredictRequest() |
43 | request.model_spec.name = classifier.model_name | 96 | request.model_spec.name = classifier.model_name |
... | @@ -45,23 +98,17 @@ async def sync_handler(request): | ... | @@ -45,23 +98,17 @@ async def sync_handler(request): |
45 | stub = getattr(app, classifier.server_name) | 98 | stub = getattr(app, classifier.server_name) |
46 | 99 | ||
47 | request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images)) | 100 | request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images)) |
48 | result = stub.Predict(request, 100.0) # 100 secs timeout | 101 | result = await stub.Predict(request, timeout=100.0) # 100 secs timeout |
49 | outputs = tf.make_ndarray(result.outputs['output']) | 102 | outputs = tf.make_ndarray(result.outputs['output']) |
50 | 103 | ||
51 | res = classifier.reprocess_output(outputs) | 104 | res = classifier.reprocess_output(outputs) |
52 | return json(res) | 105 | return json(res) |
53 | 106 | ||
54 | 107 | ||
55 | # @app.get("/async") | ||
56 | # async def async_handler(request): | ||
57 | # await asyncio.sleep(2) | ||
58 | # return json({'code': 1}) | ||
59 | |||
60 | |||
61 | @app.listener("before_server_start") | 108 | @app.listener("before_server_start") |
62 | async def set_grpc_channel(app, loop): | 109 | async def set_grpc_channel(app, loop): |
63 | for server_name, server_settings in app.config['servers'].items(): | 110 | for server_name, server_settings in app.config['servers'].items(): |
64 | channel = grpc.insecure_channel( | 111 | channel = grpc.aio.insecure_channel( |
65 | '{0}:{1}'.format(server_settings['host'], server_settings['port']), | 112 | '{0}:{1}'.format(server_settings['host'], server_settings['port']), |
66 | options=server_settings.get('options')) | 113 | options=server_settings.get('options')) |
67 | stub = prediction_service_pb2_grpc.PredictionServiceStub(channel) | 114 | stub = prediction_service_pb2_grpc.PredictionServiceStub(channel) |
... | @@ -69,4 +116,4 @@ async def set_grpc_channel(app, loop): | ... | @@ -69,4 +116,4 @@ async def set_grpc_channel(app, loop): |
69 | 116 | ||
70 | 117 | ||
71 | if __name__ == '__main__': | 118 | if __name__ == '__main__': |
72 | app.run(host='0.0.0.0', port=6699, workers=5) | 119 | app.run(host='0.0.0.0', port=6699, workers=10) | ... | ... |
-
Please register or sign in to post a comment