diff --git a/backend-flask/app_db.py b/backend-flask/app_db.py index 5bdf231..7d893e8 100644 --- a/backend-flask/app_db.py +++ b/backend-flask/app_db.py @@ -4,6 +4,7 @@ from flask_migrate import Migrate from flask_sqlalchemy import SQLAlchemy from sqlalchemy import MetaData from sqlalchemy.orm import DeclarativeBase +from celery import Celery, Task class BaseModel(DeclarativeBase): pass @@ -33,9 +34,11 @@ def connect_db_with_app(database_uri: str | None, include_migrate=True): print("Creating tables if they do not exist") db.create_all() -def connect_celery_with_app(): +def connect_celery_with_app() -> Celery: + if "celery" in app.extensions: + return app.extensions["celery"] + def celery_init_app(app): - from celery import Celery, Task class FlaskTask(Task): def __call__(self, *args: object, **kwargs: object) -> object: with app.app_context(): @@ -55,7 +58,7 @@ def connect_celery_with_app(): ) ) app.config.from_prefixed_env() - celery_init_app(app) + return celery_init_app(app) def create_app() -> Flask: return Flask(__name__) diff --git a/backend-flask/jobs/fetch_logstf.py b/backend-flask/jobs/fetch_logstf.py index 89b71fb..9f56dfe 100644 --- a/backend-flask/jobs/fetch_logstf.py +++ b/backend-flask/jobs/fetch_logstf.py @@ -14,6 +14,11 @@ from models.player_team import PlayerTeam from models.team_integration import TeamLogsTfIntegration from celery import shared_task +celery = app_db.connect_celery_with_app() + +@celery.on_after_configure.connect +def setup_periodic_task(sender, **kwargs): + sender.add_periodic_task(30.0, etl_periodic.s(), name="Fetch logs every 5 minutes") FETCH_URL = "https://logs.tf/api/v1/log/{}" SEARCH_URL = "https://logs.tf/api/v1/log?limit=25?offset={}" @@ -26,10 +31,13 @@ def get_log_ids(last_log_id: int): for summary in response.json()["logs"]: id: int = summary["id"] if id == last_log_id: - break + print("Reached last log ID", id) + return # yield models.match.RawLogSummary.from_response(summary) yield id current = id + sleep(5) + break def extract(log_id: int) -> models.match.RawLogDetails: response = requests.get(FETCH_URL.format(log_id)) @@ -68,20 +76,6 @@ def extract_steam_ids(players: dict[str, models.match.LogPlayer]): @shared_task def update_playtime(steam_ids: list[int]): - # update players with playtime (recalculate through aggregation) - #subquery = ( - # app_db.db.session.query( - # PlayerTeam.id, - # func.sum(Match.duration).label("total_playtime") - # ) - # .join(PlayerMatch, PlayerMatch.player_id == PlayerTeam.player_id) - # .join(Match, PlayerMatch.match_id == Match.logs_tf_id) - # .join(TeamMatch, TeamMatch.match_id == Match.logs_tf_id) - # .where(PlayerTeam.player_id.in_(steam_ids)) - # .where(PlayerTeam.team_id == TeamMatch.team_id) - # .group_by(PlayerTeam.id) - # .subquery() - #) steam_ids_int = list(map(lambda x: int(x), steam_ids)) ptp = ( select( @@ -154,9 +148,6 @@ def transform( .all() ) - if len(players) == 0: - return - if not existing_match: match = Match() match.logs_tf_id = log_id @@ -169,7 +160,8 @@ def transform( else: match = existing_match - #app_db.db.session.add(match) + if len(players) == 0: + return for player in players: player_data = details["players"][steam64_to_steam3(player.steam_id)] @@ -230,18 +222,22 @@ def load_specific_match(id: int, team_id: int | None): ) raw_match = extract(id) + print("Loading match: " + str(id)) app_db.db.session.bulk_save_objects(transform(id, raw_match, match, team_id)) app_db.db.session.commit() - sleep(3) # avoid rate limiting if multiple tasks are queued + sleep(3) # avoid rate limiting if multiple tasks are queued -def main(): +@celery.task +def etl_periodic(): last: int = ( app_db.db.session.query( func.max(models.match.Match.logs_tf_id) ).scalar() - ) or 3767233 + ) or 3768715 - for summary in get_log_ids(last): - print(summary) - sleep(3) + print("Last log ID: " + str(last)) + + for id in get_log_ids(last): + print("Found log: " + str(id)) + load_specific_match.delay(id, None)