feat(backend): Add celery beat schedule

master
John Montagu, the 4th Earl of Sandvich 2024-12-11 18:03:37 -08:00
parent 13fd7fdfc0
commit 1d6dce5088
Signed by: sandvich
GPG Key ID: 9A39BE37E602B22D
2 changed files with 27 additions and 28 deletions

View File

@ -4,6 +4,7 @@ from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import MetaData from sqlalchemy import MetaData
from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import DeclarativeBase
from celery import Celery, Task
class BaseModel(DeclarativeBase): class BaseModel(DeclarativeBase):
pass pass
@ -33,9 +34,11 @@ def connect_db_with_app(database_uri: str | None, include_migrate=True):
print("Creating tables if they do not exist") print("Creating tables if they do not exist")
db.create_all() db.create_all()
def connect_celery_with_app(): def connect_celery_with_app() -> Celery:
if "celery" in app.extensions:
return app.extensions["celery"]
def celery_init_app(app): def celery_init_app(app):
from celery import Celery, Task
class FlaskTask(Task): class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object: def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context(): with app.app_context():
@ -55,7 +58,7 @@ def connect_celery_with_app():
) )
) )
app.config.from_prefixed_env() app.config.from_prefixed_env()
celery_init_app(app) return celery_init_app(app)
def create_app() -> Flask: def create_app() -> Flask:
return Flask(__name__) return Flask(__name__)

View File

@ -14,6 +14,11 @@ from models.player_team import PlayerTeam
from models.team_integration import TeamLogsTfIntegration from models.team_integration import TeamLogsTfIntegration
from celery import shared_task from celery import shared_task
celery = app_db.connect_celery_with_app()
@celery.on_after_configure.connect
def setup_periodic_task(sender, **kwargs):
sender.add_periodic_task(30.0, etl_periodic.s(), name="Fetch logs every 5 minutes")
FETCH_URL = "https://logs.tf/api/v1/log/{}" FETCH_URL = "https://logs.tf/api/v1/log/{}"
SEARCH_URL = "https://logs.tf/api/v1/log?limit=25?offset={}" SEARCH_URL = "https://logs.tf/api/v1/log?limit=25?offset={}"
@ -26,10 +31,13 @@ def get_log_ids(last_log_id: int):
for summary in response.json()["logs"]: for summary in response.json()["logs"]:
id: int = summary["id"] id: int = summary["id"]
if id == last_log_id: if id == last_log_id:
break print("Reached last log ID", id)
return
# yield models.match.RawLogSummary.from_response(summary) # yield models.match.RawLogSummary.from_response(summary)
yield id yield id
current = id current = id
sleep(5)
break
def extract(log_id: int) -> models.match.RawLogDetails: def extract(log_id: int) -> models.match.RawLogDetails:
response = requests.get(FETCH_URL.format(log_id)) response = requests.get(FETCH_URL.format(log_id))
@ -68,20 +76,6 @@ def extract_steam_ids(players: dict[str, models.match.LogPlayer]):
@shared_task @shared_task
def update_playtime(steam_ids: list[int]): def update_playtime(steam_ids: list[int]):
# update players with playtime (recalculate through aggregation)
#subquery = (
# app_db.db.session.query(
# PlayerTeam.id,
# func.sum(Match.duration).label("total_playtime")
# )
# .join(PlayerMatch, PlayerMatch.player_id == PlayerTeam.player_id)
# .join(Match, PlayerMatch.match_id == Match.logs_tf_id)
# .join(TeamMatch, TeamMatch.match_id == Match.logs_tf_id)
# .where(PlayerTeam.player_id.in_(steam_ids))
# .where(PlayerTeam.team_id == TeamMatch.team_id)
# .group_by(PlayerTeam.id)
# .subquery()
#)
steam_ids_int = list(map(lambda x: int(x), steam_ids)) steam_ids_int = list(map(lambda x: int(x), steam_ids))
ptp = ( ptp = (
select( select(
@ -154,9 +148,6 @@ def transform(
.all() .all()
) )
if len(players) == 0:
return
if not existing_match: if not existing_match:
match = Match() match = Match()
match.logs_tf_id = log_id match.logs_tf_id = log_id
@ -169,7 +160,8 @@ def transform(
else: else:
match = existing_match match = existing_match
#app_db.db.session.add(match) if len(players) == 0:
return
for player in players: for player in players:
player_data = details["players"][steam64_to_steam3(player.steam_id)] player_data = details["players"][steam64_to_steam3(player.steam_id)]
@ -230,18 +222,22 @@ def load_specific_match(id: int, team_id: int | None):
) )
raw_match = extract(id) raw_match = extract(id)
print("Loading match: " + str(id))
app_db.db.session.bulk_save_objects(transform(id, raw_match, match, team_id)) app_db.db.session.bulk_save_objects(transform(id, raw_match, match, team_id))
app_db.db.session.commit() app_db.db.session.commit()
sleep(3) # avoid rate limiting if multiple tasks are queued sleep(3) # avoid rate limiting if multiple tasks are queued
def main(): @celery.task
def etl_periodic():
last: int = ( last: int = (
app_db.db.session.query( app_db.db.session.query(
func.max(models.match.Match.logs_tf_id) func.max(models.match.Match.logs_tf_id)
).scalar() ).scalar()
) or 3767233 ) or 3768715
for summary in get_log_ids(last): print("Last log ID: " + str(last))
print(summary)
sleep(3) for id in get_log_ids(last):
print("Found log: " + str(id))
load_specific_match.delay(id, None)