server3.py 2.84 KB
import os
import cv2
import grpc
import numpy as np
import tensorflow as tf
from tensorflow_serving.apis import prediction_service_pb2_grpc, predict_pb2

from sanic import Sanic
from sanic.response import json

from classification import classifier
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'

app = Sanic("async_test")

# TODO 从配置文件读取
tf_serving_settings = {
    'options': [('grpc.max_send_message_length', 1000 * 1024 * 1024),
                ('grpc.max_receive_message_length', 1000 * 1024 * 1024)],
    'host_port': 'localhost:8500', 
}
app.config.update(tf_serving_settings)


# 同步写法01
@app.post("/sync_classification")
async def sync_handler(request):
    image = request.files.get("image")
    img_array = np.frombuffer(image.body, np.uint8)
    image = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
    input_images = classifier.preprocess_input(image)

    res_list = []
    for _ in range(5):
    # for _ in range(1):
        with grpc.insecure_channel(app.config['host_port'], options=app.config['options']) as channel:
            stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
            # See prediction_service.proto for gRPC request/response details.
            request = predict_pb2.PredictRequest()
            request.model_spec.name = classifier.model_name
            request.model_spec.signature_name = classifier.signature_name

            request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images))
            result = stub.Predict(request, timeout=100.0)  # 100 secs timeout
            outputs = tf.make_ndarray(result.outputs['output'])

        res = classifier.reprocess_output(outputs)
        res_list.append(res)
    return json(res_list)

# 异步写法01
@app.post("/async_classification")
async def async_handler(request):
    image = request.files.get("image")
    img_array = np.frombuffer(image.body, np.uint8)
    image = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
    input_images = classifier.preprocess_input(image)

    res_list = []
    for _ in range(5):
    # for _ in range(1):
        channel = grpc.aio.insecure_channel(app.config['host_port'], options=app.config['options'])
        stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
        # See prediction_service.proto for gRPC request/response details.
        request = predict_pb2.PredictRequest()
        request.model_spec.name = classifier.model_name
        request.model_spec.signature_name = classifier.signature_name
 
        request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images))
        result = await stub.Predict(request, timeout=100.0)  # 100 secs timeout
        outputs = tf.make_ndarray(result.outputs['output'])

        res = classifier.reprocess_output(outputs)
        res_list.append(res)
    return json(res_list)


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=6699, workers=10)