269 lines
9.1 KiB
Python
269 lines
9.1 KiB
Python
from datetime import datetime
|
|
import requests
|
|
import threading
|
|
from sqlalchemy.orm import mapped_column, relationship, scoped_session
|
|
from sqlalchemy.orm.attributes import Mapped
|
|
from sqlalchemy.orm.properties import ForeignKey
|
|
from sqlalchemy.orm.session import Session
|
|
from sqlalchemy.schema import UniqueConstraint
|
|
from sqlalchemy.types import TIMESTAMP, BigInteger, Integer, String, Text
|
|
from sqlalchemy.sql import func
|
|
from sqlalchemy_utc import UtcDateTime
|
|
import app_db
|
|
import spec
|
|
import os
|
|
|
|
|
|
class Event(app_db.BaseModel):
|
|
__tablename__ = "events"
|
|
|
|
# surrogate key
|
|
id: Mapped[int] = mapped_column(Integer, autoincrement=True, primary_key=True)
|
|
|
|
# primary key
|
|
team_id: Mapped[int] = mapped_column(ForeignKey("teams.id"), nullable=False)
|
|
name: Mapped[str] = mapped_column(String(255), nullable=False)
|
|
start_time: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False)
|
|
|
|
description: Mapped[str | None] = mapped_column(Text, nullable=True)
|
|
created_at: Mapped[datetime] = mapped_column(TIMESTAMP, server_default=func.now())
|
|
discord_message_id: Mapped[int | None] = mapped_column(BigInteger, nullable=True, unique=True)
|
|
|
|
team: Mapped["Team"] = relationship("Team", back_populates="events")
|
|
players: Mapped[list["PlayerEvent"]] = relationship(
|
|
"PlayerEvent",
|
|
back_populates="event",
|
|
cascade="all, delete-orphan"
|
|
)
|
|
|
|
__table_args__ = (
|
|
UniqueConstraint("team_id", "name", "start_time"),
|
|
)
|
|
|
|
def get_maximum_matching(self):
|
|
players_teams_roles = app_db.db_session.query(
|
|
PlayerTeamRole
|
|
).join(
|
|
PlayerTeam
|
|
).join(
|
|
PlayerEvent,
|
|
PlayerTeam.player_id == PlayerEvent.player_id
|
|
).where(
|
|
PlayerTeam.team_id == self.team_id
|
|
).where(
|
|
PlayerTeam.player_id == PlayerEvent.player_id
|
|
).where(
|
|
PlayerEvent.event_id == self.id
|
|
).all()
|
|
|
|
role_map = {}
|
|
for roles in players_teams_roles:
|
|
if roles.player_team_id not in role_map:
|
|
role_map[roles.player_team_id] = []
|
|
role_map[roles.player_team_id].append(roles.role)
|
|
import sys
|
|
print(role_map, file=sys.stderr)
|
|
|
|
required_roles = [
|
|
PlayerTeamRole.Role.PocketScout,
|
|
PlayerTeamRole.Role.FlankScout,
|
|
PlayerTeamRole.Role.PocketSoldier,
|
|
PlayerTeamRole.Role.Roamer,
|
|
PlayerTeamRole.Role.Demoman,
|
|
PlayerTeamRole.Role.Medic,
|
|
]
|
|
graph = BipartiteGraph(role_map, required_roles)
|
|
return graph.hopcroft_karp()
|
|
|
|
def get_discord_content(self):
|
|
start_timestamp = int(self.start_time.timestamp())
|
|
players = list(self.players)
|
|
# players should be sorted by their role, leaving no-role players last
|
|
players.sort(key=lambda p: p.role.role.value if p.role else 1023)
|
|
players_info = []
|
|
matchings = self.get_maximum_matching()
|
|
ringers_needed = 6 - matchings
|
|
|
|
role_emojis = {
|
|
"blank": "1373226295651209226",
|
|
"Spy": "1373040448561741895",
|
|
"Sniper": "1373040439250255932",
|
|
"Medic": "1373040382492938385",
|
|
"Engineer": "1373040371415781467",
|
|
"Heavy": "1373040363060592720",
|
|
"Demoman": "1373040354470924400",
|
|
"Pyro": "1373040345197187145",
|
|
"Roamer": "1373040339144806420",
|
|
"PocketSoldier": "1373040333775962132",
|
|
"Soldier": "1373040326872141924",
|
|
"FlankScout": "1373040315187073034",
|
|
"Scout": "1373040306089623663",
|
|
"PocketScout": "1373040277924614254"
|
|
}
|
|
|
|
for player in players:
|
|
player_info = ""
|
|
|
|
if player.role:
|
|
player_info += "<:" + player.role.role.name + ":" + role_emojis.get(
|
|
player.role.role.name, "1373226295651209226"
|
|
) + ">"
|
|
else:
|
|
player_info += "<:blank:1373226295651209226>"
|
|
|
|
if player.has_confirmed:
|
|
player_info += " ✅"
|
|
else:
|
|
player_info += " ⏳"
|
|
|
|
if player.player.discord_id:
|
|
player_info += f" <@{player.player.discord_id}>"
|
|
else:
|
|
player_info += f" {player.player.username}"
|
|
|
|
players_info.append(player_info)
|
|
|
|
ringers_needed_msg = ""
|
|
if ringers_needed > 0:
|
|
if ringers_needed == 1:
|
|
ringers_needed_msg = " **(1 ringer needed)**"
|
|
else:
|
|
ringers_needed_msg = f" **({ringers_needed} ringers needed)**"
|
|
|
|
return "\n".join([
|
|
f"# {self.name}",
|
|
"",
|
|
self.description or "*No description.*",
|
|
"",
|
|
f"<t:{start_timestamp}:f>",
|
|
"\n".join(players_info),
|
|
f"Maximum roles filled: {matchings}" + ringers_needed_msg,
|
|
#"",
|
|
#"[Confirm attendance here]" +
|
|
# f"(https://{domain}/team/id/{self.team.id})",
|
|
])
|
|
|
|
def get_discord_message_components(self):
|
|
domain = os.environ.get("DOMAIN", "availabili.tf")
|
|
|
|
return [
|
|
{
|
|
"type": 10,
|
|
"content": self.get_discord_content(),
|
|
},
|
|
{
|
|
"type": 1,
|
|
"components": [
|
|
{
|
|
"type": 2,
|
|
"label": "✅",
|
|
"style": 3,
|
|
"custom_id": "click_attending"
|
|
},
|
|
{
|
|
"type": 2,
|
|
"label": "⌛",
|
|
"style": 2,
|
|
"custom_id": "click_pending"
|
|
},
|
|
{
|
|
"type": 2,
|
|
"label": "❌",
|
|
"style": 2,
|
|
"custom_id": "click_not_attending"
|
|
},
|
|
{
|
|
"type": 2,
|
|
"label": "Edit",
|
|
"style": 2,
|
|
"custom_id": "click_edit_event"
|
|
},
|
|
{
|
|
"type": 2,
|
|
"label": "View in browser",
|
|
"style": 5,
|
|
"url": f"https://{domain}/team/id/{self.team_id}"
|
|
}
|
|
]
|
|
}
|
|
]
|
|
|
|
def get_or_create_webhook(self):
|
|
integration = app_db.db_session.query(
|
|
TeamDiscordIntegration
|
|
).where(
|
|
TeamDiscordIntegration.team_id == self.team_id
|
|
).first()
|
|
|
|
if not integration:
|
|
return None, ""
|
|
|
|
webhook = {
|
|
"username": integration.webhook_bot_name,
|
|
"avatar_url": integration.webhook_bot_profile_picture,
|
|
"flags": 1 << 15,
|
|
}
|
|
|
|
return webhook, integration.webhook_url
|
|
|
|
def update_discord_message(self):
|
|
domain = os.environ.get("DOMAIN", "availabili.tf")
|
|
webhook, webhook_url = self.get_or_create_webhook()
|
|
if webhook:
|
|
params = "?with_components=true&wait=true"
|
|
webhook["components"] = self.get_discord_message_components()
|
|
if self.discord_message_id:
|
|
# fire and forget
|
|
#threading.Thread(target=webhook.edit).start()
|
|
del webhook["username"]
|
|
del webhook["avatar_url"]
|
|
webhook_url += f"/messages/{self.discord_message_id}"
|
|
requests.patch(webhook_url + params, json=webhook)
|
|
else:
|
|
#webhook.execute()
|
|
response = requests.post(webhook_url + params, json=webhook)
|
|
response = response.json()
|
|
|
|
if webhook_id := response["id"]:
|
|
self.discord_message_id = int(webhook_id)
|
|
app_db.db_session.commit()
|
|
else:
|
|
raise Exception("Failed to create webhook")
|
|
|
|
class EventSchema(spec.BaseModel):
|
|
id: int
|
|
team_id: int
|
|
name: str
|
|
description: str | None
|
|
start_time: datetime
|
|
created_at: datetime
|
|
|
|
@classmethod
|
|
def from_model(cls, model: Event) -> "EventSchema":
|
|
return cls(
|
|
id=model.id,
|
|
name=model.name,
|
|
description=model.description,
|
|
start_time=model.start_time,
|
|
team_id=model.team_id,
|
|
created_at=model.created_at,
|
|
)
|
|
|
|
#class EventPlayersSchema(spec.BaseModel):
|
|
# players: list["PlayerEventRolesSchema"]
|
|
#
|
|
# @classmethod
|
|
# def from_model(cls, model: Event) -> "EventPlayersSchema":
|
|
# return cls(
|
|
# players=[PlayerEventRolesSchema.from_model(player) for player in model.players],
|
|
# roles=[RoleSchema.from_model(player.role.role) for player in model.players if player.role],
|
|
# )
|
|
|
|
|
|
from models.team import Team
|
|
from models.player_event import PlayerEvent, PlayerEventRolesSchema
|
|
from models.team_integration import TeamDiscordIntegration
|
|
from models.player_team import PlayerTeam
|
|
from models.player_team_role import PlayerTeamRole
|
|
from utils.bipartite_graph import BipartiteGraph
|