A wrapper class to use Postgres with FastAPI and Celery

ยท

3 min read

The problem:

In Django, we can set up Celery, and Postgres and then associate them with each other by providing them in the settings.py file. This is provided out-of-the-box by Django.
So the problem arises in FastAPI as there is no out-of-the-box support to associate Celery, FastAPI, and Postgres together. So now inside celery workers, we are not able to access the db session and make any changes to the data.

Here is the link to the issue: Calling a common function that uses db session from inside fastapi as well as celery

The solution:

Naive but working solution:

We have to create a separate function to access the same database session that the FastAPI server uses so that we can make changes to the data.

  • core/db.py | database config this file
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from config import settings

# This function returns a Session (type of Session - Class)
def get_db_session() -> sessionmaker:
    uri = settings.SQLALCHEMY_DATABASE_URI

    # create db engine using the uri. pool_pre_ping is used to check the connection's health before each use. 
    engine = create_engine(uri, pool_pre_ping=True)

    # create a session with the above created engine using the sessionmaker.
    # autocommit: A presists a change to db
    # autoflush: A flush ensures that any pending changes in the session are synchronized with the database
    Session = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    return Session()
  • api/tasks.py | asynchronous tasks executed in this file
from core.db import get_db_session
@celery.task(queue='task-type-a')
def test_background_task(retry=8, delay=3):
    # this is an example to showcase a long running task.
    result = {}
    for i in range(retry):
        print(f"retried {i} times")
        result = some_api_call() 
        time.sleep(delay)

    # make an instance of the session class
    db_session = get_db_session()

    # Inside try, make the query and commit the changes.
    try:
        db_session.add(Transaction(**result))
        db_session.commit()
    # If any errors, rollback the entire change and raise exception.
    except Exception as e:
        db_session.rollback()
        raise Exception(e)
    # After the above, close the session.
    finally:
        db_session.close()

    return "done"
  • Drawbacks in this solution:

    1. Every time we make a query to db, we have to wrap everything in a try, except and finally block.

    2. The code below the except block is redundant.

Refactored solution:

Abstract all these redundant codes into a class and provide methods to access them.

  • Refactored core/db.py | database config this file
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from config import settings


class DBSessionManager:
    # on class initiation, get the db session.
    def __init__(self):
        self.session = self.get_db_session()

    def get_db_session(self):
        uri = settings.SQLALCHEMY_DATABASE_URI
        engine = create_engine(uri, pool_pre_ping=True)

        Session = sessionmaker(autocommit=False, autoflush=False, bind=engine)
        return Session()

    # This takes the session from construtor method and does the commit. 
    # A close parameter is introduced so that if we want to make more than one queries in same session, we can do so.
    def commit(self, close=False):
        try:
            self.session.commit()
        except Exception as e:
            self.session.rollback()
            raise Exception(e)
        finally:
            if close:
                self.session.close()
  • Refactored api/tasks.py | asynchronous tasks executed in this file
from core.db import DBSessionManager
@celery.task(queue='task-type-a')
def test_background_task(retry=8, delay=3):
    # this is an example to showcase a long running task.
    result = {}
    for i in range(retry):
        print(f"retried {i} times")
        result = some_api_call() 
        time.sleep(delay)

    # make an instance of the session class
    db_sm = DBSessionManager()

    # remember to use .session as it is initiased in constructor method
    db_sm.session.add(Transaction(**result))
    db_sm.commit(close=True)

    return "done"

So we have beautifully made redundant code into a class with methods, now we just have to initiate the class and access them.

The End

Linkedin | GitHub | website
๐Ÿš€โ˜๏ธ๐Ÿ–ค

ย