Back to Blog

Cassandra User Defined Aggregates using the Python Driver

date: August 7, 2015

Introduction

Apache Cassandra 2.2 introduced two new useful features: User Defined Functions (UDFs) and User Defined Aggregates (UDAs). These new features allow certain types of computation to occur server-side, directly on the Cassandra cluster. In my previous post, I discussed UDFs and some scenarios that take advantage of them. In this post, I will briefly demonstrate some of the scenarios where UDAs can be used, and how these can be simplified using UDAs via the DataStax Python Driver.

Examples in this post will be using the same keyspace as earlier, shown here created via the Python driver:

1

2

3

4

5

cluster = Cluster(protocol_version=4)

session = cluster.connect()

 

session.execute("CREATE KEYSPACE simplex WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}")

session.execute("USE simplex")

User Defined Aggregates (UDAs)

UDAs are aggregate functions that can be run directly on Cassandra. They are composed of two parts: a UDF (called a 'state function' when in the context of UDAs) and the UDA itself, which calls the UDF for each row returned from the query. The state function contains one argument that is carried in between the calls to the different rows, and thus the aggregate works much in the same way as a fold or a reduce. This state argument must be the first argument in the UDF, and also the return value from the UDF. In the next call to the UDF via the UDA, it will automatically set the first argument be the return value from the previous call. Here's the syntax for creating a UDA:

1

2

3

4

5

6

7

8

9

10

11

12

CREATE FUNCTION function_name(arg0 type0, arg1 type1)

    RETURNS NULL ON NULL INPUT

    RETURNS type0

    LANGUAGE java

    AS '

    return (type0) arg0 + arg1';

 

CREATE AGGREGATE aggregate_name(arg1)

    SFUNC function_name

    STYPE type0

    FINALFUNC function_name2

    INITCOND null;

The SFUNC (or State Function) is the UDF state function that will be called for each row and STYPE (or State Type) is the return type from that function, which will also become the first argument to next call in the UDF. INITCOND (or Initial Condition) will be the initial value passed as the state value in the first call to the UDF, and must be of type STYPE. The FINALFUNC (or Final Function) is an optional function that can be automatically called via the UDA once all rows have been processed by the state function. The input to the FINALFUNC will be of STYPE, namely the output of the last call to the state function. However the output of the FINALFUNC can be of any type and is not defined in the UDA.

Similar to UDFs, UDAs are defined in a specific keyspace and as such the state functions that it uses must come from the same keyspace. UDFs and UDAs are executed on the coordinator node in the Cassandra cluster. Thus as they will scan across all rows returned by a particular query, we must be careful that the rows that we retrieve from our query come from a specific partition key. Otherwise, the results from the query will be first brought back into the coordinator node before the functions and aggregates are executed, resulting in a performance hit.

Examples 1 and 2 below uses the following schema:

1

2

3

4

5

6

7

8

session.execute("""

    CREATE TABLE prices (

        item_id uuid,

        time timeuuid,

        price double,

        PRIMARY KEY (item_id, time)

    ) WITH CLUSTERING ORDER BY (time DESC)

""")

Inserted via the Python Driver:

1

2

3

4

5

6

7

8

9

10

11

12

13

from uuid import UUID, uuid1

 

insert = session.prepare("""INSERT INTO prices

    (item_id, time, price)

    VALUES

    (?, ?, ?)

""")

 

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 194.95])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 165.99])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 234.66])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 149.94])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 255.55])

And the resulting data in the prices table:

1

2

3

4

5

6

7

8

9

item_id                              | time                                 | price

--------------------------------------+--------------------------------------+--------

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e75c416-3bc4-11e5-9287-a0cec801ccca | 255.55

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7553fa-3bc4-11e5-9287-a0cec801ccca | 149.94

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e743092-3bc4-11e5-9287-a0cec801ccca | 234.66

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e719670-3bc4-11e5-9287-a0cec801ccca | 165.99

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e70b188-3bc4-11e5-9287-a0cec801ccca | 194.95

 

(5 rows)

Example #1: Using native aggregates

Prior to the 2.2 release, Cassandra already had some built-in aggregates called native aggregates, defined in the system keyspace. These include countminmaxsum, and avg. For example, we can use the avg native aggregate to retrieve the average of the prices for a particular item:

1

2

3

4

print session.execute("SELECT avg(price) FROM prices WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")[0]

 

# OUTPUT:

Row(system_avg_price=200.218)

Similar to UDFs, notice here that the resulting column name of the SELECT query after calling the native aggregate is in the form <keyspace>_<aggregate_name>_<column_name_queried_by_aggregate>. This is also the form for UDAs as well. We can also use an alias for the column name to make it more aesthetically pleasing:

1

2

3

4

print session.execute("SELECT avg(price) AS average FROM prices WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")[0]

 

# OUTPUT:

Row(average=200.218)

Example #2: Minimum of a column

Also suppose for instance we would like to retrieve the minimum price for an item. We could simply use the min native aggregate to retrieve the data. Alternatively, we could define our own min aggregate:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

session.execute("""

    CREATE FUNCTION state_min_int(minimum double, current double)

        CALLED ON NULL INPUT

        RETURNS double

        LANGUAGE java

        AS '

        if (minimum == null)

            return current;

        else

            return Math.min(minimum, current);'

""")

 

session.execute("""

    CREATE AGGREGATE min_value(double)

    SFUNC state_min_int

    STYPE double

    INITCOND null;

""")

 

result = session.execute("SELECT min_value(price) FROM prices WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")[0]

print result.simplex_min_value_price

 

# OUTPUT:

149.94

In this example, we have defined a state function which has a double type as the first argument as well as the return type. The second argument current is the value that is input from the UDA for the current row. Inside of the UDA, we have set it to aggregate over rows with a column type of double, with the SFUNC and STYPE set accordingly from the values of the state function. We've also set the INITCOND to null in the case where the aggregate is simply called on one row, such that the state function will simply return the input value (by definition the minimum of one value).

Example #3: Aggregating over a column

Consider a more complex example, given the following schema:

1

2

3

4

5

6

7

8

9

session.execute("""

    CREATE TABLE reviews (

        item_id uuid,

        time timeuuid,

        review_id uuid,

        star_rating int,

        PRIMARY KEY (item_id, time)

    ) WITH CLUSTERING ORDER BY (time DESC)

""")

Inserted via the Python Driver:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

from uuid import UUID, uuid1, uuid4

 

insert = session.prepare("""INSERT INTO reviews

    (item_id, time, review_id, star_rating)

    VALUES

    (?, ?, ?, ?)

""")

 

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 5])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 4])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 4])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 3])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 3])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 4])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 2])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 5])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 4])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 5])

And the resulting data in the reviews table:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

item_id                              | time                                 | review_id                            | star_rating

--------------------------------------+--------------------------------------+--------------------------------------+-------------

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e839f28-3bc4-11e5-9287-a0cec801ccca | 0642e0a6-abe3-4a48-9fe2-57054264165a |           5

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e819534-3bc4-11e5-9287-a0cec801ccca | d7516b54-09c0-4a5b-baa9-1da5a47b25a1 |           4

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e80c9a6-3bc4-11e5-9287-a0cec801ccca | 7ea8c81e-2f12-4afc-b6db-a0893c2ce323 |           5

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e80159c-3bc4-11e5-9287-a0cec801ccca | 449d476f-827d-47bd-8173-c866a37cdc76 |           2

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7fc8d0-3bc4-11e5-9287-a0cec801ccca | 6f211c10-fadc-416d-9fff-4c1295a52600 |           4

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7f04f4-3bc4-11e5-9287-a0cec801ccca | e1f59e74-8bf2-453e-9283-18040485b66e |           3

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7dce90-3bc4-11e5-9287-a0cec801ccca | 92757226-fef1-4857-a92d-58136c125317 |           3

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7bd324-3bc4-11e5-9287-a0cec801ccca | 62529d70-23c4-40fc-afd8-27ddb62cd98e |           4

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7a9bbc-3bc4-11e5-9287-a0cec801ccca | 66edc202-4bca-4335-83e0-789de3bda7c9 |           4

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7a388e-3bc4-11e5-9287-a0cec801ccca | 11143fbf-a4df-44fd-8c1b-4dc127add75f |           5

 

(10 rows)

Suppose we want to find out for a particular item, the star_rating and categorize each star into their respective counts. Pre-Cassandra 2.2, we could have done something such as:

1

2

3

4

5

6

7

8

9

10

11

12

results = session.execute("SELECT star_rating FROM reviews WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")

stars = {}

for item in results:

    rating = item.star_rating

    if rating not in stars:

        stars[rating] = 1

    else:

        stars[rating] = stars[rating] + 1

print stars

 

# OUTPUT:

{2: 1, 3: 2, 4: 4, 5: 3}

Here, we are pulling in all the star_ratings for each row and performing the computation on the client-side by iterating through all the results. In Cassandra 2.2, we can move this computation to the server:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

session.execute("""

    CREATE FUNCTION state_group_and_sum(state map<int, int>, star_rating int)

    CALLED ON NULL INPUT

    RETURNS map<int, int>

    LANGUAGE java

    AS '

    if (state.get(star_rating) == null)

        state.put(star_rating, 1);

    else

        state.put(star_rating, ((Integer) state.get(star_rating)) + 1);

    return state;'

""")

 

session.execute("""

    CREATE AGGREGATE group_and_sum(int)

    SFUNC state_group_and_sum

    STYPE map<int, int>

    INITCOND {}

""")

 

result = session.execute("SELECT group_and_sum(star_rating) FROM reviews WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")[0]

print result.simplex_group_and_sum_star_rating

 

# OUTPUT:

{2: 1, 3: 2, 4: 4, 5: 3}

In this example, we have defined an aggregate over type int, namely the input column star_rating. The aggregate then calls a state function which totals and stores the count for each star in its state type, a map<int,int>. Finally, we can simply call our UDA to aggregate the star_ratings and retrieve a dictionary ready to be used in our client. One interesting thing to point out here is that the value retrieved from the 'state' argument must be explicitly casted to an Integer. This is because when the Java script is called on Cassandra, the Java runtime has no notion of what the types of the map are and thus by default it will be a Map<Object,Object>.

Example #4: Using a finalfunc to gather more aggregate data

As a final example, suppose we would like to gather even more aggregate data from our results in the previous example. In addition to a bucketed star_ratings, perhaps we want to gather the percent that each star holds in the total count of stars. In pre-Cassandra 2.2, we would do something similar to (assuming we still have the results from the previous bucketing):

1

2

3

4

5

6

7

8

9

sum = 0

for k,v in stars.iteritems():

    sum += v

 

results = {k: v*100 / sum for k,v in stars.iteritems()}

print results

 

# OUTPUT

{2: 10, 3: 20, 4: 40, 5: 30}

In Cassandra 2.2, we can neatly roll this up into a FINALFUNC into our existing UDA:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

session.execute("""

    CREATE FUNCTION percent_stars(state map<int,int>)

        RETURNS NULL ON NULL INPUT

        RETURNS map<int, int>

        LANGUAGE java

        AS '

        Integer sum = 0;

        for(Object k : state.keySet()) {

            sum = sum + (Integer) state.get((Integer) k);

        }

 

        java.util.Map<Integer, Integer> results = new java.util.HashMap<Integer, Integer>();

        for(Object k : state.keySet()) {

            results.put((Integer) k, ((Integer) state.get((Integer) k))*100 / sum);

        }

 

        return results;'

""")

 

session.execute("""

    CREATE OR REPLACE AGGREGATE group_and_sum(int)

    SFUNC state_group_and_sum

    STYPE map<int, int>

    FINALFUNC percent_stars

    INITCOND {}

""")

 

results = session.execute("SELECT group_and_sum(star_rating) FROM reviews WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")

print results.simplex_group_and_sum_star_rating

 

# OUTPUT

{2: 10, 3: 20, 4: 40, 5: 30}

The SFUNCSTYPE and INITCOND are all the same as previously. However now we have defined a FINALFUNC percent_stars, which takes input the state map<int,int> from the result of the final call to the SFUNC and performs the percent calculations. Once again, note that the state map passed into the FINALFUNC does not know about the object types at retrieval and likewise we must properly cast the values to be stored back into the map.

One more interesting thing to note here is the fact that we have used the fully qualified name for the Map and HashMap classes in Java. This is because only essential Java language libraries are loaded, and external libs such as the java.util library is not imported by default when this script is run. Explicitly importing a library in our script will also not work. This is because when the script is expanded into a fully runnable Java source code by Cassandra, our script simply becomes a method that is called within the a larger framework. The generated code for our FINALFUNC is below, in the case where we attempted to import java.util.*, (note the incorrect placement of the import statement in the inner method):

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

public final class Csimplexpercent_stars_2 extends org.apache.cassandra.cql3.functions.UDFunction

{

    public Csimplexpercent_stars_2(FunctionName name, List<ColumnIdentifier> argNames, List<AbstractType<?>> argTypes,

                        DataType[] argDataTypes, AbstractType<?> returnType, DataType returnDataType, boolean calledOnNullInput, String body)

    {

        super(name, argNames, argTypes, argDataTypes, returnType, returnDataType, calledOnNullInput, "java", body);

    }

 

    protected ByteBuffer executeUserDefined(int protocolVersion, List<ByteBuffer> params) throws InvalidRequestException

    {

        try

        {

            java.util.Map result = executeInternal(

                (java.util.Map) compose(protocolVersion, 0, params.get(0))

            );

            return decompose(protocolVersion, result);

        }

        catch (Throwable t)

        {

            logger.debug("Invocation of function '{}' failed", this, t);

            if (t instanceof VirtualMachineError)

                throw (VirtualMachineError)t;

            throw FunctionExecutionException.create(this, t);

        }

    }

 

    private java.util.Map executeInternal(java.util.Map state)

    {

        import java.util.*;

 

        Integer sum = 0;

        for(Object k : state.keySet()) {

            sum = sum + (Integer) state.get((Integer) k);

        }

 

        Map<Integer, Integer> results = new HashMap<Integer, Integer>();

        for(Object k : state.keySet()) {

            results.put((Integer) k, ((Integer) state.get((Integer) k))*100 / sum);

        }

 

        return results;

    }

}

Conclusion

User Defined Functions and User Defined Aggregates are powerful new features introduced in Cassandra 2.2. They help to both conserve network bandwidth and cut down on client-side computations by performing functions and aggregations directly at the data source. Grab the latest Cassandra and try out UDFs and UDAs to simplify your client-side code!

Subscribe to Our Blog Now

Thank You for Signing Up!