test: Fix failing tests

master
John Montagu, the 4th Earl of Sandvich 2025-05-06 10:50:09 -07:00
parent 25f8da7010
commit cbeeb764a0
Signed by: sandvich
GPG Key ID: 9A39BE37E602B22D
6 changed files with 119 additions and 81 deletions

View File

@ -17,22 +17,28 @@ convention = {
"pk": "pk_%(table_name)s"
}
def connect_db_with_app(database_uri: str | None, include_migrate=True):
database_uri = database_uri or environ.get("DATABASE_URI")
DATABASE_URI = None
def connect_db_with_app(database_uri: str | None, include_migrate=True, flask_app: Flask | None = None, db_instance: SQLAlchemy | None = None):
flask_app = flask_app or app
db_instance = db_instance or db
database_uri = database_uri or environ.get("DATABASE_URI") or DATABASE_URI
if not database_uri:
raise ValueError("Database URI is not provided")
print("Connecting to database: " + database_uri)
app.config["SQLALCHEMY_DATABASE_URI"] = database_uri
db.init_app(app)
flask_app.config["SQLALCHEMY_DATABASE_URI"] = database_uri
db_instance.init_app(flask_app)
if include_migrate:
migrate.init_app(app, db)
with app.app_context():
print("Running dialect: " + db.engine.dialect.name)
migrate.init_app(flask_app, db_instance)
with flask_app.app_context():
print("Running dialect: " + db_instance.engine.dialect.name)
import models as _
if environ.get("FLASK_ENV") == "production":
print("Creating tables if they do not exist")
db.create_all()
db_instance.create_all()
def connect_celery_with_app() -> Celery:
if "celery" in app.extensions:
@ -64,6 +70,11 @@ def create_app() -> Flask:
return Flask(__name__)
metadata = MetaData(naming_convention=convention)
def create_db() -> SQLAlchemy:
return SQLAlchemy(model_class=BaseModel, metadata=metadata)
app = create_app()
db = SQLAlchemy(model_class=BaseModel, metadata=metadata)
#db = SQLAlchemy(model_class=BaseModel, metadata=metadata)
db = create_db()
migrate = Migrate(app, db, render_as_batch=True)

View File

@ -1,3 +1,4 @@
from models.auth_session import AuthSession
from models.player import Player
from models.team import Team
from models.player_team import PlayerTeam
@ -13,6 +14,7 @@ class BaseTestCase(TestCase):
TESTING = True
def create_app(self):
app.config["TESTING"] = True
return app
def setUp(self):
@ -34,7 +36,6 @@ class BaseTestCase(TestCase):
db.session.add(team)
db.session.flush()
player_team = PlayerTeam(
player_id=player.steam_id,
@ -50,4 +51,11 @@ class BaseTestCase(TestCase):
db.session.add(player_team)
db.session.add(logs_tf_integration)
auth_session = AuthSession(
player_id=player.steam_id,
key="test_key",
)
db.session.add(auth_session)
db.session.commit()

View File

@ -76,7 +76,7 @@ def extract_steam_ids(players: dict[str, models.match.LogPlayer]):
@shared_task
def update_playtime(steam_ids: list[int]):
steam_ids_int = list(map(lambda x: int(x), steam_ids))
#steam_ids_int = list(map(lambda x: int(x), steam_ids))
ptp = (
select(
PlayerTeam.id.label("id"),
@ -87,7 +87,7 @@ def update_playtime(steam_ids: list[int]):
.join(Match, PlayerMatch.match_id == Match.logs_tf_id)
.join(TeamMatch, TeamMatch.match_id == Match.logs_tf_id)
.where(
PlayerTeam.player_id.in_(steam_ids_int),
PlayerTeam.player_id.in_(steam_ids),
PlayerTeam.team_id == TeamMatch.team_id
)
.group_by(PlayerTeam.id)
@ -111,6 +111,8 @@ def update_playtime(steam_ids: list[int]):
)
)
print(stmt)
app_db.db.session.execute(stmt)
app_db.db.session.commit()
@ -183,9 +185,13 @@ def transform(
row_tuple = tuple(row)
team_id = row_tuple[0]
player_count = row_tuple[1]
log_min_player_count = app_db.db.session.query(
TeamLogsTfIntegration.min_team_member_count
).where(TeamLogsTfIntegration.team_id == team_id).one_or_none() or 100
logs_integration = app_db.db.session.query(
TeamLogsTfIntegration
).where(TeamLogsTfIntegration.team_id == team_id).one_or_none()
log_min_player_count = 100
if logs_integration:
log_min_player_count = logs_integration.min_team_member_count
should_create_team_match = False
@ -207,7 +213,7 @@ def transform(
yield team_match
#app_db.db.session.flush()
update_playtime.delay(list(map(lambda x: str(x), steam_ids)))
update_playtime.delay(steam_ids)
@shared_task

View File

@ -18,8 +18,8 @@ class Match(app_db.BaseModel):
red_score: Mapped[int] = mapped_column(Integer)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP, server_default=func.now())
teams: Mapped["TeamMatch"] = relationship("TeamMatch", back_populates="match")
players: Mapped["PlayerMatch"] = relationship("PlayerMatch", back_populates="match")
teams: Mapped[list["TeamMatch"]] = relationship("TeamMatch", back_populates="match")
players: Mapped[list["PlayerMatch"]] = relationship("PlayerMatch", back_populates="match")
class MatchSchema(spec.BaseModel):
logs_tf_id: int

View File

@ -12,6 +12,7 @@ class Team(app_db.BaseModel):
id: Mapped[int] = mapped_column(Integer, autoincrement=True, primary_key=True)
team_name: Mapped[str] = mapped_column(String(63), unique=True)
#team_tag: Mapped[str] = mapped_column(String(63), nullable=True)
discord_webhook_url: Mapped[str] = mapped_column(String(255), nullable=True)
tz_timezone: Mapped[str] = mapped_column(String(31), default="Etc/UTC")
minute_offset: Mapped[int] = mapped_column(SmallInteger, default=0)

View File

@ -5,12 +5,15 @@ from pydantic.v1 import validator
from spectree import Response
from sqlalchemy.orm import joinedload
from app_db import db
from models.event import Event
from models.player import Player, PlayerSchema
from models.player_team import PlayerTeam
from models.player_team_availability import PlayerTeamAvailability
from models.player_team_role import PlayerTeamRole, RoleSchema
from models.team import Team, TeamSchema, TeamWithRoleSchema
from middleware import assert_team_authority, requires_authentication, requires_team_membership
from models.team_integration import TeamDiscordIntegration, TeamLogsTfIntegration
from models.team_invite import TeamInvite
from spec import spec, BaseModel
from team_invite import api_team_invite
from team_integration import api_team_integration
@ -29,6 +32,7 @@ def map_player_to_schema(player: Player):
class CreateTeamJson(BaseModel):
team_name: str
#team_tag: str | None = None
discord_webhook_url: str | None = None
minute_offset: int = 0
league_timezone: str
@ -65,6 +69,7 @@ class ViewTeamsResponse(BaseModel):
def create_team(json: CreateTeamJson, player: Player, **kwargs):
team = Team(
team_name=json.team_name,
#team_tag=json.team_tag,
tz_timezone=json.league_timezone,
minute_offset=json.minute_offset,
)
@ -107,34 +112,6 @@ def update_team(player_team: PlayerTeam, team_id: int, json: CreateTeamJson, **k
return TeamSchema.from_model(team).dict(by_alias=True), 200
@api_team.delete("/id/<int:team_id>/")
@spec.validate(
resp=Response(
HTTP_200=None,
HTTP_403=None,
HTTP_404=None,
),
operation_id="delete_team"
)
def delete_team(player: Player, team_id: int):
player_team = db.session.query(
PlayerTeam
).where(
PlayerTeam.team_id == team_id
).where(
PlayerTeam.player_id == player.steam_id
).one_or_none()
if not player_team:
abort(404)
if not player_team.is_team_leader:
abort(403)
db.session.delete(player_team.team)
db.session.commit()
return make_response(200)
@api_team.delete("/id/<int:team_id>/player/<target_player_id>/")
@spec.validate(
resp=Response(
@ -176,11 +153,44 @@ def remove_player_from_team(player: Player, team_id: int, target_player_id: str,
team = target_player_team.team
# cascade delete all roles and availability
for role in target_player_team.player_roles:
db.session.delete(role)
db.session.delete(target_player_team)
db.session.flush()
db.session.refresh(team)
if len(team.players) == 0:
# delete the team if the only member
# cascade delete integrations, invites, and events
db.session.query(
TeamLogsTfIntegration
).where(
TeamLogsTfIntegration.team_id == team.id
).delete()
db.session.query(
TeamDiscordIntegration
).where(
TeamDiscordIntegration.team_id == team.id
).delete()
db.session.query(
TeamInvite
).where(
TeamInvite.team_id == team.id
).delete()
db.session.query(
Event
).where(
Event.team_id == team.id
).delete()
db.session.delete(team)
else:
# if there doesn't exist another team leader, promote the first player
@ -212,49 +222,51 @@ class AddPlayerJson(BaseModel):
),
operation_id="create_or_update_player"
)
def add_player(player: Player, team_id: int, player_id: str, json: AddPlayerJson):
player_id: int = int(player_id)
player_team = db.session.query(
PlayerTeam
).where(
PlayerTeam.player_id == player.steam_id
).where(
PlayerTeam.team_id == team_id
).one_or_none()
@requires_authentication
def add_player(player: Player, team_id: int, player_id: str, json: AddPlayerJson, **kwargs):
raise NotImplementedError("This endpoint is not implemented yet")
#player_id: int = int(player_id)
#player_team = db.session.query(
# PlayerTeam
#).where(
# PlayerTeam.player_id == player.steam_id
#).where(
# PlayerTeam.team_id == team_id
#).one_or_none()
if not player_team:
abort(404)
#if not player_team:
# abort(404)
if not player_team.is_team_leader:
abort(403)
#if not player_team.is_team_leader:
# abort(403)
target_player_team = db.session.query(
PlayerTeam
).where(
PlayerTeam.player_id == player_id
).where(
PlayerTeam.team_id == team_id
).one_or_none()
#target_player_team = db.session.query(
# PlayerTeam
#).where(
# PlayerTeam.player_id == player_id
#).where(
# PlayerTeam.team_id == team_id
#).one_or_none()
if not target_player_team:
target_player = db.session.query(
Player
).where(
Player.steam_id == player_id
).one_or_none()
#if not target_player_team:
# target_player = db.session.query(
# Player
# ).where(
# Player.steam_id == player_id
# ).one_or_none()
if not target_player:
abort(404)
# if not target_player:
# abort(404)
target_player_team = PlayerTeam()
target_player_team.player_id = player_id
target_player_team.team_id = player_team.team_id
# target_player_team = PlayerTeam()
# target_player_team.player_id = player_id
# target_player_team.team_id = player_team.team_id
target_player_team.team_role = json.team_role
target_player_team.is_team_leader = json.is_team_leader
#target_player_team.team_role = json.team_role
#target_player_team.is_team_leader = json.is_team_leader
db.session.commit()
return make_response(200)
#db.session.commit()
#return make_response(200)
@api_team.get("/all/")
@spec.validate(