TechnologyAugust 7, 2015

Cassandra User Defined Aggregates using the Python Driver

Kishan Karunaratne
Kishan Karunaratne
Cassandra User Defined Aggregates using the Python Driver

cluster = Cluster(protocol_version=4)

session = cluster.connect()

session.execute("CREATE KEYSPACE simplex WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}")

session.execute("USE simplex")

CREATE FUNCTION function_name(arg0 type0, arg1 type1)

    RETURNS NULL ON NULL INPUT

    RETURNS type0

    LANGUAGE java

    AS '

    return (type0) arg0 + arg1';

CREATE AGGREGATE aggregate_name(arg1)

    SFUNC function_name

    STYPE type0

    FINALFUNC function_name2

    INITCOND null;

session.execute("""

    CREATE TABLE prices (

        item_id uuid,

        time timeuuid,

        price double,

        PRIMARY KEY (item_id, time)

    ) WITH CLUSTERING ORDER BY (time DESC)

""")

from uuid import UUID, uuid1

insert = session.prepare("""INSERT INTO prices

    (item_id, time, price)

    VALUES

    (?, ?, ?)

""")

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 194.95])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 165.99])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 234.66])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 149.94])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 255.55])

item_id                              | time                                 | price

--------------------------------------+--------------------------------------+--------

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e75c416-3bc4-11e5-9287-a0cec801ccca | 255.55

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7553fa-3bc4-11e5-9287-a0cec801ccca | 149.94

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e743092-3bc4-11e5-9287-a0cec801ccca | 234.66

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e719670-3bc4-11e5-9287-a0cec801ccca | 165.99

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e70b188-3bc4-11e5-9287-a0cec801ccca | 194.95

(5 rows)

print session.execute("SELECT avg(price) FROM prices WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")[0]

# OUTPUT:

Row(system_avg_price=200.218)

print session.execute("SELECT avg(price) AS average FROM prices WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")[0]

# OUTPUT:

Row(average=200.218)

session.execute("""

    CREATE FUNCTION state_min_int(minimum double, current double)

        CALLED ON NULL INPUT

        RETURNS double

        LANGUAGE java

        AS '

        if (minimum == null)

            return current;

        else

            return Math.min(minimum, current);'

""")

session.execute("""

    CREATE AGGREGATE min_value(double)

    SFUNC state_min_int

    STYPE double

    INITCOND null;

""")

result = session.execute("SELECT min_value(price) FROM prices WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")[0]

print result.simplex_min_value_price

# OUTPUT:

149.94

session.execute("""

    CREATE TABLE reviews (

        item_id uuid,

        time timeuuid,

        review_id uuid,

        star_rating int,

        PRIMARY KEY (item_id, time)

    ) WITH CLUSTERING ORDER BY (time DESC)

""")

from uuid import UUID, uuid1, uuid4

insert = session.prepare("""INSERT INTO reviews

    (item_id, time, review_id, star_rating)

    VALUES

    (?, ?, ?, ?)

""")

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 5])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 4])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 4])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 3])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 3])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 4])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 2])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 5])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 4])

session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 5])

item_id                              | time                                 | review_id                            | star_rating

--------------------------------------+--------------------------------------+--------------------------------------+-------------

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e839f28-3bc4-11e5-9287-a0cec801ccca | 0642e0a6-abe3-4a48-9fe2-57054264165a |           5

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e819534-3bc4-11e5-9287-a0cec801ccca | d7516b54-09c0-4a5b-baa9-1da5a47b25a1 |           4

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e80c9a6-3bc4-11e5-9287-a0cec801ccca | 7ea8c81e-2f12-4afc-b6db-a0893c2ce323 |           5

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e80159c-3bc4-11e5-9287-a0cec801ccca | 449d476f-827d-47bd-8173-c866a37cdc76 |           2

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7fc8d0-3bc4-11e5-9287-a0cec801ccca | 6f211c10-fadc-416d-9fff-4c1295a52600 |           4

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7f04f4-3bc4-11e5-9287-a0cec801ccca | e1f59e74-8bf2-453e-9283-18040485b66e |           3

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7dce90-3bc4-11e5-9287-a0cec801ccca | 92757226-fef1-4857-a92d-58136c125317 |           3

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7bd324-3bc4-11e5-9287-a0cec801ccca | 62529d70-23c4-40fc-afd8-27ddb62cd98e |           4

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7a9bbc-3bc4-11e5-9287-a0cec801ccca | 66edc202-4bca-4335-83e0-789de3bda7c9 |           4

 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7a388e-3bc4-11e5-9287-a0cec801ccca | 11143fbf-a4df-44fd-8c1b-4dc127add75f |           5

(10 rows)

results = session.execute("SELECT star_rating FROM reviews WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")

stars = {}

for item in results:

    rating = item.star_rating

    if rating not in stars:

        stars[rating] = 1

    else:

        stars[rating] = stars[rating] + 1

print stars

# OUTPUT:

{2: 1, 3: 2, 4: 4, 5: 3}

session.execute("""

    CREATE FUNCTION state_group_and_sum(state map<int, int>, star_rating int)

    CALLED ON NULL INPUT

    RETURNS map<int, int>

    LANGUAGE java

    AS '

    if (state.get(star_rating) == null)

        state.put(star_rating, 1);

    else

        state.put(star_rating, ((Integer) state.get(star_rating)) + 1);

    return state;'

""")

session.execute("""

    CREATE AGGREGATE group_and_sum(int)

    SFUNC state_group_and_sum

    STYPE map<int, int>

    INITCOND {}

""")

result = session.execute("SELECT group_and_sum(star_rating) FROM reviews WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")[0]

print result.simplex_group_and_sum_star_rating

# OUTPUT:

{2: 1, 3: 2, 4: 4, 5: 3}

sum = 0

for k,v in stars.iteritems():

    sum += v

results = {k: v*100 / sum for k,v in stars.iteritems()}

print results

# OUTPUT

{2: 10, 3: 20, 4: 40, 5: 30}

session.execute("""

    CREATE FUNCTION percent_stars(state map<int,int>)

        RETURNS NULL ON NULL INPUT

        RETURNS map<int, int>

        LANGUAGE java

        AS '

        Integer sum = 0;

        for(Object k : state.keySet()) {

            sum = sum + (Integer) state.get((Integer) k);

        }

        java.util.Map<Integer, Integer> results = new java.util.HashMap<Integer, Integer>();

        for(Object k : state.keySet()) {

            results.put((Integer) k, ((Integer) state.get((Integer) k))*100 / sum);

        }

        return results;'

""")

session.execute("""

    CREATE OR REPLACE AGGREGATE group_and_sum(int)

    SFUNC state_group_and_sum

    STYPE map<int, int>

    FINALFUNC percent_stars

    INITCOND {}

""")

results = session.execute("SELECT group_and_sum(star_rating) FROM reviews WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")

print results.simplex_group_and_sum_star_rating

# OUTPUT

{2: 10, 3: 20, 4: 40, 5: 30}

public final class Csimplexpercent_stars_2 extends org.apache.cassandra.cql3.functions.UDFunction

{

    public Csimplexpercent_stars_2(FunctionName name, List<ColumnIdentifier> argNames, List<AbstractType<?>> argTypes,

                        DataType[] argDataTypes, AbstractType<?> returnType, DataType returnDataType, boolean calledOnNullInput, String body)

    {

        super(name, argNames, argTypes, argDataTypes, returnType, returnDataType, calledOnNullInput, "java", body);

    }

    protected ByteBuffer executeUserDefined(int protocolVersion, List<ByteBuffer> params) throws InvalidRequestException

    {

        try

        {

            java.util.Map result = executeInternal(

                (java.util.Map) compose(protocolVersion, 0, params.get(0))

            );

            return decompose(protocolVersion, result);

        }

        catch (Throwable t)

        {

            logger.debug("Invocation of function '{}' failed", this, t);

            if (t instanceof VirtualMachineError)

                throw (VirtualMachineError)t;

            throw FunctionExecutionException.create(this, t);

        }

    }

    private java.util.Map executeInternal(java.util.Map state)

    {

        import java.util.*;

        Integer sum = 0;

        for(Object k : state.keySet()) {

            sum = sum + (Integer) state.get((Integer) k);

        }

        Map<Integer, Integer> results = new HashMap<Integer, Integer>();

        for(Object k : state.keySet()) {

            results.put((Integer) k, ((Integer) state.get((Integer) k))*100 / sum);

        }

        return results;

    }

}

Discover more
PythonDrivers
Share

Open-Source,
Scale-Out, Cloud-Native
NoSQL Database

Astra DB is scale-out NoSQL built on Apache Cassandra™. Handle any workload with zero downtime and zero lock-in at global scale.

Company
Resources
Cloud Partners

DataStax, is a registered trademark of DataStax, Inc.. Apache, Apache Cassandra, Cassandra, Apache Pulsar, and Pulsar are either registered trademarks or trademarks of the Apache Software Foundation.

United States