265 lines
9.9 KiB
Python
265 lines
9.9 KiB
Python
|
#! /usr/bin/env python
|
||
|
"""
|
||
|
Reads Darknet config and weights and creates Keras model with TF backend.
|
||
|
|
||
|
"""
|
||
|
|
||
|
import argparse
|
||
|
import configparser
|
||
|
import io
|
||
|
import os
|
||
|
from collections import defaultdict
|
||
|
|
||
|
import numpy as np
|
||
|
from keras import backend as K
|
||
|
from keras.layers import (Conv2D, Input, ZeroPadding2D, Add,
|
||
|
UpSampling2D, MaxPooling2D, Concatenate)
|
||
|
#from keras.layers.advanced_activations import LeakyReLU
|
||
|
from keras.layers import ELU, PReLU, LeakyReLU
|
||
|
#from keras.layers.normalization import BatchNormalization
|
||
|
from keras.layers import BatchNormalization
|
||
|
from keras.models import Model
|
||
|
from keras.regularizers import l2
|
||
|
from keras.utils.vis_utils import plot_model as plot
|
||
|
|
||
|
|
||
|
parser = argparse.ArgumentParser(description='Darknet To Keras Converter.')
|
||
|
parser.add_argument('config_path', help='Path to Darknet cfg file.')
|
||
|
parser.add_argument('weights_path', help='Path to Darknet weights file.')
|
||
|
parser.add_argument('output_path', help='Path to output Keras model file.')
|
||
|
parser.add_argument(
|
||
|
'-p',
|
||
|
'--plot_model',
|
||
|
help='Plot generated Keras model and save as image.',
|
||
|
action='store_true')
|
||
|
parser.add_argument(
|
||
|
'-w',
|
||
|
'--weights_only',
|
||
|
help='Save as Keras weights file instead of model file.',
|
||
|
action='store_true')
|
||
|
|
||
|
def unique_config_sections(config_file):
|
||
|
"""Convert all config sections to have unique names.
|
||
|
|
||
|
Adds unique suffixes to config sections for compability with configparser.
|
||
|
"""
|
||
|
section_counters = defaultdict(int)
|
||
|
output_stream = io.StringIO()
|
||
|
with open(config_file) as fin:
|
||
|
for line in fin:
|
||
|
if line.startswith('['):
|
||
|
section = line.strip().strip('[]')
|
||
|
_section = section + '_' + str(section_counters[section])
|
||
|
section_counters[section] += 1
|
||
|
line = line.replace(section, _section)
|
||
|
output_stream.write(line)
|
||
|
output_stream.seek(0)
|
||
|
return output_stream
|
||
|
|
||
|
# %%
|
||
|
def _main(args):
|
||
|
config_path = os.path.expanduser(args.config_path)
|
||
|
weights_path = os.path.expanduser(args.weights_path)
|
||
|
assert config_path.endswith('.cfg'), '{} is not a .cfg file'.format(
|
||
|
config_path)
|
||
|
assert weights_path.endswith(
|
||
|
'.weights'), '{} is not a .weights file'.format(weights_path)
|
||
|
|
||
|
output_path = os.path.expanduser(args.output_path)
|
||
|
assert output_path.endswith(
|
||
|
'.h5'), 'output path {} is not a .h5 file'.format(output_path)
|
||
|
output_root = os.path.splitext(output_path)[0]
|
||
|
|
||
|
# Load weights and config.
|
||
|
print('Loading weights.')
|
||
|
weights_file = open(weights_path, 'rb')
|
||
|
major, minor, revision = np.ndarray(
|
||
|
shape=(3, ), dtype='int32', buffer=weights_file.read(12))
|
||
|
if (major*10+minor)>=2 and major<1000 and minor<1000:
|
||
|
seen = np.ndarray(shape=(1,), dtype='int64', buffer=weights_file.read(8))
|
||
|
else:
|
||
|
seen = np.ndarray(shape=(1,), dtype='int32', buffer=weights_file.read(4))
|
||
|
print('Weights Header: ', major, minor, revision, seen)
|
||
|
|
||
|
print('Parsing Darknet config.')
|
||
|
unique_config_file = unique_config_sections(config_path)
|
||
|
cfg_parser = configparser.ConfigParser()
|
||
|
cfg_parser.read_file(unique_config_file)
|
||
|
|
||
|
print('Creating Keras model.')
|
||
|
input_layer = Input(shape=(None, None, 3))
|
||
|
prev_layer = input_layer
|
||
|
all_layers = []
|
||
|
|
||
|
weight_decay = float(cfg_parser['net_0']['decay']
|
||
|
) if 'net_0' in cfg_parser.sections() else 5e-4
|
||
|
count = 0
|
||
|
out_index = []
|
||
|
for section in cfg_parser.sections():
|
||
|
print('Parsing section {}'.format(section))
|
||
|
if section.startswith('convolutional'):
|
||
|
filters = int(cfg_parser[section]['filters'])
|
||
|
size = int(cfg_parser[section]['size'])
|
||
|
stride = int(cfg_parser[section]['stride'])
|
||
|
pad = int(cfg_parser[section]['pad'])
|
||
|
activation = cfg_parser[section]['activation']
|
||
|
batch_normalize = 'batch_normalize' in cfg_parser[section]
|
||
|
|
||
|
padding = 'same' if pad == 1 and stride == 1 else 'valid'
|
||
|
|
||
|
# Setting weights.
|
||
|
# Darknet serializes convolutional weights as:
|
||
|
# [bias/beta, [gamma, mean, variance], conv_weights]
|
||
|
prev_layer_shape = K.int_shape(prev_layer)
|
||
|
|
||
|
weights_shape = (size, size, prev_layer_shape[-1], filters)
|
||
|
darknet_w_shape = (filters, weights_shape[2], size, size)
|
||
|
weights_size = np.product(weights_shape)
|
||
|
|
||
|
print('conv2d', 'bn'
|
||
|
if batch_normalize else ' ', activation, weights_shape)
|
||
|
|
||
|
conv_bias = np.ndarray(
|
||
|
shape=(filters, ),
|
||
|
dtype='float32',
|
||
|
buffer=weights_file.read(filters * 4))
|
||
|
count += filters
|
||
|
|
||
|
if batch_normalize:
|
||
|
bn_weights = np.ndarray(
|
||
|
shape=(3, filters),
|
||
|
dtype='float32',
|
||
|
buffer=weights_file.read(filters * 12))
|
||
|
count += 3 * filters
|
||
|
|
||
|
bn_weight_list = [
|
||
|
bn_weights[0], # scale gamma
|
||
|
conv_bias, # shift beta
|
||
|
bn_weights[1], # running mean
|
||
|
bn_weights[2] # running var
|
||
|
]
|
||
|
|
||
|
conv_weights = np.ndarray(
|
||
|
shape=darknet_w_shape,
|
||
|
dtype='float32',
|
||
|
buffer=weights_file.read(weights_size * 4))
|
||
|
count += weights_size
|
||
|
|
||
|
# DarkNet conv_weights are serialized Caffe-style:
|
||
|
# (out_dim, in_dim, height, width)
|
||
|
# We would like to set these to Tensorflow order:
|
||
|
# (height, width, in_dim, out_dim)
|
||
|
conv_weights = np.transpose(conv_weights, [2, 3, 1, 0])
|
||
|
conv_weights = [conv_weights] if batch_normalize else [
|
||
|
conv_weights, conv_bias
|
||
|
]
|
||
|
|
||
|
# Handle activation.
|
||
|
act_fn = None
|
||
|
if activation == 'leaky':
|
||
|
pass # Add advanced activation later.
|
||
|
elif activation != 'linear':
|
||
|
raise ValueError(
|
||
|
'Unknown activation function `{}` in section {}'.format(
|
||
|
activation, section))
|
||
|
|
||
|
# Create Conv2D layer
|
||
|
if stride>1:
|
||
|
# Darknet uses left and top padding instead of 'same' mode
|
||
|
prev_layer = ZeroPadding2D(((1,0),(1,0)))(prev_layer)
|
||
|
conv_layer = (Conv2D(
|
||
|
filters, (size, size),
|
||
|
strides=(stride, stride),
|
||
|
kernel_regularizer=l2(weight_decay),
|
||
|
use_bias=not batch_normalize,
|
||
|
weights=conv_weights,
|
||
|
activation=act_fn,
|
||
|
padding=padding))(prev_layer)
|
||
|
|
||
|
if batch_normalize:
|
||
|
conv_layer = (BatchNormalization(
|
||
|
weights=bn_weight_list))(conv_layer)
|
||
|
prev_layer = conv_layer
|
||
|
|
||
|
if activation == 'linear':
|
||
|
all_layers.append(prev_layer)
|
||
|
elif activation == 'leaky':
|
||
|
act_layer = LeakyReLU(alpha=0.1)(prev_layer)
|
||
|
prev_layer = act_layer
|
||
|
all_layers.append(act_layer)
|
||
|
|
||
|
elif section.startswith('route'):
|
||
|
ids = [int(i) for i in cfg_parser[section]['layers'].split(',')]
|
||
|
layers = [all_layers[i] for i in ids]
|
||
|
if len(layers) > 1:
|
||
|
print('Concatenating route layers:', layers)
|
||
|
concatenate_layer = Concatenate()(layers)
|
||
|
all_layers.append(concatenate_layer)
|
||
|
prev_layer = concatenate_layer
|
||
|
else:
|
||
|
skip_layer = layers[0] # only one layer to route
|
||
|
all_layers.append(skip_layer)
|
||
|
prev_layer = skip_layer
|
||
|
|
||
|
elif section.startswith('maxpool'):
|
||
|
size = int(cfg_parser[section]['size'])
|
||
|
stride = int(cfg_parser[section]['stride'])
|
||
|
all_layers.append(
|
||
|
MaxPooling2D(
|
||
|
pool_size=(size, size),
|
||
|
strides=(stride, stride),
|
||
|
padding='same')(prev_layer))
|
||
|
prev_layer = all_layers[-1]
|
||
|
|
||
|
elif section.startswith('shortcut'):
|
||
|
index = int(cfg_parser[section]['from'])
|
||
|
activation = cfg_parser[section]['activation']
|
||
|
assert activation == 'linear', 'Only linear activation supported.'
|
||
|
all_layers.append(Add()([all_layers[index], prev_layer]))
|
||
|
prev_layer = all_layers[-1]
|
||
|
|
||
|
elif section.startswith('upsample'):
|
||
|
stride = int(cfg_parser[section]['stride'])
|
||
|
assert stride == 2, 'Only stride=2 supported.'
|
||
|
all_layers.append(UpSampling2D(stride)(prev_layer))
|
||
|
prev_layer = all_layers[-1]
|
||
|
|
||
|
elif section.startswith('yolo'):
|
||
|
out_index.append(len(all_layers)-1)
|
||
|
all_layers.append(None)
|
||
|
prev_layer = all_layers[-1]
|
||
|
|
||
|
elif section.startswith('net'):
|
||
|
pass
|
||
|
|
||
|
else:
|
||
|
raise ValueError(
|
||
|
'Unsupported section header type: {}'.format(section))
|
||
|
|
||
|
# Create and save model.
|
||
|
if len(out_index)==0: out_index.append(len(all_layers)-1)
|
||
|
model = Model(inputs=input_layer, outputs=[all_layers[i] for i in out_index])
|
||
|
print(model.summary())
|
||
|
if args.weights_only:
|
||
|
model.save_weights('{}'.format(output_path))
|
||
|
print('Saved Keras weights to {}'.format(output_path))
|
||
|
else:
|
||
|
model.save('{}'.format(output_path))
|
||
|
print('Saved Keras model to {}'.format(output_path))
|
||
|
|
||
|
# Check to see if all weights have been read.
|
||
|
remaining_weights = len(weights_file.read()) / 4
|
||
|
weights_file.close()
|
||
|
print('Read {} of {} from Darknet weights.'.format(count, count +
|
||
|
remaining_weights))
|
||
|
if remaining_weights > 0:
|
||
|
print('Warning: {} unused weights'.format(remaining_weights))
|
||
|
|
||
|
if args.plot_model:
|
||
|
plot(model, to_file='{}.png'.format(output_root), show_shapes=True)
|
||
|
print('Saved model plot to {}.png'.format(output_root))
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
_main(parser.parse_args())
|