Over the first few posts of this series, I’ve been sharing about my experience building a Python implementation of the KillrVideo microservice tier. In the previous posts I shared why I started this project, about building GRPC service stubs, advertising the endpoints in etcd, and setting up integration tests to exercise the service APIs.

So it took four posts to describe all of the setup prior to implementing any business logic or data access code! All that work made me wonder if maybe the rest would be easy in comparison.

This post is all about writing the business logic and reading and writing data to DataStax Enterprise (DSE) using the Cassandra Query Language (CQL).

KillrVideo Python GRPC etcd Cassandra Logos

Business logic? What business logic?

As it turns out, there’s really not a lot of business logic to speak of in the KillrVideo service tier. A previous version of the system that allowed uploading of actual video files via an Uploads Service certainly had more interesting business logic.

However, in the current situation, most of the business logic is validation code. There are a couple of locations where a minor amount of work is done with the provided inputs, for example:

  • the AddYouTubeVideo operation in the Video Catalog Service infers the location of the video preview image from the URL of a YouTube video being added to the system.
  • converting a password to a md5 hashed value, and comparing hashed password strings to support user login, as in the User Management Service

Data access code — no sweat! (mostly)

As I mentioned in the previous post, the process I followed for the Python services was to implement service operations in the order implied by the killrvideo-integration-tests, starting with the User Management Service. I’ll walk you through how the User Management Service was implemented, as the same basic pattern was followed for the other services.

Installing the driver

The first step is to install the DataStax Enterprise Python Driver:

pip install dse-driver

If we were using the DataStax Python Driver for Cassandra (not DSE), we would install cassandra-driver. However, for the purposes of KillrVideo, we know that we’re intending to use DSE features for a couple of the services, specifically DSE Search for the Search Service and DSE Graph for the Suggested Videos Service. There’s a feature matrix on the KillrVideo documentation site that describes usage of DSE features by implementation language in detail.

Initializing Cluster and Session objects

In order to read and write data to Cassandra (DSE), we’ll need to initialize the driver establish connections to the cluster. We do this through the Cluster and Session classes provided by the driver, as you can read about on the Getting Started page of the documentation.

Since I’ve chosen to implement all of the KillrVideo services within a single application, I put the connection logic in the __init__.py file for the application:

from dse.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
from dse import ConsistencyLevel
import dse.cqlengine.connection
def serve():
   file = open('config.json', 'r')
   config = json.load(file)
   contact_points = config['CONTACT_POINTS']
   default_consistency_level = config['DEFAULT_CONSISTENCY_LEVEL']

# Wait for Cassandra (DSE) to be up, aka registered in etcd

# Initialize Cassandra Driver and Mapper
profile = ExecutionProfile(consistency_level =
   ConsistencyLevel.name_to_value[default_consistency_level])
cluster = Cluster(contact_points=contact_points,
      execution_profiles={EXEC_PROFILE_DEFAULT: profile})
session = cluster.connect("killrvideo")
dse.cqlengine.connection.set_session(session)

(Note: code not directly associated with initializing the driver has been omitted for brevity.)

As you may have noticed above, I’ve chosen to put configuration values associated with the driver in a JSON configuration file (based on a helpful article on various ways to approach configuration in Python). The JSON config file is very simple, looking something like this:

{ "CONTACT_POINTS": ["10.0.75.1"], "DEFAULT_CONSISTENCY_LEVEL": "LOCAL_QUORUM" }

Unpacking this connection code a bit, the first statement creates an ExecutionProfile where we can configure settings such as retry and load balancing policies. Here I’m using an ExecutionProfile to set a default consistency level for queries in the KillrVideo application:

profile = ExecutionProfile(consistency_level =
ConsistencyLevel[default_consistency_level])

The next step is to create a Cluster object using the ExecutionProfile:

cluster = Cluster(contact_points=contact_points, execution_profiles={EXEC_PROFILE_DEFAULT: profile})

Then we create a Session object bound to our killrvideo keyspace. This works because all of the tables used by the KillrVideo services are in the same keyspace:

session = cluster.connect("killrvideo")

Finally, we initialize the cqlengine, the mapper that is provided as part of the DataStax Python driver. As we will see, the cqlengine is not suitable for every data access pattern we will need in our application, but is sufficient in many cases and I’ve used it where possible for the simplicity it provides.

Creating mapper classes

In order to use the mapper, we’re going to need some classes that define the types that will be mapped to our Cassandra tables. In keeping with the principle of modeling Cassandra tables around our application queries, there are two tables used to store data, which have been designed to support queries we’ll discuss more below.

The entity classes that model our Cassandra tables are defined in the file user_management_service.py:

from dse.cqlengine import columns
from dse.cqlengine.models import Model
class UserModel(Model):
   """Model class that maps to the user table"""
   __table_name__ = 'users'
   user_id = columns.UUID(db_field='userid', primary_key=True)
   first_name = columns.Text(db_field='firstname') last_name =
   columns.Text(db_field='lastname') email = columns.Text()
   created_date = columns.Date()

class UserCredentialsModel(Model):
   """Model class that maps to the user_credentials table"""
   __table_name__ = 'user_credentials'
   email = columns.Text(primary_key=True)
   user_id = columns.UUID(db_field='userid')
   password = columns.Text()

Each entity class extends the dse.cqlengine.models.Model class. You’ll note the use of the __table_name__ attribute to specify the Cassandra table we’re using (we’ve already specified the keyspace the mapper is using above). We define an attribute of the class for each table column by referencing the appropriate type from the dse.cqlengine.columns module. Note the use of the primary_key designation to identify Cassandra primary key columns. These tables don’t involve any clustering keys but the clustering_key designation is available to describe any of those as well. The final item to note is the use of the db_field designation, used to allow us to have an attribute name that conforms to Python naming conventions but differs slightly from the CQL column name.

Inserting data using the mapper

Following the integration test order, the first operation to implement in the User Management Service is create_user. Note that we need to insert into two different tables to support our access patterns:

# insert into user_credentials table first so we can ensure uniqueness with LWT
try:
   UserCredentialsModel.if_not_exists().create(user_id=user_id, email=email, password=hashed_password)
except LWTException:
   # Exact string in this message is expected by integration test
   raise ValueError('Exception creating user because it already exists for ' + email)

# insert into users table
UserModel.create(user_id=user_id, first_name=first_name, last_name=last_name, email=email)

One way to approach this insert would be to use Cassandra’s batch feature. However, there is an additional concern that comes into play here, that of ensuring that we enforce uniqueness and do not allow multiple user accounts for the same email. The the logic we use here is to first attempt to create a record for the user in the user_credentials table using a lightweight transaction — that is what the if_not_exists() operation is doing. If the account creation is for a new email address, the insert will succeed and we can proceed to inserting into the users table. Otherwise, an error is raised.

The actual notation of each insert is quite simple, using the create() operation inherited from the Model class and specifying a value for each of the columns by name.

Retrieving data using the mapper

Next it’s time to implement the methods for retrieving user account data. The first query our application requires is used to support login in the verify_credentials operation:

# retrieve the credentials for provided email from user_credentials table user_credentials = UserCredentialsModel.get(email=email)

This is a very simple query — we’re retrieving a record from the user_credentials table based on the partition key for that table — the email address. We can then hash the provided password and compare it to the hashed password retrieved from the database to enforce a secure login.

The second query is used to obtain information about one or more user accounts. Because the API of the service allows lookup of multiple accounts by user ID, we’ve used the filter() operation of the UserModel class instead of a get():

# filter().all() returns a ModelQuerySet, we iterate over the query set to get the Model instances user_results = UserModel.filter(user_id__in=user_ids).all() users = list() for user in user_results: users.append(user) return users

Note the use of the __in notation appended to the column name to indicate our desire to use a CQL IN clause. The mapper will convert this into a query that looks like this:

SELECT * FROM killrvideo.users WHERE user_id IN , ...

We can then iterate over the results to build a list of user account information to return to the client.

Note that we could also have iterated over the provided user IDs and performed individual <code>get()</code> operations, but by using an <code>in</code> operation, we’re able to query across multiple partition keys at once.

Because the KillrVideo application is not configured to request a large number of user accounts at once, our approach should perform well, more quickly than iterating and performing individual queries. However, you should definitely read more in this classic blog post about the limitations of using in for queries involving multiple partitions, and a workaround involving asynchronous programming.

Next time: more complex data access cases

So, that was pretty easy, right? If you’re suspicious, you’re right, there are some cases where the “easy way” doesn’t work.

In the next post, I’ll show some more complex examples including cases where the cqlengine is not a good fit, as well as some interesting details I discovered about implementing queries using DSE Search in the Search Service.

Subscribe to Our Blog Now

Check your Internet Connection!!

Thank You for Signing Up!