|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.nn.functional as F |
|
|
from collections import OrderedDict |
|
|
from typing import Dict |
|
|
import math |
|
|
|
|
|
def conv_bn(inp, oup, stride=1, leaky=0): |
|
|
return nn.Sequential( |
|
|
nn.Conv2d(inp, oup, 3, stride, 1, bias=False), |
|
|
nn.BatchNorm2d(oup), |
|
|
nn.LeakyReLU(negative_slope=leaky, inplace=True) |
|
|
) |
|
|
|
|
|
def conv_bn_no_relu(inp, oup, stride): |
|
|
return nn.Sequential( |
|
|
nn.Conv2d(inp, oup, 3, stride, 1, bias=False), |
|
|
nn.BatchNorm2d(oup), |
|
|
) |
|
|
|
|
|
def conv_bn1X1(inp, oup, stride, leaky=0): |
|
|
return nn.Sequential( |
|
|
nn.Conv2d(inp, oup, 1, stride, padding=0, bias=False), |
|
|
nn.BatchNorm2d(oup), |
|
|
nn.LeakyReLU(negative_slope=leaky, inplace=True) |
|
|
) |
|
|
|
|
|
def conv_dw(inp, oup, stride, leaky=0.1): |
|
|
return nn.Sequential( |
|
|
nn.Conv2d(inp, inp, 3, stride, 1, groups=inp, bias=False), |
|
|
nn.BatchNorm2d(inp), |
|
|
nn.LeakyReLU(negative_slope=leaky, inplace=True), |
|
|
|
|
|
nn.Conv2d(inp, oup, 1, 1, 0, bias=False), |
|
|
nn.BatchNorm2d(oup), |
|
|
nn.LeakyReLU(negative_slope=leaky, inplace=True), |
|
|
) |
|
|
|
|
|
class SSH(nn.Module): |
|
|
def __init__(self, in_channel, out_channel): |
|
|
super(SSH, self).__init__() |
|
|
assert out_channel % 4 == 0 |
|
|
leaky = 0 |
|
|
if (out_channel <= 64): |
|
|
leaky = 0.1 |
|
|
self.conv3X3 = conv_bn_no_relu(in_channel, out_channel//2, stride=1) |
|
|
|
|
|
self.conv5X5_1 = conv_bn(in_channel, out_channel//4, stride=1, leaky = leaky) |
|
|
self.conv5X5_2 = conv_bn_no_relu(out_channel//4, out_channel//4, stride=1) |
|
|
|
|
|
self.conv7X7_2 = conv_bn(out_channel//4, out_channel//4, stride=1, leaky = leaky) |
|
|
self.conv7x7_3 = conv_bn_no_relu(out_channel//4, out_channel//4, stride=1) |
|
|
|
|
|
def forward(self, input): |
|
|
conv3X3 = self.conv3X3(input) |
|
|
|
|
|
conv5X5_1 = self.conv5X5_1(input) |
|
|
conv5X5 = self.conv5X5_2(conv5X5_1) |
|
|
|
|
|
conv7X7_2 = self.conv7X7_2(conv5X5_1) |
|
|
conv7X7 = self.conv7x7_3(conv7X7_2) |
|
|
|
|
|
out = torch.cat([conv3X3, conv5X5, conv7X7], dim=1) |
|
|
out = F.relu(out) |
|
|
return out |
|
|
|
|
|
class FPN(nn.Module): |
|
|
def __init__(self,in_channels_list,out_channels): |
|
|
super(FPN,self).__init__() |
|
|
leaky = 0 |
|
|
if (out_channels <= 64): |
|
|
leaky = 0.1 |
|
|
self.output1 = conv_bn1X1(in_channels_list[0], out_channels, stride = 1, leaky = leaky) |
|
|
self.output2 = conv_bn1X1(in_channels_list[1], out_channels, stride = 1, leaky = leaky) |
|
|
self.output3 = conv_bn1X1(in_channels_list[2], out_channels, stride = 1, leaky = leaky) |
|
|
|
|
|
self.merge1 = conv_bn(out_channels, out_channels, leaky = leaky) |
|
|
self.merge2 = conv_bn(out_channels, out_channels, leaky = leaky) |
|
|
|
|
|
def forward(self, input): |
|
|
|
|
|
input = list(input.values()) |
|
|
|
|
|
output1 = self.output1(input[0]) |
|
|
output2 = self.output2(input[1]) |
|
|
output3 = self.output3(input[2]) |
|
|
|
|
|
up3 = F.interpolate(output3, size=[output2.size(2), output2.size(3)], mode="nearest") |
|
|
output2 = output2 + up3 |
|
|
output2 = self.merge2(output2) |
|
|
|
|
|
up2 = F.interpolate(output2, size=[output1.size(2), output1.size(3)], mode="nearest") |
|
|
output1 = output1 + up2 |
|
|
output1 = self.merge1(output1) |
|
|
|
|
|
out = [output1, output2, output3] |
|
|
return out |
|
|
|
|
|
class MobileNetV1(nn.Module): |
|
|
def __init__(self): |
|
|
super(MobileNetV1, self).__init__() |
|
|
self.stage1 = nn.Sequential( |
|
|
conv_bn(3, 8, 2, leaky = 0.1), |
|
|
conv_dw(8, 16, 1), |
|
|
conv_dw(16, 32, 2), |
|
|
conv_dw(32, 32, 1), |
|
|
conv_dw(32, 64, 2), |
|
|
conv_dw(64, 64, 1), |
|
|
) |
|
|
self.stage2 = nn.Sequential( |
|
|
conv_dw(64, 128, 2), |
|
|
conv_dw(128, 128, 1), |
|
|
conv_dw(128, 128, 1), |
|
|
conv_dw(128, 128, 1), |
|
|
conv_dw(128, 128, 1), |
|
|
conv_dw(128, 128, 1), |
|
|
) |
|
|
self.stage3 = nn.Sequential( |
|
|
conv_dw(128, 256, 2), |
|
|
conv_dw(256, 256, 1), |
|
|
) |
|
|
self.avg = nn.AdaptiveAvgPool2d((1,1)) |
|
|
self.fc = nn.Linear(256, 1000) |
|
|
|
|
|
def forward(self, x): |
|
|
x = self.stage1(x) |
|
|
x = self.stage2(x) |
|
|
x = self.stage3(x) |
|
|
x = self.avg(x) |
|
|
|
|
|
x = x.view(-1, 256) |
|
|
x = self.fc(x) |
|
|
return x |
|
|
|
|
|
class ClassHead(nn.Module): |
|
|
def __init__(self,inchannels=512,num_anchors=3): |
|
|
super(ClassHead,self).__init__() |
|
|
self.num_anchors = num_anchors |
|
|
self.conv1x1 = nn.Conv2d(inchannels,self.num_anchors*2,kernel_size=(1,1),stride=1,padding=0) |
|
|
|
|
|
def forward(self,x): |
|
|
out = self.conv1x1(x) |
|
|
out = out.permute(0,2,3,1).contiguous() |
|
|
|
|
|
return out.view(out.shape[0], -1, 2) |
|
|
|
|
|
class BboxHead(nn.Module): |
|
|
def __init__(self,inchannels=512,num_anchors=3): |
|
|
super(BboxHead,self).__init__() |
|
|
self.conv1x1 = nn.Conv2d(inchannels,num_anchors*4,kernel_size=(1,1),stride=1,padding=0) |
|
|
|
|
|
def forward(self,x): |
|
|
out = self.conv1x1(x) |
|
|
out = out.permute(0,2,3,1).contiguous() |
|
|
|
|
|
return out.view(out.shape[0], -1, 4) |
|
|
|
|
|
class LandmarkHead(nn.Module): |
|
|
def __init__(self,inchannels=512,num_anchors=3): |
|
|
super(LandmarkHead,self).__init__() |
|
|
self.conv1x1 = nn.Conv2d(inchannels,num_anchors*10,kernel_size=(1,1),stride=1,padding=0) |
|
|
|
|
|
def forward(self,x): |
|
|
out = self.conv1x1(x) |
|
|
out = out.permute(0,2,3,1).contiguous() |
|
|
|
|
|
return out.view(out.shape[0], -1, 10) |
|
|
|
|
|
class RetinaFace(nn.Module): |
|
|
def __init__(self, cfg = None, phase = 'train'): |
|
|
""" |
|
|
:param cfg: Network related settings. |
|
|
:param phase: train or test. |
|
|
""" |
|
|
super(RetinaFace,self).__init__() |
|
|
self.phase = phase |
|
|
backbone = None |
|
|
if cfg['name'] == 'mobilenet0.25': |
|
|
backbone = MobileNetV1() |
|
|
if cfg['pretrain']: |
|
|
checkpoint = torch.load("./weights/mobilenetV1X0.25_pretrain.tar", map_location=torch.device('cpu')) |
|
|
from collections import OrderedDict |
|
|
new_state_dict = OrderedDict() |
|
|
for k, v in checkpoint['state_dict'].items(): |
|
|
name = k[7:] |
|
|
new_state_dict[name] = v |
|
|
|
|
|
backbone.load_state_dict(new_state_dict) |
|
|
elif cfg['name'] == 'Resnet50': |
|
|
import torchvision.models as models |
|
|
backbone = models.resnet50(pretrained=cfg['pretrain']) |
|
|
|
|
|
if cfg['name'] == 'Resnet50': |
|
|
from torchvision.models._utils import IntermediateLayerGetter |
|
|
self.body = IntermediateLayerGetter(backbone, cfg['return_layers']) |
|
|
else: |
|
|
self.body = backbone |
|
|
|
|
|
in_channels_stage2 = cfg['in_channel'] |
|
|
in_channels_list = [ |
|
|
in_channels_stage2 * 2, |
|
|
in_channels_stage2 * 4, |
|
|
in_channels_stage2 * 8, |
|
|
] |
|
|
out_channels = cfg['out_channel'] |
|
|
self.fpn = FPN(in_channels_list,out_channels) |
|
|
self.ssh1 = SSH(out_channels, out_channels) |
|
|
self.ssh2 = SSH(out_channels, out_channels) |
|
|
self.ssh3 = SSH(out_channels, out_channels) |
|
|
|
|
|
self.ClassHead = self._make_class_head(fpn_num=3, inchannels=cfg['out_channel']) |
|
|
self.BboxHead = self._make_bbox_head(fpn_num=3, inchannels=cfg['out_channel']) |
|
|
self.LandmarkHead = self._make_landmark_head(fpn_num=3, inchannels=cfg['out_channel']) |
|
|
|
|
|
def _make_class_head(self,fpn_num=3,inchannels=64,anchor_num=2): |
|
|
classhead = nn.ModuleList() |
|
|
for i in range(fpn_num): |
|
|
classhead.append(ClassHead(inchannels,anchor_num)) |
|
|
return classhead |
|
|
|
|
|
def _make_bbox_head(self,fpn_num=3,inchannels=64,anchor_num=2): |
|
|
bboxhead = nn.ModuleList() |
|
|
for i in range(fpn_num): |
|
|
bboxhead.append(BboxHead(inchannels,anchor_num)) |
|
|
return bboxhead |
|
|
|
|
|
def _make_landmark_head(self,fpn_num=3,inchannels=64,anchor_num=2): |
|
|
landmarkhead = nn.ModuleList() |
|
|
for i in range(fpn_num): |
|
|
landmarkhead.append(LandmarkHead(inchannels,anchor_num)) |
|
|
return landmarkhead |
|
|
|
|
|
def forward(self,inputs): |
|
|
out = self.body(inputs) |
|
|
|
|
|
|
|
|
fpn = self.fpn(out) |
|
|
|
|
|
|
|
|
feature1 = self.ssh1(fpn[0]) |
|
|
feature2 = self.ssh2(fpn[1]) |
|
|
feature3 = self.ssh3(fpn[2]) |
|
|
features = [feature1, feature2, feature3] |
|
|
|
|
|
bbox_regressions = torch.cat([self.BboxHead[i](feature) for i, feature in enumerate(features)], dim=1) |
|
|
classifications = torch.cat([self.ClassHead[i](feature) for i, feature in enumerate(features)], dim=1) |
|
|
ldm_regressions = torch.cat([self.LandmarkHead[i](feature) for i, feature in enumerate(features)], dim=1) |
|
|
|
|
|
if self.phase == 'train': |
|
|
output = (bbox_regressions, classifications, ldm_regressions) |
|
|
else: |
|
|
output = (bbox_regressions, F.softmax(classifications, dim=-1), ldm_regressions) |
|
|
return output |
|
|
|
|
|
|
|
|
class _utils_resnet: |
|
|
class IntermediateLayerGetter(nn.ModuleDict): |
|
|
""" |
|
|
Module wrapper that returns intermediate layers from a model |
|
|
|
|
|
It has a strong assumption that the modules have been registered |
|
|
into the model in the same order as they are used. |
|
|
This means that one should **not** reuse the same nn.Module |
|
|
twice in the forward if you want this to work. |
|
|
|
|
|
Additionally, it is only able to query submodules that are directly |
|
|
assigned to the model. So if `model` is passed, `model.feature1` can |
|
|
be returned, but not `model.feature1.layer2`. |
|
|
|
|
|
Arguments: |
|
|
model (nn.Module): model on which we will extract the features |
|
|
return_layers (Dict[name, new_name]): a dict containing the names |
|
|
of the modules for which the activations will be returned as |
|
|
the key of the dict, and the value of the dict is the name |
|
|
of the returned activation (which the user can specify). |
|
|
|
|
|
Examples:: |
|
|
|
|
|
>>> m = torchvision.models.resnet18(pretrained=True) |
|
|
>>> # extract layer1 and layer3, giving as names `feat1` and feat2` |
|
|
>>> new_m = torchvision.models._utils.IntermediateLayerGetter(m, |
|
|
>>> {'layer1': 'feat1', 'layer3': 'feat2'}) |
|
|
>>> out = new_m(x) |
|
|
>>> print([(k, v.shape) for k, v in out.items()]) |
|
|
>>> [('feat1', torch.Size([1, 64, 56, 56])), |
|
|
>>> ('feat2', torch.Size([1, 256, 14, 14]))] |
|
|
""" |
|
|
_version = 2 |
|
|
__annotations__ = { |
|
|
"return_layers": Dict[str, str], |
|
|
} |
|
|
|
|
|
def __init__(self, model, return_layers): |
|
|
if not set(return_layers).issubset([name for name, _ in model.named_children()]): |
|
|
raise ValueError("return_layers are not present in model") |
|
|
orig_return_layers = return_layers |
|
|
return_layers = {str(k): str(v) for k, v in return_layers.items()} |
|
|
layers = OrderedDict() |
|
|
for name, module in model.named_children(): |
|
|
layers[name] = module |
|
|
if name in return_layers: |
|
|
del return_layers[name] |
|
|
if not return_layers: |
|
|
break |
|
|
|
|
|
super(_utils_resnet.IntermediateLayerGetter, self).__init__(layers) |
|
|
self.return_layers = orig_return_layers |
|
|
|
|
|
def forward(self, x): |
|
|
result = OrderedDict() |
|
|
for name, module in self.items(): |
|
|
x = module(x) |
|
|
if name in self.return_layers: |
|
|
out_name = self.return_layers[name] |
|
|
result[out_name] = x |
|
|
return result |
|
|
|