a33165ce by 周伟奇

add async test

1 parent 158a5293
from .model import F3Classification
from .const import CLASS_CN_LIST, CLASS_OTHER_FIRST
classifier = F3Classification(
class_name_list=CLASS_CN_LIST,
class_other_first=CLASS_OTHER_FIRST
)
CLASS_OTHER_CN = '其他'
CLASS_OTHER_FIRST = True
# CLASS_CN_LIST = [CLASS_OTHER_CN, '身份证', '营业执照', '经销商授权书', '个人授权书']
CLASS_CN_LIST = [CLASS_OTHER_CN, '营业执照', '经销商授权书', '个人授权书']
OTHER_THRESHOLDS = 0.5
import tensorflow as tf
class F3Classification:
def __init__(self, class_name_list, class_other_first, *args, **kwargs):
self.class_name_list = class_name_list
self.class_count = len(class_name_list) if not class_other_first else len(class_name_list) - 1
self.model_name = 'classification_model'
self.signature_name = 'serving_default'
self.server_name = 'server_1'
@staticmethod
def preprocess_input(image):
image = tf.image.resize(image, [224, 224])
image = tf.keras.applications.mobilenet_v2.preprocess_input(image)
input_images = tf.expand_dims(image, axis=0)
return input_images
def reprocess_output(self, outputs, thresholds=0.5):
for output in outputs:
idx = tf.math.argmax(output)
confidence = output[idx]
if confidence < thresholds:
idx = -1
label = self.class_name_list[idx + 1]
break
res = {
'label': label,
'confidence': confidence
}
return res
import time
from locust import HttpUser, task, between, constant, tag
class QuickstartUser(HttpUser):
# wait_time = between(1, 5)
@tag('sync')
@task
def sync_test(self):
self.client.get("/sync")
@tag('async')
@task
def async_test(self):
self.client.get("/async")
\ No newline at end of file
sanic==22.6.0
locust==2.10.1
import asyncio
import time
from sanic import Sanic
from sanic.response import json
app = Sanic("async_test")
@app.get("/sync")
async def sync_handler(request):
time.sleep(2)
return json({'code': 1})
@app.get("/async")
async def async_handler(request):
await asyncio.sleep(2)
return json({'code': 1})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=1337, workers=5)
import grpc
import cv2
import numpy as np
import tensorflow as tf
from tensorflow_serving.apis import prediction_service_pb2_grpc, predict_pb2
from sanic import Sanic
from sanic.response import json
from classification import classifier
app = Sanic("async_test")
# TODO 从配置文件读取
tf_serving_settings = {
'servers': {
'server_1': {
'host': '192.168.10.191',
'port': '8500',
'options': [
('grpc.max_send_message_length', 1000 * 1024 * 1024),
('grpc.max_receive_message_length', 1000 * 1024 * 1024),
],
}
},
}
app.config.update(tf_serving_settings)
@app.post("/sync_classification")
async def sync_handler(request):
image = request.files.get("image")
img_array = np.frombuffer(image.body, np.uint8)
image = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
input_images = classifier.preprocess_input(image)
# print(type(image))
# See prediction_service.proto for gRPC request/response details.
request = predict_pb2.PredictRequest()
request.model_spec.name = classifier.model_name
request.model_spec.signature_name = classifier.signature_name
stub = getattr(app.ctx, classifier.server_name)
request.inputs['input_1'].CopyFrom(tf.make_tensor_proto(input_images))
result = stub.Predict(request, 100.0) # 100 secs timeout
outputs = tf.make_ndarray(result.outputs['output'])
res = classifier.reprocess_output(outputs)
return json(res)
# @app.get("/async")
# async def async_handler(request):
# await asyncio.sleep(2)
# return json({'code': 1})
@app.listener("before_server_start")
async def set_grpc_channel(app, loop):
for server_name, server_settings in app.config['servers'].items():
channel = grpc.insecure_channel(
'{0}:{1}'.format(server_settings['host'], server_settings['port']),
options=server_settings.get('options'))
stub = prediction_service_pb2_grpc.PredictionServiceStub(channel)
setattr(app.ctx, server_name, stub)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=6699, workers=5)
......@@ -9,5 +9,5 @@ classifier = F3Classification(
)
classifier.load_model(load_weights_path=os.path.join(
os.path.dirname(os.path.abspath(__file__)), 'ckpt_prod.h5'))
os.path.dirname(os.path.abspath(__file__)), 'ckpt_best_0.5.h5'))
......
import os
import random
import cv2
import tensorflow as tf
import tensorflow_addons as tfa
......@@ -23,10 +24,9 @@ class F3Classification(BaseModel):
self.model = None
@staticmethod
def gpu_config():
def gpu_config(gpu_idx=0):
gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
# print(gpus)
tf.config.set_visible_devices(devices=gpus[1], device_type='GPU')
tf.config.set_visible_devices(devices=gpus[gpu_idx], device_type='GPU')
@staticmethod
def get_class_label_map(class_name_list, class_other_first=False):
......@@ -53,15 +53,20 @@ class F3Classification(BaseModel):
@staticmethod
# @tf.function
def random_rgb_2_bgr(image, label):
# 1/5
if random.random() < 0.1:
# 1/2
if random.random() < 0.5:
image = image[:, :, ::-1]
return image, label
@staticmethod
# @tf.function
def rgb_2_bgr(image, label):
image = image[:, :, ::-1]
return image, label
@staticmethod
# @tf.function
def random_grayscale_expand(image, label):
# 1/10
if random.random() < 0.1:
image = tf.image.rgb_to_grayscale(image)
image = tf.image.grayscale_to_rgb(image)
......@@ -69,22 +74,19 @@ class F3Classification(BaseModel):
@staticmethod
def random_flip_left_right(image, label):
# 1/10
if random.random() < 0.2:
image = tf.image.random_flip_left_right(image)
# if random.random() < 0.2:
image = tf.image.random_flip_left_right(image)
return image, label
@staticmethod
def random_flip_up_down(image, label):
# 1/10
if random.random() < 0.2:
image = tf.image.random_flip_up_down(image)
# if random.random() < 0.2:
image = tf.image.random_flip_up_down(image)
return image, label
@staticmethod
def random_rot90(image, label):
# 1/10
if random.random() < 0.1:
if random.random() < 0.3:
image = tf.image.rot90(image, k=random.randint(1, 3))
return image, label
......@@ -93,13 +95,15 @@ class F3Classification(BaseModel):
def load_image(image_path, label):
image = tf.io.read_file(image_path)
# image = tf.image.decode_image(image, channels=3) # TODO ?
image = tf.image.decode_png(image, channels=3)
# image = tf.image.decode_png(image, channels=3)
image = tf.image.decode_jpeg(image, channels=3, dct_method='INTEGER_ACCURATE')
image = tf.image.resize(image, [224, 224])
return image, label
@staticmethod
# @tf.function
def preprocess_input(image, label):
image = tf.image.resize(image, [224, 224])
# image = tf.image.resize(image, [224, 224])
image = applications.mobilenet_v2.preprocess_input(image)
return image, label
......@@ -131,7 +135,9 @@ class F3Classification(BaseModel):
base_model = MobileNetV2(
input_shape=(224, 224, 3),
alpha=0.35,
# alpha=0.35,
alpha=0.5,
# alpha=1,
include_top=False,
weights='imagenet',
pooling='avg',
......@@ -143,18 +149,15 @@ class F3Classification(BaseModel):
x = layers.Dense(self.class_count, activation='sigmoid', name='output')(x)
self.model = models.Model(inputs=base_model.input, outputs=x)
if for_training:
if isinstance(load_weights_path, str) and os.path.isfile(load_weights_path):
self.model.load_weights(load_weights_path, by_name=True, skip_mismatch=True)
elif for_training:
freeze = True
for layer in self.model.layers:
layer.trainable = not freeze
if freeze and layer.name == 'block_16_project_BN':
freeze = False
if isinstance(load_weights_path, str):
if not os.path.isfile(load_weights_path):
raise Exception('load_weights_path can not find')
self.model.load_weights(load_weights_path, by_name=True, skip_mismatch=True)
def train(self,
dataset_dir,
epoch,
......@@ -167,13 +170,14 @@ class F3Classification(BaseModel):
thresholds=0.5,
metrics_name='accuracy'):
self.gpu_config()
self.gpu_config(1)
self.load_model(for_training=True, load_weights_path=load_weights_path)
self.model.summary()
self.model.compile(
optimizer=optimizers.Adam(learning_rate=3e-4),
# optimizer=optimizers.Adam(learning_rate=3e-4),
optimizer=optimizers.Adam(learning_rate=1e-4),
loss=tfa.losses.SigmoidFocalCrossEntropy(), # TODO ?
metrics=[CustomMetric(thresholds, name=metrics_name), ],
......@@ -193,7 +197,8 @@ class F3Classification(BaseModel):
'random_flip_left_right',
'random_flip_up_down',
'random_rot90',
'random_rgb_2_bgr',
# 'random_rgb_2_bgr',
# 'rgb_2_bgr',
'random_grayscale_expand'
],
)
......@@ -201,16 +206,21 @@ class F3Classification(BaseModel):
dataset_dir=os.path.join(dataset_dir, validate_dir_name),
name=validate_dir_name,
batch_size=batch_size,
augmentation_methods=[]
augmentation_methods=[
'rgb_2_bgr'
]
)
ckpt_callback = callbacks.ModelCheckpoint(ckpt_path, save_best_only=True)
es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
history = self.model.fit(
train_dataset,
epochs=epoch,
validation_data=validate_dataset,
callbacks=[ckpt_callback, ],
callbacks=[ckpt_callback,
# es_callback
],
)
history_save(history, history_save_path, metrics_name)
......@@ -222,7 +232,7 @@ class F3Classification(BaseModel):
batch_size,
validate_dir_name='test',
thresholds=0.5):
self.gpu_config()
self.gpu_config(3)
self.load_model(load_weights_path=load_weights_path)
self.model.summary()
......@@ -231,7 +241,9 @@ class F3Classification(BaseModel):
dataset_dir=os.path.join(dataset_dir, validate_dir_name),
name=validate_dir_name,
batch_size=batch_size,
augmentation_methods=[]
augmentation_methods=[
'rgb_2_bgr'
]
)
label_true_list = []
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!