前言

看不懂别人的代码,自己实现一遍经典网络,熟悉keras api
水篇博客
没有训练测试过网络的效果,直接拿去用可能会出问题!!!

本意是了解如何自己构建网络,以防日后的模型迁移要再学一遍. 不要问为什么我知道要重学

可能网络会有错误,但是无伤大雅,知道如何构建就行 反正以后经典网络可以直接导入

不过在之前要先了解一下模型保存不同格式的区别以防模型实现了不会保存

模型保存

TF官网

Save_model格式

这个是最简单粗暴的模型保存方法了。

保存的模型将包括:

  • 模型的架构/配置
  • 模型的权重值(在训练过程中学习)
  • 模型的编译信息(如果调用了 compile()
  • 优化器及其状态(如果有的话,使您可以从上次中断的位置重新开始训练)
# 保存为dirname_path路径下文件名为dirname的文件夹

model.save(dirname_path)

H5格式

Keras 还支持保存单个 HDF5 文件,其中包含模型的架构、权重值和 compile() 信息。它是 SavedModel 的轻量化替代选择。

但是同时因为只有一个h5文件与 SavedModel 格式相比,H5 文件不包括以下两方面内容:

  • 通过 model.add_loss()model.add_metric() 添加的外部损失和指标不会被保存(这与 SavedModel 不同)。如果您的模型有此类损失和指标且您想要恢复训练,则您需要在加载模型后自行重新添加这些损失。请注意,这不适用于通过 self.add_loss()self.add_metric() 在层内创建的损失指标。只要该层被加载,这些损失和指标就会被保留,因为它们是该层 call 方法的一部分。
  • 已保存的文件中不包含自定义对象(如自定义层)的计算图
# 只需要在文件名后加.h5后缀即可
model.save(name.h5)

 checkpoints

保存时附带签名

class Model(tf.keras.Model):

    @tf.function
    def call(self, x):
      ...

  m = Model()
  tf.saved_model.save(
      m, '/tmp/saved_model/',
      signatures=m.call.get_concrete_function(
          tf.TensorSpec(shape=[None, 3], dtype=tf.float32, name="inp")))

ResNet

手写实现

不同的ResNet只有结构不同,unit是相同的只需要改变layer_dims就可以实现了

这里使用重写类来构建网络,虽然要写前向传播比较麻烦,但是自由度更高

import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'

import numpy as np
import tensorflow as tf
from  tensorflow import keras
import tensorboard
from tensorflow.keras import layers
import datetime
from matplotlib import pyplot as plt


BATCHSIZE=64

class BasicBlock(layers.Layer):
    def __init__(self,filter_num,stride=1):
        super(BasicBlock,self).__init__()
        self.conv1=layers.Conv2D(kernel_size=(3,3) ,filters=filter_num,strides=stride,padding='same')
        self.bn1=layers.BatchNormalization()
        self.relu=layers.Activation('relu')

        self.conv2=layers.Conv2D(kernel_size=(3,3) ,filters=filter_num,strides=1,padding='same')
        self.bn2 = layers.BatchNormalization()

        if(stride!=1):
            self.downsample = keras.Sequential()
            self.downsample.add(layers.Conv2D(filters=filter_num,kernel_size=(1,1),strides=stride))
        else:
            self.downsample=lambda x:x

    def call(self,inputs,training=None):

        out=self.conv1(inputs)
        out=self.bn1(out)
        out=self.relu(out)
        out=self.conv2(out)
        out=self.bn2(out)

        identity=self.downsample(inputs)
        output=layers.add([out,identity])
        output=self.relu(output)
        return output

class ResNet(keras.Model):
    def __init__(self,layer_dims,num_classes=100):
        super(ResNet, self).__init__()
        # self.flatten=layers.Flatten(input_shape=(32,32,3))
        self.stem=keras.Sequential([
            layers.Conv2D(64,(3,3),strides=(1,1)),
            layers.BatchNormalization(),
            layers.Activation('relu'),
            layers.MaxPool2D(pool_size=(2,2),strides=(1,1),padding='same')
        ])
        self.layer1=self.build_resblock(filter_num=64,blocks=layer_dims[0])
        self.layer2=self.build_resblock(filter_num=128,blocks=layer_dims[1],stride=2)
        self.layer3=self.build_resblock(filter_num=256,blocks=layer_dims[2],stride=2)
        self.layer4=self.build_resblock(filter_num=512,blocks=layer_dims[3],stride=2)

        self.avgpool=layers.GlobalAveragePooling2D()
        self.fc=layers.Dense(num_classes)

    def call(self,inputs,training=None):
        # x = tf.reshape(inputs, [-1, 32 * 32*3])
        out=self.stem(inputs)
        out=self.layer1(out)
        out=self.layer2(out)
        out=self.layer3(out)
        out=self.layer4(out)
        out=self.avgpool(out)
        out=self.fc(out)

        return out
    def build_resblock(self,filter_num,blocks,stride=1):
        res_block=keras.Sequential()
        res_block.add(BasicBlock(filter_num,stride))

        for _ in range(1,blocks):
            res_block.add(BasicBlock(filter_num,stride=1))
        return  res_block

def resnet18():
    return ResNet(layer_dims=[2,2,2,2])

def preprocess(x,y):
    print('pre:', x.shape, y.shape)
    x=tf.cast(x,dtype=tf.float32)/255.0
    y=tf.cast(y,dtype=tf.int32)
    y = tf.squeeze(y)
    y=tf.one_hot(y,depth=100)

    print('after', x.shape, y.shape)

    return x,y

def data2tensor(x,y):

    db=tf.data.Dataset.from_tensor_slices((x,y))
    db=db.map(preprocess)
    db=db.shuffle(5000).batch(BATCHSIZE)

    return db

def train_model(train_db,val_db,is_train=False):
    model = resnet18()

    model.build(input_shape=(None, 32, 32, 3))
    model.summary()
    x=tf.random.normal([4,32,32,3])
    out=model(x)
    print(out.shape)

    model.compile(
        optimizer=tf.optimizers.Adam(),
        loss=tf.losses.CategoricalCrossentropy(from_logits=True),
        metrics=['accuracy'],
    )


    path=os.path.abspath('./')
    log_dir = path + '\\logs\\' + datetime.datetime.now().strftime("%Y%m%d-%H%M")
    print(log_dir)
    tensorboard=keras.callbacks.TensorBoard(log_dir=log_dir,histogram_freq=1)
    model.summary()
    if is_train:

        model.fit(train_db,validation_data=val_db,validation_freq=1,epochs=5,callbacks=[tensorboard])

        model.save_weights('./resnet18.h5')


def main():
    (x,y),(x_test,y_test)=keras.datasets.cifar100.load_data()

    l=int(len(x)*0.8)
    train_db=data2tensor(x[:l],y[:l])
    val_db=data2tensor(x[l:],y[l:])
    test_db=data2tensor(x_test,y_test)

    # sample=next(iter(train_db))
    # print(sample[0].shape,sample[1].shape)
    # plt.imshow(sample[0])
    # plt.show()
    train_model(train_db,val_db,is_train=False)

main()

自带api实现

因为application里面都有,功能都类似故后面不再赘述

tf.kears.application.resnet50.Resnet50与tf.kears.application.Resnet50的功能都一样
from tensorflow.keras.applications import *

model=ResNet50(weights='./resnet50_weights_tf_dim_ordering_tf_kernels.h5')
path='./cat.jpg'
# 读入图片
image=image_preprocess.img_decoder(path)

pre1=model.predict(image)
# 这个能使标签对应起来
pre=resnet50.decode_predictions(pre1)
print(pre)

VGG

​ 这里使用keras的高级api来构建网络,当然使用Sequential也可以实现同样的效果.

# -*- coding:utf-8 -*-
# @Author : Dummerfu
# @Contact : https://github.com/dummerchen 
# @Time : 2021/2/19 20:48

import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
import datetime
import  tensorflow as tf
from  tensorflow import  keras


BATCHSIZE=32


def preprocess(x,y):
    print('pre',x.shape,y.shape)

    x=2*tf.cast(x,dtype=tf.float32)/255.0 -1
    y = tf.squeeze(y)
    y=tf.cast(y,dtype=tf.int32)
    y=tf.one_hot(y,depth=100)
    print('after:',x.shape,y.shape)
    return  x,y

def data2tensor(x,y):

    db=tf.data.Dataset.from_tensor_slices((x,y))
    db=db.map(preprocess)
    db=db.shuffle(5000).batch(BATCHSIZE)

    return db

def VGG(image_shape,n_class):

    print(image_shape[0],image_shape[1],image_shape[2])
    inputs = keras.Input(shape=[image_shape[0],image_shape[1],image_shape[2]])

    x=keras.layers.Conv2D(64, kernel_size=[3, 3], strides=[1, 1], activation=keras.activations.relu,padding='same')(inputs)
    x=keras.layers.Conv2D(64, kernel_size=[3, 3], strides=[1, 1], activation=keras.activations.relu,padding='same')(x)
    x=keras.layers.MaxPooling2D(pool_size=[2, 2], strides=[2, 2], padding='same')(x)

    x= keras.layers.Conv2D(128, kernel_size=[3, 3], strides=[1, 1], activation=keras.activations.relu,padding='same')(x)
    x= keras.layers.Conv2D(128, kernel_size=[3, 3], strides=[1, 1], activation=keras.activations.relu,padding='same')(x)
    x= keras.layers.MaxPooling2D(pool_size=[2, 2], strides=[2, 2], padding='same')(x)

    x= keras.layers.Conv2D(256, kernel_size=[3, 3], strides=[1, 1], activation=keras.activations.relu,padding='same')(x)
    x= keras.layers.Conv2D(256, kernel_size=[3, 3], strides=[1, 1], activation=keras.activations.relu,padding='same')(x)
    x= keras.layers.Conv2D(256, kernel_size=[3, 3], strides=[1, 1], activation=keras.activations.relu,padding='same')(x)
    x= keras.layers.Conv2D(256, kernel_size=[3, 3], strides=[1, 1], activation=keras.activations.relu,padding='same')(x)
    x= keras.layers.MaxPooling2D(pool_size=[2, 2], strides=[2, 2], padding='same')(x)

    x= keras.layers.Conv2D(512, kernel_size=[3, 3], strides=[1, 1], activation=keras.activations.relu,padding='same')(x)
    x= keras.layers.Conv2D(512, kernel_size=[3, 3], strides=[1, 1], activation=keras.activations.relu,padding='same')(x)
    x= keras.layers.Conv2D(512, kernel_size=[3, 3], strides=[1, 1], activation=keras.activations.relu,padding='same')(x)
    x= keras.layers.Conv2D(512, kernel_size=[3, 3], strides=[1, 1], activation=keras.activations.relu,padding='same')(x)
    x= keras.layers.MaxPooling2D(pool_size=[2, 2], strides=[2, 2], padding='same')(x)

    x= keras.layers.Conv2D(512, kernel_size=[3, 3], strides=[1, 1], activation=keras.activations.relu,padding='same')(x)
    x= keras.layers.Conv2D(512, kernel_size=[3, 3], strides=[1, 1], activation=keras.activations.relu,padding='same')(x)
    x= keras.layers.Conv2D(512, kernel_size=[3, 3], strides=[1, 1], activation=keras.activations.relu,padding='same')(x)
    x= keras.layers.Conv2D(512, kernel_size=[3, 3], strides=[1, 1], activation=keras.activations.relu,padding='same')(x)
    x= keras.layers.MaxPooling2D(pool_size=[2, 2], strides=[2, 2], padding='same')(x)

    x= keras.layers.Flatten()(x)
    x= keras.layers.Dense(4096, activation=keras.activations.relu, use_bias=True)(x)
    x= keras.layers.Dense(4096, activation=keras.activations.relu, use_bias=True)(x)

    outputs= keras.layers.Dense(n_class, activation=keras.activations.softmax, use_bias=True)(x)
    # 基于Model方法构建模型
    model = keras.Model(inputs=inputs, outputs=outputs)

    return model

def train(train_db,val_db,is_train=False):
    model = VGG([32,32,3],n_class=100)

    model.compile(
        optimizer=tf.optimizers.Adam(),
        loss=tf.losses.CategoricalCrossentropy(),
        metrics=['accuracy'],
    )

    path = os.path.abspath('./')
    log_dir = path + '\\logs\\' + datetime.datetime.now().strftime("%Y%m%d-%H%M")
    print(log_dir)
    tensorboard = keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)
    model.summary()
    if is_train:

        model.fit(train_db, validation_data=val_db, validation_freq=1, epochs=5, callbacks=[tensorboard])

        model.save_weights('./vgg16.h5')


(x,y),(x_test,y_test)=keras.datasets.cifar100.load_data()
print('pre',x.shape,y.shape)

train_db=data2tensor(x,y)
test_db=data2tensor(x_test,y_test)

train(train_db=train_db,val_db=test_db,is_train=False)

LSTM

layers.lstmcell和layers.lstm传参是不一样的

前者需要手动更新state参数($h{t-1}$,$c{t-1}$)但是后者自动更新,如果需要多层叠加则需要设置return_sequence=True , unroll=True

# -*- coding:utf-8 -*-
# @Author : Dummerfu
# @Contact : https://github.com/dummerchen 
# @Time : 2021/2/21 17:46

import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'

import numpy as np
import matplotlib as mpl
from matplotlib import pyplot as plt

mpl.rcParams['font.sans-serif'] = 'SimHei'
mpl.rcParams['axes.unicode_minus'] = False

import tensorflow as tf
from tensorflow import keras

# 最常见的前20000个单词
max_features=20000

# 一句话的最大长度
max_len=100
batchsize=64

class Mylstm(keras.Model):
    def __init__(self,units):
        super(Mylstm,self).__init__()

        # [b,100] => [b,100,100]

        self.embeding=keras.layers.Embedding(input_dim=max_features,input_length=max_len,output_dim=100)
        self.rnn=keras.Sequential([
            keras.layers.LSTM(units=units,dropout=0.5,return_sequences=True,unroll=True),
            keras.layers.LSTM(units=units,dropout=0.5,unroll=True)
        ])

        self.fc=keras.layers.Dense(1,activation=keras.activations.sigmoid)

    def call(self,inputs,training=None):

        # [b,100] => [b,100,100]
        x=self.embeding(inputs)
        print(x.shape)
        # [b,100,100] => [b,64]
        x=self.rnn(x)
        x=self.fc(x)

        return x


def data2tensor(x,y):
    x=keras.preprocessing.sequence.pad_sequences(sequences=x,maxlen=max_len)
    x=tf.cast(x,dtype=tf.int32)
    y=tf.cast(y,dtype=tf.int32)

    print(x.shape,y.shape)

    db=tf.data.Dataset.from_tensor_slices((x,y)).shuffle(10000).batch(batchsize,drop_remainder=True)
    return db

def train(db_train,db_val,db_test):

    model=Mylstm(64)
    model.compile(
        optimizer=tf.optimizers.Adam(),
        loss=tf.losses.BinaryCrossentropy(),
        metrics=['accuracy'],
    )
    model.fit(db_train,epochs=5,validation_data=db_val,validation_freq=1)

    model.evaluate(db_test)
    return

def main():
    (x,y),(x_test,y_test)=keras.datasets.imdb.load_data(num_words=max_features)

    l=int(len(x)*0.8)
    db_train=data2tensor(x[:l],y[:l])
    db_val=data2tensor(x[l:],y[l:])
    db_test=data2tensor(x_test,y_test)


    train(db_train,db_val,db_test)


if __name__ == "__main__":
    main()

AutoEncoder|VAE

这里是自定义训练,当然相比之下更复杂但是自由度也更高。

autoencoder就是两个自定义网络,先降维得到特征向量h,再升到原本维度就行了没什么技术含量,就不写了,关键是它的思路非常具有启发性。

这里要注意的是mean,var Dense是两个Dense,即使计算方式一样但是要用两Dense,如果一个Dense算两次因为权重的原因结果是相同的,直接会导致图片越来越暗。

先附上tf.nn的几种损失函数区别再附代码

# -*- coding:utf-8 -*-
# @Author : Dummerfu
# @Contact : https://github.com/dummerchen 
# @Time : 2021/2/22 21:55

import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
import matplotlib as mpl
from matplotlib import pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from PIL import Image
import numpy as np
mpl.rcParams['font.sans-serif'] = 'SimHei'
mpl.rcParams['axes.unicode_minus'] = False

tf.random.set_seed(2345)
# autoencoder 计算量很小batch可以大一点
batch_size=512
# 特征维数
z_dims=20


class VAE(keras.Model):
    def __init__(self):
        super(VAE,self).__init__()

        #encoder
        self.encoder=keras.Sequential([
            keras.layers.InputLayer(input_shape=(28*28)),
            keras.layers.Dense(128),
        ])

        self.meanfc=keras.layers.Dense(z_dims)
        self.varfc=keras.layers.Dense(z_dims)
        #decoder
        self.decoder=keras.Sequential([
            keras.layers.Dense(128, activation=tf.nn.relu),
            keras.layers.Dense(784),
        ])

    def reparamize(self,mean,log_var):

        eps=tf.random.normal(log_var.shape)
        z=mean+eps*tf.exp(log_var*0.5)
        return z

    def call(self,inputs,training=None):
        h=self.encoder(inputs)

        mean=self.meanfc(h)
        log_var=self.varfc(h)

        z=self.reparamize(mean,log_var)

        outputs=self.decoder(z)

        return outputs,mean,log_var

def data2tensor(x,y):

    x=tf.cast(x,dtype=tf.float32)/255.0
    db=tf.data.Dataset.from_tensor_slices(x)
    db=db.shuffle(batch_size*5).batch(batch_size)
    return db

def save_images(imgs,name):
    new_im = Image.new('L', (280, 280))

    index = 0
    for i in range(0, 280, 28):
        for j in range(0, 280, 28):
            im = imgs[index]
            im = Image.fromarray(im, mode='L')
            new_im.paste(im, (i, j))
            index += 1
    new_im.save(name)

def train_and_test(db_train,db_test):

    model=VAE()
    # model.build(input_shape=(4,784))
    optimizer=tf.optimizers.Adam()
    for epoch in range(100):
        for step,x in enumerate(db_train):
            # print(x.shape)
            x=tf.reshape(x,[-1,784])

            with tf.GradientTape() as tape:
                x_hat,mean,log_var=model(x)
                # 这里使用的这个loss是为了更好的收敛,使用其他的也行,但是要多训练
                redu_loss=tf.nn.sigmoid_cross_entropy_with_logits(x,x_hat)

                # 这里其实随便,reduce_mean(),reduce_sum()应该都行反正都是minimize loss
                # reduce_mean()和reduce_sum()|reduce_sum/x.shape[0]训练结果完全不同..
                # 但是后两者相似
                redu_loss=tf.reduce_sum(redu_loss)/x.shape[0]

                kl=-0.5*(log_var+1-mean**2-tf.exp(log_var))
                # prekl=tf.reduce_mean(kl)
                kl=tf.reduce_sum(kl)/x.shape[0]

                loss=redu_loss+kl*1.0
            grads=tape.gradient(loss,model.trainable_variables)

            optimizer.apply_gradients(zip(grads,model.trainable_variables))
            if step%50==0:

                print(epoch,step,"kl_loss:",kl,'loss:',loss,'x_shape0',x.shape[0])
        # evaluation
        z=tf.random.normal((batch_size,z_dims))
        sample_x=model.decoder(z)
        sample_x=tf.nn.sigmoid(sample_x)
        sample_x = tf.reshape(sample_x, [-1, 28, 28]).numpy() * 255.
        sample_x= sample_x.astype(np.uint8)

        save_images(sample_x, 'vae_images/sample_epoch_%d.png' % epoch)


        test_x = next(iter(db_test))
        test_x,_,_= model(tf.reshape(test_x, [-1, 784]))
        # [b, 784] => [b, 28, 28]
        test_x=tf.nn.sigmoid(test_x)
        test_x = tf.reshape(test_x, [-1, 28, 28])

        # [b, 28, 28] => [2b, 28, 28]
        test_x= test_x.numpy() * 255.
        test_x = test_x.astype(np.uint8)
        save_images(test_x, 'vae_images/test_epoch_%d.png' % epoch)

    model.save_weights('./vae.h5')

if __name__ == "__main__":
    (x,y),(x_test,y_test)=keras.datasets.mnist.load_data()

    l=int(len(x)*0.8)
    print(x.shape, y.shape,l,28*28)
    db_train=data2tensor(x[:l],y[:l])
    db_val=data2tensor(x[l:],y[l:])
    db_test=data2tensor(x_test,y_test)

    train_and_test(db_train,db_val)

Gan

WGAN原理

GAN一直面临着G,D训练困难、G,D的损失函数与训练好坏无关(由于js散度,loss 常常是log2)等问题,在此基础上便提出了WGAN,相对于传统的GAN,WGAN只做了几点改动确有很好的效果

  • D的最后一层去掉sigmod
  • G,Dloss不取log
  • 每次更新D的参数后做一个梯度惩罚(gradient penalty)

GAN的原本损失函数为

但是这样导致了如果D太好了G则训练不到有效的梯度,G太好了D又训练不到有效的梯度

所以WGAN的损失函数改为了,在improve WGAN中还加入了gradient penalty

WGAN理论上给出了GAN训练不稳定的原因,即交叉熵(JS散度)不适合衡量具有不相交部分的分布之间的距离,转而使用wassertein距离去衡量生成数据分布和真实数据分布之间的距离,理论上解决了训练不稳定的问题。

​ WGAN相对于DCGAN,WGAN虽然收敛时间更长但是更稳定,所以对于更复杂网络来说更倾向于WGAN,比如使用resnet可以达到更好的结果。

code

generator里的反卷积参数必须最后要计算结果能吻合Discriminator的input_shape

因为要在batchnorm后面做激活所以不能像之前一样在卷积层里面激活。

这是我自己的wgan跑3000个epoch后的结果,可以明显看出学习到了头发和眼睛(相比之下别人调的参太牛了)

wg_img_3400

数据集kaggle上随便找啊,kaggle真香,各方意义上

龙书里面的提到的数据集在这里https://pan.baidu.com/s/1Yn53uxFLCbja13_6Ay44MA

数据集来源在这里,我开始没看issue没找到这个数据集,爬到一半才看到😓,

我爬的数据在https://pan.baidu.com/s/1JsUHx_1blY6pGx0DQfE0nQ 提取码:3621 (只有三万张图片…

我写的gan参数太差了,看龙书说跑3万次似乎能得到比较好的效果?算了直接上别人已经调好参的WGAN代码吧,知乎那个调好参的DCGAN太猛了,300epoch居然就成型了orz(虽然我没跑

train函数

import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
import glob
import numpy as np
import matplotlib as mpl
from PIL import Image
import tensorflow as tf
from tensorflow import keras
import dataset
import  tensorflow as tf
from    tensorflow import keras
from    tensorflow.keras import layers

class Generator(keras.Model):

    def __init__(self):
        super(Generator, self).__init__()

        # z: [b, 100] => [b, 3*3*512] => [b, 3, 3, 512] => [b, 64, 64, 3]
        self.fc = layers.Dense(3*3*512)

        self.conv1 = layers.Conv2DTranspose(256, 3, 3, 'valid')
        self.bn1 = layers.BatchNormalization()

        self.conv2 = layers.Conv2DTranspose(128, 5, 2, 'valid')
        self.bn2 = layers.BatchNormalization()

        self.conv3 = layers.Conv2DTranspose(3, 4, 3, 'valid')

    def call(self, inputs, training=None):
        # [z, 100] => [z, 3*3*512]
        x = self.fc(inputs)
        x = tf.reshape(x, [-1, 3, 3, 512])
        x = tf.nn.leaky_relu(x)

        #
        x = tf.nn.leaky_relu(self.bn1(self.conv1(x), training=training))
        x = tf.nn.leaky_relu(self.bn2(self.conv2(x), training=training))
        x = self.conv3(x)
        x = tf.tanh(x)

        return x


class Discriminator(keras.Model):

    def __init__(self):
        super(Discriminator, self).__init__()

        # [b, 64, 64, 3] => [b, 1]
        self.conv1 = layers.Conv2D(64, 5, 3, 'valid')

        self.conv2 = layers.Conv2D(128, 5, 3, 'valid')
        self.bn2 = layers.BatchNormalization()

        self.conv3 = layers.Conv2D(256, 5, 3, 'valid')
        self.bn3 = layers.BatchNormalization()

        # [b, h, w ,c] => [b, -1]
        self.flatten = layers.Flatten()
        self.fc = layers.Dense(1)


    def call(self, inputs, training=None):

        x = tf.nn.leaky_relu(self.conv1(inputs))
        x = tf.nn.leaky_relu(self.bn2(self.conv2(x), training=training))
        x = tf.nn.leaky_relu(self.bn3(self.conv3(x), training=training))

        # [b, h, w, c] => [b, -1]
        x = self.flatten(x)
        # [b, -1] => [b, 1]
        logits = self.fc(x)

        return logits
def save_result(val_out, val_block_size, image_path, color_mode):
    def preprocess(img):
        img = ((img + 1.0) * 127.5).astype(np.uint8)
        # img = img.astype(np.uint8)
        return img

    preprocesed = preprocess(val_out)
    final_image = np.array([])
    single_row = np.array([])
    for b in range(val_out.shape[0]):
        # concat image into a row
        if single_row.size == 0:
            single_row = preprocesed[b, :, :, :]
        else:
            single_row = np.concatenate((single_row, preprocesed[b, :, :, :]), axis=1)

        # concat image row to final_image
        if (b+1) % val_block_size == 0:
            if final_image.size == 0:
                final_image = single_row
            else:
                final_image = np.concatenate((final_image, single_row), axis=0)

            # reset single row
            single_row = np.array([])

    if final_image.shape[2] == 1:
        final_image = np.squeeze(final_image, axis=2) 
    Image.fromarray(final_image).save(image_path)


def celoss_ones(logits):
       return - tf.reduce_mean(logits)


def celoss_zeros(logits):
    return tf.reduce_mean(logits)


def gradient_penalty(discriminator, batch_x, fake_image):

    batchsz = batch_x.shape[0]

    # [b, h, w, c]
    t = tf.random.uniform([batchsz, 1, 1, 1])
    # [b, 1, 1, 1] => [b, h, w, c]
    t = tf.broadcast_to(t, batch_x.shape)

    interplate = t * batch_x + (1 - t) * fake_image

    with tf.GradientTape() as tape:
        tape.watch([interplate])
        d_interplote_logits = discriminator(interplate, training=True)
    grads = tape.gradient(d_interplote_logits, interplate)

    # grads:[b, h, w, c] => [b, -1]
    grads = tf.reshape(grads, [grads.shape[0], -1])
    gp = tf.norm(grads, axis=1) #[b]
    gp = tf.reduce_mean( (gp-1)**2 )

    return gp



def d_loss_fn(generator, discriminator, batch_z, batch_x, is_training):
    # 1. treat real image as real
    # 2. treat generated image as fake
    fake_image = generator(batch_z, is_training)
    d_fake_logits = discriminator(fake_image, is_training)
    d_real_logits = discriminator(batch_x, is_training)

    d_loss_real = celoss_ones(d_real_logits)
    d_loss_fake = celoss_zeros(d_fake_logits)
    gp = gradient_penalty(discriminator, batch_x, fake_image)

    loss = d_loss_real + d_loss_fake + 10. * gp

    return loss, gp


def g_loss_fn(generator, discriminator, batch_z, is_training):

    fake_image = generator(batch_z, is_training)
    d_fake_logits = discriminator(fake_image, is_training)
    loss = celoss_ones(d_fake_logits)

    return loss


def main():

    tf.random.set_seed(233)
    np.random.seed(233)

    # hyper parameters
    z_dim = 100
    epochs = 3000000
    batch_size = 512
    learning_rate = 0.0005
    is_training = True


    img_path = glob.glob('.\animefacedataset\images\*.jpg')
    assert len(img_path) > 0


    dataset, img_shape, _ = make_anime_dataset(img_path, batch_size)
    print(dataset, img_shape)
    sample = next(iter(dataset))
    print(sample.shape, tf.reduce_max(sample).numpy(),
          tf.reduce_min(sample).numpy())
    dataset = dataset.repeat()
    db_iter = iter(dataset)


    generator = Generator() 
    generator.build(input_shape = (None, z_dim))
    discriminator = Discriminator()
    discriminator.build(input_shape=(None, 64, 64, 3))
    z_sample = tf.random.normal([100, z_dim])


    g_optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate, beta_1=0.5)
    d_optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate, beta_1=0.5)


    for epoch in range(epochs):
        # 训练5次discriminator 再训练一次generator!!!不然就会出现像我一样的图
        for _ in range(5):
            batch_z = tf.random.normal([batch_size, z_dim])
            batch_x = next(db_iter)

            # train D
            with tf.GradientTape() as tape:
                d_loss, gp = d_loss_fn(generator, discriminator, batch_z, batch_x, is_training)
            grads = tape.gradient(d_loss, discriminator.trainable_variables)
            d_optimizer.apply_gradients(zip(grads, discriminator.trainable_variables))

        batch_z = tf.random.normal([batch_size, z_dim])

        with tf.GradientTape() as tape:
            g_loss = g_loss_fn(generator, discriminator, batch_z, is_training)
        grads = tape.gradient(g_loss, generator.trainable_variables)
        g_optimizer.apply_gradients(zip(grads, generator.trainable_variables))

        if epoch % 100 == 0:
            print(epoch, 'd-loss:',float(d_loss), 'g-loss:', float(g_loss),
                  'gp:', float(gp))

            z = tf.random.normal([100, z_dim])
            fake_image = generator(z, training=False)
            img_path = os.path.join('images', 'wgan-%d.png'%epoch)
            save_result(fake_image.numpy(), 10, img_path, color_mode='P')

if __name__ == '__main__':
    main()

datasetload 函数

import multiprocessing

import tensorflow as tf


def make_anime_dataset(img_paths, batch_size, resize=64, drop_remainder=True, shuffle=True, repeat=1):

    # @tf.function
    def _map_fn(img):
        img = tf.image.resize(img, [resize, resize])
        # img = tf.image.random_crop(img,[resize, resize])
        # img = tf.image.random_flip_left_right(img)
        # img = tf.image.random_flip_up_down(img)
        img = tf.clip_by_value(img, 0, 255)
        img = img / 127.5 - 1 #-1~1
        return img

    dataset = disk_image_batch_dataset(img_paths,
                                          batch_size,
                                          drop_remainder=drop_remainder,
                                          map_fn=_map_fn,
                                          shuffle=shuffle,
                                          repeat=repeat)
    img_shape = (resize, resize, 3)
    len_dataset = len(img_paths) // batch_size

    return dataset, img_shape, len_dataset


def batch_dataset(dataset,
                  batch_size,
                  drop_remainder=True,
                  n_prefetch_batch=1,
                  filter_fn=None,
                  map_fn=None,
                  n_map_threads=None,
                  filter_after_map=False,
                  shuffle=True,
                  shuffle_buffer_size=None,
                  repeat=None):
    # set defaults
    if n_map_threads is None:
        n_map_threads = multiprocessing.cpu_count()
    if shuffle and shuffle_buffer_size is None:
        shuffle_buffer_size = max(batch_size * 128, 2048)  # set the minimum buffer size as 2048

    # [*] it is efficient to conduct `shuffle` before `map`/`filter` because `map`/`filter` is sometimes costly
    if shuffle:
        dataset = dataset.shuffle(shuffle_buffer_size)

    if not filter_after_map:
        if filter_fn:
            dataset = dataset.filter(filter_fn)

        if map_fn:
            dataset = dataset.map(map_fn, num_parallel_calls=n_map_threads)

    else:  # [*] this is slower
        if map_fn:
            dataset = dataset.map(map_fn, num_parallel_calls=n_map_threads)

        if filter_fn:
            dataset = dataset.filter(filter_fn)

    dataset = dataset.batch(batch_size, drop_remainder=drop_remainder)

    dataset = dataset.repeat(repeat).prefetch(n_prefetch_batch)

    return dataset


def memory_data_batch_dataset(memory_data,
                              batch_size,
                              drop_remainder=True,
                              n_prefetch_batch=1,
                              filter_fn=None,
                              map_fn=None,
                              n_map_threads=None,
                              filter_after_map=False,
                              shuffle=True,
                              shuffle_buffer_size=None,
                              repeat=None):
    """Batch dataset of memory data.
    Parameters
    ----------
    memory_data : nested structure of tensors/ndarrays/lists
    """
    dataset = tf.data.Dataset.from_tensor_slices(memory_data)
    dataset = batch_dataset(dataset,
                            batch_size,
                            drop_remainder=drop_remainder,
                            n_prefetch_batch=n_prefetch_batch,
                            filter_fn=filter_fn,
                            map_fn=map_fn,
                            n_map_threads=n_map_threads,
                            filter_after_map=filter_after_map,
                            shuffle=shuffle,
                            shuffle_buffer_size=shuffle_buffer_size,
                            repeat=repeat)
    return dataset


def disk_image_batch_dataset(img_paths,
                             batch_size,
                             labels=None,
                             drop_remainder=True,
                             n_prefetch_batch=1,
                             filter_fn=None,
                             map_fn=None,
                             n_map_threads=None,
                             filter_after_map=False,
                             shuffle=True,
                             shuffle_buffer_size=None,
                             repeat=None):
    """Batch dataset of disk image for PNG and JPEG.
    Parameters
    ----------
        img_paths : 1d-tensor/ndarray/list of str
        labels : nested structure of tensors/ndarrays/lists
    """
    if labels is None:
        memory_data = img_paths
    else:
        memory_data = (img_paths, labels)

    def parse_fn(path, *label):
        img = tf.io.read_file(path)
        img = tf.image.decode_jpeg(img, channels=3)  # fix channels to 3
        return (img,) + label

    if map_fn:  # fuse `map_fn` and `parse_fn`
        def map_fn_(*args):
            return map_fn(*parse_fn(*args))
    else:
        map_fn_ = parse_fn

    dataset = memory_data_batch_dataset(memory_data,
                                        batch_size,
                                        drop_remainder=drop_remainder,
                                        n_prefetch_batch=n_prefetch_batch,
                                        filter_fn=filter_fn,
                                        map_fn=map_fn_,
                                        n_map_threads=n_map_threads,
                                        filter_after_map=filter_after_map,
                                        shuffle=shuffle,
                                        shuffle_buffer_size=shuffle_buffer_size,
                                        repeat=repeat)

    return dataset

1000epoch之后是这样

wgansample-1000

后记

​ 本来是想每一个经典网络都详细写的,但是感觉这样会导致太专业全是公式也不会有人去仔细看 其实是我不会。结果变成了现在这种类似板子的东西。水博客才是原动力

​ 终于体会到电脑的苦了,cpu占用率99% 还要开多线程同时爬图片…(虽然现在字都显示不出来了

这里就随便总结一下学习的经验:

代码方面

  • keras.build(inputs_shape):这里最好是使用tuple形式表示不然会报奇怪的错,tensorflow和pytorch不同这方面更加严格。
  • tf.losses: 这个模块里的函数大小写不同功能也是不同的,具体可以看官网,如果用complie建议用大小的函数,自定义算loss使用小写的函数
  • sigmod和softmax: 当’分类’事物不完全相互独立可以使用sigmod否则softmax,softmax一定要onehot
  • model.save:这个因为保存了网络结构只能用在纯自定义网络里,继承类是不行的。
  • layers.BatchNormalization::这个函数有一个trainable参数,train=True|None,test=False|0,具体可以看源码说明,但是千万要设置正确原因可参考这里
  • layers.Flatten与Dense:flatten只是单纯的reshape维度是固定的,Dense还作了一次全连接

网络等方面

​ 可以从代码中看出现有的几种网络构建格式。当初我也纠结了许多,最后还是准备使用gan网络的格式,毕竟框架好用是好用,但这是牺牲‘自由’换来的,对后期自主构建网络可能会起到反效果。每个人喜好不同,也不用太参考我的建议。

​ k折验证等trick是视频里没有讲的(视频参考下面的学习资源),可以自己去看看相关trick。

学习资源

我才不是看到Gan可以随机生成老婆才想学Gan的

日月光华的《tensorflow入门学习与实战的》资源弄不到,可惜了免费课程讲的确实好就是太贵了。

就跟着龙书学Gan顺便复习了一遍经典网络,顺便附上李宏毅讲解的Gan网络(每次看完这种视频都感觉概率论白学了,建议李宏毅的可以先看一半再看龙书。

emmm,再附上别人整理的深度学习路线应该不会有人看的完