Module narya.utils.google_football_utils
Expand source code
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import six
import os
import random
import numpy as np
import pandas as pd
import pickle
from six.moves import range
SMM_WIDTH = 96
SMM_HEIGHT = 72
channel_dimensions = (SMM_WIDTH, SMM_HEIGHT)
frame = np.zeros((channel_dimensions[1], channel_dimensions[0]))
SMM_LAYERS = ["left_team", "right_team", "ball", "active"]
# Normalized minimap coordinates
MINIMAP_NORM_X_MIN = -1.0
MINIMAP_NORM_X_MAX = 1.0
MINIMAP_NORM_Y_MIN = -1.0 / 2.25
MINIMAP_NORM_Y_MAX = 1.0 / 2.25
_MARKER_VALUE = 255
SMM_WIDTH = 96
SMM_HEIGHT = 72
def _get_values():
return MINIMAP_NORM_Y_MIN
def mark_points(frame, frame_cnt, points):
"""Draw dots corresponding to 'points'.
Arguments:
frame: 2-d matrix representing one SMM channel ([y, x])
points: a list of (x, y) coordinates to be marked
Returns:
Raises:
"""
for p in range(len(points) // 2):
x = int(
(points[p * 2] - MINIMAP_NORM_X_MIN)
/ (MINIMAP_NORM_X_MAX - MINIMAP_NORM_X_MIN)
* frame.shape[1]
)
y = int(
(points[p * 2 + 1] - MINIMAP_NORM_Y_MIN)
/ (MINIMAP_NORM_Y_MAX - MINIMAP_NORM_Y_MIN)
* frame.shape[0]
)
x = max(0, min(frame.shape[1] - 1, x))
y = max(0, min(frame.shape[0] - 1, y))
frame[y, x] = _MARKER_VALUE
frame_cnt[y, x] += 1
def _left_parser(tab, i):
"""Helper function to build google observations
"""
coord = []
tab_ = tab[i]["left_team_positions"]
for list_ in tab_:
for value in list_:
coord.append(value)
return coord
def _right_parser(tab, i):
"""Helper function to build google observations
"""
coord = []
tab_ = tab[i]["right_team_positions"]
for list_ in tab_:
for value in list_:
coord.append(value)
return coord
def _ball_parser(tab, i):
"""Helper function to build google observations
"""
tab_ = tab[i]["ball_position"]
return tab_[:-1]
def _active_parser(tab, i):
"""Helper function to build google observations
"""
tab_left = tab[i]["left_team_positions"]
tab_ball = tab[i]["ball_position"][:-1]
min_dist = 1000000
for indx, list_ in enumerate(tab_left):
dist = (list_[0] - tab_ball[0]) ** 2 + (list_[1] - tab_ball[1]) ** 2
if dist < min_dist:
min_dist = dist
select = indx
return tab_left[select]
def _build_obs_stacked(tab, i):
""" Computes an observation, readable by an agent, from _save_data output
Arguments:
tab: Output of _save_data : player tracking data in the right coordinates
i: Index of the frame to create
Returns:
frame: Google observations of the tracking data
frame_count:Google observations count of the tracking data
Raises:
"""
frame = np.zeros((channel_dimensions[1], channel_dimensions[0], 16))
frame_count = np.zeros((channel_dimensions[1], channel_dimensions[0], 16))
mark_points(frame[:, :, 0], frame_count[:, :, 0], _left_parser(tab, i))
mark_points(frame[:, :, 1], frame_count[:, :, 0], _right_parser(tab, i))
mark_points(frame[:, :, 2], frame_count[:, :, 0], _ball_parser(tab, i))
mark_points(frame[:, :, 3], frame_count[:, :, 0], _active_parser(tab, i))
to_add = min((i + 1), len(tab) - 1)
mark_points(frame[:, :, 4], frame_count[:, :, 0], _left_parser(tab, to_add))
mark_points(frame[:, :, 5], frame_count[:, :, 0], _right_parser(tab, to_add))
mark_points(frame[:, :, 6], frame_count[:, :, 0], _ball_parser(tab, to_add))
mark_points(frame[:, :, 7], frame_count[:, :, 0], _active_parser(tab, to_add))
to_add = min((i + 2), len(tab) - 1)
mark_points(frame[:, :, 8], frame_count[:, :, 0], _left_parser(tab, to_add))
mark_points(frame[:, :, 9], frame_count[:, :, 0], _right_parser(tab, to_add))
mark_points(frame[:, :, 10], frame_count[:, :, 0], _ball_parser(tab, to_add))
mark_points(frame[:, :, 11], frame_count[:, :, 0], _active_parser(tab, to_add))
to_add = min((i + 3), len(tab) - 1)
mark_points(frame[:, :, 12], frame_count[:, :, 0], _left_parser(tab, to_add))
mark_points(frame[:, :, 13], frame_count[:, :, 0], _right_parser(tab, to_add))
mark_points(frame[:, :, 14], frame_count[:, :, 0], _ball_parser(tab, to_add))
mark_points(frame[:, :, 15], frame_count[:, :, 0], _active_parser(tab, to_add))
return frame, frame_count
# For each play, we add to ball coordinates at each frame and for each player :
def _add_ball_coordinates(dataframe: pd.DataFrame, id_ball=-1) -> pd.DataFrame:
""" Adds the ball coordinates at each row
Arguments:
dataframe: pd.DataFrame with player tracking data
Returns:
dataframe: pd.DataFrame with player tracking data
Raises:
"""
list_of_plays = list(dataframe.index.get_level_values("play").unique())
# First, a function to compute this for one play at a time :
def _add_coord(df):
# Getting the balls infos:
df_ball = df[df["id"] == id_ball]
df_ball = df_ball[["frame", "x", "y", "z"]]
df_ball.rename(
columns={"x": "ball_x", "y": "ball_y", "z": "ball_z"}, inplace=True
)
df = df.merge(df_ball, on="frame", how="left")
df = df.drop(columns=["Unnamed: 0"])
return df
for i, play in enumerate(list_of_plays):
df = dataframe.loc[play]
df = df.reset_index()
if i == 0:
new_dataframe = _add_coord(df)
new_dataframe["play"] = play
else:
df = _add_coord(df)
df["play"] = play
new_dataframe = pd.concat([new_dataframe, df])
return new_dataframe
# We now add a boolean to mark the possession of the ball by a player
def _add_possession(dataframe: pd.DataFrame, indx_color=4) -> pd.DataFrame:
"""Add a boolean to mark which player has the ball
Arguments:
dataframe: pd.DataFrame with player tracking data
Returns:
dataframe: pd.DataFrame with player tracking data
Raises:
"""
dataframe["possession"] = False
columns = list(dataframe.columns.values)
tab = dataframe.values
for line in tab:
# Not ocmputing for the ball :
if line[4] != 0:
if (line[8] - line[11]) ** 2 + (line[9] - line[12]) ** 2 < 2:
line[-1] = True
# Test to display later :
line[indx_color] = "black"
dataframe = pd.DataFrame(tab, columns=columns)
return dataframe
def _prepare_dataset(
dataframe: pd.DataFrame, bgcolor_mapping, team_mapping, ball_mapping
) -> pd.DataFrame:
dataframe["bgcolor"] = dataframe["id"]
dataframe["bgcolor"] = dataframe["bgcolor"].replace(bgcolor_mapping)
dataframe["team"] = dataframe["id"]
dataframe["team"] = dataframe["team"].replace(team_mapping)
dataframe["id"] = dataframe["id"].replace(ball_mapping)
dataframe["player"] = dataframe["player"].replace(ball_mapping)
dataframe["dx"] = 0
dataframe["dy"] = 0
dataframe["z"] = 0
dataframe["ball_z"] = 0
dataframe["play"] = "RM_VS_BARCA_BUT_3"
dataframe = dataframe.drop(columns=["index", "id"])
ordered_cols = [
"bgcolor",
"dx",
"dy",
"edgecolor",
"player",
"player_num",
"team",
"x",
"y",
"z",
"ball_x",
"ball_y",
"ball_z",
"play",
"possession",
]
dataframe = dataframe[ordered_cols]
return dataframe
def _scale_mapper(x, y):
"""maps coordinates to google coordinates
Arguments:
x,y: coordinates
Returns:
new_x,new_y: Google env coordinates
Raises:
"""
# Takes our x,y and maps it into google's (x,y)
new_x = (x / 100.0 - 0.5) * 2
new_y = -(y / 100.0 - 0.5) * 2
return (new_x, new_y)
def _save_data(df, filename):
"""Saves the tracking data into a google readable format
Arguments:
df:pd.DataFrame with player tracking data
filename:a .dump filename to store the informations
Returns:
full_info: A list with the tracking data in a google observations format
Raises:
"""
df = df.reset_index()
full_info = []
for i in list(df["frame"].unique()):
temp_df = df[df["frame"] == i]
if (i + 1) in list(df["frame"].unique()):
next_temp_df = df[df["frame"] == i + 1]
else:
next_temp_df = df[df["frame"] == i]
left_df = temp_df[temp_df["team"] == "attack"]
right_df = temp_df[temp_df["team"] == "defense"]
next_left_df = next_temp_df[next_temp_df["team"] == "attack"]
next_right_df = next_temp_df[next_temp_df["team"] == "defense"]
left_team_positions = [[-1.0, 0.0]]
right_team_positions = []
left_team_velocity = [[0.0, 0.0]]
right_team_velocity = []
ball_position = []
ball_velocity = []
ball_team_owner = -1
ball_player_owner = -1
# Ball Information :
# Left team informations :
for indx, line in enumerate(left_df.values):
new_x, new_y = _scale_mapper(line[8], line[9])
next_line = next_left_df.values[indx]
next_new_x, next_new_y = _scale_mapper(next_line[8], next_line[9])
dx, dy = next_new_x - new_x, next_new_y - new_y
left_team_positions.append([new_x, new_y])
left_team_velocity.append([dx, dy])
if line[-1] == True and ball_team_owner == -1:
ball_team_owner = 0
ball_player_owner = indx + 1
if indx == 0:
# Ball information :
ball_x, ball_y = _scale_mapper(line[11], line[12])
next_ball_x, next_ball_y = _scale_mapper(next_line[11], next_line[12])
dx, dy = next_ball_x - ball_x, next_ball_y - ball_y
ball_z = line[13]
ball_position = [ball_x, ball_y, 0.0]
ball_velocity = [dx, dy, 0.0]
for indx, line in enumerate(right_df.values):
new_x, new_y = _scale_mapper(line[8], line[9])
next_line = next_right_df.values[indx]
next_new_x, next_new_y = _scale_mapper(next_line[8], next_line[9])
dx, dy = next_new_x - new_x, next_new_y - new_y
right_team_positions.append([new_x, new_y])
right_team_velocity.append([dx, dy])
info = {
"left_team_positions": left_team_positions,
"right_team_positions": right_team_positions,
"left_team_velocity": left_team_velocity,
"right_team_velocity": right_team_velocity,
"ball_position": ball_position,
"ball_velocity": ball_velocity,
"ball_team_owner": ball_team_owner,
"ball_player_owner": ball_player_owner,
}
full_info.append(info)
with open(filename, "wb") as fh:
pickle.dump((full_info,), fh, pickle.HIGHEST_PROTOCOL)
return full_info
def _reverse_points(x, y):
"""Moves the left team to the right (and the right to the left)
Arguments:
x,y: Coordinates to reverse
Returns:
x,y: Reverse coordinates
Raises:
"""
x = channel_dimensions[0] - 1 - x
y = channel_dimensions[1] - 1 - y
return (x, y)
def _change(observation, observation_count, x, y, new_x, new_y):
"""Moves an entity from x,y to new_x,new_y
Arguments:
observation: np.array unstack of observations
observation_count: np.array unstack of observations count
x,y: old coordinates
new_x,new_y: new coordinates
Returns:
Raises:
"""
observation[new_y, new_x] = _MARKER_VALUE
observation_count[new_y, new_x] += 1
observation_count[y, x] = max(0, observation_count[y, x] - 1)
if observation_count[y, x] == 0:
observation[y, x] = 0
def change(observation, observation_count, x, y, new_x, new_y, entity):
"""Moves an entity from x,y to new_x,new_y on an entire observations
Arguments:
observation: np.array stacked observations
observation_count: np.array stacked observations count
x,y: old coordinates
new_x,new_y: new coordinates
entity: the entity to move (player or ball)
Returns:
Raises:
"""
indx_init = 0 if entity == "player" else 2
for i in range(indx_init, 16, 4):
unstack_observation = observation[:, :, i]
unstack_observation_count = observation_count[:, :, i]
_change(unstack_observation, unstack_observation_count, x, y, new_x, new_y)
def _change_random(observation, observation_count, x, y):
"""Moves an entity from x,y randomly on the field
Arguments:
observation: np.array unstack observations
observation_count: np.array unstacke observations count
x,y: old coordinates
Returns:
Raises:
"""
new_x = int(random.random() * channel_dimensions[0])
new_y = int(random.random() * channel_dimensions[1])
_change(observation, observation_count, x, y, new_x, new_y)
def _add_noise(observation, observation_count, x, y, x_std=5, y_std=5):
"""Moves an entity from x,y randomly on the field, with a gaussian noise
Arguments:
observation: np.array unstack observations
observation_count: np.array unstacke observations count
x,y: old coordinates
x_std,y_std : std for the noises
Returns:
Raises:
"""
new_x = min(max(int(np.random.normal(x, x_std, 1)[0]), 0), channel_dimensions[0])
new_y = min(max(int(np.random.normal(y, y_std, 1)[0]), 0), channel_dimensions[1])
_change(observation, observation_count, x, y, new_x, new_y)
def traverse(observation, observation_count, x, y, entity):
"""Moves an entityto its next possible position. Can be used to try the entire field
Arguments:
observation: np.array unstack observations
observation_count: np.array unstacke observations count
x,y: old coordinates
entity: ball or player
Returns:
new_x,new_y: the Next available position
Raises:
"""
new_x = x + 1
if new_x >= channel_dimensions[0]:
new_x = 0
new_y = y + 1
else:
new_y = y
change(observation, observation_count, x, y, new_x, new_y, entity)
return new_x, new_y
Functions
def change(observation, observation_count, x, y, new_x, new_y, entity)
-
Moves an entity from x,y to new_x,new_y on an entire observations
Arguments
observation: np.array stacked observations observation_count: np.array stacked observations count x,y: old coordinates new_x,new_y: new coordinates entity: the entity to move (player or ball) Returns: Raises:
Expand source code
def change(observation, observation_count, x, y, new_x, new_y, entity): """Moves an entity from x,y to new_x,new_y on an entire observations Arguments: observation: np.array stacked observations observation_count: np.array stacked observations count x,y: old coordinates new_x,new_y: new coordinates entity: the entity to move (player or ball) Returns: Raises: """ indx_init = 0 if entity == "player" else 2 for i in range(indx_init, 16, 4): unstack_observation = observation[:, :, i] unstack_observation_count = observation_count[:, :, i] _change(unstack_observation, unstack_observation_count, x, y, new_x, new_y)
def mark_points(frame, frame_cnt, points)
-
Draw dots corresponding to 'points'.
Arguments
frame: 2-d matrix representing one SMM channel ([y, x]) points: a list of (x, y) coordinates to be marked Returns: Raises:
Expand source code
def mark_points(frame, frame_cnt, points): """Draw dots corresponding to 'points'. Arguments: frame: 2-d matrix representing one SMM channel ([y, x]) points: a list of (x, y) coordinates to be marked Returns: Raises: """ for p in range(len(points) // 2): x = int( (points[p * 2] - MINIMAP_NORM_X_MIN) / (MINIMAP_NORM_X_MAX - MINIMAP_NORM_X_MIN) * frame.shape[1] ) y = int( (points[p * 2 + 1] - MINIMAP_NORM_Y_MIN) / (MINIMAP_NORM_Y_MAX - MINIMAP_NORM_Y_MIN) * frame.shape[0] ) x = max(0, min(frame.shape[1] - 1, x)) y = max(0, min(frame.shape[0] - 1, y)) frame[y, x] = _MARKER_VALUE frame_cnt[y, x] += 1
def traverse(observation, observation_count, x, y, entity)
-
Moves an entityto its next possible position. Can be used to try the entire field
Arguments
observation: np.array unstack observations observation_count: np.array unstacke observations count x,y: old coordinates entity: ball or player
Returns
new_x,new_y
- the Next available position
Raises:
Expand source code
def traverse(observation, observation_count, x, y, entity): """Moves an entityto its next possible position. Can be used to try the entire field Arguments: observation: np.array unstack observations observation_count: np.array unstacke observations count x,y: old coordinates entity: ball or player Returns: new_x,new_y: the Next available position Raises: """ new_x = x + 1 if new_x >= channel_dimensions[0]: new_x = 0 new_y = y + 1 else: new_y = y change(observation, observation_count, x, y, new_x, new_y, entity) return new_x, new_y