Module narya.utils.tracker
Expand source code
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import mxnet as mx
from scipy.signal import savgol_filter
import pandas as pd
import numpy as np
def add_nan_trajectories(trajectories, max_frame):
"""Add np.nan to frame where the x,y coordinates are missing
Arguments:
trajectories: Dict mapping each id to a list of tuple (x,y,frame)
max_frame: Max number of frame
Returns:
trajectories: Dict mapping each id to a list of tuple (x,y,frame)
Raises:
"""
frame_range = [i for i in range(1, max_frame)]
full_trajectories = {}
for ids in trajectories.keys():
traj = trajectories[ids]
full_trajectories[ids] = []
cnt = 0
for x_, y_, frame_ in traj:
if cnt == 0:
full_trajectories[ids].append([x_, y_, frame_])
last_x, last_y, last_frame = x_, y_, frame_
cnt += 1
else:
nb_fake_data_to_add = frame_ - last_frame
for i in range(nb_fake_data_to_add - 1):
full_trajectories[ids].append([np.nan, np.nan, last_frame + i + 1])
full_trajectories[ids].append([x_, y_, frame_])
last_x, last_y, last_frame = x_, y_, frame_
cnt += 1
last_frame = frame_
if last_frame < frame_range[-1]:
for i in range(last_frame + 1, frame_range[-1] + 1):
full_trajectories[ids].append([np.nan, np.nan, i])
return full_trajectories
def get_trajectory_from_id(trajectories, id_):
"""Get the trajectory of an id, and map them to lastRow data format.
Arguments:
trajectories: Dict mapping each id to a list of tuple (x,y,frame)
id_: the id of the trajectory we want
Returns:
x,y, frames: List of x,y coordinates with frames
Raises:
"""
tab = trajectories[id_]
x = []
y = []
frames = []
for data_points in tab:
x_, y_, fr = data_points[0], data_points[1], data_points[2]
if np.isnan(x_):
x.append(x_)
y.append(y_)
else:
new_x = x_ * (100.0 / 320.0)
new_y = y_ * (100.0 / 320.0)
new_y = 100.0 - new_y
x.append(new_x)
y.append(new_y)
frames.append(fr)
return x, y, frames
def build_df_per_id(trajectories):
"""Build one dataframe per id
Arguments:
trajectories: Dict mapping each id to a list of tuple (x,y,frame)
Returns:
df_per_id: dict mapping each id to a dataframe with its trajectory
Raises:
"""
df_per_id = {}
full_trajectories = add_nan_trajectories(trajectories, max_frame=50)
for ids in trajectories.keys():
x, y, frame = get_trajectory_from_id(full_trajectories,ids)
results_ids = {"x": [], "y": [], "frame": []}
for x_, y_, frame_ in zip(x, y, frame):
results_ids["x"].append(x_)
results_ids["y"].append(y_)
results_ids["frame"].append(frame_)
df_per_id[int(ids)] = pd.DataFrame(results_ids)
return df_per_id
def fill_nan_trajectories(df_per_id,window_size=21):
"""Fill each trajectory, and apply a savgol filter
Arguments:
df_per_id: dict mapping each id to a dataframe with its trajectory
Returns:
df_per_id: dict mapping each id to a dataframe with its trajectory
Raises:
"""
for ids in df_per_id.keys():
df_id = df_per_id[ids]
test = df_id.set_index("frame")
test["x"] = test["x"].interpolate(method="slinear", limit_direction="both")
test["y"] = test["y"].interpolate(method="slinear", limit_direction="both")
test["x"] = test["x"].interpolate(
method="pad", limit_direction="forward", limit_area="outside"
)
test["y"] = test["y"].interpolate(
method="pad", limit_direction="forward", limit_area="outside"
)
test["x"] = savgol_filter(test["x"], window_size, 3) if len(test["x"]) > 3 else test["x"]
test["y"] = savgol_filter(test["y"], window_size, 3) if len(test["y"]) > 3 else test["y"]
df_per_id[ids] = test
return df_per_id
def get_full_results(df_per_id):
"""Build a dataframe of tracking data, in the same format as Last Row
Arguments:
df_per_id: dict mapping each id to a dataframe with its trajectory
Returns:
full_results: A dataframe with each coordinates at each frame
Raises:
"""
full_results = {"frame": [], "id": [], "x": [], "y": []}
for ids in df_per_id.keys():
df_id = df_per_id[ids].reset_index()
for line in df_id.values:
full_results["frame"].append(int(line[0]))
full_results["x"].append(line[1])
full_results["y"].append(line[2])
full_results["id"].append(ids)
return pd.DataFrame(full_results).set_index("frame")
def _get_max_id(traj):
"""Get the biggest id within a trajectory
"""
return max([int(ids) for ids in traj.keys()])
def _remove_coords(traj, ids, frame):
"""Remove the x,y coordinates of an id at a given frame
Arguments:
traj: Dict mapping each id to a list of trajectory
ids: the id to target
frame: int, the frame we want to remove
Returns:
traj: Dict mapping each id to a list of trajectory
Raises:
"""
new_traj = []
for data_points in traj[ids]:
if data_points[2] != frame:
new_traj.append(data_points)
traj[ids] = new_traj
return traj
def _remove_ids(traj, list_ids):
"""Remove ids from a trajectory
Arguments:
traj: Dict mapping each id to a list of trajectory
list_ids: List of id
Returns:
traj: Dict mapping each id to a list of trajectory
Raises:
"""
for t in list_ids:
traj.pop(t, None)
return traj
def add_entity(traj, entity_id, entity_traj):
"""Adds a new id with a trajectory
Arguments:
traj: Dict mapping each id to a list of trajectory
entity_id: the id to add
entity_traj: the trajectory linked to entity_id we want to add
Returns:
traj: Dict mapping each id to a list of trajectory
Raises:
"""
assert (
entity_id not in traj.keys()
), "This id is already present in the trajectory, find another one"
traj[entity_id] = entity_traj
return traj
def add_entity_coords(traj, entity_id, entity_traj, max_frame):
"""Add some coordinates to the trajectory of a given id
Arguments:
traj: Dict mapping each id to a list of trajectory
entity_id: the id to target
entity_traj: List of (x,y,frame) to add to the trajectory of entity_id
max_frame: int, the maximum number of frame in trajectories
Returns:
traj: Dict mapping each id to a list of trajectory
Raises:
"""
# entity_traj with shape list of (x,y,frame)
assert entity_id in traj.keys(), "This id is not present in the trajectory"
traj_to_add = {frame: [x, y] for x, y, frame in entity_traj}
traj_ = {
data_point[2]: [data_point[0], data_point[1]] for data_point in traj[entity_id]
}
temp_traj = []
for frame in range(1, max_frame):
if frame in traj_to_add.keys():
temp_traj.append([traj_to_add[frame][0], traj_to_add[frame][1], frame])
elif frame in traj_.keys():
temp_traj.append([traj_[frame][0], traj_[frame][1], frame])
traj[entity_id] = temp_traj
return traj
def merge_id(traj, list_ids_frame):
"""Merge trajectories of different ids.
e.g.: (10,0,110),(12,110,300) will merge the trajectory of 10 between frame 0 and 110 to the
trajectory of 12 between frame 110 and 300.
Arguments:
traj: Dict mapping each id to a list of trajectory
list_ids_frame: List of (id,frame_start,frame_end)
Returns:
traj: Dict mapping each id to a list of trajectory
Raises:
"""
# list_ids: sort by order of appearance
# with the shape : list of (id,frame_start,frame_end)
new_traj = []
for ids, frame_start, frame_end in list_ids_frame:
for data_points in traj[ids]:
x, y, frame = data_points[0], data_points[1], data_points[2]
if frame >= frame_start and frame < frame_end:
new_traj.append([x, y, frame])
for ids, _, _ in list_ids_frame:
traj.pop(ids, None)
traj[list_ids_frame[0][0]] = new_traj
return traj
def merge_2_trajectories(traj1, traj2, id_mapper, max_frame_traj1):
"""Merge 2 dict of trajectories, if you want to merge the results of 2 tracking
Arguments:
traj1: Dict mapping each id to a list of trajectory
traj2: Dict mapping each id to a list of trajectory
id_mapper: A dict mapping each id in traj1 to id in traj2
max_frame_traj1: Maximum number of frame in the first trajectory
Returns:
traj1: Dict mapping each id to a list of trajectory
Raises:
"""
for id_1 in id_mapper.keys():
id_2 = id_mapper[id_1]
traj_id1 = traj1[id_1]
traj_id2 = traj2[id_2]
for data_points in traj_id2:
x, y, frame = data_points[0], data_points[1], data_points[2]
frame += max_frame_traj1
traj_id1.append([x, y, frame])
for id_2 in traj2.keys():
if id_2 not in traj1.keys():
traj_id2 = traj2[id_2]
new_traj = []
for data_points in traj_id2:
x, y, frame = data_points[0], data_points[1], data_points[2]
frame += max_frame_traj1
new_traj.append([x, y, frame])
traj1[id_2] = new_traj
return traj1
Functions
def add_entity(traj, entity_id, entity_traj)
-
Adds a new id with a trajectory
Arguments
traj: Dict mapping each id to a list of trajectory entity_id: the id to add entity_traj: the trajectory linked to entity_id we want to add
Returns
traj
- Dict mapping each id to a list of trajectory
Raises:
Expand source code
def add_entity(traj, entity_id, entity_traj): """Adds a new id with a trajectory Arguments: traj: Dict mapping each id to a list of trajectory entity_id: the id to add entity_traj: the trajectory linked to entity_id we want to add Returns: traj: Dict mapping each id to a list of trajectory Raises: """ assert ( entity_id not in traj.keys() ), "This id is already present in the trajectory, find another one" traj[entity_id] = entity_traj return traj
def add_entity_coords(traj, entity_id, entity_traj, max_frame)
-
Add some coordinates to the trajectory of a given id
Arguments
traj: Dict mapping each id to a list of trajectory entity_id: the id to target entity_traj: List of (x,y,frame) to add to the trajectory of entity_id max_frame: int, the maximum number of frame in trajectories
Returns
traj
- Dict mapping each id to a list of trajectory
Raises:
Expand source code
def add_entity_coords(traj, entity_id, entity_traj, max_frame): """Add some coordinates to the trajectory of a given id Arguments: traj: Dict mapping each id to a list of trajectory entity_id: the id to target entity_traj: List of (x,y,frame) to add to the trajectory of entity_id max_frame: int, the maximum number of frame in trajectories Returns: traj: Dict mapping each id to a list of trajectory Raises: """ # entity_traj with shape list of (x,y,frame) assert entity_id in traj.keys(), "This id is not present in the trajectory" traj_to_add = {frame: [x, y] for x, y, frame in entity_traj} traj_ = { data_point[2]: [data_point[0], data_point[1]] for data_point in traj[entity_id] } temp_traj = [] for frame in range(1, max_frame): if frame in traj_to_add.keys(): temp_traj.append([traj_to_add[frame][0], traj_to_add[frame][1], frame]) elif frame in traj_.keys(): temp_traj.append([traj_[frame][0], traj_[frame][1], frame]) traj[entity_id] = temp_traj return traj
def add_nan_trajectories(trajectories, max_frame)
-
Add np.nan to frame where the x,y coordinates are missing
Arguments
trajectories: Dict mapping each id to a list of tuple (x,y,frame) max_frame: Max number of frame
Returns
trajectories
- Dict mapping each id to a list of tuple (x,y,frame)
Raises:
Expand source code
def add_nan_trajectories(trajectories, max_frame): """Add np.nan to frame where the x,y coordinates are missing Arguments: trajectories: Dict mapping each id to a list of tuple (x,y,frame) max_frame: Max number of frame Returns: trajectories: Dict mapping each id to a list of tuple (x,y,frame) Raises: """ frame_range = [i for i in range(1, max_frame)] full_trajectories = {} for ids in trajectories.keys(): traj = trajectories[ids] full_trajectories[ids] = [] cnt = 0 for x_, y_, frame_ in traj: if cnt == 0: full_trajectories[ids].append([x_, y_, frame_]) last_x, last_y, last_frame = x_, y_, frame_ cnt += 1 else: nb_fake_data_to_add = frame_ - last_frame for i in range(nb_fake_data_to_add - 1): full_trajectories[ids].append([np.nan, np.nan, last_frame + i + 1]) full_trajectories[ids].append([x_, y_, frame_]) last_x, last_y, last_frame = x_, y_, frame_ cnt += 1 last_frame = frame_ if last_frame < frame_range[-1]: for i in range(last_frame + 1, frame_range[-1] + 1): full_trajectories[ids].append([np.nan, np.nan, i]) return full_trajectories
def build_df_per_id(trajectories)
-
Build one dataframe per id
Arguments
trajectories: Dict mapping each id to a list of tuple (x,y,frame)
Returns
df_per_id
- dict mapping each id to a dataframe with its trajectory
Raises:
Expand source code
def build_df_per_id(trajectories): """Build one dataframe per id Arguments: trajectories: Dict mapping each id to a list of tuple (x,y,frame) Returns: df_per_id: dict mapping each id to a dataframe with its trajectory Raises: """ df_per_id = {} full_trajectories = add_nan_trajectories(trajectories, max_frame=50) for ids in trajectories.keys(): x, y, frame = get_trajectory_from_id(full_trajectories,ids) results_ids = {"x": [], "y": [], "frame": []} for x_, y_, frame_ in zip(x, y, frame): results_ids["x"].append(x_) results_ids["y"].append(y_) results_ids["frame"].append(frame_) df_per_id[int(ids)] = pd.DataFrame(results_ids) return df_per_id
def fill_nan_trajectories(df_per_id, window_size=21)
-
Fill each trajectory, and apply a savgol filter
Arguments
df_per_id: dict mapping each id to a dataframe with its trajectory
Returns
df_per_id
- dict mapping each id to a dataframe with its trajectory
Raises:
Expand source code
def fill_nan_trajectories(df_per_id,window_size=21): """Fill each trajectory, and apply a savgol filter Arguments: df_per_id: dict mapping each id to a dataframe with its trajectory Returns: df_per_id: dict mapping each id to a dataframe with its trajectory Raises: """ for ids in df_per_id.keys(): df_id = df_per_id[ids] test = df_id.set_index("frame") test["x"] = test["x"].interpolate(method="slinear", limit_direction="both") test["y"] = test["y"].interpolate(method="slinear", limit_direction="both") test["x"] = test["x"].interpolate( method="pad", limit_direction="forward", limit_area="outside" ) test["y"] = test["y"].interpolate( method="pad", limit_direction="forward", limit_area="outside" ) test["x"] = savgol_filter(test["x"], window_size, 3) if len(test["x"]) > 3 else test["x"] test["y"] = savgol_filter(test["y"], window_size, 3) if len(test["y"]) > 3 else test["y"] df_per_id[ids] = test return df_per_id
def get_full_results(df_per_id)
-
Build a dataframe of tracking data, in the same format as Last Row
Arguments
df_per_id: dict mapping each id to a dataframe with its trajectory
Returns
full_results
- A dataframe with each coordinates at each frame
Raises:
Expand source code
def get_full_results(df_per_id): """Build a dataframe of tracking data, in the same format as Last Row Arguments: df_per_id: dict mapping each id to a dataframe with its trajectory Returns: full_results: A dataframe with each coordinates at each frame Raises: """ full_results = {"frame": [], "id": [], "x": [], "y": []} for ids in df_per_id.keys(): df_id = df_per_id[ids].reset_index() for line in df_id.values: full_results["frame"].append(int(line[0])) full_results["x"].append(line[1]) full_results["y"].append(line[2]) full_results["id"].append(ids) return pd.DataFrame(full_results).set_index("frame")
def get_trajectory_from_id(trajectories, id_)
-
Get the trajectory of an id, and map them to lastRow data format.
Arguments
trajectories: Dict mapping each id to a list of tuple (x,y,frame) id_: the id of the trajectory we want
Returns
x,y, frames
- List of x,y coordinates with frames
Raises:
Expand source code
def get_trajectory_from_id(trajectories, id_): """Get the trajectory of an id, and map them to lastRow data format. Arguments: trajectories: Dict mapping each id to a list of tuple (x,y,frame) id_: the id of the trajectory we want Returns: x,y, frames: List of x,y coordinates with frames Raises: """ tab = trajectories[id_] x = [] y = [] frames = [] for data_points in tab: x_, y_, fr = data_points[0], data_points[1], data_points[2] if np.isnan(x_): x.append(x_) y.append(y_) else: new_x = x_ * (100.0 / 320.0) new_y = y_ * (100.0 / 320.0) new_y = 100.0 - new_y x.append(new_x) y.append(new_y) frames.append(fr) return x, y, frames
def merge_2_trajectories(traj1, traj2, id_mapper, max_frame_traj1)
-
Merge 2 dict of trajectories, if you want to merge the results of 2 tracking
Arguments
traj1: Dict mapping each id to a list of trajectory traj2: Dict mapping each id to a list of trajectory id_mapper: A dict mapping each id in traj1 to id in traj2 max_frame_traj1: Maximum number of frame in the first trajectory
Returns
traj1
- Dict mapping each id to a list of trajectory
Raises:
Expand source code
def merge_2_trajectories(traj1, traj2, id_mapper, max_frame_traj1): """Merge 2 dict of trajectories, if you want to merge the results of 2 tracking Arguments: traj1: Dict mapping each id to a list of trajectory traj2: Dict mapping each id to a list of trajectory id_mapper: A dict mapping each id in traj1 to id in traj2 max_frame_traj1: Maximum number of frame in the first trajectory Returns: traj1: Dict mapping each id to a list of trajectory Raises: """ for id_1 in id_mapper.keys(): id_2 = id_mapper[id_1] traj_id1 = traj1[id_1] traj_id2 = traj2[id_2] for data_points in traj_id2: x, y, frame = data_points[0], data_points[1], data_points[2] frame += max_frame_traj1 traj_id1.append([x, y, frame]) for id_2 in traj2.keys(): if id_2 not in traj1.keys(): traj_id2 = traj2[id_2] new_traj = [] for data_points in traj_id2: x, y, frame = data_points[0], data_points[1], data_points[2] frame += max_frame_traj1 new_traj.append([x, y, frame]) traj1[id_2] = new_traj return traj1
def merge_id(traj, list_ids_frame)
-
Merge trajectories of different ids. e.g.: (10,0,110),(12,110,300) will merge the trajectory of 10 between frame 0 and 110 to the trajectory of 12 between frame 110 and 300.
Arguments
traj: Dict mapping each id to a list of trajectory list_ids_frame: List of (id,frame_start,frame_end)
Returns
traj
- Dict mapping each id to a list of trajectory
Raises:
Expand source code
def merge_id(traj, list_ids_frame): """Merge trajectories of different ids. e.g.: (10,0,110),(12,110,300) will merge the trajectory of 10 between frame 0 and 110 to the trajectory of 12 between frame 110 and 300. Arguments: traj: Dict mapping each id to a list of trajectory list_ids_frame: List of (id,frame_start,frame_end) Returns: traj: Dict mapping each id to a list of trajectory Raises: """ # list_ids: sort by order of appearance # with the shape : list of (id,frame_start,frame_end) new_traj = [] for ids, frame_start, frame_end in list_ids_frame: for data_points in traj[ids]: x, y, frame = data_points[0], data_points[1], data_points[2] if frame >= frame_start and frame < frame_end: new_traj.append([x, y, frame]) for ids, _, _ in list_ids_frame: traj.pop(ids, None) traj[list_ids_frame[0][0]] = new_traj return traj