Back to Blog

Cassandra User Defined Functions using the Python Driver

date: August 4, 2015

Introduction

Apache Cassandra 2.2 introduced two new useful features: User Defined Functions (UDFs) and User Defined Aggregates (UDAs). These new features allow certain types of computation to occur server-side, directly on the Cassandra cluster. This can reduce network traffic to the client and reduce client-side resource utilization. The combination of these features can improve performance in certain scenarios. As a part of a series of two blog posts, this first post will briefly demonstrate some of these scenarios where UDFs can be used, how these would have been implemented pre-Cassandra 2.2, and finally how these can be simplified using UDFs via the DataStax Python Driver. My second post will go over using UDAs with the Python Driver.

All examples in this post will be using the following simple keyspace, shown here created via the Python driver:

1

2

3

4

5

cluster = Cluster(protocol_version=4)

session = cluster.connect()

 

session.execute("CREATE KEYSPACE simplex WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}")

session.execute("USE simplex")

User Defined Functions (UDFs)

UDFs are functions that are run directly on Cassandra as part of query execution. The scripting portion of the UDF can be performed by any language that supports the Java Scripting API, such as Java, Javascript, Python, Ruby, and many other languages (JARs need to be dropped into the classpath to support Python/Ruby). Similar to User Defined Types (UDTs), UDFs are associated with a specific keyspace, and if no keyspace is explicitly specified, the current keyspace is used. As UDFs are a part of the Cassandra schema, they are automatically propagated to all nodes in the cluster. To use UDFs, they must be explicitly enabled in cassandra.yaml:

1

enable_user_defined_functions: true

Here's the syntax for creating a UDF:

1

2

3

4

5

6

CREATE function_name(arg type)

    RETURNS NULL ON NULL INPUT

    RETURNS type

    LANGUAGE java

    AS '

    return arg';

UDFs are identified by their signature: the combination of its keyspace, function name and arguments. As such, it's possible to overload UDFs by having the same function name as long as the argument size or argument types are different. The arguments can be literals or terms, including collection types such as lists, maps and UDTs. Cassandra uses a bundled version of the DataStax Java Driver to perform collection and UDT conversions internally. As UDFs are simple scripts, it is possible for them to fail during runtime due to conditions such as illegal arguments or null pointer exceptions, thus we must take care to handle these cases. If a UDF does fail this way, the entire query is aborted.

All examples in this section will be using the following schema:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

session.execute("""

    CREATE TYPE Address (

        street text,

        city text,

        state text,

        zip int)

""")

 

session.execute("""

    CREATE TABLE inventory (

        item_id uuid PRIMARY KEY,

        name text,

        dimensions Tuple<double,double,double,text>,

        available_states Set<text>,

        item_location frozen<Address>)

""")

Inserted via the Python Driver:

1

2

3

4

5

6

7

8

9

10

11

12

13

from collections import namedtuple

from uuid import uuid4

 

insert = session.prepare("""INSERT INTO inventory

    (item_id, name, dimensions, available_states, item_location)

    VALUES

    (?, ?, ?, ?, ?)

""")

Address = namedtuple('address', ('street', 'city', 'state', 'zip'))

 

session.execute(insert, [uuid4(), 'camping_tent', (8.5, 5.0, 9.5, 'ft'), set(['WA', 'OR', 'CA']), Address('487 Franciscan Ct', 'Santa Clara', 'CA', 95050)])

session.execute(insert, [uuid4(), 'cooking_pot', (8.4, 4.7, 8.4, 'in'), set(['WA', 'OR', 'CA', 'AZ', 'NV', 'UT', 'TX']), Address('623 Shaver St', 'Portland', 'OR', 97212)])

session.execute(insert, [uuid4(), 'curved_knife', (2.96, 0.450, 0.100, 'in'), set(['WA', 'OR', 'CA', 'AZ', 'NV', 'UT', 'TX', 'MN', 'KY', 'NY', 'NJ', 'FL', 'GA']), Address('728 Denny Way', 'Seattle', 'WA', 98122)])

And the resulting data in the inventory table:

1

2

3

4

5

6

7

item_id                              | available_states                                                               | dimensions              | item_location                                                               | name

--------------------------------------+--------------------------------------------------------------------------------+-------------------------+-----------------------------------------------------------------------------+--------------

 3c0958bb-a9ff-49b9-b076-d9ee878bb0a8 |                                                             {'CA', 'OR', 'WA'} |     (8.5, 5, 9.5, 'ft') | {street: '487 Franciscan Ct', city: 'Santa Clara', state: 'CA', zip: 95050} | camping_tent

 fed6171e-329a-4b23-a53a-cbed87d841c1 |                                     {'AZ', 'CA', 'NV', 'OR', 'TX', 'UT', 'WA'} |   (8.4, 4.7, 8.4, 'in') |        {street: '623 Shaver St', city: 'Portland', state: 'OR', zip: 97212} |  cooking_pot

 196d5b4b-7e7a-45c5-b993-5f397a9ebe8b | {'AZ', 'CA', 'FL', 'GA', 'KY', 'MN', 'NJ', 'NV', 'NY', 'OR', 'TX', 'UT', 'WA'} | (2.96, 0.45, 0.1, 'in') |         {street: '728 Denny Way', city: 'Seattle', state: 'WA', zip: 98122} | curved_knife

 

(3 rows)

Example #1: Finding the length of a column

Let's say we're interested in all items that can be sold in at least 10 states. In pre-Cassandra 2.2, we would have to retrieve all the rows from the database, and the entire available_states column to do some computation on the client side and find the number of states per item:

1

2

3

4

5

6

7

8

9

results = session.execute("SELECT item_id, available_states FROM inventory")

sale_items = []

for item in results:

    if len(item.available_states) > 10:

        sale_items.append(item.item_id)

print sale_items

 

# OUTPUT:

[UUID('615db411-ab52-48df-a696-ac00c5ca9c79')]

This is costly not only because we have to do computations on the client-side, but we're also reading in the entire available_states column when we simply wanted its size. This can be simplified using a UDF:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

session.execute("""

    CREATE FUNCTION len_states(states Set<text>)

        RETURNS NULL ON NULL INPUT

        RETURNS int

        LANGUAGE java

        AS '

        return states.size();'

""")

 

results = session.execute("SELECT item_id, len_states(available_states) FROM inventory")

sale_items = []

for item in results:

    if item.simplex_len_states_available_states > 10:

        sale_items.append(item.item_id)

print sale_items

 

# OUTPUT:

[UUID('615db411-ab52-48df-a696-ac00c5ca9c79')]

Here, we're creating a UDF called len_states, that takes in a list of values and returns its length. This greatly reduces the amount of network traffic on the wire, as now we're simply returning the size (which is what we care about), rather than the entire list of states. On the client-side, now we simply iterate through the list of items to check its size. In my next post, we will see how we can use UDAs to simplify even this computation and directly retrieve the list of items that match our conditions. Notice here that the resulting column name of the SELECT query after calling the UDF is simplex_len_states_available_states, which is in the form <keyspace>_<udf_name>_<column_name_queried_by_udf>.

Example #2: Retrieving an element from inside a UDT

Similarly, let's say we're only interested in the zipcode of the item_location (for calculating postal charges, for example) from inside of the Address UDT. In pre-Cassandra 2.2, we would so something like:

1

2

3

4

5

6

7

8

9

results = session.execute("SELECT item_id, item_location FROM inventory")

sale_items = []

for item in results:

    if item.item_location.zip < 97000:

        sale_items.append(item.item_id)

print sale_items

 

# OUTPUT:

[UUID('4c9616dc-f649-42c3-ace3-ecdbcda1802c')]

Here once again we're retrieving the entire UDT, where we only need one piece of it-- namely the zipcode. In Cassandra 2.2, we can use a UDF to simplify and speed up the process:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

session.execute("""

    CREATE FUNCTION extract_zip(address Address)

        RETURNS NULL ON NULL INPUT

        RETURNS int

        LANGUAGE java

        AS '

        return address.getInt("zip");'

""")

 

results = session.execute("SELECT item_id, extract_zip(item_location) FROM inventory")

sale_items = []

for item in results:

    if item.simplex_extract_zip_item_location < 97000:

        sale_items.append(item.item_id)

print sale_items

 

# OUTPUT:

[UUID('4c9616dc-f649-42c3-ace3-ecdbcda1802c')]

Notice that we can directly use the UDT by name as an argument to our UDF, and all of the Java driver's UDT manipulation mechanisms are available to us.

Example #3: Performing a server-side calculation of a column

As a final example, consider perhaps a use case where we need to calculate the volume of an item (perhaps once again for postal charges). We're interested in both the volume, as well as the proper units for the calculated volume. In pre-Cassandra 2.2, we would retrieve all the dimensions and perform the volume calculation on the client-side:

1

2

3

4

5

6

7

8

9

results = session.execute("SELECT item_id, dimensions FROM inventory")

volumes = []

for item in results:

    volume = str(item.dimensions[0] * item.dimensions[1] * item.dimensions[2]) + " " + item.dimensions[3] + "^3"

    volumes.append((item.item_id, volume))

print volumes

 

# OUTPUT:

[(UUID('6d76a559-ecd8-4a76-ad96-a8a6aa3a6f83'), u'331.632 in^3'), (UUID('8b485426-f054-4406-9e20-2463a42b0d65'), u'0.1332 in^3'), (UUID('27d74750-1b0e-45b9-9628-e6df9ce342ef'), u'403.75 ft^3')]

In Cassandra 2.2, we can move the volume computation to the server via a UDF. Since UDFs work horizontally across a single column in a single row, this is readily implemented:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

session.execute("""

    CREATE FUNCTION volume(dimensions tuple<double, double, double, text>)

        RETURNS NULL ON NULL INPUT

        RETURNS text

        LANGUAGE java

        AS '

        return String.valueOf(dimensions.getDouble(0) * dimensions.getDouble(1) * dimensions.getDouble(2)) + " " + dimensions.getString(3) + "^3";'

""")

 

results = session.execute("SELECT item_id, volume(dimensions) FROM inventory")

volumes = []

for item in results:

    volumes.append((item.item_id, item.simplex_volume_dimensions))

print volumes

 

# OUTPUT:

[(UUID('6d76a559-ecd8-4a76-ad96-a8a6aa3a6f83'), u'331.632 in^3'), (UUID('8b485426-f054-4406-9e20-2463a42b0d65'), u'0.1332 in^3'), (UUID('27d74750-1b0e-45b9-9628-e6df9ce342ef'), u'403.75 ft^3')]

Another benefit here is that with the volume computation stored on the server, any client reading data from this Cassandra cluster can retrieve the same calculated volume information, cutting down on redunant code across multiple applications.

Conclusion

By supporting JSR 223 scripting languages, UDFs allows you to write server-side executable scripts in your favorite language among many supported languages. While UDFs are useful by itself, UDFs in conjunction with UDAs bring out the full functionality of Cassandra 2.2. In my next post, I will discuss taking advantage of User Defined Aggregates using the Python driver.

Subscribe to Our Blog Now

Thank You for Signing Up!