Module narya.linker.matching

Expand source code
import lap
import numpy as np
import scipy
from cython_bbox import bbox_overlaps as bbox_ious
from scipy.spatial.distance import cdist
from .kalman_filter import chi2inv95
from ..utils.utils import to_torch, to_numpy

"""

Cloned from https://github.com/Zhongdao/Towards-Realtime-MOT

"""


def linear_assignment(cost_matrix, thresh):
    """Assigns ids based on their cost.
    Arguments:
        cost_matrix: np.array, cost_matrix for pairs of ids
        tresh: float in [0,1], the treshold for id attributions
    Returns:
        matchs: np.array, the list of matches ids
        unmatched_a, unmatched_b: np.array, list of unmatched ids
    Raises:
    """
    if cost_matrix.size == 0:
        return (
            np.empty((0, 2), dtype=int),
            tuple(range(cost_matrix.shape[0])),
            tuple(range(cost_matrix.shape[1])),
        )
    matches, unmatched_a, unmatched_b = [], [], []
    cost, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh)
    for ix, mx in enumerate(x):
        if mx >= 0:
            matches.append([ix, mx])
    unmatched_a = np.where(x < 0)[0]
    unmatched_b = np.where(y < 0)[0]
    matches = np.asarray(matches)
    return matches, unmatched_a, unmatched_b


def ious(atlbrs, btlbrs):
    """Compute cost based on IoU
    Arguments:
        atlbrs: np.array, list of boxes
        btlbrs: np.array, list of boxes
    Returns:
        ious: np.array, matrix of IoUs between each box
    Raises:
    """
    ious = np.zeros((len(atlbrs), len(btlbrs)), dtype=np.float)
    if ious.size == 0:
        return ious

    ious = bbox_ious(
        np.ascontiguousarray(atlbrs, dtype=np.float),
        np.ascontiguousarray(btlbrs, dtype=np.float),
    )

    return ious


def iou_distance(atracks, btracks):
    """Compute cost matrix based on IoU for tracks
    Arguments:
        atracks: np.array, list of tracks
        btracks: np.array, list of tracks
    Returns:
        cost_matrix: np.array, matrix of IoU cost for each pair of track
    Raises:
    """

    if (len(atracks) > 0 and isinstance(atracks[0], np.ndarray)) or (
        len(btracks) > 0 and isinstance(btracks[0], np.ndarray)
    ):
        atlbrs = atracks
        btlbrs = btracks
    else:
        atlbrs = [track.tlbr for track in atracks]
        btlbrs = [track.tlbr for track in btracks]
    _ious = ious(atlbrs, btlbrs)
    cost_matrix = 1 - _ious

    return cost_matrix


def embedding_distance(tracks, detections, metric="cosine"):
    """Compute cost based on embedding cosine similarity
    Arguments:
        tracks: list of STrack
        detections: list of BaseTrack
    Returns:
        cost_matrix: np.array, matrix of similarity between each track
    Raises:
    """
    cost_matrix = np.zeros((len(tracks), len(detections)), dtype=np.float)
    if cost_matrix.size == 0:
        return cost_matrix
    det_features = np.array([to_numpy(track.curr_feat) for track in detections])
    # for i, track in enumerate(tracks):
    # cost_matrix[i, :] = np.maximum(0.0, cdist(track.smooth_feat.reshape(1,-1), det_features, metric))
    track_features = np.array([to_numpy(track.smooth_feat) for track in tracks])
    cost_matrix = np.maximum(
        0.0, cdist(track_features, det_features, metric)
    )  # Nomalized features
    return cost_matrix


def gate_cost_matrix(kf, cost_matrix, tracks, detections, only_position=False):
    """Apply a falman-filter and a gating treshold to a cost matrix
    Arguments:
        kf: a KalmanFilter
        cost_matrix: the cost matrix to use
        tracks: a list of STrack
        detections: a list of BaseTrack
    Returns:
        cost_matrix: np.array
    Raises:
    """
    if cost_matrix.size == 0:
        return cost_matrix
    gating_dim = 2 if only_position else 4
    gating_threshold = chi2inv95[gating_dim]
    measurements = np.asarray([det.to_xyah() for det in detections])
    for row, track in enumerate(tracks):
        gating_distance = kf.gating_distance(
            track.mean, track.covariance, measurements, only_position
        )
        cost_matrix[row, gating_distance > gating_threshold] = np.inf
    return cost_matrix


def fuse_motion(kf, cost_matrix, tracks, detections, only_position=False, lambda_=0.98):
    if cost_matrix.size == 0:
        return cost_matrix
    gating_dim = 2 if only_position else 4
    gating_threshold = chi2inv95[gating_dim]
    measurements = np.asarray([det.to_xyah() for det in detections])
    for row, track in enumerate(tracks):
        gating_distance = kf.gating_distance(
            track.mean, track.covariance, measurements, only_position, metric="maha"
        )
        cost_matrix[row, gating_distance > gating_threshold] = np.inf
        cost_matrix[row] = lambda_ * cost_matrix[row] + (1 - lambda_) * gating_distance
    return cost_matrix

Functions

def embedding_distance(tracks, detections, metric='cosine')

Compute cost based on embedding cosine similarity

Arguments

tracks: list of STrack detections: list of BaseTrack

Returns

cost_matrix
np.array, matrix of similarity between each track

Raises:

Expand source code
def embedding_distance(tracks, detections, metric="cosine"):
    """Compute cost based on embedding cosine similarity
    Arguments:
        tracks: list of STrack
        detections: list of BaseTrack
    Returns:
        cost_matrix: np.array, matrix of similarity between each track
    Raises:
    """
    cost_matrix = np.zeros((len(tracks), len(detections)), dtype=np.float)
    if cost_matrix.size == 0:
        return cost_matrix
    det_features = np.array([to_numpy(track.curr_feat) for track in detections])
    # for i, track in enumerate(tracks):
    # cost_matrix[i, :] = np.maximum(0.0, cdist(track.smooth_feat.reshape(1,-1), det_features, metric))
    track_features = np.array([to_numpy(track.smooth_feat) for track in tracks])
    cost_matrix = np.maximum(
        0.0, cdist(track_features, det_features, metric)
    )  # Nomalized features
    return cost_matrix
def fuse_motion(kf, cost_matrix, tracks, detections, only_position=False, lambda_=0.98)
Expand source code
def fuse_motion(kf, cost_matrix, tracks, detections, only_position=False, lambda_=0.98):
    if cost_matrix.size == 0:
        return cost_matrix
    gating_dim = 2 if only_position else 4
    gating_threshold = chi2inv95[gating_dim]
    measurements = np.asarray([det.to_xyah() for det in detections])
    for row, track in enumerate(tracks):
        gating_distance = kf.gating_distance(
            track.mean, track.covariance, measurements, only_position, metric="maha"
        )
        cost_matrix[row, gating_distance > gating_threshold] = np.inf
        cost_matrix[row] = lambda_ * cost_matrix[row] + (1 - lambda_) * gating_distance
    return cost_matrix
def gate_cost_matrix(kf, cost_matrix, tracks, detections, only_position=False)

Apply a falman-filter and a gating treshold to a cost matrix

Arguments

kf: a KalmanFilter cost_matrix: the cost matrix to use tracks: a list of STrack detections: a list of BaseTrack

Returns

cost_matrix
np.array

Raises:

Expand source code
def gate_cost_matrix(kf, cost_matrix, tracks, detections, only_position=False):
    """Apply a falman-filter and a gating treshold to a cost matrix
    Arguments:
        kf: a KalmanFilter
        cost_matrix: the cost matrix to use
        tracks: a list of STrack
        detections: a list of BaseTrack
    Returns:
        cost_matrix: np.array
    Raises:
    """
    if cost_matrix.size == 0:
        return cost_matrix
    gating_dim = 2 if only_position else 4
    gating_threshold = chi2inv95[gating_dim]
    measurements = np.asarray([det.to_xyah() for det in detections])
    for row, track in enumerate(tracks):
        gating_distance = kf.gating_distance(
            track.mean, track.covariance, measurements, only_position
        )
        cost_matrix[row, gating_distance > gating_threshold] = np.inf
    return cost_matrix
def iou_distance(atracks, btracks)

Compute cost matrix based on IoU for tracks

Arguments

atracks: np.array, list of tracks btracks: np.array, list of tracks

Returns

cost_matrix
np.array, matrix of IoU cost for each pair of track

Raises:

Expand source code
def iou_distance(atracks, btracks):
    """Compute cost matrix based on IoU for tracks
    Arguments:
        atracks: np.array, list of tracks
        btracks: np.array, list of tracks
    Returns:
        cost_matrix: np.array, matrix of IoU cost for each pair of track
    Raises:
    """

    if (len(atracks) > 0 and isinstance(atracks[0], np.ndarray)) or (
        len(btracks) > 0 and isinstance(btracks[0], np.ndarray)
    ):
        atlbrs = atracks
        btlbrs = btracks
    else:
        atlbrs = [track.tlbr for track in atracks]
        btlbrs = [track.tlbr for track in btracks]
    _ious = ious(atlbrs, btlbrs)
    cost_matrix = 1 - _ious

    return cost_matrix
def ious(atlbrs, btlbrs)

Compute cost based on IoU

Arguments

atlbrs: np.array, list of boxes btlbrs: np.array, list of boxes

Returns

ious()
np.array, matrix of IoUs between each box

Raises:

Expand source code
def ious(atlbrs, btlbrs):
    """Compute cost based on IoU
    Arguments:
        atlbrs: np.array, list of boxes
        btlbrs: np.array, list of boxes
    Returns:
        ious: np.array, matrix of IoUs between each box
    Raises:
    """
    ious = np.zeros((len(atlbrs), len(btlbrs)), dtype=np.float)
    if ious.size == 0:
        return ious

    ious = bbox_ious(
        np.ascontiguousarray(atlbrs, dtype=np.float),
        np.ascontiguousarray(btlbrs, dtype=np.float),
    )

    return ious
def linear_assignment(cost_matrix, thresh)

Assigns ids based on their cost.

Arguments

cost_matrix: np.array, cost_matrix for pairs of ids tresh: float in [0,1], the treshold for id attributions

Returns

matchs
np.array, the list of matches ids
unmatched_a, unmatched_b
np.array, list of unmatched ids

Raises:

Expand source code
def linear_assignment(cost_matrix, thresh):
    """Assigns ids based on their cost.
    Arguments:
        cost_matrix: np.array, cost_matrix for pairs of ids
        tresh: float in [0,1], the treshold for id attributions
    Returns:
        matchs: np.array, the list of matches ids
        unmatched_a, unmatched_b: np.array, list of unmatched ids
    Raises:
    """
    if cost_matrix.size == 0:
        return (
            np.empty((0, 2), dtype=int),
            tuple(range(cost_matrix.shape[0])),
            tuple(range(cost_matrix.shape[1])),
        )
    matches, unmatched_a, unmatched_b = [], [], []
    cost, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh)
    for ix, mx in enumerate(x):
        if mx >= 0:
            matches.append([ix, mx])
    unmatched_a = np.where(x < 0)[0]
    unmatched_b = np.where(y < 0)[0]
    matches = np.asarray(matches)
    return matches, unmatched_a, unmatched_b