Functional tests with Python and Cassandra

We have all faced the problems of functionally testing our applications to a level where we can confidently say its 100% tested. WAIT, WHAT!! In most cases 100% is not feasible, no matter what your manager says. But we can strive to get to an acceptable level. In this guide we will show you how to implement functional tests with Python and Cassandra step by step.

To our goal, add in the middle of this scenario a microservice architecture written in python using the FastApi framework. On top of that, this microservice is responsible for data layer communication to and from a Cassandra database, as well as having to be accessible to all other Microservices in the environment.

How do we test?

So how do we functionally test something like this? Lets start with a few goals(I would say requirements but we dislike that word)

  • We want to test all api calls for at minimum happy and error paths
  • We want to test all cql including table schemas, keyspace schemas and prepared statements
  • We need to test against cassandra but we don’t want to write to the main production keyspace
  • It would be great if our tests would be able to be run in a pipeline on checkin and merges
  • It would be great to have performance timings from tests to highlight any slow reads/writes or where code refactor is necessary

Ok so we faced this task recently and after some research came up with what we think is a good solution. Lets dive into the details and we will describe step by step how to achieve this.

Setup our test Application

Ok so we are going to test a FastApi application responsible for interactions with a customers table in cassandra. If you don’t know how to setup a FastApi application with cassandra please check out our post on that here TODO.

Ok so lets look at the code in our application first.

  1. keyspaces.cql – Here we have the cql responsible for creating a keyspace in cassandra.
CREATE KEYSPACE IF NOT EXISTS main_keyspace
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}
AND durable_writes = true;

2. customers.cql – Here we have the table definition for our customers table

CREATE TABLE IF NOT EXISTS customers
(
    customer_email_id varchar,
    customer_id uuid,
    customer_name varchar,
    config   map<text, text>,
	PRIMARY KEY (customer_email_id)
)
WITH compaction = { 'class' : 'LeveledCompactionStrategy' };

3. cass_db – This is our cassandra connection file which handles the connection to the db as well as the definition and preparation for the prepared statements

import os
import logging
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory
from cassandra.cluster import Cluster
from app.conf import CASSANDRA_PROTOCOL_VERSION
from cassandra.policies import DCAwareRoundRobinPolicy


from time import sleep


class CassandraDB:
    def __init__(self):

        self.host = os.environ.get("CASSANDRA_HOST", "1.2.3.4")
        self.port = os.environ.get("CASSANDRA_PORT", "9042")
        self.user = os.environ.get("CASSANDRA_USER", "user")
        self.pwd = os.environ.get("CASSANDRA_PWD", "password")
        self.keyspace = os.environ.get("CASSANDRA_KEYSPACE", "main_keyspace")
        self.cluster = None
        self.session = self.createConnection()
        self.prepared_statements = self.getPreparedStatements()

    def createConnection(self):
        self.session = None
        # Add sleep time to avoid infinite loop
        sleep_time = 1
        while not self.session:
            try:
                self.set_session_and_cluster()
            except Exception as e:
                logging.error(
                    "Cassandra connection error. Attempt reconnection"
                )
                logging.error(e)
                sleep(sleep_time)
                sleep_time += 1
                if sleep_time == 10:
                    break
        return self.session

    def set_session_and_cluster(self):
        logging.info("CREATE CONNECTION")
        auth_provider = PlainTextAuthProvider(
            username=self.user, password=self.pwd
        )
        self.cluster = Cluster(
            [self.host],
            port=self.port,
            auth_provider=auth_provider,
            protocol_version=CASSANDRA_PROTOCOL_VERSION,
            load_balancing_policy=DCAwareRoundRobinPolicy(),
        )
        self.session = self.cluster.connect()
        self.session.row_factory = dict_factory
        self.session.set_keyspace(self.keyspace)

    def getSession(self):
        logging.info("GET SESSION")
        return self.session

    def getCluster(self):
        return self.cluster

    def getPreparedStatements(self):
        logging.info("PREPARING STATEMENTS")
        prepared_statements = {}
        prepared_statements.update(self.customersStatements())
        return prepared_statements

    def customersStatements(self):
        return {
            "CUSTOMERS_GET_SINGLE": self.session.prepare(
                "select * from customers where customer_email_id=?"
            ),
            "CUSTOMERS_INSERT": self.session.prepare(
                "insert into customers ("
                "customer_email_id, customer_id, config, customer_name)"
                "values (?,?,?,?) if not exists"
                
            ),
            "CUSTOMERS_UPDATE": self.session.prepare(
                "update customers set"
                " config=?, customer_name=?"
                " where customer_email_id=? if exists"
                
            ),
            "CUSTOMERS_DELETE": self.session.prepare(
                "delete from customers where customer_email_id=? if exists"
            )
        }

4. customers.py – This is where our api endpoints are defined. Only showing one below as this tutorial is more focused on the testing og the apis rather than the creation of the application. Assume we have all endpoints for post/get/put/delete created at /api/v1/customers

tag = "customers"

routerV1 = APIRouter(
    prefix="/api/v1/" + prefix,
    tags=[tag],
    dependencies=[],
    responses={404: {"description": "Not found"}},
)

#
# GET /api/v1/customers/customer_email_id
#
@routerV1.get(
    "/{customer_email_id}",
    tags=[tag],
    description="Returns a single customer record",
    summary="Retrieve single customer record",
    response_model=CustomerOut,
    status_code=200,
)
@get_single_exception_log_handler
def get_customer_record(
    request: Request,
    response: Response,
    customer_email_id: str
):
    query = preparedStatement["CUSTOMERS_GET_SINGLE"].bind(
        [customer_email_id]
    )
    result = session.execute(query)
    if not result:
        response.status_code = status.HTTP_404_NOT_FOUND
        return response
    return result

5. main.py – Responsible for configuration and running the FastApi application and registering the customers.py router

app = FastAPI(
    title="Blog_app",
    description="description",
    docs_url="/docs/test",
    openapi_url="/docs/test/openapi.json",
)


@app.on_event("startup")
async def startup():
    logging.info("STARTUP")
    # Set session and prepared statements to be shared with request
    app.state.db = cassandra_instance.getSession()
    app.state.ps = cassandra_instance.getPreparedStatements()
    # registering the routing
    logging.info("STARTING ROUTE SETUP")

    logging.debug("Registering customers")
    app.include_router(customers.routerV1)


    logging.info("ROUTE SETUP COMPLETE")
    logging.info("STARTUP COMPLETE")


if __name__ == "__main__":
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        reload=True,
    )

Set up functional tests with pytest

  1. Create a /tests folder in your project at hte /app level and then create 2 files in this tests folder. (conftest.py, test_customers.py)
    Folder structure should look like
    /app
    /tests
    -/conftest.py
    -/test_customers.py
  2. Below is the conftest.py file. Please read the code comments for clear explanation of what is happening
# Here we import pytest and fastapi test client
# Also we get our db connection to cassandra
import pytest
from fastapi.testclient import TestClient
from app.main import app, cassandra_instance
import os

# This file holds all shared fixtures and functions for tests


# Here we define a function to return our cassandra instance
# NOTE the decorator @pytest.fixture(scope="session")
# Scope=session means the fixture is destroyed at the end of the test session.
# Global Variables
@pytest.fixture(scope="session")
def cass_instance():
    return cassandra_instance


@pytest.fixture(scope="session")
def app_():
    return app

# This is simply returning a bearer token for auth in our request
@pytest.fixture(scope="session")
def bearer():
    return "dd0d2190-ba61-11eb-889f-f3ef5e535655"


# Create test app
# This must be done to allow app.startup to run for router reg
@pytest.fixture(scope="session")
def client():
    # getting client app for test
    with TestClient(app) as c:
        # Similar to return c// More on yield below
        yield c


@pytest.fixture(scope="session")
def session(client):
    # get session form test client
    return client.app.state.db


@pytest.fixture(scope="package", autouse=True)
def keyspace(session):
    # Setup new keyspace for testing on same cluster and set keyspace
    session.execute(
        """CREATE KEYSPACE IF NOT EXISTS test_keyspace
        WITH replication = {
            'class': 'SimpleStrategy', 'replication_factor': '1'}
        AND durable_writes = true;"""
    )
    session.set_keyspace("test_keyspace")
    # Set up sessions table for tests and insert session for bearer token
    if session.keyspace == "test_keyspace":
        cql_sessions = read_cql_file("./app/<path>sessions.cql")
        cql_sessions = cql_sessions.replace(
            " IF NOT EXISTS ", " IF NOT EXISTS test_keyspace."
        )
        session.execute(cql_sessions)
        # Setup customers table for testing
        cql_customers = read_cql_file("./app/<path>customers.cql")
        cql_customers = cql_customers.replace(
            " IF NOT EXISTS ", " IF NOT EXISTS test_keyspace."
        )
        session.execute(cql_customers)
    # Yield acts as a stop so everything before the yield runs at the start of the # tests and then when all tests have run everything after the yield runs. This # allows us to have a setup and tear down   
    yield
    # Cleanup, drop keyspace when all tests have run and set keyspace back
    session.execute("""DROP TABLE IF EXISTS test_keyspace.sessions;""")
    session.execute("""DROP TABLE IF EXISTS test_keyspace.customers;""")
    session.execute("""DROP KEYSPACE test_keyspace;""")
    session.set_keyspace("main_keyspace")
    # reload prepared statements after tests have finished
    app.state.ps = cassandra_instance.getPreparedStatements()


# This is a function to read the cql files in the project
def read_cql_file(path):
    if os.path.isfile(path):
        with open(path, "r") as f:
            return f.read()
    else:
        print("Invalid file.")

A brief overview of what is happening in the conftest is basically scope is set for functions and a set up and tear down for tests is defined.

So when tests run we

  • Setup the test client app
  • Read the cql schema from the project
  • Connect to the cassandra db
  • Create a new test_keyspace
  • Switch to that keyspace for testing
  • Create the tables and info needed for testing in the test keyspace
  • Then tests run
  • After tests run, after the yeild, drop the test tables on the test keyspace
  • Drop the test keyspace
  • Switch back to the main keyspace and reload the prepared statements

3. Now lets have a look at the test_customers.py

Here we setup the table for the resource customers. This allows us to test our cql table definitions. As you can see from the test case we can test the api functionality but we also test the prepared statement functionality on the test cleanup, deleting the customer test object when the test case has run.

import pytest
from .conftest import read_cql_file
from uuid import uuid4

URL = "/api/v1/customers"


# Here we read the cql to create the table for customers and load the customer #prepared statements for tests
@pytest.fixture(autouse=True)
def create_table(session, client, app_, cass_instance):
    # Setup table for testing from project cql schema
    if session.keyspace == "test_keyspace":
        cql_customers = read_cql_file("./app/<path>customers.cql")
        cql_customers = cql_customers.replace(
            " IF NOT EXISTS ", " IF NOT EXISTS test_keyspace."
        )
        session.execute(cql_customers)
        # load prepared statements for customers
        app_.state.ps = {}
        app_.state.ps.update(cass_instance.sessionsStatements())
        cass_instance.prepared_statements = cass_instance.sessionsStatements()
        app_.state.ps.update(cass_instance.customersStatements())

    yield


# Customer POST
def test_customer_post_and_delete_statement_success_201(
    client, session, bearer, app_
):
    # Arrange test data
    customer_email_id = "test@s.com"
    customer_name = "Test"
    # Act
    response = client.post(
        f"{URL}",
        headers={"Authorization": f"Bearer {bearer}"},
        json={
            "customer_email_id": f"{customer_email_id}",
            "customer_name": f"{customer_name}",
            "config": {"test": "data"},
        },
    )
    expected = {
        "customer_email_id": f"{customer_email_id}",
        "customer_name": f"{customer_name}",
        "config": {"test": "data"},
    }
    assert response.json() == expected
    assert response.status_code == 201

    query = app_.state.ps["CUSTOMERS_DELETE"].bind([f"{customer_email_id}"])
    result = session.execute(query)
    assert result.was_applied is True



Conclusion

We showed you step by step how to functionally test your FastApi service with cassandra using pytest. We went into detail on connecting to cassandra, setting up a test_keyspace creating and testing our cql schema and prepared statements as well as testing out api against the database. When running the tests you will also get timings from your test runner which will highlight if there are and bottlenecks or refactor work to be done. We also have shown you how to clean up after the tests and leave everything as it was.

We hope you enjoyed this and it helps you out.

KeepForYourself or share with others, you have the power.

The Green Guy

Experienced Software engineer working on a wide range of technologies. Always eager to acquire new skills and share learnings. Also fully qualified carpenter and I really enjoy tinkering with projects to merge software solutions to real life scenarios to enhance and automate our ever changing world :D

You may also like...