Deepsort源码阅读笔记¶
论文:Simple Online and Realtime Tracking with a Deep Association Metric,17年论文,引用4700次,可能有点过时,但对理解“视频目标追踪”(Online Realtime Tracking)问题很重要。
源码:Yolov5_DeepSort_Pytorch,deep_sort
目录结构,入口函数track.py
中的main
.
├── deep
│ ├── checkpoint
│ │ └── ckpt.t7
│ ├── evaluate.py
│ ├── feature_extractor.py
│ ├── __init__.py
│ ├── model.py
│ ├── original_model.py
│ ├── test.py
│ ├── train.jpg
│ └── train.py
├── deep_sort.py
├── __init__.py
├── README.md
└── sort
├── detection.py
├── __init__.py
├── iou_matching.py
├── kalman_filter.py
├── linear_assignment.py
├── nn_matching.py
├── preprocessing.py
├── tracker.py
└── track.py
3 directories, 21 files
Deepsort代码是个硬骨头,代码量真的不小,中间难点核心在于sort中的内容。重点关心几个问题,
- KalmanFilter到底是怎么用的,输入输出是什么,有需要训练的参数吗,
X'=AX
的状态转移矩阵A
是写死的吗,X
中的速度是怎么算的,怎么更新的。 - 匹配是匈牙利最大匹配,是否用了
scipy.linear_assignment
?代价矩阵怎么融合特征权重的,比如Kalman和ReID - Tracker的逻辑,是否是像我16年搞热点事件那样,mid归入group_id的逻辑一样,相似度满足阈值就append,不满足就create,还是有更细腻的操作。
复习时可以跳过,直接看最干的总结部分。
1、Deepsort¶
这个类是控制器入口,还有一部分代码被作者放到Deepsort外层入口脚本track.py
了,但实际相当于Deepsort.main()
。两个脚本可以结合起来看,依次调用,
- 视频抽帧:一张一张图处理
- Model:YOLO,输出Detections
- Extractor:Resnet输出512d的ReID feature
- Tracker:把当前帧识别的Detection挂载到已经识别的Tracker上。功能类似去重/聚类引擎,把文章id, emb/关键词流式传入,判断是同一个新闻热点还是新的热点,生成group_id
- 输出结果:轮训Trackers,在原图中画出
track.mean
的位置框,标记track.id
,把图片写入视频文件。
class DeepSort(object):
def __init__(self, model_path, max_dist=0.2, min_confidence=0.3, nms_max_overlap=1.0, max_iou_distance=0.7, max_age=70, n_init=3, nn_budget=100, use_cuda=True):
self.min_confidence = min_confidence
self.nms_max_overlap = nms_max_overlap
self.extractor = Extractor(model_path, use_cuda=use_cuda) # ReID模型封装inference
max_cosine_distance = max_dist
metric = NearestNeighborDistanceMetric(
"cosine", max_cosine_distance, nn_budget)
self.tracker = Tracker(
metric, max_iou_distance=max_iou_distance, max_age=max_age, n_init=n_init)
def update(self, bbox_xywh, confidences, ori_img):
self.height, self.width = ori_img.shape[:2]
# generate detections
features = self._get_features(bbox_xywh, ori_img) #获取当前bbox的ReID特征, [B, 512]
bbox_tlwh = self._xywh_to_tlwh(bbox_xywh) # [B, 4]
detections = [Detection(bbox_tlwh[i], conf, features[i]) for i, conf in enumerate(
confidences) if conf > self.min_confidence] #筛选,大于min_confidence的才保留并构造成Detection数据类,存储其坐标,置信度,人的特征
# update tracker,计算下一时刻的bbox,置信区间,并使用这次观测来update参数
self.tracker.predict()
self.tracker.update(detections)
# output bbox identities
outputs = []
for track in self.tracker.tracks:
if not track.is_confirmed() or track.time_since_update > 1:
continue
box = track.to_tlwh()
x1, y1, x2, y2 = self._tlwh_to_xyxy(box)
track_id = track.track_id
outputs.append(np.array([x1, y1, x2, y2, track_id], dtype=np.int32))
if len(outputs) > 0:
outputs = np.stack(outputs, axis=0)
return outputs
2、视频抽帧¶
视频文件怎么加载的,没想到也是写了一个IterateDataset
类,path只传一个文件,每次读取一张截图,而不是读出一个列表来。多个文件也支持,但似乎不区分文件,应该是用在有连续性的两个文件上。
class LoadImages: # for inference
def __init__(self, path, img_size=640, stride=32):
p = str(Path(path).absolute()) # os-agnostic absolute path
if '*' in p:
files = sorted(glob.glob(p, recursive=True)) # glob
elif os.path.isdir(p):
files = sorted(glob.glob(os.path.join(p, '*.*'))) # dir
elif os.path.isfile(p):
files = [p] # files
else:
raise Exception(f'ERROR: {p} does not exist')
images = [x for x in files if x.split('.')[-1].lower() in img_formats]
videos = [x for x in files if x.split('.')[-1].lower() in vid_formats]
ni, nv = len(images), len(videos)
self.img_size = img_size
self.stride = stride
self.files = images + videos
self.nf = ni + nv # number of files
self.video_flag = [False] * ni + [True] * nv
self.mode = 'image'
if any(videos):
self.new_video(videos[0]) # new video
else:
self.cap = None
assert self.nf > 0, f'No images or videos found in {p}. ' \
f'Supported formats are:\nimages: {img_formats}\nvideos: {vid_formats}'
def __iter__(self):
self.count = 0
return self
def __next__(self):
if self.count == self.nf:
raise StopIteration
path = self.files[self.count]
if self.video_flag[self.count]:
# Read video
self.mode = 'video'
ret_val, img0 = self.cap.read() # 截图1次,类似f.readline()一次,原尺寸
if not ret_val: # 最后一帧会返回ret_val=False
self.count += 1
self.cap.release()
if self.count == self.nf: # last video
raise StopIteration
else:
path = self.files[self.count]
self.new_video(path)
ret_val, img0 = self.cap.read()
self.frame += 1
print(f'video {self.count + 1}/{self.nf} ({self.frame}/{self.nframes}) {path}: ', end='')
else:
# Read image
self.count += 1
img0 = cv2.imread(path) # BGR
assert img0 is not None, 'Image Not Found ' + path
print(f'image {self.count}/{self.nf} {path}: ', end='')
# Padded resize
img = letterbox(img0, self.img_size, stride=self.stride)[0]
# Convert
img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, to 3x416x416
img = np.ascontiguousarray(img)
return path, img, img0, self.cap
def new_video(self, path):
self.frame = 0
self.cap = cv2.VideoCapture(path)
self.nframes = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) # 获取video对象的帧数属性,cv2这个就是个enum的key名称
def __len__(self):
return self.nf # number of files
file = cv2.VideoCapture(path)
,file.read()
可以一次次读取所有帧(没有跳帧)- 不知道skimage, pillow之类是否也能做
- cv2转换为torch/numpy格式要做转化,两个差异,
- 把HWC改成CHW,
cv2_image = np.transpose(numpy_image, (1, 2, 0))
- 把Channel里面调换反向,
cv2_image = cv2.cvtColor(cv2_image, cv2.COLOR_BGR2RGB)
- 在上面的代码里写在一起了,
img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, to 3x416x416
补充一句,输出的时候,也是创建一个cv2.VideoWriter
,和文件写行一样,每次处理完一张图,open一下视频文件,write(img)
一次,就相当于给视频续写了。
3、Model¶
就是YOLO模型,从单独的yolov5目录中加载出来的,该目录中包含train/test/data等,使用的时候直接ckpt = torch.load('yolov5/weights/yolov5s.pt')
就能把模型加载出来了。1
用yolo跑出来bbox,
# Inference
t1 = time_synchronized()
pred = model(img, augment=opt.augment)[0] # [15120, 85],这个不懂,但是nms之后就是bbox了
# Apply NMS
pred = non_max_suppression(
pred, opt.conf_thres, opt.iou_thres, classes=opt.classes, agnostic=opt.agnostic_nms)
t2 = time_synchronized()
>>> pred[0]
tensor([[206.63669, 236.06554, 233.10620, 271.06351, 0.80423, 2.00000],
[111.02235, 333.11285, 164.43254, 371.24493, 0.79562, 2.00000],
[219.16492, 172.77596, 238.27371, 203.99550, 0.40342, 2.00000]])
存储了xyxy, confidence, class_id
,左下右上的xy, 置信度,类别整数id。
这个结果过滤置信度后,直接塞给Tracker算法
4、Extractor¶
ReID模型,作用是对提取出的crop图像输出embedding,从给定路径中加载参数,模型结构为4个block堆叠,每个block是4层conv,带residual,总深度16左右,输出flatten成的512d。embedding是训练的一个分类模型,取出的最后的emb。
class Net(nn.Module):
def __init__(self, num_classes=751, reid=False):
super(Net, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(3, 64, 3, stride=1, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.MaxPool2d(3, 2, padding=1),
)
self.layer1 = make_layers(64, 64, 2, False) # [relu(conv-bn-relu-conv-bn + conv(x)) ] * 2
self.layer2 = make_layers(64, 128, 2, True)
self.layer3 = make_layers(128, 256, 2, True)
self.layer4 = make_layers(256, 512, 2, True)
self.avgpool = nn.AvgPool2d((8, 4), 1) # GlobalAvgPool
self.reid = reid
self.classifier = nn.Sequential(
nn.Linear(512, 256),
nn.BatchNorm1d(256),
nn.ReLU(inplace=True),
nn.Dropout(),
nn.Linear(256, num_classes),
)
def forward(self, x):
# B, 3, 128, 64 -> 所有输入都resize成长方形图(64,128),前置会/255变0-1取值,然后做一个zscore变换到浮点数
x = self.conv(x) # B, 64, 64, 32
x = self.layer1(x) # B, 64, 64, 32
x = self.layer2(x) # B, 128, 32, 16
x = self.layer3(x) # B, 256, 16, 8
x = self.layer4(x) # B, 512, 8, 4
x = self.avgpool(x) # B, 512, 1, 1
x = x.view(x.size(0), -1) # B, 512
if self.reid:
x = x.div(x.norm(p=2, dim=1, keepdim=True)) # B, 512
return x
x = self.classifier(x)
return x
这个ResidualBlock
是这么写的
class BasicBlock(nn.Module):
def __init__(self, c_in, c_out, is_downsample=False):
super(BasicBlock, self).__init__()
self.is_downsample = is_downsample
if is_downsample:
self.conv1 = nn.Conv2d(
c_in, c_out, 3, stride=2, padding=1, bias=False)
else:
self.conv1 = nn.Conv2d(
c_in, c_out, 3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(c_out)
self.relu = nn.ReLU(True)
self.conv2 = nn.Conv2d(c_out, c_out, 3, stride=1,
padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(c_out)
if is_downsample:
self.downsample = nn.Sequential(
nn.Conv2d(c_in, c_out, 1, stride=2, bias=False),
nn.BatchNorm2d(c_out)
)
elif c_in != c_out:
self.downsample = nn.Sequential(
nn.Conv2d(c_in, c_out, 1, stride=1, bias=False),
nn.BatchNorm2d(c_out)
)
self.is_downsample = True
def forward(self, x):
y = self.conv1(x)
y = self.bn1(y)
y = self.relu(y)
y = self.conv2(y)
y = self.bn2(y)
if self.is_downsample:
x = self.downsample(x)
return F.relu(x.add(y), True)
def make_layers(c_in, c_out, repeat_times, is_downsample=False):
blocks = []
for i in range(repeat_times):
if i == 0:
# 一个block只做一次downsample
blocks += [BasicBlock(c_in, c_out, is_downsample=is_downsample), ]
else:
blocks += [BasicBlock(c_out, c_out), ]
return nn.Sequential(*blocks)
- Block结构是
conv1
-bn
-relu
-conv2
-bn
+conv3(x)
conv(x)
是residual对齐的方式,1*1卷积对齐filter数量(不知道为啥图像上这种FN层也能shortcut,之前测试在MLP架构下就只能x不变变换直接加才能又点用,留观)- 降低图片大小的时候,conv1和conv3一起
stride=2
5、Tracker¶
这个类是追踪算法主要实现,track记录汇总聚合的信息,detection记录点的信息。追踪问题可以类比一个聚类问题,因此步骤也类似EM算法/Kmeans的过程,
-
E xpectation:计算相似度矩阵,计算最佳匹配,对应下面代码中的
_match()
,生成matches = [], unmatched_tracks = [], unmatched_detections = [0, 1, 2]
-
M aximization:根据匹配结果,更新track的信息,包括100条记录, 预估最可能的位置及方差,对应下面代码中的
update()
部分
class Tracker:
"""
This is the multi-target tracker.
Parameters
----------
metric : nn_matching.NearestNeighborDistanceMetric
A distance metric for measurement-to-track association.
max_age : int
Maximum number of missed misses before a track is deleted.
n_init : int
Number of consecutive detections before the track is confirmed. The
track state is set to `Deleted` if a miss occurs within the first
`n_init` frames.
Attributes
----------
metric : nn_matching.NearestNeighborDistanceMetric
The distance metric used for measurement to track association.
max_age : int
Maximum number of missed misses before a track is deleted.
n_init : int
Number of frames that a track remains in initialization phase.
kf : kalman_filter.KalmanFilter
A Kalman filter to filter target trajectories in image space.
tracks : List[Track]
The list of active tracks at the current time step.
"""
def __init__(self, metric, max_iou_distance=0.7, max_age=70, n_init=3):
self.metric = metric
self.max_iou_distance = max_iou_distance
self.max_age = max_age
self.n_init = n_init
self.kf = kalman_filter.KalmanFilter()
self.tracks = []
self._next_id = 1
def predict(self):
"""Propagate track state distributions one time step forward.
This function should be called once every time step, before `update`.
"""
for track in self.tracks:
track.predict(self.kf)
def update(self, detections):
"""Perform measurement update and track management.
Parameters
----------
detections : List[deep_sort.detection.Detection]
A list of detections at the current time step.
"""
# Run matching cascade.
matches, unmatched_tracks, unmatched_detections = \
self._match(detections)
# Update track set.
for track_idx, detection_idx in matches:
self.tracks[track_idx].update(
self.kf, detections[detection_idx])
for track_idx in unmatched_tracks:
self.tracks[track_idx].mark_missed()
for detection_idx in unmatched_detections:
self._initiate_track(detections[detection_idx]) #对未匹配的detection初始化
self.tracks = [t for t in self.tracks if not t.is_deleted()]
# Update distance metric.
active_targets = [t.track_id for t in self.tracks if t.is_confirmed()]
features, targets = [], []
for track in self.tracks:
if not track.is_confirmed():
continue
features += track.features
targets += [track.track_id for _ in track.features]
track.features = []
self.metric.partial_fit(
np.asarray(features), np.asarray(targets), active_targets)
5.1、更新步骤(M)¶
Track在新增detection的时候,除了append进去外,还会根据列表的运动轨迹,预估物体当前所在位置。换句话说,已知[bbox1, bbox2, ...]
以及最新的detection.bbox
,预估一个最可能的bbox,这个要用到的是KalmanFilter算法。
理论:卡尔曼滤波¶
卡尔曼滤波用于在存在噪声的动态系统中,递归估计状态变量的最佳值。我们怎么理解这句话,
-
首先是噪声问题,我们预估轨迹落在\(\bar{x}_k\)位置,结果观测到实际落在\(y_k\)位置,由于观测和预估都是有噪声的,所以最佳估计值是这两个位置的插值2,插值比例是KalmanFilter的公式给出来的一个\(K_k\)
-
其次,如何“预估轨迹落在\(\bar{x}_k\)位置”,这个是由动态系统的状态转移方程算出来的。转移方程是一个固定的数学表达式,就是位置对时刻/步数\(t\)的一阶展开式,不用训练。重点未知的是每个时刻的导数(速度),这个速度是递归估计出来的,也是KalmanFilter的更新公式算出来的。
- 将位置和导数信息concat在一起,就是“状态变量”,state;其中位置可观测,但有误差;导数不可观测,但位置和导数有确定的公式关系。Kalman就是在上述数学建模下,“递归估计”出所有时刻t的state值。
Kalman更新公式和MovingAverage类比
更新步骤可以类比一下在线计算均值的操作,不严谨,但是方便体会。要计算流式传入\(\{x_1, ..., x_t, x_{t+1}\}\),在步数为\(t+1\)处的移动平均值(记为\(\mu_{t+1}\),我们会这么做 $$ \mu_{t+1} = \mu_t\cdot(1-\alpha_t) + x_{t+1} \cdot \alpha_t $$ 其中\(\alpha(t) = \dfrac{1}{t + 1}\)随着步长增大而逐渐减小,已有的数据越多,新增数据在更新中的占比就约小。
卡尔曼估计公式中的\(K_t\)是在预估值和实际值之间的插值比例,比例随着两边方差大小而变化,更偏向方差小的那端。
为啥会有predict和update两步呢?
- predict是每走一帧,需要根据Transition状态转移矩阵把state值 从上一帧的位置移动到当前帧。
- update则是把最新的value更新进去形成这一帧的最终结果。KalmanFilter移动操作如下,
running_mean = Transition @ running_mean
running_var = Transition @ running_var @ Transition.T + Init
现在看看代码中是怎么实现的。
第一帧进来的时候,没有match结果,unmatch_detections
会触发create tracker操作,
def _initiate_track(self, detection):
mean, covariance = self.kf.initiate(detection.to_xyah())
self.tracks.append(Track(
mean, covariance, self._next_id, self.n_init, self.max_age,
detection.feature))
self._next_id += 1
mean是8维向量state,包括4维的可观测值(YOLO的bbox),还有4维隐变量(速度值)。covariance是衡量state的协方差。
协方差还是方差?
当一个tracker中记录比如很多states之后,就可以得到这8个维度之间的协方差。这个协方差描述的是bbox数据分布的特点。bbox是实际数据,所以会有一个数据分布,不是空间上随机分布的点。
协方差会告诉我们,这个tracker的x增加了,可能也意味着y增加,其中\(\delta{y}\)是\(\delta{x}\)带动的,减去这部分的才是y主动变化的部分。
比如一个人跑过来,同时站起来,跑过来的时候,x,y同比变大,站起来导致y额外增加。Kalman计算的时候需要把这部分独立变化的y拿来计算,独立变化的不确定有多少,从而确定插值比例。
步骤1:初始化参数¶
class KalmanFilter(object):
"""
A simple Kalman filter for tracking bounding boxes in image space.
The 8-dimensional state space
x, y, a, h, vx, vy, va, vh
contains the bounding box center position (x, y), aspect ratio a, height h,
and their respective velocities.
Object motion follows a constant velocity model. The bounding box location
(x, y, a, h) is taken as direct observation of the state space (linear
observation model).
"""
def __init__(self):
ndim, dt = 4, 1.
# Create Kalman filter model matrices.
self._motion_mat = np.eye(2 * ndim, 2 * ndim)
for i in range(ndim):
self._motion_mat[i, ndim + i] = dt#状态转移矩阵,8*8,ndim+i代表各自的速度,dt=1是因为咱们一帧帧来做的
self._update_mat = np.eye(ndim, 2 * ndim)
# Motion and observation uncertainty are chosen relative to the current
# state estimate. These weights control the amount of uncertainty in
# the model. This is a bit hacky.
self._std_weight_position = 1. / 20 #位置的方差
self._std_weight_velocity = 1. / 160 #速度的方差
def initiate(self, measurement):
"""Create track from unassociated measurement.
Parameters
----------
measurement : ndarray
Bounding box coordinates (x, y, a, h) with center position (x, y),
aspect ratio a, and height h.
Returns
-------
(ndarray, ndarray)
Returns the mean vector (8 dimensional) and covariance matrix (8x8
dimensional) of the new track. Unobserved velocities are initialized
to 0 mean.
""" #mean是位置信息,各个速度初始化为0;协方差是位置信息之间的关系
mean_pos = measurement # array([439.5, 483, 0.75714, 70], dtype=float32)
mean_vel = np.zeros_like(mean_pos) # [0, 0, 0, 0]
mean = np.r_[mean_pos, mean_vel] # concat
#感觉这些变量都跟高度关系比较紧密?高度越小就越远,高度越大就越近?与其当前位置无关
std = [
2 * self._std_weight_position * measurement[3],
2 * self._std_weight_position * measurement[3],
1e-2,
2 * self._std_weight_position * measurement[3],
10 * self._std_weight_velocity * measurement[3],
10 * self._std_weight_velocity * measurement[3],
1e-5,
10 * self._std_weight_velocity * measurement[3]]
covariance = np.diag(np.square(std))
return mean, covariance
值如下,方差是拍了一个,和bbox的高度值相关的,毕竟大框变化也大,小框变化也小,大小框初始方差等比例就行。
不同两个框,对应的state,也就是mean和观测误差都是不同的。下图就是第一帧初始化后的三个tracks
这个covariance对应的是公式中的\(P_0\),初始(协)方差
步骤2:移动帧¶
也就是predict操作
class KalmanFilter(object):
...
def predict(self, mean, covariance):
"""Run Kalman filter prediction step.
Parameters
----------
mean : ndarray
The 8 dimensional mean vector of the object state at the previous
time step.
covariance : ndarray
The 8x8 dimensional covariance matrix of the object state at the
previous time step.
Returns
-------
(ndarray, ndarray)
Returns the mean vector and covariance matrix of the predicted
state. Unobserved velocities are initialized to 0 mean.
"""
std_pos = [
self._std_weight_position * mean[3],
self._std_weight_position * mean[3],
1e-2,
self._std_weight_position * mean[3]]
std_vel = [
self._std_weight_velocity * mean[3],
self._std_weight_velocity * mean[3],
1e-5,
self._std_weight_velocity * mean[3]]
motion_cov = np.diag(np.square(np.r_[std_pos, std_vel])) #初始化噪声矩阵Q
mean = np.dot(self._motion_mat, mean) #x' = Fx 得到预测状态
covariance = np.linalg.multi_dot(( #p' = FPF^T + Q
self._motion_mat, covariance, self._motion_mat.T)) + motion_cov
return mean, covariance
这个值对应公式中的Q,移动中的累积恒定误差,值如下,
步骤3:叠加观测值¶
计算Kalman增益,
其中矩阵\(C\)是一个投影矩阵,投影到子空间的操作,也就是C = [I, O]
,对应到代码中的self._update_mat
,作用到P上,就是covariance也限制到了前四项(维度从[B, 8, 8]
-> [B, 4, 4]
);如果作用到state上,就是取了state的前四项(维度从[B, 8]
-> [B, 4]
)。
首先,代码在project
函数中计算了\(CPC^t + R\)
class KalmanFilter(object):
...
def project(self, mean, covariance):
"""Project state distribution to measurement space.
Parameters
----------
mean : ndarray
The state's mean vector (8 dimensional array).
covariance : ndarray
The state's covariance matrix (8x8 dimensional).
Returns
-------
(ndarray, ndarray)
Returns the projected mean and covariance matrix of the given state
estimate.
"""#噪声的协方差
std = [
self._std_weight_position * mean[3],
self._std_weight_position * mean[3],
1e-1,
self._std_weight_position * mean[3]]
innovation_cov = np.diag(np.square(std))#初始化噪声矩阵
mean = np.dot(self._update_mat, mean)#Hx' 将均值向量映射到检测空间,由于测量矩阵H是一个4×8的单位矩阵,所以就相当于提取出了前面的4个位置向量[cx,cy,r,h]
covariance = np.linalg.multi_dot(( #HP'H^T 将协方差矩阵映射到检测空间
self._update_mat, covariance, self._update_mat.T))
return mean, covariance + innovation_cov
公式中的R是在代码中是innovation_cov
,新观测值的固定误差
接着,计算矩阵求逆\(K = (CPC^t + R)^{-1}(PC^t)\),其中的求逆运算使用Cholesky分解辅助
class KalmanFilter(object):
...
def update(self, mean, covariance, measurement):
"""Run Kalman filter correction step.
Parameters
----------
mean : ndarray
The predicted state's mean vector (8 dimensional).
covariance : ndarray
The state's covariance matrix (8x8 dimensional).
measurement : ndarray
The 4 dimensional measurement vector (x, y, a, h), where (x, y)
is the center position, a the aspect ratio, and h the height of the
bounding box.
Returns
-------
(ndarray, ndarray)
Returns the measurement-corrected state distribution.
"""
projected_mean, projected_cov = self.project(mean, covariance)
chol_factor, lower = scipy.linalg.cho_factor( #矩阵分解
projected_cov, lower=True, check_finite=False)
kalman_gain = scipy.linalg.cho_solve( #计算卡尔曼增益
(chol_factor, lower), np.dot(covariance, self._update_mat.T).T,
check_finite=False).T
innovation = measurement - projected_mean #y=z-Hx',z为detection的均值向量,不包含速度变化值,
#z=[cx,cy,r,h],H称为测量矩阵,它将track的均值向量x'映射到检测空间,该公式计算detection和track的均值误差。
new_mean = mean + np.dot(innovation, kalman_gain.T) ##新的均值向量x=x'+Ky=x'+k(z-Hx')
new_covariance = covariance - np.linalg.multi_dot(( #新的协方差向量P=P'-KHK^T
kalman_gain, projected_cov, kalman_gain.T))
return new_mean, new_covariance
最后,叠加到观测值上,
速度状态是隐变量
- 观测值\(y\)只有4维位置信息,是不包含速度信息。速度不是直接计算的,也是通过增加梯度值来更新的。
- 梯度值通过\(y_k - C\hat{x}_{k}\)算出来的,预估和观测差异越大,梯度也大。
- 梯度乘以\(K_k\)的上半部分矩阵,算出位置的增量,更新\(x\)的1-4位的位置信息;梯度乘以\(K_k\)下半部分矩阵,算出速度的增量,更新\(x\)的4-8位。
对应代码(代码有个转置,向量是[1, 4]
维度的行向量,公式里是当成列向量的)
innovation = measurement - projected_mean
new_mean = mean + np.dot(innovation, kalman_gain.T)
协方差叠加观测值公式,
对应代码(看起来有点区别,但带入K的定义式,应该化简能得到)
new_covariance = covariance - np.linalg.multi_dot(kalman_gain, projected_cov, kalman_gain.T)
反思:何时会用到卡尔曼滤波¶
- 回顾:一个运动物体的坐标,及对应运动方程,通过观察坐标点
[P1, P2, ..., Pk]
,不断梯度更新当前位置及速度,学习率是是动态变化的K。 - 推广:假设不是运动问题,而是温度,用户对标签的兴趣分数之类,也是一样的。运动方程就还是那个泰勒一阶展开式,state中一半是函数值,一半是导数值,函数值可以观测,导数值是隐藏的。观测一系列坐标点,通过梯度更新得到当前位置的state,比如平滑后的兴趣值及不可观测的兴趣变化率,学习率是动态变化的K。
- 功能:Kalman可以(大概齐)看成简化版反向传播的RNN,输入一个\(\R^n\)序列
[x1, ..., xk]
,调用KalmanFilter().forward
,输出预估的[h1, ..., hk]
,经过模型修正后的x
,同时更新内部状态参数mean
和covar
,更新方式是计算梯度,学习率是动态的K。 - 适用要求:序列是满足一阶泰勒展开近似相等的(也就是\(S=S_0 + vt + O(1)\)),下一时刻的位置基本可以由之前的运动状态预测,不能跳来跳去的。(否则就还不如直接用观测值)
5.2、匹配步骤(E)¶
匹配算法有几个策略融合在一起
- 静止IOU距离:基于绝对的位置信息,匹配比较严格
- ReID距离:计算track.features与detection.feature的embedding计算cosine距离
- 带运动修正后的bbox距离:track.mean, track.covariance,与detection.bbox计算gauss分布的距离,track的信息由KalmanFilter.predict生成
- 时间间隔的距离:当前帧的检测结果,优先匹配上几帧里出现的track,匹配不上再去检查很久没出现的track,代码里叫cascade。
代码组织使用了StrategyPattern,按照不同条件(第几帧,track的确认状态,track的更新时间)调用不同的匹配策略。每种策略基本都是算出cost_matrix,叫代价矩阵或者相似度矩阵,然后调用匈牙利算法获得匹配结果。
匹配算法:匈牙利算法¶
什么情况下,匹配需要匈牙利算法?
举个例子,
A -> x1:0.9, x2:0.8
B -> x1:1.0
-
如果我们只需要输出满足阈值的,结果是
(A, x1), (A, x2), (B, x1)
-
如果我们输出top1,结果是
(A, x1), (B, x1)
- 如果要求一一映射,那么x1不能和两个同时匹配,这就需要匈牙利算法,输出
(A, x2), (B, x1)
在当前任务中,每一帧的detection和tracks是最多一一对应的,比如一个detection与两个track的距离都满足阈值,也会只选一个track。
扩展思考:匈牙利匹配实际应用案例
- 相似的问题可以是
- 人体关键点的匹配,一个头只能归属于一个躯干,不能两个
- 微博和小红书间的账号匹配,但无小号的情况下
- 不适用的问题
- 中文关键词,在知识图谱中寻找对应的object,可能有多个
- 一个新闻,归属到一个热点事件(话题)上,也可能是多个
步骤1:ReID距离和运动信息距离¶
候选的每个detection.feature
与已有[track.features]
计算cosine距离作为匹配度,一个track.features
长度有最多100个,取min(cost)
值作为汇总函数。
>>> features = np.array([dets[i].feature for i in detection_indices])
>>> targets = np.array([tracks[i].track_id for i in track_indices])
>>> cost_matrix = self.metric.distance(features, targets)
>>> cost_matrix
array([[ 0.0045737, 0.33172, 0.31459],
[ 0.29239, 0.010331, 0.32532],
[ 0.33085, 0.33551, 0.0096017]])
基于Kalman预估的track的位置,和detection的位置计算一个cost,逻辑叠加到cost_matrix
上
cost_matrix = linear_assignment.gate_cost_matrix(
self.kf, cost_matrix, tracks, dets, track_indices,
detection_indices)
cost定义方式,3个measurements的位置和track的mean, covariance来比较,看距离track的中心有几个covariance的距离
class KalmanFilter(object):
...
def gating_distance(self, mean, covariance, measurements,
only_position=False):
"""Compute gating distance between state distribution and measurements.
A suitable distance threshold can be obtained from `chi2inv95`. If
`only_position` is False, the chi-square distribution has 4 degrees of
freedom, otherwise 2.
Parameters
----------
mean : ndarray
Mean vector over the state distribution (8 dimensional).
covariance : ndarray
Covariance of the state distribution (8x8 dimensional).
measurements : ndarray
An Nx4 dimensional matrix of N measurements, each in
format (x, y, a, h) where (x, y) is the bounding box center
position, a the aspect ratio, and h the height.
only_position : Optional[bool]
If True, distance computation is done with respect to the bounding
box center position only.
Returns
-------
ndarray
Returns an array of length N, where the i-th element contains the
squared Mahalanobis distance between (mean, covariance) and
`measurements[i]`.
"""
mean, covariance = self.project(mean, covariance)
if only_position:
mean, covariance = mean[:2], covariance[:2, :2]
measurements = measurements[:, :2]
cholesky_factor = np.linalg.cholesky(covariance)
d = measurements - mean
z = scipy.linalg.solve_triangular(
cholesky_factor, d.T, lower=True, check_finite=False,
overwrite_b=True)
squared_maha = np.sum(z * z, axis=0)
return squared_maha
里面的solve计算的就是 $$ d = P^{-1}(y - x) $$ 得到4个位置维度上,距离track中心的方差\(\sigma\)的个数,就是一个无量纲的距离值
>>> covariance
array([[ 48, 0, 0, 0],
[ 0, 48, 0, 0],
[ 0, 0, 0.010473, 0],
[ 0, 0, 0, 48]])
>>> measurements
array([[ 435.5, 497, 0.76389, 72],
[ 264, 691, 1.7857, 56],
[ 453, 359.5, 0.60317, 63]], dtype=float32)
>>> z
array([[ -0.15468, -24.909, 2.3712],
[ 0.24247, 28.244, -19.604],
[ 0.055514, 10.04, -1.5149],
[ 0.1186, -2.1908, -1.1804]])
>>> squared_maha
array([ 0.099863, 1523.8, 393.62])
3个测量结果,距离tracks[1]
的距离分别如上面。
超过一定阈值的,把cost代价矩阵加到很大,相当于说两个之间不能匹配
>>> cost_matrix[row, gating_distance > gating_threshold] = gated_cost
>>> cost_matrix
array([[ 0.0045737, 1e+05, 1e+05],
[ 1e+05, 1e+05, 1e+05],
[ 1e+05, 1e+05, 0.0096017]])
>>> cost_matrix[cost_matrix > max_distance] = max_distance + 1e-5
array([[ 0.0045737, 0.20001, 0.20001],
[ 0.20001, 0.20001, 0.20001],
[ 0.20001, 0.20001, 0.0096017]])
匈牙利匹配算法,传入cost代价矩阵就完事了
>>> from scipy.optimize import linear_sum_assignment as linear_assignment
>>> row_indices, col_indices = linear_assignment(cost_matrix) #匹配
>>> row_indices
array([0, 1, 2])
>>> col_indices
array([0, 1, 2])
没在col_indices
中的,记录到没匹配上的detections,没在row_indices
中的记录到没匹配的tracks中,值为0.20001
的也放到没匹配的里面。代码不贴了,值是下面的
>>> unmatched_tracks
[1]
>>> unmatched_detections
[1]
>>> matches
[(0, 0), (2, 2)]
步骤2:IOU距离¶
IOU距离没有使用ReID信息,只是基于位置计算的相似度,大小位置都一样才行,比较严格,用于启动和补充召回。启动是说tracker.detections
只有1-2个的时候。补充召回是说,ReID+Kalman没匹配上的(unmatched_tracks
和unmatched_detections
),再用这个召回一下。
def iou(bbox, candidates):
"""Computer intersection over union.
Parameters
----------
bbox : ndarray
A bounding box in format `(top left x, top left y, width, height)`.
candidates : ndarray
A matrix of candidate bounding boxes (one per row) in the same format
as `bbox`.
Returns
-------
ndarray
The intersection over union in [0, 1] between the `bbox` and each
candidate. A higher score means a larger fraction of the `bbox` is
occluded by the candidate.
"""
bbox_tl, bbox_br = bbox[:2], bbox[:2] + bbox[2:]
candidates_tl = candidates[:, :2]
candidates_br = candidates[:, :2] + candidates[:, 2:]
tl = np.c_[np.maximum(bbox_tl[0], candidates_tl[:, 0])[:, np.newaxis],
np.maximum(bbox_tl[1], candidates_tl[:, 1])[:, np.newaxis]]
br = np.c_[np.minimum(bbox_br[0], candidates_br[:, 0])[:, np.newaxis],
np.minimum(bbox_br[1], candidates_br[:, 1])[:, np.newaxis]]
wh = np.maximum(0., br - tl)
area_intersection = wh.prod(axis=1)
area_bbox = bbox[2:].prod()
area_candidates = candidates[:, 2:].prod(axis=1)
return area_intersection / (area_bbox + area_candidates - area_intersection)
>>> bbox
array([ 221.61, 660.99, 82.976, 58.171])
>>> candidates
array([[ 214, 663, 100, 56]], dtype=float32)
>>> cost_matrix
array([[ 0.1961]])
>>> matches # 补充召回结果
[(1, 1)]
最终匹配结果还是
>>> matches
[(0, 0), (2, 2), (1, 1)]
6、总结¶
- 追踪,去重,匹配,这些都是类似的问题,算法框架是聚类,EM,Kmeans
- 视频处理借用cv2的
VideoCapture
和VideoWriter
,可以像文件的readline和write一样处理 - 聚类中的E步骤(将点和中心进行匹配),简单点是分配到距离最近的,复杂点的可以cost_matrix结合匈牙利匹配,再复杂的项目就是多种cost_matrix+匈牙利置信,设置不同阈值/顺序执行
- 惯性变化的状态(有momentum的)都可以用Kalman Filter来建模和平滑