utils.py 5.84 KB
# -*- coding: utf-8 -*-
# @Author        : lk
# @Email         : 9428.al@gmail.com
# @Create Date   : 2022-04-19 17:24:28
# @Last Modified : 2022-04-23 15:07:48
# @Description   : 

import os
import cv2
import time
import numpy as np
import tensorflow as tf


def MobileUNet():
    def dw_conv_bn_relu(inputs, strides):
        kernel_init = tf.initializers.RandomNormal(0.0, 0.01)
        x = tf.keras.layers.DepthwiseConv2D(kernel_size=3, 
                                            strides=strides,
                                            padding="same",
                                            kernel_initializer=kernel_init)(inputs)
        x = tf.keras.layers.BatchNormalization(axis=-1, momentum=0.99, epsilon=0.001)(x)
        x = tf.keras.layers.ReLU(6.)(x)
        return x

    def deconv_bn_relu(inputs, filters, strides):
        kernel_init = tf.initializers.RandomNormal(0.0, 0.01)
        x = tf.keras.layers.Conv2DTranspose(filters,
                                            kernel_size=3, 
                                            strides=strides,
                                            padding="same",
                                            kernel_initializer=kernel_init)(inputs)
        x = tf.keras.layers.BatchNormalization(axis=-1, momentum=0.99, epsilon=0.001)(x)
        x = tf.keras.layers.ReLU(6.)(x)
        return x

    def build_head(inputs, output_filters, bias_init):
        kernel_init = tf.initializers.RandomNormal(0.0, 0.01)
        x = dw_conv_bn_relu(inputs, strides=1)
        x = dw_conv_bn_relu(x, strides=1)
        x = dw_conv_bn_relu(x, strides=1)
        x = dw_conv_bn_relu(x, strides=1)
        x = tf.keras.layers.Conv2D(output_filters,
                                   kernel_size=1, 
                                   strides=1,
                                   padding="same",
                                   kernel_initializer=kernel_init,
                                   bias_initializer=bias_init)(x)
        return x

    mobilenetv2 = tf.keras.applications.MobileNetV2(
        include_top=False, input_shape=[1024, 1024, 3], alpha=1.0
    )

    c3_output, c4_output, c5_output = [
        mobilenetv2.get_layer(layer_name).output
        for layer_name in ["block_5_add", "block_12_add", "block_15_add"]
    ]

    c6_output = dw_conv_bn_relu(c5_output, strides=2)
    c7_output = dw_conv_bn_relu(c6_output, strides=2)

    # FPN
    c7_output_up = deconv_bn_relu(c7_output, filters=c6_output.shape[-1], strides=2)
    c6_output = tf.keras.layers.Add()([c6_output, c7_output_up])

    c6_output_up = deconv_bn_relu(c6_output, filters=c5_output.shape[-1], strides=2)
    c5_output = tf.keras.layers.Add()([c5_output, c6_output_up])

    c5_output_up = deconv_bn_relu(c5_output, filters=c4_output.shape[-1], strides=2)
    c4_output = tf.keras.layers.Add()([c4_output, c5_output_up])

    c4_output_up = deconv_bn_relu(c4_output, filters=c3_output.shape[-1], strides=2)
    c3_output = tf.keras.layers.Add()([c3_output, c4_output_up])

    quarter_maps = deconv_bn_relu(c3_output, filters=256, strides=2)
    a_half_maps = deconv_bn_relu(quarter_maps, filters=128, strides=2)
    one_one_maps = deconv_bn_relu(a_half_maps, filters=128, strides=2)

    prob_maps = tf.keras.layers.Conv2D(filters=1, kernel_size=1, strides=1, activation='sigmoid', name='probability_maps_layer')(one_one_maps)
    thre_maps = tf.keras.layers.Conv2D(filters=1, kernel_size=1, strides=1, activation='sigmoid', name='threshold_maps_layer')(one_one_maps)

    out = tf.keras.layers.Concatenate(axis=-1)([prob_maps, thre_maps])

    # binary_maps = 1 / (1 + tf.math.exp(-50 * (prob_maps - thre_maps)))

    model = tf.keras.Model(inputs=mobilenetv2.inputs, outputs=out)
    return model


class DBNetLoss(tf.losses.Loss):
    """Wrapper to combine both the losses"""

    def __init__(self):
        super(DBNetLoss, self).__init__(reduction="auto", name="DBNetLoss")
        self._bce_loss = tf.keras.losses.BinaryCrossentropy(from_logits=False, reduction=tf.keras.losses.Reduction.NONE)

    def call(self, y_true, y_pred):
        prob_map_label, thre_map_label, binary_map_label = tf.split(y_true, num_or_size_splits=3, axis=3)
        prob_map_pred, thre_map_pred = tf.split(y_pred, num_or_size_splits=2, axis=3)
        binary_map_pred = 1 / (1 + tf.math.exp(-50 * (prob_map_pred - thre_map_pred)))

        prob_loss = self._bce_loss(prob_map_label, prob_map_pred)
        binary_loss = self._bce_loss(binary_map_label, binary_map_pred)

        loss = 10 * prob_loss + binary_loss
        return loss


def resize_with_padding(src, limit_max=1024):
    '''限制长边不大于 limit_max 短边等比例缩放,以 0 填充'''
    img = src.copy()

    h, w, _ = img.shape
    max_side = max(h, w)
    ratio = limit_max / max_side if max_side > limit_max else 1
    h, w = int(h * ratio), int(w * ratio)
    proc = cv2.resize(img, (w, h))

    canvas = np.zeros((limit_max, limit_max, 3), dtype=np.float32)
    canvas[0:h, 0:w, :] = proc
    return canvas, ratio


if __name__ == '__main__':

    model = MobileUNet()
    model.summary()

    model.save('model.h5')
    
    file_dir = '/home/lk/MyProject/BMW/数据集/文件分类/其他/其他'

    mean_resize_time = list()
    mean_infer_time = list()

    for fn in os.listdir(file_dir):
        file_path = os.path.join(file_dir, fn)

        image = cv2.imread(file_path)

        t0 = time.time()
        image, ratio = resize_with_padding(image)
        inputs = np.expand_dims(np.float32(image/255.), axis=0)
        t1 = time.time()
        mean_resize_time.append(t1-t0)

        # print(int(np.mean(mean_resize_time)*1000))

        # save_path = os.path.join('./tmp', fn)
        # cv2.imwrite(save_path, image)

        t2 = time.time()
        predictions = model.predict(inputs)
        t3 = time.time()
        mean_infer_time.append(t3-t2)

        print(int((t3-t2)*1000))
        # print(int(np.mean(mean_infer_time)*1000))