Source code for chicken_dinner.models.telemetry.telemetry

"""Telemetry class."""
import datetime

from chicken_dinner.constants import map_to_map_name
from chicken_dinner.constants import map_name_to_map
from chicken_dinner.models.telemetry.events import TelemetryEvent


[docs]class Telemetry(object): """Telemetry model. :param pubg: a PUBG instance :param str url: the url for this telemetry :param list telemetry_json: (optional) the raw telemetry response :param str shard: the shard for the match associated with this telemetry :param bool map_assets: whether to map asset ids to named values, e.g. map ``Item_Weapon_AK47_C`` to ``AKM``. """ def __init__(self, pubg, url, telemetry_json=None, shard=None, map_assets=False): self._pubg = pubg self._shard = shard #: Whether asset ids are mapped to names self.map_assets = map_assets if telemetry_json is not None: #: The API response associated with this object self.response = telemetry_json else: self.response = self._pubg._core.telemetry(url) #: Snake cased object-attribute models for telemetry events and objects self.events = [TelemetryEvent(e, map_assets) for e in self.response] if getattr(self.events[-1], "common", None) is not None: #: The platform for this game, "pc" or "xbox" self.platform = "pc" else: self.platform = "xbox" #: Whether this game was played on PC self.is_pc = "pc" == self.platform #: Whether this game was played on xbox self.is_xbox = "xbox" == self.platform def __getitem__(self, key): return self.events[key] @property def shard(self): """The shard for this match.""" return self._shard or self._pubg.shard
[docs] def event_types(self): """A sorted list of event type names from this telemetry.""" return sorted(list(set([e.event_type for e in self.events])))
[docs] def filter_by(self, event_type=None): """Get a list of telemetry events for a specific event type. :param event_type: the event type to filter """ events = None if event_type is not None: events = [ event for event in self.events if event.event_type == event_type ] else: events = [event for event in self.events] return events
[docs] def player_ids(self): """The account ids of all players in the match.""" accounts = [] for event in self.events: if event.event_type == "log_player_login": accounts.append(event.account_id) return accounts
[docs] def players(self): """A map of player names to account ids for all players this match.""" players = {} for event in self.events: if event.event_type == "log_player_create": players[event.character.name] = event.character.account_id return players
[docs] def player_names(self): """A list of player names for all match pariticipants.""" player_names = [] for event in self.events: if event.event_type == "log_player_create": player_names.append(event.character.name) return player_names
[docs] def damage_done(self, player=None, combat_only=True, distribution=False): """Damage done by each player in the match. :param str player: a player name to filter on :param bool combat_only: return only PvP damage (default True) :param bool distribution: return a player to player distribution of damage for the match if true. if false return total damage done by each player. (default False) """ start = datetime.datetime.strptime( self.filter_by("log_match_start")[0].timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" ) damage = {} damage_events = self.filter_by("log_player_take_damage") for event in damage_events: timestamp = datetime.datetime.strptime( event.timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" ) dt = (timestamp - start).total_seconds() if dt < 0 or event.attack_id == -1: continue victim = event.victim.name try: attacker = event.attacker.name except AttributeError: if combat_only: continue else: attacker = "[" + event.damage_type_category + "]" if player is not None and player != attacker: continue if distribution: if attacker not in damage: damage[attacker] = {} if victim not in damage[attacker]: damage[attacker][victim] = event.damage else: damage[attacker][victim] += event.damage else: if attacker not in damage: damage[attacker] = event.damage else: damage[attacker] += event.damage return damage
[docs] def damage_taken(self, player=None, combat_only=True, distribution=False): """Damage taken by each player in the match. :param str player: a player name to filter on :param bool combat_only: return only PvP damage (default True) :param bool distribution: return a player to player distribution of damage for the match if true. if false return total damage taken by each player. (default False) """ start = datetime.datetime.strptime( self.filter_by("log_match_start")[0].timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" ) damage = {} damage_events = self.filter_by("log_player_take_damage") for event in damage_events: timestamp = datetime.datetime.strptime( event.timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" ) dt = (timestamp - start).total_seconds() if dt < 0 or event.attack_id == -1: continue victim = event.victim.name if player is not None and player != victim: continue try: attacker = event.attacker.name except AttributeError: if combat_only: continue else: attacker = "[" + event.damage_type_category + "]" if distribution: if victim not in damage: damage[victim] = {} if attacker not in damage[victim]: damage[victim][attacker] = event.damage else: damage[victim][attacker] += event.damage else: if victim not in damage: damage[victim] = event.damage else: damage[victim] += event.damage return damage
[docs] def rosters(self): """The team rosters for the match.""" rosters = {} for event in self.events[::-1]: if event.event_type == "log_match_end": for player in event.characters: team = player.team_id player_name = player.name if team not in rosters: rosters[team] = [] rosters[team].append(player_name) return rosters
[docs] def num_players(self): """Number of participants in this match.""" return len(self.player_names())
[docs] def num_teams(self): """Number of teams (rosters) in this match.""" return len(self.rosters())
[docs] def rankings(self, rank=None): """The rankings of each team from this match. Returns a map of rank : [players] for each team in the match. :param int rank: Get the specific rank number players for the match. """ rankings = {} for event in self.events[::-1]: if event.event_type == "log_match_end": for player in event.characters: ranking = player.ranking if ranking not in rankings: rankings[ranking] = [] rankings[ranking].append(player.name) if rank is not None: return rankings.get(rank, None) return rankings
[docs] def winner(self): """The winner(s) of the match. Match winners as a list of player names. """ return self.rankings(rank=1)
[docs] @classmethod def from_json(cls, telemetry_json, pubg=None, url=None, shard=None): """Construct an instance of telemetry from the json response.""" return cls(pubg, url, telemetry_json, shard)
[docs] @classmethod def from_match_id(cls, match_id, pubg, shard=None): """Construct an instance of telemetry from a match id.""" match = pubg.match(match_id, shard) url = match.telemetry_url() return cls(pubg, url, shard=shard)
[docs] def map_name(self): """Get the map name for PC matches. None if not PC.""" for event in self.events: if event.event_type == "log_match_start": map_id = getattr(event, "map_name", None) if map_id is not None: return map_to_map_name.get(map_id, map_id) else: return self._pubg.match(self.match_id()).map_name
[docs] def map_id(self): """Get the map id for PC matches. None if not PC.""" for event in self.events: if event.event_type == "log_match_start": map_id = getattr(event, "map_name", None) if map_id is not None: return map_name_to_map.get(map_id, map_id) else: return self._pubg.match(self.match_id()).map_id
[docs] def match_id(self): """The match id for the match.""" for event in self.events: if event.event_type == "log_match_definition": return event.match_id
[docs] def player_damages(self, include_pregame=False): """Get the player damages for the match. Returns a dict of attacker players as keys and values of damage attacker and victim positions with the values tuples. Each tuple has seven elements being (t, x_a, y_a, z_a, x_v, y_v, y_z) coordinates where a is attacker and v is victim. :param bool include_pregame: (default False) whether to include pre-game damage positions. """ start = datetime.datetime.strptime( self.filter_by("log_match_start")[0].timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" ) damages = {} attack_events = self.filter_by("log_player_attack") attackers = {} for event in attack_events: attackers[event.attack_id] = event.attacker damage_events = self.filter_by("log_player_take_damage") for event in damage_events: try: attacker = event.attacker.name except AttributeError: continue if attacker != "": timestamp = datetime.datetime.strptime( event.timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" ) dt = (timestamp - start).total_seconds() if (not include_pregame and dt < 0) or event.attack_id == -1: continue if attacker not in damages: damages[attacker] = [] attacker_location = attackers[event.attack_id].location damages[attacker].append( ( dt, attacker_location.x, attacker_location.y, attacker_location.z, event.victim.location.x, event.victim.location.y, event.victim.location.z, ) ) return damages
[docs] def player_positions(self, include_pregame=False): """Get the player positions for the match. Returns a dict of players positions up to death with keys being player names and values being a list of tuples. Each tuple has four elements being (t, x, y, z) coordinates where t is taken from the event timestamps. :param bool include_pregame: (default False) whether to include pre-game player positions. """ start = datetime.datetime.strptime( self.filter_by("log_match_start")[0].timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" ) locations = self.filter_by("log_player_position") if not include_pregame: locations = [ location for location in locations if location.elapsed_time > 0 ] player_positions = {} dead = [] for location in locations: timestamp = datetime.datetime.strptime( location.timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" ) dt = (timestamp - start).total_seconds() player = location.character.name if player not in player_positions: player_positions[player] = [] elif player in dead: continue # (t, x, y, z) player_positions[player].append( ( dt, location.character.location.x, location.character.location.y, location.character.location.z, ) ) if location.character.ranking > 1: dead.append(player) # cleanup for player, positions in player_positions.items(): count = 0 last = positions[-1] for pos in positions[::-1]: if pos == last: count += 1 else: break if count > 0: player_positions[player] = positions[:-count] return player_positions
[docs] def circle_positions(self): """Get the circle positions for the match. Returns a dict of circle positions with keys being circle colors and values being a list of tuples. Each tuple has five elements being (t, x, y, z, r) coordinates where t is taken from the "elapsedTime" field in the JSON response and r is the circle radius. The circle colors are "white", "blue", and "red" """ game_states = self.filter_by("log_game_state_periodic") circle_positions = { "white": [], "blue": [], "red": [], } start = datetime.datetime.strptime( game_states[0].timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" ) for game_state in game_states: timestamp = datetime.datetime.strptime( game_state.timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" ) dt = (timestamp - start).total_seconds() circle_positions["blue"].append( ( dt, game_state.game_state.safety_zone_position.x, game_state.game_state.safety_zone_position.y, game_state.game_state.safety_zone_position.z, game_state.game_state.safety_zone_radius, ) ) circle_positions["red"].append( ( dt, game_state.game_state.red_zone_position.x, game_state.game_state.red_zone_position.y, game_state.game_state.red_zone_position.z, game_state.game_state.red_zone_radius, ) ) circle_positions["white"].append( ( dt, game_state.game_state.poison_gas_warning_position.x, game_state.game_state.poison_gas_warning_position.y, game_state.game_state.poison_gas_warning_position.z, game_state.game_state.poison_gas_warning_radius, ) ) return circle_positions
[docs] def care_package_positions(self, land=True): """Get the crate positions for the match. Returns the crate positions for a match as a list of tuples. Each tuple has four elements being (t, x, y, z) coordinates where t is taken from the "elapsedTime" field in the JSON response. """ start = datetime.datetime.strptime( self.filter_by("log_match_start")[0].timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" ) if land: care_package_spawns = self.filter_by("log_care_package_land") else: care_package_spawns = self.filter_by("log_care_package_spawn") care_package_positions = [] for care_package in care_package_spawns: package_time = datetime.datetime.strptime( care_package.timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" ) time_elapsed = (package_time - start).total_seconds() care_package_positions.append( ( time_elapsed, care_package.item_package.location.x, care_package.item_package.location.y, care_package.item_package.location.z, ) ) return care_package_positions
[docs] def match_length(self): """The length of the match in seconds.""" for event in self.events[::-1]: elapsed_time = getattr(event, "elapsed_time", None) if elapsed_time is not None: return elapsed_time
[docs] def started(self): """A timestamp of when the match started.""" return self.events[0].timestamp
[docs] def killed(self): """A list of player names of all killed players this match.""" deaths = self.filter_by("log_player_kill") players_killed = [] for death in deaths: players_killed.append(death.victim.name) # Telemetry data sometimes doesn't log all deaths # This is more reliable players = self.player_names() winner = self.winner() killed = set(players_killed) | (set(players) - set(winner)) return list(killed)
[docs] def playback_animation(self, filename="playback.html", **kwargs): """Generate a playback animation from the telemetry data. Generate an HTML5 animation using matplotlib and ffmpeg. Requires installation via ``pip install chicken-dinner[visual]``. :param filename: a file to generate for the animation (default "playback.html") :param bool labels: whether to label players by name :param int disable_labels_after: if passed, turns off player labels after number of seconds elapsed in game :param list label_players: a list of strings of player names that should be labeled :param bool dead_players: whether to mark dead players :param list dead_player_labels: a list of strings of players that should be labeled when dead :param bool zoom: whether to zoom with the circles through the playback :param float zoom_edge_buffer: how much to buffer the blue circle edge when zooming :param bool use_hi_res: whether to use the hi-res image, best to be set to True when using zoom :param bool use_no_text: whether to use the image with no text for town/location names :param bool color_teams: whether to color code different teams :param list highlight_teams: a list of strings of player names whose teams should be highlighted :param list highlight_players: a list of strings of player names who should be highlighted :param str highlight_color: a color to use for highlights :param bool highlight_winner: whether to highlight the winner(s) :param bool label_highlights: whether to label the highlights :param bool care_packages: whether to show care packages :param bool damage: whether to show PvP damage :param int end_frames: the number of extra end frames after game has been completed :param int size: the size of the resulting animation frame :param int dpi: the dpi to use when processing the animation :param bool interpolate: use linear interpolation to get frames with second-interval granularity :param int interval: interval between gameplay frames in seconds :param int fps: the frames per second for the animation """ try: from chicken_dinner.visual.playback import create_playback_animation except ModuleNotFoundError as exc: print( "Use `pip install chicken_dinner[visual]` " "for visualization dependencies." ) raise exc return create_playback_animation(self, filename, **kwargs)