deep_sort tracking

This commit is contained in:
bednarco 2021-01-11 09:08:31 +01:00
parent 4d842583f0
commit 954619b0a5
34 changed files with 2884 additions and 16 deletions

13
yolov5/deep_sort_pytorch/.gitignore vendored Normal file
View File

@ -0,0 +1,13 @@
# Folders
__pycache__/
build/
*.egg-info
# Files
*.weights
*.t7
*.mp4
*.avi
*.so
*.txt

View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2020 Ziqiang
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,137 @@
# Deep Sort with PyTorch
![](demo/demo.gif)
## Update(1-1-2020)
Changes
- fix bugs
- refactor code
- accerate detection by adding nms on gpu
## Latest Update(07-22)
Changes
- bug fix (Thanks @JieChen91 and @yingsen1 for bug reporting).
- using batch for feature extracting for each frame, which lead to a small speed up.
- code improvement.
Futher improvement direction
- Train detector on specific dataset rather than the official one.
- Retrain REID model on pedestrain dataset for better performance.
- Replace YOLOv3 detector with advanced ones.
**Any contributions to this repository is welcome!**
## Introduction
This is an implement of MOT tracking algorithm deep sort. Deep sort is basicly the same with sort but added a CNN model to extract features in image of human part bounded by a detector. This CNN model is indeed a RE-ID model and the detector used in [PAPER](https://arxiv.org/abs/1703.07402) is FasterRCNN , and the original source code is [HERE](https://github.com/nwojke/deep_sort).
However in original code, the CNN model is implemented with tensorflow, which I'm not familier with. SO I re-implemented the CNN feature extraction model with PyTorch, and changed the CNN model a little bit. Also, I use **YOLOv3** to generate bboxes instead of FasterRCNN.
## Dependencies
- python 3 (python2 not sure)
- numpy
- scipy
- opencv-python
- sklearn
- torch >= 0.4
- torchvision >= 0.1
- pillow
- vizer
- edict
## Quick Start
0. Check all dependencies installed
```bash
pip install -r requirements.txt
```
for user in china, you can specify pypi source to accelerate install like:
```bash
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
```
1. Clone this repository
```
git clone git@github.com:ZQPei/deep_sort_pytorch.git
```
2. Download YOLOv3 parameters
```
cd detector/YOLOv3/weight/
wget https://pjreddie.com/media/files/yolov3.weights
wget https://pjreddie.com/media/files/yolov3-tiny.weights
cd ../../../
```
3. Download deepsort parameters ckpt.t7
```
cd deep_sort/deep/checkpoint
# download ckpt.t7 from
https://drive.google.com/drive/folders/1xhG0kRH1EX5B9_Iz8gQJb7UNnn_riXi6 to this folder
cd ../../../
```
4. Compile nms module
```bash
cd detector/YOLOv3/nms
sh build.sh
cd ../../..
```
Notice:
If compiling failed, the simplist way is to **Upgrade your pytorch >= 1.1 and torchvision >= 0.3" and you can avoid the troublesome compiling problems which are most likely caused by either `gcc version too low` or `libraries missing`.
5. Run demo
```
usage: python yolov3_deepsort.py VIDEO_PATH
[--help]
[--frame_interval FRAME_INTERVAL]
[--config_detection CONFIG_DETECTION]
[--config_deepsort CONFIG_DEEPSORT]
[--display]
[--display_width DISPLAY_WIDTH]
[--display_height DISPLAY_HEIGHT]
[--save_path SAVE_PATH]
[--cpu]
# yolov3 + deepsort
python yolov3_deepsort.py [VIDEO_PATH]
# yolov3_tiny + deepsort
python yolov3_deepsort.py [VIDEO_PATH] --config_detection ./configs/yolov3_tiny.yaml
# yolov3 + deepsort on webcam
python3 yolov3_deepsort.py /dev/video0 --camera 0
# yolov3_tiny + deepsort on webcam
python3 yolov3_deepsort.py /dev/video0 --config_detection ./configs/yolov3_tiny.yaml --camera 0
```
Use `--display` to enable display.
Results will be saved to `./output/results.avi` and `./output/results.txt`.
All files above can also be accessed from BaiduDisk!
linker[BaiduDisk](https://pan.baidu.com/s/1YJ1iPpdFTlUyLFoonYvozg)
passwdfbuw
## Training the RE-ID model
The original model used in paper is in original_model.py, and its parameter here [original_ckpt.t7](https://drive.google.com/drive/folders/1xhG0kRH1EX5B9_Iz8gQJb7UNnn_riXi6).
To train the model, first you need download [Market1501](http://www.liangzheng.com.cn/Project/project_reid.html) dataset or [Mars](http://www.liangzheng.com.cn/Project/project_mars.html) dataset.
Then you can try [train.py](deep_sort/deep/train.py) to train your own parameter and evaluate it using [test.py](deep_sort/deep/test.py) and [evaluate.py](deep_sort/deep/evalute.py).
![train.jpg](deep_sort/deep/train.jpg)
## Demo videos and images
[demo.avi](https://drive.google.com/drive/folders/1xhG0kRH1EX5B9_Iz8gQJb7UNnn_riXi6)
[demo2.avi](https://drive.google.com/drive/folders/1xhG0kRH1EX5B9_Iz8gQJb7UNnn_riXi6)
![1.jpg](demo/1.jpg)
![2.jpg](demo/2.jpg)
## References
- paper: [Simple Online and Realtime Tracking with a Deep Association Metric](https://arxiv.org/abs/1703.07402)
- code: [nwojke/deep_sort](https://github.com/nwojke/deep_sort)
- paper: [YOLOv3](https://pjreddie.com/media/files/papers/YOLOv3.pdf)
- code: [Joseph Redmon/yolov3](https://pjreddie.com/darknet/yolo/)

View File

@ -0,0 +1,10 @@
DEEPSORT:
REID_CKPT: "deep_sort_pytorch/deep_sort/deep/checkpoint/ckpt.t7"
MAX_DIST: 0.3
MIN_CONFIDENCE: 0.3
NMS_MAX_OVERLAP: 0.5
MAX_IOU_DISTANCE: 0.7
MAX_AGE: 70
N_INIT: 3
NN_BUDGET: 100

View File

@ -0,0 +1,3 @@
# Deep Sort
This is the implemention of deep sort with pytorch.

View File

@ -0,0 +1,21 @@
from .deep_sort import DeepSort
__all__ = ['DeepSort', 'build_tracker']
def build_tracker(cfg, use_cuda):
return DeepSort(cfg.DEEPSORT.REID_CKPT,
max_dist=cfg.DEEPSORT.MAX_DIST, min_confidence=cfg.DEEPSORT.MIN_CONFIDENCE,
nms_max_overlap=cfg.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance=cfg.DEEPSORT.MAX_IOU_DISTANCE,
max_age=cfg.DEEPSORT.MAX_AGE, n_init=cfg.DEEPSORT.N_INIT, nn_budget=cfg.DEEPSORT.NN_BUDGET, use_cuda=use_cuda)

View File

@ -0,0 +1,13 @@
import torch
features = torch.load("features.pth")
qf = features["qf"]
ql = features["ql"]
gf = features["gf"]
gl = features["gl"]
scores = qf.mm(gf.t())
res = scores.topk(5, dim=1)[1][:, 0]
top1correct = gl[res].eq(ql).sum().item()
print("Acc top1:{:.3f}".format(top1correct / ql.size(0)))

View File

@ -0,0 +1,54 @@
import torch
import torchvision.transforms as transforms
import numpy as np
import cv2
import logging
from .model import Net
class Extractor(object):
def __init__(self, model_path, use_cuda=True):
self.net = Net(reid=True)
self.device = "cuda" if torch.cuda.is_available() and use_cuda else "cpu"
state_dict = torch.load(model_path, map_location=lambda storage, loc: storage)[
'net_dict']
self.net.load_state_dict(state_dict)
logger = logging.getLogger("root.tracker")
logger.info("Loading weights from {}... Done!".format(model_path))
self.net.to(self.device)
self.size = (64, 128)
self.norm = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])
def _preprocess(self, im_crops):
"""
TODO:
1. to float with scale from 0 to 1
2. resize to (64, 128) as Market1501 dataset did
3. concatenate to a numpy array
3. to torch Tensor
4. normalize
"""
def _resize(im, size):
return cv2.resize(im.astype(np.float32)/255., size)
im_batch = torch.cat([self.norm(_resize(im, self.size)).unsqueeze(
0) for im in im_crops], dim=0).float()
return im_batch
def __call__(self, im_crops):
im_batch = self._preprocess(im_crops)
with torch.no_grad():
im_batch = im_batch.to(self.device)
features = self.net(im_batch)
return features.cpu().numpy()
if __name__ == '__main__':
img = cv2.imread("demo.jpg")[:, :, (2, 1, 0)]
extr = Extractor("checkpoint/ckpt.t7")
feature = extr(img)
print(feature.shape)

View File

@ -0,0 +1,109 @@
import torch
import torch.nn as nn
import torch.nn.functional as F
class BasicBlock(nn.Module):
def __init__(self, c_in, c_out, is_downsample=False):
super(BasicBlock, self).__init__()
self.is_downsample = is_downsample
if is_downsample:
self.conv1 = nn.Conv2d(
c_in, c_out, 3, stride=2, padding=1, bias=False)
else:
self.conv1 = nn.Conv2d(
c_in, c_out, 3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(c_out)
self.relu = nn.ReLU(True)
self.conv2 = nn.Conv2d(c_out, c_out, 3, stride=1,
padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(c_out)
if is_downsample:
self.downsample = nn.Sequential(
nn.Conv2d(c_in, c_out, 1, stride=2, bias=False),
nn.BatchNorm2d(c_out)
)
elif c_in != c_out:
self.downsample = nn.Sequential(
nn.Conv2d(c_in, c_out, 1, stride=1, bias=False),
nn.BatchNorm2d(c_out)
)
self.is_downsample = True
def forward(self, x):
y = self.conv1(x)
y = self.bn1(y)
y = self.relu(y)
y = self.conv2(y)
y = self.bn2(y)
if self.is_downsample:
x = self.downsample(x)
return F.relu(x.add(y), True)
def make_layers(c_in, c_out, repeat_times, is_downsample=False):
blocks = []
for i in range(repeat_times):
if i == 0:
blocks += [BasicBlock(c_in, c_out, is_downsample=is_downsample), ]
else:
blocks += [BasicBlock(c_out, c_out), ]
return nn.Sequential(*blocks)
class Net(nn.Module):
def __init__(self, num_classes=751, reid=False):
super(Net, self).__init__()
# 3 128 64
self.conv = nn.Sequential(
nn.Conv2d(3, 64, 3, stride=1, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
# nn.Conv2d(32,32,3,stride=1,padding=1),
# nn.BatchNorm2d(32),
# nn.ReLU(inplace=True),
nn.MaxPool2d(3, 2, padding=1),
)
# 32 64 32
self.layer1 = make_layers(64, 64, 2, False)
# 32 64 32
self.layer2 = make_layers(64, 128, 2, True)
# 64 32 16
self.layer3 = make_layers(128, 256, 2, True)
# 128 16 8
self.layer4 = make_layers(256, 512, 2, True)
# 256 8 4
self.avgpool = nn.AvgPool2d((8, 4), 1)
# 256 1 1
self.reid = reid
self.classifier = nn.Sequential(
nn.Linear(512, 256),
nn.BatchNorm1d(256),
nn.ReLU(inplace=True),
nn.Dropout(),
nn.Linear(256, num_classes),
)
def forward(self, x):
x = self.conv(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = x.view(x.size(0), -1)
# B x 128
if self.reid:
x = x.div(x.norm(p=2, dim=1, keepdim=True))
return x
# classifier
x = self.classifier(x)
return x
if __name__ == '__main__':
net = Net()
x = torch.randn(4, 3, 128, 64)
y = net(x)
import ipdb
ipdb.set_trace()

View File

@ -0,0 +1,111 @@
import torch
import torch.nn as nn
import torch.nn.functional as F
class BasicBlock(nn.Module):
def __init__(self, c_in, c_out, is_downsample=False):
super(BasicBlock, self).__init__()
self.is_downsample = is_downsample
if is_downsample:
self.conv1 = nn.Conv2d(
c_in, c_out, 3, stride=2, padding=1, bias=False)
else:
self.conv1 = nn.Conv2d(
c_in, c_out, 3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(c_out)
self.relu = nn.ReLU(True)
self.conv2 = nn.Conv2d(c_out, c_out, 3, stride=1,
padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(c_out)
if is_downsample:
self.downsample = nn.Sequential(
nn.Conv2d(c_in, c_out, 1, stride=2, bias=False),
nn.BatchNorm2d(c_out)
)
elif c_in != c_out:
self.downsample = nn.Sequential(
nn.Conv2d(c_in, c_out, 1, stride=1, bias=False),
nn.BatchNorm2d(c_out)
)
self.is_downsample = True
def forward(self, x):
y = self.conv1(x)
y = self.bn1(y)
y = self.relu(y)
y = self.conv2(y)
y = self.bn2(y)
if self.is_downsample:
x = self.downsample(x)
return F.relu(x.add(y), True)
def make_layers(c_in, c_out, repeat_times, is_downsample=False):
blocks = []
for i in range(repeat_times):
if i == 0:
blocks += [BasicBlock(c_in, c_out, is_downsample=is_downsample), ]
else:
blocks += [BasicBlock(c_out, c_out), ]
return nn.Sequential(*blocks)
class Net(nn.Module):
def __init__(self, num_classes=625, reid=False):
super(Net, self).__init__()
# 3 128 64
self.conv = nn.Sequential(
nn.Conv2d(3, 32, 3, stride=1, padding=1),
nn.BatchNorm2d(32),
nn.ELU(inplace=True),
nn.Conv2d(32, 32, 3, stride=1, padding=1),
nn.BatchNorm2d(32),
nn.ELU(inplace=True),
nn.MaxPool2d(3, 2, padding=1),
)
# 32 64 32
self.layer1 = make_layers(32, 32, 2, False)
# 32 64 32
self.layer2 = make_layers(32, 64, 2, True)
# 64 32 16
self.layer3 = make_layers(64, 128, 2, True)
# 128 16 8
self.dense = nn.Sequential(
nn.Dropout(p=0.6),
nn.Linear(128*16*8, 128),
nn.BatchNorm1d(128),
nn.ELU(inplace=True)
)
# 256 1 1
self.reid = reid
self.batch_norm = nn.BatchNorm1d(128)
self.classifier = nn.Sequential(
nn.Linear(128, num_classes),
)
def forward(self, x):
x = self.conv(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = x.view(x.size(0), -1)
if self.reid:
x = self.dense[0](x)
x = self.dense[1](x)
x = x.div(x.norm(p=2, dim=1, keepdim=True))
return x
x = self.dense(x)
# B x 128
# classifier
x = self.classifier(x)
return x
if __name__ == '__main__':
net = Net(reid=True)
x = torch.randn(4, 3, 128, 64)
y = net(x)
import ipdb
ipdb.set_trace()

View File

@ -0,0 +1,80 @@
import torch
import torch.backends.cudnn as cudnn
import torchvision
import argparse
import os
from model import Net
parser = argparse.ArgumentParser(description="Train on market1501")
parser.add_argument("--data-dir", default='data', type=str)
parser.add_argument("--no-cuda", action="store_true")
parser.add_argument("--gpu-id", default=0, type=int)
args = parser.parse_args()
# device
device = "cuda:{}".format(
args.gpu_id) if torch.cuda.is_available() and not args.no_cuda else "cpu"
if torch.cuda.is_available() and not args.no_cuda:
cudnn.benchmark = True
# data loader
root = args.data_dir
query_dir = os.path.join(root, "query")
gallery_dir = os.path.join(root, "gallery")
transform = torchvision.transforms.Compose([
torchvision.transforms.Resize((128, 64)),
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize(
[0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
queryloader = torch.utils.data.DataLoader(
torchvision.datasets.ImageFolder(query_dir, transform=transform),
batch_size=64, shuffle=False
)
galleryloader = torch.utils.data.DataLoader(
torchvision.datasets.ImageFolder(gallery_dir, transform=transform),
batch_size=64, shuffle=False
)
# net definition
net = Net(reid=True)
assert os.path.isfile(
"./checkpoint/ckpt.t7"), "Error: no checkpoint file found!"
print('Loading from checkpoint/ckpt.t7')
checkpoint = torch.load("./checkpoint/ckpt.t7")
net_dict = checkpoint['net_dict']
net.load_state_dict(net_dict, strict=False)
net.eval()
net.to(device)
# compute features
query_features = torch.tensor([]).float()
query_labels = torch.tensor([]).long()
gallery_features = torch.tensor([]).float()
gallery_labels = torch.tensor([]).long()
with torch.no_grad():
for idx, (inputs, labels) in enumerate(queryloader):
inputs = inputs.to(device)
features = net(inputs).cpu()
query_features = torch.cat((query_features, features), dim=0)
query_labels = torch.cat((query_labels, labels))
for idx, (inputs, labels) in enumerate(galleryloader):
inputs = inputs.to(device)
features = net(inputs).cpu()
gallery_features = torch.cat((gallery_features, features), dim=0)
gallery_labels = torch.cat((gallery_labels, labels))
gallery_labels -= 2
# save features
features = {
"qf": query_features,
"ql": query_labels,
"gf": gallery_features,
"gl": gallery_labels
}
torch.save(features, "features.pth")

View File

@ -0,0 +1,206 @@
import argparse
import os
import time
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.backends.cudnn as cudnn
import torchvision
from model import Net
parser = argparse.ArgumentParser(description="Train on market1501")
parser.add_argument("--data-dir", default='data', type=str)
parser.add_argument("--no-cuda", action="store_true")
parser.add_argument("--gpu-id", default=0, type=int)
parser.add_argument("--lr", default=0.1, type=float)
parser.add_argument("--interval", '-i', default=20, type=int)
parser.add_argument('--resume', '-r', action='store_true')
args = parser.parse_args()
# device
device = "cuda:{}".format(
args.gpu_id) if torch.cuda.is_available() and not args.no_cuda else "cpu"
if torch.cuda.is_available() and not args.no_cuda:
cudnn.benchmark = True
# data loading
root = args.data_dir
train_dir = os.path.join(root, "train")
test_dir = os.path.join(root, "test")
transform_train = torchvision.transforms.Compose([
torchvision.transforms.RandomCrop((128, 64), padding=4),
torchvision.transforms.RandomHorizontalFlip(),
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize(
[0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
transform_test = torchvision.transforms.Compose([
torchvision.transforms.Resize((128, 64)),
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize(
[0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
trainloader = torch.utils.data.DataLoader(
torchvision.datasets.ImageFolder(train_dir, transform=transform_train),
batch_size=64, shuffle=True
)
testloader = torch.utils.data.DataLoader(
torchvision.datasets.ImageFolder(test_dir, transform=transform_test),
batch_size=64, shuffle=True
)
num_classes = max(len(trainloader.dataset.classes),
len(testloader.dataset.classes))
# net definition
start_epoch = 0
net = Net(num_classes=num_classes)
if args.resume:
assert os.path.isfile(
"./checkpoint/ckpt.t7"), "Error: no checkpoint file found!"
print('Loading from checkpoint/ckpt.t7')
checkpoint = torch.load("./checkpoint/ckpt.t7")
# import ipdb; ipdb.set_trace()
net_dict = checkpoint['net_dict']
net.load_state_dict(net_dict)
best_acc = checkpoint['acc']
start_epoch = checkpoint['epoch']
net.to(device)
# loss and optimizer
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(
net.parameters(), args.lr, momentum=0.9, weight_decay=5e-4)
best_acc = 0.
# train function for each epoch
def train(epoch):
print("\nEpoch : %d" % (epoch+1))
net.train()
training_loss = 0.
train_loss = 0.
correct = 0
total = 0
interval = args.interval
start = time.time()
for idx, (inputs, labels) in enumerate(trainloader):
# forward
inputs, labels = inputs.to(device), labels.to(device)
outputs = net(inputs)
loss = criterion(outputs, labels)
# backward
optimizer.zero_grad()
loss.backward()
optimizer.step()
# accumurating
training_loss += loss.item()
train_loss += loss.item()
correct += outputs.max(dim=1)[1].eq(labels).sum().item()
total += labels.size(0)
# print
if (idx+1) % interval == 0:
end = time.time()
print("[progress:{:.1f}%]time:{:.2f}s Loss:{:.5f} Correct:{}/{} Acc:{:.3f}%".format(
100.*(idx+1)/len(trainloader), end-start, training_loss /
interval, correct, total, 100.*correct/total
))
training_loss = 0.
start = time.time()
return train_loss/len(trainloader), 1. - correct/total
def test(epoch):
global best_acc
net.eval()
test_loss = 0.
correct = 0
total = 0
start = time.time()
with torch.no_grad():
for idx, (inputs, labels) in enumerate(testloader):
inputs, labels = inputs.to(device), labels.to(device)
outputs = net(inputs)
loss = criterion(outputs, labels)
test_loss += loss.item()
correct += outputs.max(dim=1)[1].eq(labels).sum().item()
total += labels.size(0)
print("Testing ...")
end = time.time()
print("[progress:{:.1f}%]time:{:.2f}s Loss:{:.5f} Correct:{}/{} Acc:{:.3f}%".format(
100.*(idx+1)/len(testloader), end-start, test_loss /
len(testloader), correct, total, 100.*correct/total
))
# saving checkpoint
acc = 100.*correct/total
if acc > best_acc:
best_acc = acc
print("Saving parameters to checkpoint/ckpt.t7")
checkpoint = {
'net_dict': net.state_dict(),
'acc': acc,
'epoch': epoch,
}
if not os.path.isdir('checkpoint'):
os.mkdir('checkpoint')
torch.save(checkpoint, './checkpoint/ckpt.t7')
return test_loss/len(testloader), 1. - correct/total
# plot figure
x_epoch = []
record = {'train_loss': [], 'train_err': [], 'test_loss': [], 'test_err': []}
fig = plt.figure()
ax0 = fig.add_subplot(121, title="loss")
ax1 = fig.add_subplot(122, title="top1err")
def draw_curve(epoch, train_loss, train_err, test_loss, test_err):
global record
record['train_loss'].append(train_loss)
record['train_err'].append(train_err)
record['test_loss'].append(test_loss)
record['test_err'].append(test_err)
x_epoch.append(epoch)
ax0.plot(x_epoch, record['train_loss'], 'bo-', label='train')
ax0.plot(x_epoch, record['test_loss'], 'ro-', label='val')
ax1.plot(x_epoch, record['train_err'], 'bo-', label='train')
ax1.plot(x_epoch, record['test_err'], 'ro-', label='val')
if epoch == 0:
ax0.legend()
ax1.legend()
fig.savefig("train.jpg")
# lr decay
def lr_decay():
global optimizer
for params in optimizer.param_groups:
params['lr'] *= 0.1
lr = params['lr']
print("Learning rate adjusted to {}".format(lr))
def main():
for epoch in range(start_epoch, start_epoch+40):
train_loss, train_err = train(epoch)
test_loss, test_err = test(epoch)
draw_curve(epoch, train_loss, train_err, test_loss, test_err)
if (epoch+1) % 20 == 0:
lr_decay()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,117 @@
import numpy as np
import torch
from .deep.feature_extractor import Extractor
from .sort.nn_matching import NearestNeighborDistanceMetric
from .sort.preprocessing import non_max_suppression
from .sort.detection import Detection
from .sort.tracker import Tracker
__all__ = ['DeepSort']
class DeepSort(object):
def __init__(self, model_path, max_dist=0.2, min_confidence=0.3, nms_max_overlap=1.0, max_iou_distance=0.7, max_age=70, n_init=3, nn_budget=100, use_cuda=True):
self.min_confidence = min_confidence
self.nms_max_overlap = nms_max_overlap
self.extractor = Extractor(model_path, use_cuda=use_cuda)
max_cosine_distance = max_dist
nn_budget = 100
metric = NearestNeighborDistanceMetric(
"cosine", max_cosine_distance, nn_budget)
self.tracker = Tracker(
metric, max_iou_distance=max_iou_distance, max_age=max_age, n_init=n_init)
def update(self, bbox_xywh, confidences, ori_img):
self.height, self.width = ori_img.shape[:2]
# generate detections
features = self._get_features(bbox_xywh, ori_img)
bbox_tlwh = self._xywh_to_tlwh(bbox_xywh)
detections = [Detection(bbox_tlwh[i], conf, features[i]) for i, conf in enumerate(
confidences) if conf > self.min_confidence]
# run on non-maximum supression
boxes = np.array([d.tlwh for d in detections])
scores = np.array([d.confidence for d in detections])
indices = non_max_suppression(boxes, self.nms_max_overlap, scores)
detections = [detections[i] for i in indices]
# update tracker
self.tracker.predict()
self.tracker.update(detections)
# output bbox identities
outputs = []
for track in self.tracker.tracks:
if not track.is_confirmed() or track.time_since_update > 1:
continue
box = track.to_tlwh()
x1, y1, x2, y2 = self._tlwh_to_xyxy(box)
track_id = track.track_id
outputs.append(np.array([x1, y1, x2, y2, track_id], dtype=np.int))
if len(outputs) > 0:
outputs = np.stack(outputs, axis=0)
return outputs
"""
TODO:
Convert bbox from xc_yc_w_h to xtl_ytl_w_h
Thanks JieChen91@github.com for reporting this bug!
"""
@staticmethod
def _xywh_to_tlwh(bbox_xywh):
if isinstance(bbox_xywh, np.ndarray):
bbox_tlwh = bbox_xywh.copy()
elif isinstance(bbox_xywh, torch.Tensor):
bbox_tlwh = bbox_xywh.clone()
bbox_tlwh[:, 0] = bbox_xywh[:, 0] - bbox_xywh[:, 2] / 2.
bbox_tlwh[:, 1] = bbox_xywh[:, 1] - bbox_xywh[:, 3] / 2.
return bbox_tlwh
def _xywh_to_xyxy(self, bbox_xywh):
x, y, w, h = bbox_xywh
x1 = max(int(x - w / 2), 0)
x2 = min(int(x + w / 2), self.width - 1)
y1 = max(int(y - h / 2), 0)
y2 = min(int(y + h / 2), self.height - 1)
return x1, y1, x2, y2
def _tlwh_to_xyxy(self, bbox_tlwh):
"""
TODO:
Convert bbox from xtl_ytl_w_h to xc_yc_w_h
Thanks JieChen91@github.com for reporting this bug!
"""
x, y, w, h = bbox_tlwh
x1 = max(int(x), 0)
x2 = min(int(x+w), self.width - 1)
y1 = max(int(y), 0)
y2 = min(int(y+h), self.height - 1)
return x1, y1, x2, y2
def increment_ages(self):
self.tracker.increment_ages()
def _xyxy_to_tlwh(self, bbox_xyxy):
x1, y1, x2, y2 = bbox_xyxy
t = x1
l = y1
w = int(x2 - x1)
h = int(y2 - y1)
return t, l, w, h
def _get_features(self, bbox_xywh, ori_img):
im_crops = []
for box in bbox_xywh:
x1, y1, x2, y2 = self._xywh_to_xyxy(box)
im = ori_img[y1:y2, x1:x2]
im_crops.append(im)
if im_crops:
features = self.extractor(im_crops)
else:
features = np.array([])
return features

View File

@ -0,0 +1,49 @@
# vim: expandtab:ts=4:sw=4
import numpy as np
class Detection(object):
"""
This class represents a bounding box detection in a single image.
Parameters
----------
tlwh : array_like
Bounding box in format `(x, y, w, h)`.
confidence : float
Detector confidence score.
feature : array_like
A feature vector that describes the object contained in this image.
Attributes
----------
tlwh : ndarray
Bounding box in format `(top left x, top left y, width, height)`.
confidence : ndarray
Detector confidence score.
feature : ndarray | NoneType
A feature vector that describes the object contained in this image.
"""
def __init__(self, tlwh, confidence, feature):
self.tlwh = np.asarray(tlwh, dtype=np.float)
self.confidence = float(confidence)
self.feature = np.asarray(feature, dtype=np.float32)
def to_tlbr(self):
"""Convert bounding box to format `(min x, min y, max x, max y)`, i.e.,
`(top left, bottom right)`.
"""
ret = self.tlwh.copy()
ret[2:] += ret[:2]
return ret
def to_xyah(self):
"""Convert bounding box to format `(center x, center y, aspect ratio,
height)`, where the aspect ratio is `width / height`.
"""
ret = self.tlwh.copy()
ret[:2] += ret[2:] / 2
ret[2] /= ret[3]
return ret

View File

@ -0,0 +1,82 @@
# vim: expandtab:ts=4:sw=4
from __future__ import absolute_import
import numpy as np
from . import linear_assignment
def iou(bbox, candidates):
"""Computer intersection over union.
Parameters
----------
bbox : ndarray
A bounding box in format `(top left x, top left y, width, height)`.
candidates : ndarray
A matrix of candidate bounding boxes (one per row) in the same format
as `bbox`.
Returns
-------
ndarray
The intersection over union in [0, 1] between the `bbox` and each
candidate. A higher score means a larger fraction of the `bbox` is
occluded by the candidate.
"""
bbox_tl, bbox_br = bbox[:2], bbox[:2] + bbox[2:]
candidates_tl = candidates[:, :2]
candidates_br = candidates[:, :2] + candidates[:, 2:]
tl = np.c_[np.maximum(bbox_tl[0], candidates_tl[:, 0])[:, np.newaxis],
np.maximum(bbox_tl[1], candidates_tl[:, 1])[:, np.newaxis]]
br = np.c_[np.minimum(bbox_br[0], candidates_br[:, 0])[:, np.newaxis],
np.minimum(bbox_br[1], candidates_br[:, 1])[:, np.newaxis]]
wh = np.maximum(0., br - tl)
area_intersection = wh.prod(axis=1)
area_bbox = bbox[2:].prod()
area_candidates = candidates[:, 2:].prod(axis=1)
return area_intersection / (area_bbox + area_candidates - area_intersection)
def iou_cost(tracks, detections, track_indices=None,
detection_indices=None):
"""An intersection over union distance metric.
Parameters
----------
tracks : List[deep_sort.track.Track]
A list of tracks.
detections : List[deep_sort.detection.Detection]
A list of detections.
track_indices : Optional[List[int]]
A list of indices to tracks that should be matched. Defaults to
all `tracks`.
detection_indices : Optional[List[int]]
A list of indices to detections that should be matched. Defaults
to all `detections`.
Returns
-------
ndarray
Returns a cost matrix of shape
len(track_indices), len(detection_indices) where entry (i, j) is
`1 - iou(tracks[track_indices[i]], detections[detection_indices[j]])`.
"""
if track_indices is None:
track_indices = np.arange(len(tracks))
if detection_indices is None:
detection_indices = np.arange(len(detections))
cost_matrix = np.zeros((len(track_indices), len(detection_indices)))
for row, track_idx in enumerate(track_indices):
if tracks[track_idx].time_since_update > 1:
cost_matrix[row, :] = linear_assignment.INFTY_COST
continue
bbox = tracks[track_idx].to_tlwh()
candidates = np.asarray(
[detections[i].tlwh for i in detection_indices])
cost_matrix[row, :] = 1. - iou(bbox, candidates)
return cost_matrix

View File

@ -0,0 +1,229 @@
# vim: expandtab:ts=4:sw=4
import numpy as np
import scipy.linalg
"""
Table for the 0.95 quantile of the chi-square distribution with N degrees of
freedom (contains values for N=1, ..., 9). Taken from MATLAB/Octave's chi2inv
function and used as Mahalanobis gating threshold.
"""
chi2inv95 = {
1: 3.8415,
2: 5.9915,
3: 7.8147,
4: 9.4877,
5: 11.070,
6: 12.592,
7: 14.067,
8: 15.507,
9: 16.919}
class KalmanFilter(object):
"""
A simple Kalman filter for tracking bounding boxes in image space.
The 8-dimensional state space
x, y, a, h, vx, vy, va, vh
contains the bounding box center position (x, y), aspect ratio a, height h,
and their respective velocities.
Object motion follows a constant velocity model. The bounding box location
(x, y, a, h) is taken as direct observation of the state space (linear
observation model).
"""
def __init__(self):
ndim, dt = 4, 1.
# Create Kalman filter model matrices.
self._motion_mat = np.eye(2 * ndim, 2 * ndim)
for i in range(ndim):
self._motion_mat[i, ndim + i] = dt
self._update_mat = np.eye(ndim, 2 * ndim)
# Motion and observation uncertainty are chosen relative to the current
# state estimate. These weights control the amount of uncertainty in
# the model. This is a bit hacky.
self._std_weight_position = 1. / 20
self._std_weight_velocity = 1. / 160
def initiate(self, measurement):
"""Create track from unassociated measurement.
Parameters
----------
measurement : ndarray
Bounding box coordinates (x, y, a, h) with center position (x, y),
aspect ratio a, and height h.
Returns
-------
(ndarray, ndarray)
Returns the mean vector (8 dimensional) and covariance matrix (8x8
dimensional) of the new track. Unobserved velocities are initialized
to 0 mean.
"""
mean_pos = measurement
mean_vel = np.zeros_like(mean_pos)
mean = np.r_[mean_pos, mean_vel]
std = [
2 * self._std_weight_position * measurement[3],
2 * self._std_weight_position * measurement[3],
1e-2,
2 * self._std_weight_position * measurement[3],
10 * self._std_weight_velocity * measurement[3],
10 * self._std_weight_velocity * measurement[3],
1e-5,
10 * self._std_weight_velocity * measurement[3]]
covariance = np.diag(np.square(std))
return mean, covariance
def predict(self, mean, covariance):
"""Run Kalman filter prediction step.
Parameters
----------
mean : ndarray
The 8 dimensional mean vector of the object state at the previous
time step.
covariance : ndarray
The 8x8 dimensional covariance matrix of the object state at the
previous time step.
Returns
-------
(ndarray, ndarray)
Returns the mean vector and covariance matrix of the predicted
state. Unobserved velocities are initialized to 0 mean.
"""
std_pos = [
self._std_weight_position * mean[3],
self._std_weight_position * mean[3],
1e-2,
self._std_weight_position * mean[3]]
std_vel = [
self._std_weight_velocity * mean[3],
self._std_weight_velocity * mean[3],
1e-5,
self._std_weight_velocity * mean[3]]
motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))
mean = np.dot(self._motion_mat, mean)
covariance = np.linalg.multi_dot((
self._motion_mat, covariance, self._motion_mat.T)) + motion_cov
return mean, covariance
def project(self, mean, covariance):
"""Project state distribution to measurement space.
Parameters
----------
mean : ndarray
The state's mean vector (8 dimensional array).
covariance : ndarray
The state's covariance matrix (8x8 dimensional).
Returns
-------
(ndarray, ndarray)
Returns the projected mean and covariance matrix of the given state
estimate.
"""
std = [
self._std_weight_position * mean[3],
self._std_weight_position * mean[3],
1e-1,
self._std_weight_position * mean[3]]
innovation_cov = np.diag(np.square(std))
mean = np.dot(self._update_mat, mean)
covariance = np.linalg.multi_dot((
self._update_mat, covariance, self._update_mat.T))
return mean, covariance + innovation_cov
def update(self, mean, covariance, measurement):
"""Run Kalman filter correction step.
Parameters
----------
mean : ndarray
The predicted state's mean vector (8 dimensional).
covariance : ndarray
The state's covariance matrix (8x8 dimensional).
measurement : ndarray
The 4 dimensional measurement vector (x, y, a, h), where (x, y)
is the center position, a the aspect ratio, and h the height of the
bounding box.
Returns
-------
(ndarray, ndarray)
Returns the measurement-corrected state distribution.
"""
projected_mean, projected_cov = self.project(mean, covariance)
chol_factor, lower = scipy.linalg.cho_factor(
projected_cov, lower=True, check_finite=False)
kalman_gain = scipy.linalg.cho_solve(
(chol_factor, lower), np.dot(covariance, self._update_mat.T).T,
check_finite=False).T
innovation = measurement - projected_mean
new_mean = mean + np.dot(innovation, kalman_gain.T)
new_covariance = covariance - np.linalg.multi_dot((
kalman_gain, projected_cov, kalman_gain.T))
return new_mean, new_covariance
def gating_distance(self, mean, covariance, measurements,
only_position=False):
"""Compute gating distance between state distribution and measurements.
A suitable distance threshold can be obtained from `chi2inv95`. If
`only_position` is False, the chi-square distribution has 4 degrees of
freedom, otherwise 2.
Parameters
----------
mean : ndarray
Mean vector over the state distribution (8 dimensional).
covariance : ndarray
Covariance of the state distribution (8x8 dimensional).
measurements : ndarray
An Nx4 dimensional matrix of N measurements, each in
format (x, y, a, h) where (x, y) is the bounding box center
position, a the aspect ratio, and h the height.
only_position : Optional[bool]
If True, distance computation is done with respect to the bounding
box center position only.
Returns
-------
ndarray
Returns an array of length N, where the i-th element contains the
squared Mahalanobis distance between (mean, covariance) and
`measurements[i]`.
"""
mean, covariance = self.project(mean, covariance)
if only_position:
mean, covariance = mean[:2], covariance[:2, :2]
measurements = measurements[:, :2]
cholesky_factor = np.linalg.cholesky(covariance)
d = measurements - mean
z = scipy.linalg.solve_triangular(
cholesky_factor, d.T, lower=True, check_finite=False,
overwrite_b=True)
squared_maha = np.sum(z * z, axis=0)
return squared_maha

View File

@ -0,0 +1,192 @@
# vim: expandtab:ts=4:sw=4
from __future__ import absolute_import
import numpy as np
# from sklearn.utils.linear_assignment_ import linear_assignment
from scipy.optimize import linear_sum_assignment as linear_assignment
from . import kalman_filter
INFTY_COST = 1e+5
def min_cost_matching(
distance_metric, max_distance, tracks, detections, track_indices=None,
detection_indices=None):
"""Solve linear assignment problem.
Parameters
----------
distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
The distance metric is given a list of tracks and detections as well as
a list of N track indices and M detection indices. The metric should
return the NxM dimensional cost matrix, where element (i, j) is the
association cost between the i-th track in the given track indices and
the j-th detection in the given detection_indices.
max_distance : float
Gating threshold. Associations with cost larger than this value are
disregarded.
tracks : List[track.Track]
A list of predicted tracks at the current time step.
detections : List[detection.Detection]
A list of detections at the current time step.
track_indices : List[int]
List of track indices that maps rows in `cost_matrix` to tracks in
`tracks` (see description above).
detection_indices : List[int]
List of detection indices that maps columns in `cost_matrix` to
detections in `detections` (see description above).
Returns
-------
(List[(int, int)], List[int], List[int])
Returns a tuple with the following three entries:
* A list of matched track and detection indices.
* A list of unmatched track indices.
* A list of unmatched detection indices.
"""
if track_indices is None:
track_indices = np.arange(len(tracks))
if detection_indices is None:
detection_indices = np.arange(len(detections))
if len(detection_indices) == 0 or len(track_indices) == 0:
return [], track_indices, detection_indices # Nothing to match.
cost_matrix = distance_metric(
tracks, detections, track_indices, detection_indices)
cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5
row_indices, col_indices = linear_assignment(cost_matrix)
matches, unmatched_tracks, unmatched_detections = [], [], []
for col, detection_idx in enumerate(detection_indices):
if col not in col_indices:
unmatched_detections.append(detection_idx)
for row, track_idx in enumerate(track_indices):
if row not in row_indices:
unmatched_tracks.append(track_idx)
for row, col in zip(row_indices, col_indices):
track_idx = track_indices[row]
detection_idx = detection_indices[col]
if cost_matrix[row, col] > max_distance:
unmatched_tracks.append(track_idx)
unmatched_detections.append(detection_idx)
else:
matches.append((track_idx, detection_idx))
return matches, unmatched_tracks, unmatched_detections
def matching_cascade(
distance_metric, max_distance, cascade_depth, tracks, detections,
track_indices=None, detection_indices=None):
"""Run matching cascade.
Parameters
----------
distance_metric : Callable[List[Track], List[Detection], List[int], List[int]) -> ndarray
The distance metric is given a list of tracks and detections as well as
a list of N track indices and M detection indices. The metric should
return the NxM dimensional cost matrix, where element (i, j) is the
association cost between the i-th track in the given track indices and
the j-th detection in the given detection indices.
max_distance : float
Gating threshold. Associations with cost larger than this value are
disregarded.
cascade_depth: int
The cascade depth, should be se to the maximum track age.
tracks : List[track.Track]
A list of predicted tracks at the current time step.
detections : List[detection.Detection]
A list of detections at the current time step.
track_indices : Optional[List[int]]
List of track indices that maps rows in `cost_matrix` to tracks in
`tracks` (see description above). Defaults to all tracks.
detection_indices : Optional[List[int]]
List of detection indices that maps columns in `cost_matrix` to
detections in `detections` (see description above). Defaults to all
detections.
Returns
-------
(List[(int, int)], List[int], List[int])
Returns a tuple with the following three entries:
* A list of matched track and detection indices.
* A list of unmatched track indices.
* A list of unmatched detection indices.
"""
if track_indices is None:
track_indices = list(range(len(tracks)))
if detection_indices is None:
detection_indices = list(range(len(detections)))
unmatched_detections = detection_indices
matches = []
for level in range(cascade_depth):
if len(unmatched_detections) == 0: # No detections left
break
track_indices_l = [
k for k in track_indices
if tracks[k].time_since_update == 1 + level
]
if len(track_indices_l) == 0: # Nothing to match at this level
continue
matches_l, _, unmatched_detections = \
min_cost_matching(
distance_metric, max_distance, tracks, detections,
track_indices_l, unmatched_detections)
matches += matches_l
unmatched_tracks = list(set(track_indices) - set(k for k, _ in matches))
return matches, unmatched_tracks, unmatched_detections
def gate_cost_matrix(
kf, cost_matrix, tracks, detections, track_indices, detection_indices,
gated_cost=INFTY_COST, only_position=False):
"""Invalidate infeasible entries in cost matrix based on the state
distributions obtained by Kalman filtering.
Parameters
----------
kf : The Kalman filter.
cost_matrix : ndarray
The NxM dimensional cost matrix, where N is the number of track indices
and M is the number of detection indices, such that entry (i, j) is the
association cost between `tracks[track_indices[i]]` and
`detections[detection_indices[j]]`.
tracks : List[track.Track]
A list of predicted tracks at the current time step.
detections : List[detection.Detection]
A list of detections at the current time step.
track_indices : List[int]
List of track indices that maps rows in `cost_matrix` to tracks in
`tracks` (see description above).
detection_indices : List[int]
List of detection indices that maps columns in `cost_matrix` to
detections in `detections` (see description above).
gated_cost : Optional[float]
Entries in the cost matrix corresponding to infeasible associations are
set this value. Defaults to a very large value.
only_position : Optional[bool]
If True, only the x, y position of the state distribution is considered
during gating. Defaults to False.
Returns
-------
ndarray
Returns the modified cost matrix.
"""
gating_dim = 2 if only_position else 4
gating_threshold = kalman_filter.chi2inv95[gating_dim]
measurements = np.asarray(
[detections[i].to_xyah() for i in detection_indices])
for row, track_idx in enumerate(track_indices):
track = tracks[track_idx]
gating_distance = kf.gating_distance(
track.mean, track.covariance, measurements, only_position)
cost_matrix[row, gating_distance > gating_threshold] = gated_cost
return cost_matrix

View File

@ -0,0 +1,176 @@
# vim: expandtab:ts=4:sw=4
import numpy as np
def _pdist(a, b):
"""Compute pair-wise squared distance between points in `a` and `b`.
Parameters
----------
a : array_like
An NxM matrix of N samples of dimensionality M.
b : array_like
An LxM matrix of L samples of dimensionality M.
Returns
-------
ndarray
Returns a matrix of size len(a), len(b) such that eleement (i, j)
contains the squared distance between `a[i]` and `b[j]`.
"""
a, b = np.asarray(a), np.asarray(b)
if len(a) == 0 or len(b) == 0:
return np.zeros((len(a), len(b)))
a2, b2 = np.square(a).sum(axis=1), np.square(b).sum(axis=1)
r2 = -2. * np.dot(a, b.T) + a2[:, None] + b2[None, :]
r2 = np.clip(r2, 0., float(np.inf))
return r2
def _cosine_distance(a, b, data_is_normalized=False):
"""Compute pair-wise cosine distance between points in `a` and `b`.
Parameters
----------
a : array_like
An NxM matrix of N samples of dimensionality M.
b : array_like
An LxM matrix of L samples of dimensionality M.
data_is_normalized : Optional[bool]
If True, assumes rows in a and b are unit length vectors.
Otherwise, a and b are explicitly normalized to lenght 1.
Returns
-------
ndarray
Returns a matrix of size len(a), len(b) such that eleement (i, j)
contains the squared distance between `a[i]` and `b[j]`.
"""
if not data_is_normalized:
a = np.asarray(a) / np.linalg.norm(a, axis=1, keepdims=True)
b = np.asarray(b) / np.linalg.norm(b, axis=1, keepdims=True)
return 1. - np.dot(a, b.T)
def _nn_euclidean_distance(x, y):
""" Helper function for nearest neighbor distance metric (Euclidean).
Parameters
----------
x : ndarray
A matrix of N row-vectors (sample points).
y : ndarray
A matrix of M row-vectors (query points).
Returns
-------
ndarray
A vector of length M that contains for each entry in `y` the
smallest Euclidean distance to a sample in `x`.
"""
distances = _pdist(x, y)
return np.maximum(0.0, distances.min(axis=0))
def _nn_cosine_distance(x, y):
""" Helper function for nearest neighbor distance metric (cosine).
Parameters
----------
x : ndarray
A matrix of N row-vectors (sample points).
y : ndarray
A matrix of M row-vectors (query points).
Returns
-------
ndarray
A vector of length M that contains for each entry in `y` the
smallest cosine distance to a sample in `x`.
"""
distances = _cosine_distance(x, y)
return distances.min(axis=0)
class NearestNeighborDistanceMetric(object):
"""
A nearest neighbor distance metric that, for each target, returns
the closest distance to any sample that has been observed so far.
Parameters
----------
metric : str
Either "euclidean" or "cosine".
matching_threshold: float
The matching threshold. Samples with larger distance are considered an
invalid match.
budget : Optional[int]
If not None, fix samples per class to at most this number. Removes
the oldest samples when the budget is reached.
Attributes
----------
samples : Dict[int -> List[ndarray]]
A dictionary that maps from target identities to the list of samples
that have been observed so far.
"""
def __init__(self, metric, matching_threshold, budget=None):
if metric == "euclidean":
self._metric = _nn_euclidean_distance
elif metric == "cosine":
self._metric = _nn_cosine_distance
else:
raise ValueError(
"Invalid metric; must be either 'euclidean' or 'cosine'")
self.matching_threshold = matching_threshold
self.budget = budget
self.samples = {}
def partial_fit(self, features, targets, active_targets):
"""Update the distance metric with new data.
Parameters
----------
features : ndarray
An NxM matrix of N features of dimensionality M.
targets : ndarray
An integer array of associated target identities.
active_targets : List[int]
A list of targets that are currently present in the scene.
"""
for feature, target in zip(features, targets):
self.samples.setdefault(target, []).append(feature)
if self.budget is not None:
self.samples[target] = self.samples[target][-self.budget:]
self.samples = {k: self.samples[k] for k in active_targets}
def distance(self, features, targets):
"""Compute distance between features and targets.
Parameters
----------
features : ndarray
An NxM matrix of N features of dimensionality M.
targets : List[int]
A list of targets to match the given `features` against.
Returns
-------
ndarray
Returns a cost matrix of shape len(targets), len(features), where
element (i, j) contains the closest squared distance between
`targets[i]` and `features[j]`.
"""
cost_matrix = np.zeros((len(targets), len(features)))
for i, target in enumerate(targets):
cost_matrix[i, :] = self._metric(self.samples[target], features)
return cost_matrix

View File

@ -0,0 +1,73 @@
# vim: expandtab:ts=4:sw=4
import numpy as np
import cv2
def non_max_suppression(boxes, max_bbox_overlap, scores=None):
"""Suppress overlapping detections.
Original code from [1]_ has been adapted to include confidence score.
.. [1] http://www.pyimagesearch.com/2015/02/16/
faster-non-maximum-suppression-python/
Examples
--------
>>> boxes = [d.roi for d in detections]
>>> scores = [d.confidence for d in detections]
>>> indices = non_max_suppression(boxes, max_bbox_overlap, scores)
>>> detections = [detections[i] for i in indices]
Parameters
----------
boxes : ndarray
Array of ROIs (x, y, width, height).
max_bbox_overlap : float
ROIs that overlap more than this values are suppressed.
scores : Optional[array_like]
Detector confidence score.
Returns
-------
List[int]
Returns indices of detections that have survived non-maxima suppression.
"""
if len(boxes) == 0:
return []
boxes = boxes.astype(np.float)
pick = []
x1 = boxes[:, 0]
y1 = boxes[:, 1]
x2 = boxes[:, 2] + boxes[:, 0]
y2 = boxes[:, 3] + boxes[:, 1]
area = (x2 - x1 + 1) * (y2 - y1 + 1)
if scores is not None:
idxs = np.argsort(scores)
else:
idxs = np.argsort(y2)
while len(idxs) > 0:
last = len(idxs) - 1
i = idxs[last]
pick.append(i)
xx1 = np.maximum(x1[i], x1[idxs[:last]])
yy1 = np.maximum(y1[i], y1[idxs[:last]])
xx2 = np.minimum(x2[i], x2[idxs[:last]])
yy2 = np.minimum(y2[i], y2[idxs[:last]])
w = np.maximum(0, xx2 - xx1 + 1)
h = np.maximum(0, yy2 - yy1 + 1)
overlap = (w * h) / area[idxs[:last]]
idxs = np.delete(
idxs, np.concatenate(
([last], np.where(overlap > max_bbox_overlap)[0])))
return pick

View File

@ -0,0 +1,169 @@
# vim: expandtab:ts=4:sw=4
class TrackState:
"""
Enumeration type for the single target track state. Newly created tracks are
classified as `tentative` until enough evidence has been collected. Then,
the track state is changed to `confirmed`. Tracks that are no longer alive
are classified as `deleted` to mark them for removal from the set of active
tracks.
"""
Tentative = 1
Confirmed = 2
Deleted = 3
class Track:
"""
A single target track with state space `(x, y, a, h)` and associated
velocities, where `(x, y)` is the center of the bounding box, `a` is the
aspect ratio and `h` is the height.
Parameters
----------
mean : ndarray
Mean vector of the initial state distribution.
covariance : ndarray
Covariance matrix of the initial state distribution.
track_id : int
A unique track identifier.
n_init : int
Number of consecutive detections before the track is confirmed. The
track state is set to `Deleted` if a miss occurs within the first
`n_init` frames.
max_age : int
The maximum number of consecutive misses before the track state is
set to `Deleted`.
feature : Optional[ndarray]
Feature vector of the detection this track originates from. If not None,
this feature is added to the `features` cache.
Attributes
----------
mean : ndarray
Mean vector of the initial state distribution.
covariance : ndarray
Covariance matrix of the initial state distribution.
track_id : int
A unique track identifier.
hits : int
Total number of measurement updates.
age : int
Total number of frames since first occurance.
time_since_update : int
Total number of frames since last measurement update.
state : TrackState
The current track state.
features : List[ndarray]
A cache of features. On each measurement update, the associated feature
vector is added to this list.
"""
def __init__(self, mean, covariance, track_id, n_init, max_age,
feature=None):
self.mean = mean
self.covariance = covariance
self.track_id = track_id
self.hits = 1
self.age = 1
self.time_since_update = 0
self.state = TrackState.Tentative
self.features = []
if feature is not None:
self.features.append(feature)
self._n_init = n_init
self._max_age = max_age
def to_tlwh(self):
"""Get current position in bounding box format `(top left x, top left y,
width, height)`.
Returns
-------
ndarray
The bounding box.
"""
ret = self.mean[:4].copy()
ret[2] *= ret[3]
ret[:2] -= ret[2:] / 2
return ret
def to_tlbr(self):
"""Get current position in bounding box format `(min x, miny, max x,
max y)`.
Returns
-------
ndarray
The bounding box.
"""
ret = self.to_tlwh()
ret[2:] = ret[:2] + ret[2:]
return ret
def increment_age(self):
self.age += 1
self.time_since_update += 1
def predict(self, kf):
"""Propagate the state distribution to the current time step using a
Kalman filter prediction step.
Parameters
----------
kf : kalman_filter.KalmanFilter
The Kalman filter.
"""
self.mean, self.covariance = kf.predict(self.mean, self.covariance)
self.increment_age()
def update(self, kf, detection):
"""Perform Kalman filter measurement update step and update the feature
cache.
Parameters
----------
kf : kalman_filter.KalmanFilter
The Kalman filter.
detection : Detection
The associated detection.
"""
self.mean, self.covariance = kf.update(
self.mean, self.covariance, detection.to_xyah())
self.features.append(detection.feature)
self.hits += 1
self.time_since_update = 0
if self.state == TrackState.Tentative and self.hits >= self._n_init:
self.state = TrackState.Confirmed
def mark_missed(self):
"""Mark this track as missed (no association at the current time step).
"""
if self.state == TrackState.Tentative:
self.state = TrackState.Deleted
elif self.time_since_update > self._max_age:
self.state = TrackState.Deleted
def is_tentative(self):
"""Returns True if this track is tentative (unconfirmed).
"""
return self.state == TrackState.Tentative
def is_confirmed(self):
"""Returns True if this track is confirmed."""
return self.state == TrackState.Confirmed
def is_deleted(self):
"""Returns True if this track is dead and should be deleted."""
return self.state == TrackState.Deleted

View File

@ -0,0 +1,143 @@
# vim: expandtab:ts=4:sw=4
from __future__ import absolute_import
import numpy as np
from . import kalman_filter
from . import linear_assignment
from . import iou_matching
from .track import Track
class Tracker:
"""
This is the multi-target tracker.
Parameters
----------
metric : nn_matching.NearestNeighborDistanceMetric
A distance metric for measurement-to-track association.
max_age : int
Maximum number of missed misses before a track is deleted.
n_init : int
Number of consecutive detections before the track is confirmed. The
track state is set to `Deleted` if a miss occurs within the first
`n_init` frames.
Attributes
----------
metric : nn_matching.NearestNeighborDistanceMetric
The distance metric used for measurement to track association.
max_age : int
Maximum number of missed misses before a track is deleted.
n_init : int
Number of frames that a track remains in initialization phase.
kf : kalman_filter.KalmanFilter
A Kalman filter to filter target trajectories in image space.
tracks : List[Track]
The list of active tracks at the current time step.
"""
def __init__(self, metric, max_iou_distance=0.7, max_age=70, n_init=3):
self.metric = metric
self.max_iou_distance = max_iou_distance
self.max_age = max_age
self.n_init = n_init
self.kf = kalman_filter.KalmanFilter()
self.tracks = []
self._next_id = 1
def predict(self):
"""Propagate track state distributions one time step forward.
This function should be called once every time step, before `update`.
"""
for track in self.tracks:
track.predict(self.kf)
def increment_ages(self):
for track in self.tracks:
track.increment_age()
track.mark_missed()
def update(self, detections):
"""Perform measurement update and track management.
Parameters
----------
detections : List[deep_sort.detection.Detection]
A list of detections at the current time step.
"""
# Run matching cascade.
matches, unmatched_tracks, unmatched_detections = \
self._match(detections)
# Update track set.
for track_idx, detection_idx in matches:
self.tracks[track_idx].update(
self.kf, detections[detection_idx])
for track_idx in unmatched_tracks:
self.tracks[track_idx].mark_missed()
for detection_idx in unmatched_detections:
self._initiate_track(detections[detection_idx])
self.tracks = [t for t in self.tracks if not t.is_deleted()]
# Update distance metric.
active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]
features, targets = [], []
for track in self.tracks:
if not track.is_confirmed():
continue
features += track.features
targets += [track.track_id for _ in track.features]
track.features = []
self.metric.partial_fit(
np.asarray(features), np.asarray(targets), active_targets)
def _match(self, detections):
def gated_metric(tracks, dets, track_indices, detection_indices):
features = np.array([dets[i].feature for i in detection_indices])
targets = np.array([tracks[i].track_id for i in track_indices])
cost_matrix = self.metric.distance(features, targets)
cost_matrix = linear_assignment.gate_cost_matrix(
self.kf, cost_matrix, tracks, dets, track_indices,
detection_indices)
return cost_matrix
# Split track set into confirmed and unconfirmed tracks.
confirmed_tracks = [
i for i, t in enumerate(self.tracks) if t.is_confirmed()]
unconfirmed_tracks = [
i for i, t in enumerate(self.tracks) if not t.is_confirmed()]
# Associate confirmed tracks using appearance features.
matches_a, unmatched_tracks_a, unmatched_detections = \
linear_assignment.matching_cascade(
gated_metric, self.metric.matching_threshold, self.max_age,
self.tracks, detections, confirmed_tracks)
# Associate remaining tracks together with unconfirmed tracks using IOU.
iou_track_candidates = unconfirmed_tracks + [
k for k in unmatched_tracks_a if
self.tracks[k].time_since_update == 1]
unmatched_tracks_a = [
k for k in unmatched_tracks_a if
self.tracks[k].time_since_update != 1]
matches_b, unmatched_tracks_b, unmatched_detections = \
linear_assignment.min_cost_matching(
iou_matching.iou_cost, self.max_iou_distance, self.tracks,
detections, iou_track_candidates, unmatched_detections)
matches = matches_a + matches_b
unmatched_tracks = list(set(unmatched_tracks_a + unmatched_tracks_b))
return matches, unmatched_tracks, unmatched_detections
def _initiate_track(self, detection):
mean, covariance = self.kf.initiate(detection.to_xyah())
self.tracks.append(Track(
mean, covariance, self._next_id, self.n_init, self.max_age,
detection.feature))
self._next_id += 1

View File

@ -0,0 +1,13 @@
from os import environ
def assert_in(file, files_to_check):
if file not in files_to_check:
raise AssertionError("{} does not exist in the list".format(str(file)))
return True
def assert_in_env(check_list: list):
for item in check_list:
assert_in(item, environ.keys())
return True

View File

@ -0,0 +1,36 @@
import numpy as np
import cv2
palette = (2 ** 11 - 1, 2 ** 15 - 1, 2 ** 20 - 1)
def compute_color_for_labels(label):
"""
Simple function that adds fixed color depending on the class
"""
color = [int((p * (label ** 2 - label + 1)) % 255) for p in palette]
return tuple(color)
def draw_boxes(img, bbox, identities=None, offset=(0,0)):
for i,box in enumerate(bbox):
x1,y1,x2,y2 = [int(i) for i in box]
x1 += offset[0]
x2 += offset[0]
y1 += offset[1]
y2 += offset[1]
# box text and bar
id = int(identities[i]) if identities is not None else 0
color = compute_color_for_labels(id)
label = '{}{:d}'.format("", id)
t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 2 , 2)[0]
cv2.rectangle(img,(x1, y1),(x2,y2),color,3)
cv2.rectangle(img,(x1, y1),(x1+t_size[0]+3,y1+t_size[1]+4), color,-1)
cv2.putText(img,label,(x1,y1+t_size[1]+4), cv2.FONT_HERSHEY_PLAIN, 2, [255,255,255], 2)
return img
if __name__ == '__main__':
for i in range(82):
print(compute_color_for_labels(i))

View File

@ -0,0 +1,103 @@
import os
import numpy as np
import copy
import motmetrics as mm
mm.lap.default_solver = 'lap'
from utils.io import read_results, unzip_objs
class Evaluator(object):
def __init__(self, data_root, seq_name, data_type):
self.data_root = data_root
self.seq_name = seq_name
self.data_type = data_type
self.load_annotations()
self.reset_accumulator()
def load_annotations(self):
assert self.data_type == 'mot'
gt_filename = os.path.join(self.data_root, self.seq_name, 'gt', 'gt.txt')
self.gt_frame_dict = read_results(gt_filename, self.data_type, is_gt=True)
self.gt_ignore_frame_dict = read_results(gt_filename, self.data_type, is_ignore=True)
def reset_accumulator(self):
self.acc = mm.MOTAccumulator(auto_id=True)
def eval_frame(self, frame_id, trk_tlwhs, trk_ids, rtn_events=False):
# results
trk_tlwhs = np.copy(trk_tlwhs)
trk_ids = np.copy(trk_ids)
# gts
gt_objs = self.gt_frame_dict.get(frame_id, [])
gt_tlwhs, gt_ids = unzip_objs(gt_objs)[:2]
# ignore boxes
ignore_objs = self.gt_ignore_frame_dict.get(frame_id, [])
ignore_tlwhs = unzip_objs(ignore_objs)[0]
# remove ignored results
keep = np.ones(len(trk_tlwhs), dtype=bool)
iou_distance = mm.distances.iou_matrix(ignore_tlwhs, trk_tlwhs, max_iou=0.5)
if len(iou_distance) > 0:
match_is, match_js = mm.lap.linear_sum_assignment(iou_distance)
match_is, match_js = map(lambda a: np.asarray(a, dtype=int), [match_is, match_js])
match_ious = iou_distance[match_is, match_js]
match_js = np.asarray(match_js, dtype=int)
match_js = match_js[np.logical_not(np.isnan(match_ious))]
keep[match_js] = False
trk_tlwhs = trk_tlwhs[keep]
trk_ids = trk_ids[keep]
# get distance matrix
iou_distance = mm.distances.iou_matrix(gt_tlwhs, trk_tlwhs, max_iou=0.5)
# acc
self.acc.update(gt_ids, trk_ids, iou_distance)
if rtn_events and iou_distance.size > 0 and hasattr(self.acc, 'last_mot_events'):
events = self.acc.last_mot_events # only supported by https://github.com/longcw/py-motmetrics
else:
events = None
return events
def eval_file(self, filename):
self.reset_accumulator()
result_frame_dict = read_results(filename, self.data_type, is_gt=False)
frames = sorted(list(set(self.gt_frame_dict.keys()) | set(result_frame_dict.keys())))
for frame_id in frames:
trk_objs = result_frame_dict.get(frame_id, [])
trk_tlwhs, trk_ids = unzip_objs(trk_objs)[:2]
self.eval_frame(frame_id, trk_tlwhs, trk_ids, rtn_events=False)
return self.acc
@staticmethod
def get_summary(accs, names, metrics=('mota', 'num_switches', 'idp', 'idr', 'idf1', 'precision', 'recall')):
names = copy.deepcopy(names)
if metrics is None:
metrics = mm.metrics.motchallenge_metrics
metrics = copy.deepcopy(metrics)
mh = mm.metrics.create()
summary = mh.compute_many(
accs,
metrics=metrics,
names=names,
generate_overall=True
)
return summary
@staticmethod
def save_summary(summary, filename):
import pandas as pd
writer = pd.ExcelWriter(filename)
summary.to_excel(writer)
writer.save()

View File

@ -0,0 +1,133 @@
import os
from typing import Dict
import numpy as np
# from utils.log import get_logger
def write_results(filename, results, data_type):
if data_type == 'mot':
save_format = '{frame},{id},{x1},{y1},{w},{h},-1,-1,-1,-1\n'
elif data_type == 'kitti':
save_format = '{frame} {id} pedestrian 0 0 -10 {x1} {y1} {x2} {y2} -10 -10 -10 -1000 -1000 -1000 -10\n'
else:
raise ValueError(data_type)
with open(filename, 'w') as f:
for frame_id, tlwhs, track_ids in results:
if data_type == 'kitti':
frame_id -= 1
for tlwh, track_id in zip(tlwhs, track_ids):
if track_id < 0:
continue
x1, y1, w, h = tlwh
x2, y2 = x1 + w, y1 + h
line = save_format.format(frame=frame_id, id=track_id, x1=x1, y1=y1, x2=x2, y2=y2, w=w, h=h)
f.write(line)
# def write_results(filename, results_dict: Dict, data_type: str):
# if not filename:
# return
# path = os.path.dirname(filename)
# if not os.path.exists(path):
# os.makedirs(path)
# if data_type in ('mot', 'mcmot', 'lab'):
# save_format = '{frame},{id},{x1},{y1},{w},{h},1,-1,-1,-1\n'
# elif data_type == 'kitti':
# save_format = '{frame} {id} pedestrian -1 -1 -10 {x1} {y1} {x2} {y2} -1 -1 -1 -1000 -1000 -1000 -10 {score}\n'
# else:
# raise ValueError(data_type)
# with open(filename, 'w') as f:
# for frame_id, frame_data in results_dict.items():
# if data_type == 'kitti':
# frame_id -= 1
# for tlwh, track_id in frame_data:
# if track_id < 0:
# continue
# x1, y1, w, h = tlwh
# x2, y2 = x1 + w, y1 + h
# line = save_format.format(frame=frame_id, id=track_id, x1=x1, y1=y1, x2=x2, y2=y2, w=w, h=h, score=1.0)
# f.write(line)
# logger.info('Save results to {}'.format(filename))
def read_results(filename, data_type: str, is_gt=False, is_ignore=False):
if data_type in ('mot', 'lab'):
read_fun = read_mot_results
else:
raise ValueError('Unknown data type: {}'.format(data_type))
return read_fun(filename, is_gt, is_ignore)
"""
labels={'ped', ... % 1
'person_on_vhcl', ... % 2
'car', ... % 3
'bicycle', ... % 4
'mbike', ... % 5
'non_mot_vhcl', ... % 6
'static_person', ... % 7
'distractor', ... % 8
'occluder', ... % 9
'occluder_on_grnd', ... %10
'occluder_full', ... % 11
'reflection', ... % 12
'crowd' ... % 13
};
"""
def read_mot_results(filename, is_gt, is_ignore):
valid_labels = {1}
ignore_labels = {2, 7, 8, 12}
results_dict = dict()
if os.path.isfile(filename):
with open(filename, 'r') as f:
for line in f.readlines():
linelist = line.split(',')
if len(linelist) < 7:
continue
fid = int(linelist[0])
if fid < 1:
continue
results_dict.setdefault(fid, list())
if is_gt:
if 'MOT16-' in filename or 'MOT17-' in filename:
label = int(float(linelist[7]))
mark = int(float(linelist[6]))
if mark == 0 or label not in valid_labels:
continue
score = 1
elif is_ignore:
if 'MOT16-' in filename or 'MOT17-' in filename:
label = int(float(linelist[7]))
vis_ratio = float(linelist[8])
if label not in ignore_labels and vis_ratio >= 0:
continue
else:
continue
score = 1
else:
score = float(linelist[6])
tlwh = tuple(map(float, linelist[2:6]))
target_id = int(linelist[1])
results_dict[fid].append((tlwh, target_id, score))
return results_dict
def unzip_objs(objs):
if len(objs) > 0:
tlwhs, ids, scores = zip(*objs)
else:
tlwhs, ids, scores = [], [], []
tlwhs = np.asarray(tlwhs, dtype=float).reshape(-1, 4)
return tlwhs, ids, scores

View File

@ -0,0 +1,383 @@
"""
References:
https://medium.com/analytics-vidhya/creating-a-custom-logging-mechanism-for-real-time-object-detection-using-tdd-4ca2cfcd0a2f
"""
import json
from os import makedirs
from os.path import exists, join
from datetime import datetime
class JsonMeta(object):
HOURS = 3
MINUTES = 59
SECONDS = 59
PATH_TO_SAVE = 'LOGS'
DEFAULT_FILE_NAME = 'remaining'
class BaseJsonLogger(object):
"""
This is the base class that returns __dict__ of its own
it also returns the dicts of objects in the attributes that are list instances
"""
def dic(self):
# returns dicts of objects
out = {}
for k, v in self.__dict__.items():
if hasattr(v, 'dic'):
out[k] = v.dic()
elif isinstance(v, list):
out[k] = self.list(v)
else:
out[k] = v
return out
@staticmethod
def list(values):
# applies the dic method on items in the list
return [v.dic() if hasattr(v, 'dic') else v for v in values]
class Label(BaseJsonLogger):
"""
For each bounding box there are various categories with confidences. Label class keeps track of that information.
"""
def __init__(self, category: str, confidence: float):
self.category = category
self.confidence = confidence
class Bbox(BaseJsonLogger):
"""
This module stores the information for each frame and use them in JsonParser
Attributes:
labels (list): List of label module.
top (int):
left (int):
width (int):
height (int):
Args:
bbox_id (float):
top (int):
left (int):
width (int):
height (int):
References:
Check Label module for better understanding.
"""
def __init__(self, bbox_id, top, left, width, height):
self.labels = []
self.bbox_id = bbox_id
self.top = top
self.left = left
self.width = width
self.height = height
def add_label(self, category, confidence):
# adds category and confidence only if top_k is not exceeded.
self.labels.append(Label(category, confidence))
def labels_full(self, value):
return len(self.labels) == value
class Frame(BaseJsonLogger):
"""
This module stores the information for each frame and use them in JsonParser
Attributes:
timestamp (float): The elapsed time of captured frame
frame_id (int): The frame number of the captured video
bboxes (list of Bbox objects): Stores the list of bbox objects.
References:
Check Bbox class for better information
Args:
timestamp (float):
frame_id (int):
"""
def __init__(self, frame_id: int, timestamp: float = None):
self.frame_id = frame_id
self.timestamp = timestamp
self.bboxes = []
def add_bbox(self, bbox_id: int, top: int, left: int, width: int, height: int):
bboxes_ids = [bbox.bbox_id for bbox in self.bboxes]
if bbox_id not in bboxes_ids:
self.bboxes.append(Bbox(bbox_id, top, left, width, height))
else:
raise ValueError("Frame with id: {} already has a Bbox with id: {}".format(self.frame_id, bbox_id))
def add_label_to_bbox(self, bbox_id: int, category: str, confidence: float):
bboxes = {bbox.id: bbox for bbox in self.bboxes}
if bbox_id in bboxes.keys():
res = bboxes.get(bbox_id)
res.add_label(category, confidence)
else:
raise ValueError('the bbox with id: {} does not exists!'.format(bbox_id))
class BboxToJsonLogger(BaseJsonLogger):
"""
ُ This module is designed to automate the task of logging jsons. An example json is used
to show the contents of json file shortly
Example:
{
"video_details": {
"frame_width": 1920,
"frame_height": 1080,
"frame_rate": 20,
"video_name": "/home/gpu/codes/MSD/pedestrian_2/project/public/camera1.avi"
},
"frames": [
{
"frame_id": 329,
"timestamp": 3365.1254
"bboxes": [
{
"labels": [
{
"category": "pedestrian",
"confidence": 0.9
}
],
"bbox_id": 0,
"top": 1257,
"left": 138,
"width": 68,
"height": 109
}
]
}],
Attributes:
frames (dict): It's a dictionary that maps each frame_id to json attributes.
video_details (dict): information about video file.
top_k_labels (int): shows the allowed number of labels
start_time (datetime object): we use it to automate the json output by time.
Args:
top_k_labels (int): shows the allowed number of labels
"""
def __init__(self, top_k_labels: int = 1):
self.frames = {}
self.video_details = self.video_details = dict(frame_width=None, frame_height=None, frame_rate=None,
video_name=None)
self.top_k_labels = top_k_labels
self.start_time = datetime.now()
def set_top_k(self, value):
self.top_k_labels = value
def frame_exists(self, frame_id: int) -> bool:
"""
Args:
frame_id (int):
Returns:
bool: true if frame_id is recognized
"""
return frame_id in self.frames.keys()
def add_frame(self, frame_id: int, timestamp: float = None) -> None:
"""
Args:
frame_id (int):
timestamp (float): opencv captured frame time property
Raises:
ValueError: if frame_id would not exist in class frames attribute
Returns:
None
"""
if not self.frame_exists(frame_id):
self.frames[frame_id] = Frame(frame_id, timestamp)
else:
raise ValueError("Frame id: {} already exists".format(frame_id))
def bbox_exists(self, frame_id: int, bbox_id: int) -> bool:
"""
Args:
frame_id:
bbox_id:
Returns:
bool: if bbox exists in frame bboxes list
"""
bboxes = []
if self.frame_exists(frame_id=frame_id):
bboxes = [bbox.bbox_id for bbox in self.frames[frame_id].bboxes]
return bbox_id in bboxes
def find_bbox(self, frame_id: int, bbox_id: int):
"""
Args:
frame_id:
bbox_id:
Returns:
bbox_id (int):
Raises:
ValueError: if bbox_id does not exist in the bbox list of specific frame.
"""
if not self.bbox_exists(frame_id, bbox_id):
raise ValueError("frame with id: {} does not contain bbox with id: {}".format(frame_id, bbox_id))
bboxes = {bbox.bbox_id: bbox for bbox in self.frames[frame_id].bboxes}
return bboxes.get(bbox_id)
def add_bbox_to_frame(self, frame_id: int, bbox_id: int, top: int, left: int, width: int, height: int) -> None:
"""
Args:
frame_id (int):
bbox_id (int):
top (int):
left (int):
width (int):
height (int):
Returns:
None
Raises:
ValueError: if bbox_id already exist in frame information with frame_id
ValueError: if frame_id does not exist in frames attribute
"""
if self.frame_exists(frame_id):
frame = self.frames[frame_id]
if not self.bbox_exists(frame_id, bbox_id):
frame.add_bbox(bbox_id, top, left, width, height)
else:
raise ValueError(
"frame with frame_id: {} already contains the bbox with id: {} ".format(frame_id, bbox_id))
else:
raise ValueError("frame with frame_id: {} does not exist".format(frame_id))
def add_label_to_bbox(self, frame_id: int, bbox_id: int, category: str, confidence: float):
"""
Args:
frame_id:
bbox_id:
category:
confidence: the confidence value returned from yolo detection
Returns:
None
Raises:
ValueError: if labels quota (top_k_labels) exceeds.
"""
bbox = self.find_bbox(frame_id, bbox_id)
if not bbox.labels_full(self.top_k_labels):
bbox.add_label(category, confidence)
else:
raise ValueError("labels in frame_id: {}, bbox_id: {} is fulled".format(frame_id, bbox_id))
def add_video_details(self, frame_width: int = None, frame_height: int = None, frame_rate: int = None,
video_name: str = None):
self.video_details['frame_width'] = frame_width
self.video_details['frame_height'] = frame_height
self.video_details['frame_rate'] = frame_rate
self.video_details['video_name'] = video_name
def output(self):
output = {'video_details': self.video_details}
result = list(self.frames.values())
output['frames'] = [item.dic() for item in result]
return output
def json_output(self, output_name):
"""
Args:
output_name:
Returns:
None
Notes:
It creates the json output with `output_name` name.
"""
if not output_name.endswith('.json'):
output_name += '.json'
with open(output_name, 'w') as file:
json.dump(self.output(), file)
file.close()
def set_start(self):
self.start_time = datetime.now()
def schedule_output_by_time(self, output_dir=JsonMeta.PATH_TO_SAVE, hours: int = 0, minutes: int = 0,
seconds: int = 60) -> None:
"""
Notes:
Creates folder and then periodically stores the jsons on that address.
Args:
output_dir (str): the directory where output files will be stored
hours (int):
minutes (int):
seconds (int):
Returns:
None
"""
end = datetime.now()
interval = 0
interval += abs(min([hours, JsonMeta.HOURS]) * 3600)
interval += abs(min([minutes, JsonMeta.MINUTES]) * 60)
interval += abs(min([seconds, JsonMeta.SECONDS]))
diff = (end - self.start_time).seconds
if diff > interval:
output_name = self.start_time.strftime('%Y-%m-%d %H-%M-%S') + '.json'
if not exists(output_dir):
makedirs(output_dir)
output = join(output_dir, output_name)
self.json_output(output_name=output)
self.frames = {}
self.start_time = datetime.now()
def schedule_output_by_frames(self, frames_quota, frame_counter, output_dir=JsonMeta.PATH_TO_SAVE):
"""
saves as the number of frames quota increases higher.
:param frames_quota:
:param frame_counter:
:param output_dir:
:return:
"""
pass
def flush(self, output_dir):
"""
Notes:
We use this function to output jsons whenever possible.
like the time that we exit the while loop of opencv.
Args:
output_dir:
Returns:
None
"""
filename = self.start_time.strftime('%Y-%m-%d %H-%M-%S') + '-remaining.json'
output = join(output_dir, filename)
self.json_output(output_name=output)

View File

@ -0,0 +1,17 @@
import logging
def get_logger(name='root'):
formatter = logging.Formatter(
# fmt='%(asctime)s [%(levelname)s]: %(filename)s(%(funcName)s:%(lineno)s) >> %(message)s')
fmt='%(asctime)s [%(levelname)s]: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
logger.addHandler(handler)
return logger

View File

@ -0,0 +1,39 @@
import os
import yaml
from easydict import EasyDict as edict
class YamlParser(edict):
"""
This is yaml parser based on EasyDict.
"""
def __init__(self, cfg_dict=None, config_file=None):
if cfg_dict is None:
cfg_dict = {}
if config_file is not None:
assert(os.path.isfile(config_file))
with open(config_file, 'r') as fo:
cfg_dict.update(yaml.load(fo.read()))
super(YamlParser, self).__init__(cfg_dict)
def merge_from_file(self, config_file):
with open(config_file, 'r') as fo:
self.update(yaml.load(fo.read()))
def merge_from_dict(self, config_dict):
self.update(config_dict)
def get_config(config_file=None):
return YamlParser(config_file=config_file)
if __name__ == "__main__":
cfg = YamlParser(config_file="../configs/yolov3.yaml")
cfg.merge_from_file("../configs/deep_sort.yaml")
import ipdb
ipdb.set_trace()

View File

@ -0,0 +1,39 @@
from functools import wraps
from time import time
def is_video(ext: str):
"""
Returns true if ext exists in
allowed_exts for video files.
Args:
ext:
Returns:
"""
allowed_exts = ('.mp4', '.webm', '.ogg', '.avi', '.wmv', '.mkv', '.3gp')
return any((ext.endswith(x) for x in allowed_exts))
def tik_tok(func):
"""
keep track of time for each process.
Args:
func:
Returns:
"""
@wraps(func)
def _time_it(*args, **kwargs):
start = time()
try:
return func(*args, **kwargs)
finally:
end_ = time()
print("time: {:.03f}s, fps: {:.03f}".format(end_ - start, 1 / (end_ - start)))
return _time_it

View File

@ -1,10 +1,13 @@
import argparse import argparse
import time import time
import os
from pathlib import Path from pathlib import Path
import cv2 import cv2
import sklearn
import torch import torch
import torch.backends.cudnn as cudnn import torch.backends.cudnn as cudnn
import numpy as np
from numpy import random from numpy import random
from models.experimental import attempt_load from models.experimental import attempt_load
@ -13,6 +16,52 @@ from utils.general import check_img_size, non_max_suppression, apply_classifier,
strip_optimizer, set_logging, increment_path strip_optimizer, set_logging, increment_path
from utils.plots import plot_one_box from utils.plots import plot_one_box
from utils.torch_utils import select_device, load_classifier, time_synchronized from utils.torch_utils import select_device, load_classifier, time_synchronized
from deep_sort_pytorch.utils.parser import get_config
from deep_sort_pytorch.deep_sort import DeepSort
palette = (2 ** 11 - 1, 2 ** 15 - 1, 2 ** 20 - 1)
def bbox_rel(*xyxy):
"""" Calculates the relative bounding box from absolute pixel values. """
bbox_left = min([xyxy[0].item(), xyxy[2].item()])
bbox_top = min([xyxy[1].item(), xyxy[3].item()])
bbox_w = abs(xyxy[0].item() - xyxy[2].item())
bbox_h = abs(xyxy[1].item() - xyxy[3].item())
x_c = (bbox_left + bbox_w / 2)
y_c = (bbox_top + bbox_h / 2)
w = bbox_w
h = bbox_h
return x_c, y_c, w, h
def compute_color_for_labels(label):
"""
Simple function that adds fixed color depending on the class
"""
color = [int((p * (label ** 2 - label + 1)) % 255) for p in palette]
return tuple(color)
def draw_boxes(img, bbox, identities=None, offset=(0, 0)):
for i, box in enumerate(bbox):
x1, y1, x2, y2 = [int(i) for i in box]
x1 += offset[0]
x2 += offset[0]
y1 += offset[1]
y2 += offset[1]
# box text and bar
id = int(identities[i]) if identities is not None else 0
color = compute_color_for_labels(id)
label = '{}{:d}'.format("", id)
t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 2, 2)[0]
cv2.rectangle(img, (x1, y1), (x2, y2), color, 3)
cv2.rectangle(
img, (x1, y1), (x1 + t_size[0] + 3, y1 + t_size[1] + 4), color, -1)
cv2.putText(img, label, (x1, y1 +
t_size[1] + 4), cv2.FONT_HERSHEY_PLAIN, 2, [255, 255, 255], 2)
return img
def detect(save_img=False): def detect(save_img=False):
@ -24,6 +73,15 @@ def detect(save_img=False):
save_dir = Path(increment_path(Path(opt.project) / opt.name, exist_ok=opt.exist_ok)) # increment run save_dir = Path(increment_path(Path(opt.project) / opt.name, exist_ok=opt.exist_ok)) # increment run
(save_dir / 'labels' if save_txt else save_dir).mkdir(parents=True, exist_ok=True) # make dir (save_dir / 'labels' if save_txt else save_dir).mkdir(parents=True, exist_ok=True) # make dir
# DeepSort Initialize
cfg = get_config()
cfg.merge_from_file(opt.config_deepsort)
deepsort = DeepSort(cfg.DEEPSORT.REID_CKPT,
max_dist=cfg.DEEPSORT.MAX_DIST, min_confidence=cfg.DEEPSORT.MIN_CONFIDENCE,
nms_max_overlap=cfg.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance=cfg.DEEPSORT.MAX_IOU_DISTANCE,
max_age=cfg.DEEPSORT.MAX_AGE, n_init=cfg.DEEPSORT.N_INIT, nn_budget=cfg.DEEPSORT.NN_BUDGET,
use_cuda=True)
# Initialize # Initialize
set_logging() set_logging()
device = select_device(opt.device) device = select_device(opt.device)
@ -51,14 +109,14 @@ def detect(save_img=False):
save_img = True save_img = True
dataset = LoadImages(source, img_size=imgsz) dataset = LoadImages(source, img_size=imgsz)
# Get names and colors # Get names
names = model.module.names if hasattr(model, 'module') else model.names names = model.module.names if hasattr(model, 'module') else model.names
colors = [[random.randint(0, 255) for _ in range(3)] for _ in names]
# Run inference # Run inference
t0 = time.time() t0 = time.time()
img = torch.zeros((1, 3, imgsz, imgsz), device=device) # init img img = torch.zeros((1, 3, imgsz, imgsz), device=device) # init img
_ = model(img.half() if half else img) if device.type != 'cpu' else None # run once _ = model(img.half() if half else img) if device.type != 'cpu' else None # run once
for path, img, im0s, vid_cap in dataset: for path, img, im0s, vid_cap in dataset:
img = torch.from_numpy(img).to(device) img = torch.from_numpy(img).to(device)
img = img.half() if half else img.float() # uint8 to fp16/32 img = img.half() if half else img.float() # uint8 to fp16/32
@ -77,17 +135,15 @@ def detect(save_img=False):
# Apply Classifier # Apply Classifier
if classify: if classify:
pred = apply_classifier(pred, modelc, img, im0s) pred = apply_classifier(pred, modelc, img, im0s)
# Process detections # Process detections
for i, det in enumerate(pred): # detections per image for i, det in enumerate(pred): # detections per image
if webcam: # batch_size >= 1 if webcam: # batch_size >= 1
p, s, im0, frame = path[i], '%g: ' % i, im0s[i].copy(), dataset.count p, s, im0, frame = Path(path[i]), '%g: ' % i, im0s[i].copy(), dataset.count
else: else:
p, s, im0, frame = path, '', im0s, getattr(dataset, 'frame', 0) p, s, im0, frame = Path(path), '', im0s, getattr(dataset, 'frame', 0)
p = Path(p) # to Path save_path = str(save_dir / p.name)
save_path = str(save_dir / p.name) # img.jpg txt_path = str(save_dir / 'labels' / p.stem) + ('' if dataset.mode == 'image' else f'_{frame}')
txt_path = str(save_dir / 'labels' / p.stem) + ('' if dataset.mode == 'image' else f'_{frame}') # img.txt
s += '%gx%g ' % img.shape[2:] # print string s += '%gx%g ' % img.shape[2:] # print string
gn = torch.tensor(im0.shape)[[1, 0, 1, 0]] # normalization gain whwh gn = torch.tensor(im0.shape)[[1, 0, 1, 0]] # normalization gain whwh
if len(det): if len(det):
@ -99,7 +155,30 @@ def detect(save_img=False):
n = (det[:, -1] == c).sum() # detections per class n = (det[:, -1] == c).sum() # detections per class
s += f'{n} {names[int(c)]}s, ' # add to string s += f'{n} {names[int(c)]}s, ' # add to string
# Write results bbox_xywh = []
confs = []
# Adapt detections to deep sort input format
for *xyxy, conf, cls in det:
if cls == 0:
x_c, y_c, bbox_w, bbox_h = bbox_rel(*xyxy)
obj = [x_c, y_c, bbox_w, bbox_h]
bbox_xywh.append(obj)
confs.append([conf.item()])
xywhs = torch.Tensor(bbox_xywh)
confss = torch.Tensor(confs)
# Pass detections to deepsort
outputs = deepsort.update(xywhs, confss, im0)
# draw boxes for visualization
if len(outputs) > 0:
bbox_xyxy = outputs[:, :4]
identities = outputs[:, -1]
draw_boxes(im0, bbox_xyxy, identities)
for *xyxy, conf, cls in reversed(det): for *xyxy, conf, cls in reversed(det):
if save_txt: # Write to file if save_txt: # Write to file
xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4)) / gn).view(-1).tolist() # normalized xywh xywh = (xyxy2xywh(torch.tensor(xyxy).view(1, 4)) / gn).view(-1).tolist() # normalized xywh
@ -107,9 +186,26 @@ def detect(save_img=False):
with open(txt_path + '.txt', 'a') as f: with open(txt_path + '.txt', 'a') as f:
f.write(('%g ' * len(line)).rstrip() % line + '\n') f.write(('%g ' * len(line)).rstrip() % line + '\n')
if save_img or view_img: # Add bbox to image if cls == 32 and (save_img or view_img): # Add bbox to ball
label = f'{names[int(cls)]} {conf:.2f}' #label = f'{names[int(cls)]}'
plot_one_box(xyxy, im0, label=label, color=colors[int(cls)], line_thickness=3) label = 'ball'
plot_one_box(xyxy, im0, label=label, color=[0,0,0], line_thickness=2)
# Write MOT compliant results to file
""" if save_txt and len(outputs) != 0:
for j, output in enumerate(outputs):
bbox_left = output[0]
bbox_top = output[1]
bbox_w = output[2]
bbox_h = output[3]
identity = output[-1]
with open(txt_path + '.txt', 'a') as f:
f.write(('%g ' * 10 + '\n') % (j, identity, bbox_left,
bbox_top, bbox_w, bbox_h, -1, -1, -1, -1)) # label format """
else:
deepsort.increment_ages()
# Print time (inference + NMS) # Print time (inference + NMS)
print(f'{s}Done. ({t2 - t1:.3f}s)') print(f'{s}Done. ({t2 - t1:.3f}s)')
@ -155,13 +251,14 @@ if __name__ == '__main__':
parser.add_argument('--view-img', action='store_true', help='display results') parser.add_argument('--view-img', action='store_true', help='display results')
parser.add_argument('--save-txt', action='store_true', help='save results to *.txt') parser.add_argument('--save-txt', action='store_true', help='save results to *.txt')
parser.add_argument('--save-conf', action='store_true', help='save confidences in --save-txt labels') parser.add_argument('--save-conf', action='store_true', help='save confidences in --save-txt labels')
parser.add_argument('--classes', nargs='+', type=int, help='filter by class: --class 0, or --class 0 2 3') parser.add_argument('--classes', nargs='+', type=int, default=[0, 32], help='filter by class: --class 0, or --class 0 2 3')
parser.add_argument('--agnostic-nms', action='store_true', help='class-agnostic NMS') parser.add_argument('--agnostic-nms', action='store_true', help='class-agnostic NMS')
parser.add_argument('--augment', action='store_true', help='augmented inference') parser.add_argument('--augment', action='store_true', help='augmented inference')
parser.add_argument('--update', action='store_true', help='update all models') parser.add_argument('--update', action='store_true', help='update all models')
parser.add_argument('--project', default='runs/detect', help='save results to project/name') parser.add_argument('--project', default='../files/output', help='save results to project/name')
parser.add_argument('--name', default='exp', help='save results to project/name') parser.add_argument('--name', default='exp', help='save results to project/name')
parser.add_argument('--exist-ok', action='store_true', help='existing project/name ok, do not increment') parser.add_argument('--exist-ok', action='store_true', help='existing project/name ok, do not increment')
parser.add_argument("--config_deepsort", type=str, default="deep_sort_pytorch/configs/deep_sort.yaml")
opt = parser.parse_args() opt = parser.parse_args()
print(opt) print(opt)
@ -171,4 +268,4 @@ if __name__ == '__main__':
detect() detect()
strip_optimizer(opt.weights) strip_optimizer(opt.weights)
else: else:
detect() detect()