Deephaven and asyncio for fantasy football

Blog Deephaven 23 May 2024

Keith McConigly

KDB_New
KDB is not just a time-series database, for many it also stands for Manchester City midfielder Kevin De Bruyne, one of the best players in the Premier League who has been a consistently high scorer in fantasy land and a regular candidate for the captain's armband.

Recently while upskilling on Deephaven, I came up with the idea for a project that combined my passion for learning about new technologies with my enjoyment of fantasy football. Fantasy Premier League (FPL) is a game that keeps growing in popularity and now has over 10 million players worldwide.

Recalculated

On the official FPL website the bonus points and league table positions are not updated until the end of each match day, long after the close of play.

My idea was to utilise a FPL Python package, which is a wrapper for the official FPL REST API, and populate a Deephaven table with the live points scored as games unfold. This provides an added level of excitement as you can track your performance in near real-time and watch as teams leapfrog one another in your mini-league tables.

Deephaven

Deephaven is a real-time, time-series, column-oriented analytics engine with relational database features. Queries can seamlessly operate upon both historical and real-time data. Deephaven includes an intuitive user experience and visualization tools. It can ingest data from a variety of sources, apply computation and analysis algorithms to that data, and build rich queries, dashboards, and representations with the results.

I am running the free Community Core version of Deephaven within Docker Desktop.

Docker command

Below is the Docker command I used to launch Deephaven, including some environment variables I needed to authenticate.

docker run --rm --name deephaven -p 10000:10000 --env START_OPTS=-DAuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler --env FPL_EMAIL="dev@mail.com" --env FPL_PASSWORD="Deephaven!" --env FPL_COOKIE="datadome=mduvowtanq4UMeaKA6aMj~0Uma~aLiBywaBSuvQLyTS6y8jIwvXUrZTb6HUqhZXclIxRUAOCjBQWbJ_ORlgZ16Z0tVAsHC1eK~uiVL76QfflE43G~oSCy6pCHAlzna;pl_profile=eyJzIjogIld6SXNPREUyTmpjMU5ERmQ6MXJmZjSyOkZIQ0FKREtkMFowYzJjY1pwckFSMUNJN2tkODdNc251dTFWWGNLaGstUkkiLCAidSI6IHsiaWQiOiA4MTY2NzU0MSwgImZuIjogIktlaXRoIiwgImxuIjogIk1jQ29uaWdseSIsICJmYyI6IDF9fQ==" ghcr.io/deephaven/server:0.34.2

Authentication

Cookies

In order to authenticate the web service calls I created a new account and while logged in captured the values of certain cookies from the Developer Tools section of Google Chrome, under the Application tab I copied the ‘datadome’ and ‘pl_profile’ cookies. I then combined these to make the FPL_COOKIE environment variable which I set in the previous step.

Python script

I wrote a Python script that uses the asyncio, aiohttp and fpl packages to get the required data asynchronously. Under the hood, asyncio uses an event loop to manage and execute asynchronous tasks. The event loop is responsible for scheduling tasks, handling I/O operations, and switching between tasks efficiently. When an asynchronous function encounters an I/O operation, it yields control to the event loop, allowing other tasks to run.

One of the endpoints, /event-status, was not catered for in the wrapper so I called that endpoint directly using a utility function in order to get the current gameweek number and whether or not bonus points have been added yet. Credit to lead developer Amos Bastian for this workaround which I found on the Discord channel.

import os

os.system("pip install aiohttp")
os.system("pip install fpl")
import aiohttp
from fpl import FPL
import asyncio
from threading import Thread
from dataclasses import dataclass
from typing import Callable
from concurrent.futures import CancelledError

from deephaven.table import Table
from deephaven.column import string_col, int_col
from deephaven import dtypes
from deephaven.table_factory import new_table
from deephaven.stream.table_publisher import table_publisher, TablePublisher


SLEEP_TIME = 1
SQUAD_SIZE = 15
GW_NUM = 0
BONUS_ADDED = False

@dataclass
class MiniLeagueEntry:
    id: int
    event_total: int
    player_name: str
    rank: int
    last_rank: int
    rank_sort: int
    total: int
    entry: int
    entry_name: str
    picks: dict
    transfers_cost: int
    live_game_week_points: int = 0
    live_total_points: int = 0


async def fetch(session, url, retries=10, cooldown=1):
    retries_count = 0
    while True:
        try:
            async with session.get(url) as response:
                result = await response.json()
                return result
        except aiohttp.client_exceptions.ContentTypeError:
            retries_count += 1

            if retries_count > retries:
                raise Exception(f"Could not fetch {url} after {retries} retries")

            if cooldown:
                await asyncio.sleep(cooldown)


async def get_event_status(session):
    stat = await fetch(
        session, "https://fantasy.premierleague.com/api/event-status/")
    global GW_NUM, BONUS_ADDED
    GW_NUM = stat["status"][0]["event"]
    BONUS_ADDED = stat["status"][0]["bonus_added"]


async def get_static_data(fpl_session, ml_id):
    # Get the current team standings from a mini-league at time of the last update
    teams = []
    classic_league = await fpl_session.get_classic_league(ml_id)
    for result in classic_league.standings["results"]:
        user = await fpl_session.get_user(result["entry"])
        picks = await user.get_picks(GW_NUM)
        result.update({"picks": picks})
        history = await user.get_user_history()
        result["transfers_cost"] = history[-1]["event_transfers_cost"]
        teams.append(MiniLeagueEntry(**result))
    return teams


async def calculate_live_score(ml_id, message_handler: Callable[[MiniLeagueEntry], None]):
    async with aiohttp.ClientSession() as session:
        await get_event_status(session)
        fpl_session = FPL(session)
        await fpl_session.login()
        list_of_teams = await get_static_data(fpl_session, ml_id)
        while True:
            game_week = await fpl_session.get_gameweek(GW_NUM, include_live=True, return_json=True)
            squad_range = range(SQUAD_SIZE)
            for team in list_of_teams:
                live_score = 0
                for position in squad_range:
                    player_stats = game_week["elements"][team.picks[GW_NUM][position]["element"]]["stats"]
                    live_score += player_stats["total_points"] * team.picks[GW_NUM][position]["multiplier"]
                team.live_game_week_points = live_score
                live_score -= (0 if BONUS_ADDED else team.transfers_cost)
                team.live_total_points = team.total + (live_score - team.event_total)
                message_handler(team)
            await asyncio.sleep(SLEEP_TIME)


def to_table(matches: list[MiniLeagueEntry]):
    return new_table(
        [
            int_col("Rank", [x.rank for x in matches]),
            int_col("LastRank", [x.last_rank for x in matches]),
            int_col("Entry", [x.entry for x in matches]),
            string_col("Team", [x.entry_name for x in matches]),
            string_col("Manager", [x.player_name.split()[0] for x in matches]),
            int_col("GW", [x.live_game_week_points for x in matches]),
            int_col("Hit", [x.transfers_cost for x in matches]),
            int_col("GWMinusHit", [(x.live_game_week_points - x.transfers_cost) for x in matches]),
            int_col("Total", [x.live_total_points for x in matches]),
        ]
    )


def create_matches(ml_id, event_loop) -> tuple[Table, Callable[[], None]]:
    on_shutdown_callbacks = []

    def on_shutdown():
        nonlocal on_shutdown_callbacks
        for c in on_shutdown_callbacks:
            c()

    my_matches: list[MiniLeagueEntry] = []

    def on_flush(tp: TablePublisher):
        nonlocal my_matches
        my_matches_copy = my_matches.copy()
        my_matches.clear()
        tp.add(to_table(my_matches_copy))

    table, publisher = table_publisher(
        f"Matches for {GW_NUM}",
        {
            "Rank": dtypes.int32,
            "LastRank": dtypes.int32,
            "Entry": dtypes.int32,
            "Team": dtypes.string,
            "Manager": dtypes.string,
            "GW": dtypes.int32,
            "Hit": dtypes.int32,
            "GWMinusHit": dtypes.int32,
            "Total": dtypes.int32,
        },
        on_flush_callback=on_flush,
        on_shutdown_callback=on_shutdown,
    )

    future = asyncio.run_coroutine_threadsafe(
        calculate_live_score(ml_id, my_matches.append), event_loop
    )

    def on_future_done(f):
        nonlocal publisher
        try:
            e = f.exception(timeout=0) or RuntimeError("completed")
        except CancelledError as c:
            e = RuntimeError("cancelled")
        publisher.publish_failure(e)

    future.add_done_callback(on_future_done)

    on_shutdown_callbacks.append(future.cancel)

    return table, future.cancel


my_event_loop = asyncio.new_event_loop()
Thread(target=my_event_loop.run_forever).start()


def subscribe_stats(ml_id):
    blink_table, on_done = create_matches(ml_id, my_event_loop)

    blink_table = (blink_table
                   .sort_descending(order_by="Total")
                   .update("Rank = i+1"))

    return blink_table, on_done


fpl_ml, fpl_cancel = subscribe_stats(291289)
fpl_ml = fpl_ml.format_columns(["Rank = Rank < LastRank ? DARK_BLUEGREEN : Rank > LastRank ? DARK_RED : NO_FORMATTING"]).layout_hints(hide=["LastRank","Entry"])

# call this to explicitly cancel
# fpl_cancel()

Blink

The table type I used was a Blink table which is aptly named as it blinks while refreshing. A Blink table only retains the rows added in the current update cycle, one advantage of this table type is that the memory usage does not keep building up. I set the application to sleep for 1 second between calls to the API in order to match the default Barrage update propagation period. I also used a Table Publisher to programmatically add data to the table.

Find out in more detail the basics of Deephaven and Barrage – https://dataintellect.com/blog/deephaven-as-a-tool-for-real-time-analytics-the-basics/

LiveTableFinal

I implemented colour formatting functionality on the Rank column in order to indicate a position change, green for upwards and red for downwards. I also set certain columns such as the LastRank column to auto hide as I didn’t need to display this following the formatting change.

Plot

Another feature I tried was the plotting functionality in order to visualise the league data in a XY series plot. You can see in the screenshot below how the top 10 teams fared over the course of the season. The lines would update at the same rate as the blink table because the source table was joined to it.

GraphPlus

The 2023/2024 season came to an end last weekend, congratulations to Liam on winning the Data Intellect Classic league this year with an impressive overall ranking of 13,455!

I found this to be a fun and interesting project while building upon my Deephaven and Python skills, there is much more that you can use Deephaven for and so far I have only scratched the surface.

Share this:

LET'S CHAT ABOUT YOUR PROJECT.

GET IN TOUCH