TechnologyAugust 7, 2015

Cassandra User Defined Aggregates using the Python Driver

Kishan Karunaratne
Kishan Karunaratne
Cassandra User Defined Aggregates using the Python Driver
cluster = Cluster(protocol_version=4)
session = cluster.connect()
session.execute("CREATE KEYSPACE simplex WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}")
session.execute("USE simplex")
CREATE FUNCTION function_name(arg0 type0, arg1 type1)
    RETURNS NULL ON NULL INPUT
    RETURNS type0
    LANGUAGE java
    AS '
    return (type0) arg0 + arg1';
CREATE AGGREGATE aggregate_name(arg1)
    SFUNC function_name
    STYPE type0
    FINALFUNC function_name2
    INITCOND null;
session.execute("""
    CREATE TABLE prices (
        item_id uuid,
        time timeuuid,
        price double,
        PRIMARY KEY (item_id, time)
    ) WITH CLUSTERING ORDER BY (time DESC)
""")
from uuid import UUID, uuid1
insert = session.prepare("""INSERT INTO prices
    (item_id, time, price)
    VALUES
    (?, ?, ?)
""")
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 194.95])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 165.99])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 234.66])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 149.94])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 255.55])
item_id                              | time                                 | price
--------------------------------------+--------------------------------------+--------
 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e75c416-3bc4-11e5-9287-a0cec801ccca | 255.55
 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7553fa-3bc4-11e5-9287-a0cec801ccca | 149.94
 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e743092-3bc4-11e5-9287-a0cec801ccca | 234.66
 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e719670-3bc4-11e5-9287-a0cec801ccca | 165.99
 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e70b188-3bc4-11e5-9287-a0cec801ccca | 194.95
(5 rows)
print session.execute("SELECT avg(price) FROM prices WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")[0]
# OUTPUT:
Row(system_avg_price=200.218)
print session.execute("SELECT avg(price) AS average FROM prices WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")[0]
# OUTPUT:
Row(average=200.218)
session.execute("""
    CREATE FUNCTION state_min_int(minimum double, current double)
        CALLED ON NULL INPUT
        RETURNS double
        LANGUAGE java
        AS '
        if (minimum == null)
            return current;
        else
            return Math.min(minimum, current);'
""")
session.execute("""
    CREATE AGGREGATE min_value(double)
    SFUNC state_min_int
    STYPE double
    INITCOND null;
""")
result = session.execute("SELECT min_value(price) FROM prices WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")[0]
print result.simplex_min_value_price
# OUTPUT:
149.94
session.execute("""
    CREATE TABLE reviews (
        item_id uuid,
        time timeuuid,
        review_id uuid,
        star_rating int,
        PRIMARY KEY (item_id, time)
    ) WITH CLUSTERING ORDER BY (time DESC)
""")
from uuid import UUID, uuid1, uuid4
insert = session.prepare("""INSERT INTO reviews
    (item_id, time, review_id, star_rating)
    VALUES
    (?, ?, ?, ?)
""")
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 5])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 4])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 4])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 3])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 3])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 4])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 2])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 5])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 4])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 5])
item_id                              | time                                 | review_id                            | star_rating
--------------------------------------+--------------------------------------+--------------------------------------+-------------
 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e839f28-3bc4-11e5-9287-a0cec801ccca | 0642e0a6-abe3-4a48-9fe2-57054264165a |           5
 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e819534-3bc4-11e5-9287-a0cec801ccca | d7516b54-09c0-4a5b-baa9-1da5a47b25a1 |           4
 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e80c9a6-3bc4-11e5-9287-a0cec801ccca | 7ea8c81e-2f12-4afc-b6db-a0893c2ce323 |           5
 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e80159c-3bc4-11e5-9287-a0cec801ccca | 449d476f-827d-47bd-8173-c866a37cdc76 |           2
 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7fc8d0-3bc4-11e5-9287-a0cec801ccca | 6f211c10-fadc-416d-9fff-4c1295a52600 |           4
 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7f04f4-3bc4-11e5-9287-a0cec801ccca | e1f59e74-8bf2-453e-9283-18040485b66e |           3
 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7dce90-3bc4-11e5-9287-a0cec801ccca | 92757226-fef1-4857-a92d-58136c125317 |           3
 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7bd324-3bc4-11e5-9287-a0cec801ccca | 62529d70-23c4-40fc-afd8-27ddb62cd98e |           4
 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7a9bbc-3bc4-11e5-9287-a0cec801ccca | 66edc202-4bca-4335-83e0-789de3bda7c9 |           4
 0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7a388e-3bc4-11e5-9287-a0cec801ccca | 11143fbf-a4df-44fd-8c1b-4dc127add75f |           5
(10 rows)
results = session.execute("SELECT star_rating FROM reviews WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")
stars = {}
for item in results:
    rating = item.star_rating
    if rating not in stars:
        stars[rating] = 1
    else:
        stars[rating] = stars[rating] + 1
print stars
# OUTPUT:
{2: 1, 3: 2, 4: 4, 5: 3}
session.execute("""
    CREATE FUNCTION state_group_and_sum(state map<int, int>, star_rating int)
    CALLED ON NULL INPUT
    RETURNS map<int, int>
    LANGUAGE java
    AS '
    if (state.get(star_rating) == null)
        state.put(star_rating, 1);
    else
        state.put(star_rating, ((Integer) state.get(star_rating)) + 1);
    return state;'
""")
session.execute("""
    CREATE AGGREGATE group_and_sum(int)
    SFUNC state_group_and_sum
    STYPE map<int, int>
    INITCOND {}
""")
result = session.execute("SELECT group_and_sum(star_rating) FROM reviews WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")[0]
print result.simplex_group_and_sum_star_rating
# OUTPUT:
{2: 1, 3: 2, 4: 4, 5: 3}
sum = 0
for k,v in stars.iteritems():
    sum += v
results = {k: v*100 / sum for k,v in stars.iteritems()}
print results
# OUTPUT
{2: 10, 3: 20, 4: 40, 5: 30}
session.execute("""
    CREATE FUNCTION percent_stars(state map<int,int>)
        RETURNS NULL ON NULL INPUT
        RETURNS map<int, int>
        LANGUAGE java
        AS '
        Integer sum = 0;
        for(Object k : state.keySet()) {
            sum = sum + (Integer) state.get((Integer) k);
        }
        java.util.Map<Integer, Integer> results = new java.util.HashMap<Integer, Integer>();
        for(Object k : state.keySet()) {
            results.put((Integer) k, ((Integer) state.get((Integer) k))*100 / sum);
        }
        return results;'
""")
session.execute("""
    CREATE OR REPLACE AGGREGATE group_and_sum(int)
    SFUNC state_group_and_sum
    STYPE map<int, int>
    FINALFUNC percent_stars
    INITCOND {}
""")
results = session.execute("SELECT group_and_sum(star_rating) FROM reviews WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")
print results.simplex_group_and_sum_star_rating
# OUTPUT
{2: 10, 3: 20, 4: 40, 5: 30}
public final class Csimplexpercent_stars_2 extends org.apache.cassandra.cql3.functions.UDFunction
{
    public Csimplexpercent_stars_2(FunctionName name, List<ColumnIdentifier> argNames, List<AbstractType<?>> argTypes,
                        DataType[] argDataTypes, AbstractType<?> returnType, DataType returnDataType, boolean calledOnNullInput, String body)
    {
        super(name, argNames, argTypes, argDataTypes, returnType, returnDataType, calledOnNullInput, "java", body);
    }
    protected ByteBuffer executeUserDefined(int protocolVersion, List<ByteBuffer> params) throws InvalidRequestException
    {
        try
        {
            java.util.Map result = executeInternal(
                (java.util.Map) compose(protocolVersion, 0, params.get(0))
            );
            return decompose(protocolVersion, result);
        }
        catch (Throwable t)
        {
            logger.debug("Invocation of function '{}' failed", this, t);
            if (t instanceof VirtualMachineError)
                throw (VirtualMachineError)t;
            throw FunctionExecutionException.create(this, t);
        }
    }
    private java.util.Map executeInternal(java.util.Map state)
    {
        import java.util.*;
        Integer sum = 0;
        for(Object k : state.keySet()) {
            sum = sum + (Integer) state.get((Integer) k);
        }
        Map<Integer, Integer> results = new HashMap<Integer, Integer>();
        for(Object k : state.keySet()) {
            results.put((Integer) k, ((Integer) state.get((Integer) k))*100 / sum);
        }
        return results;
    }
}
Discover more
PythonDrivers
Share

One-stop Data API for Production GenAI

Astra DB gives JavaScript developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.