Cassandra User Defined Aggregates using the Python Driver

cluster = Cluster(protocol_version=4)
session = cluster.connect()
session.execute("CREATE KEYSPACE simplex WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}")
session.execute("USE simplex")
CREATE FUNCTION function_name(arg0 type0, arg1 type1)
RETURNS NULL ON NULL INPUT
RETURNS type0
LANGUAGE java
AS '
return (type0) arg0 + arg1';
CREATE AGGREGATE aggregate_name(arg1)
SFUNC function_name
STYPE type0
FINALFUNC function_name2
INITCOND null;
session.execute("""
CREATE TABLE prices (
item_id uuid,
time timeuuid,
price double,
PRIMARY KEY (item_id, time)
) WITH CLUSTERING ORDER BY (time DESC)
""")
from uuid import UUID, uuid1
insert = session.prepare("""INSERT INTO prices
(item_id, time, price)
VALUES
(?, ?, ?)
""")
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 194.95])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 165.99])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 234.66])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 149.94])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), 255.55])
item_id | time | price
--------------------------------------+--------------------------------------+--------
0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e75c416-3bc4-11e5-9287-a0cec801ccca | 255.55
0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7553fa-3bc4-11e5-9287-a0cec801ccca | 149.94
0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e743092-3bc4-11e5-9287-a0cec801ccca | 234.66
0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e719670-3bc4-11e5-9287-a0cec801ccca | 165.99
0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e70b188-3bc4-11e5-9287-a0cec801ccca | 194.95
(5 rows)
print session.execute("SELECT avg(price) FROM prices WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")[0]
# OUTPUT:
Row(system_avg_price=200.218)
print session.execute("SELECT avg(price) AS average FROM prices WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")[0]
# OUTPUT:
Row(average=200.218)
session.execute("""
CREATE FUNCTION state_min_int(minimum double, current double)
CALLED ON NULL INPUT
RETURNS double
LANGUAGE java
AS '
if (minimum == null)
return current;
else
return Math.min(minimum, current);'
""")
session.execute("""
CREATE AGGREGATE min_value(double)
SFUNC state_min_int
STYPE double
INITCOND null;
""")
result = session.execute("SELECT min_value(price) FROM prices WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")[0]
print result.simplex_min_value_price
# OUTPUT:
149.94
session.execute("""
CREATE TABLE reviews (
item_id uuid,
time timeuuid,
review_id uuid,
star_rating int,
PRIMARY KEY (item_id, time)
) WITH CLUSTERING ORDER BY (time DESC)
""")
from uuid import UUID, uuid1, uuid4
insert = session.prepare("""INSERT INTO reviews
(item_id, time, review_id, star_rating)
VALUES
(?, ?, ?, ?)
""")
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 5])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 4])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 4])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 3])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 3])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 4])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 2])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 5])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 4])
session.execute(insert, [UUID('0979dea5-5a65-446d-bad6-27d04d5dd8a5'), uuid1(), uuid4(), 5])
item_id | time | review_id | star_rating
--------------------------------------+--------------------------------------+--------------------------------------+-------------
0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e839f28-3bc4-11e5-9287-a0cec801ccca | 0642e0a6-abe3-4a48-9fe2-57054264165a | 5
0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e819534-3bc4-11e5-9287-a0cec801ccca | d7516b54-09c0-4a5b-baa9-1da5a47b25a1 | 4
0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e80c9a6-3bc4-11e5-9287-a0cec801ccca | 7ea8c81e-2f12-4afc-b6db-a0893c2ce323 | 5
0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e80159c-3bc4-11e5-9287-a0cec801ccca | 449d476f-827d-47bd-8173-c866a37cdc76 | 2
0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7fc8d0-3bc4-11e5-9287-a0cec801ccca | 6f211c10-fadc-416d-9fff-4c1295a52600 | 4
0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7f04f4-3bc4-11e5-9287-a0cec801ccca | e1f59e74-8bf2-453e-9283-18040485b66e | 3
0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7dce90-3bc4-11e5-9287-a0cec801ccca | 92757226-fef1-4857-a92d-58136c125317 | 3
0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7bd324-3bc4-11e5-9287-a0cec801ccca | 62529d70-23c4-40fc-afd8-27ddb62cd98e | 4
0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7a9bbc-3bc4-11e5-9287-a0cec801ccca | 66edc202-4bca-4335-83e0-789de3bda7c9 | 4
0979dea5-5a65-446d-bad6-27d04d5dd8a5 | 6e7a388e-3bc4-11e5-9287-a0cec801ccca | 11143fbf-a4df-44fd-8c1b-4dc127add75f | 5
(10 rows)
results = session.execute("SELECT star_rating FROM reviews WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")
stars = {}
for item in results:
rating = item.star_rating
if rating not in stars:
stars[rating] = 1
else:
stars[rating] = stars[rating] + 1
print stars
# OUTPUT:
{2: 1, 3: 2, 4: 4, 5: 3}
session.execute("""
CREATE FUNCTION state_group_and_sum(state map<int, int>, star_rating int)
CALLED ON NULL INPUT
RETURNS map<int, int>
LANGUAGE java
AS '
if (state.get(star_rating) == null)
state.put(star_rating, 1);
else
state.put(star_rating, ((Integer) state.get(star_rating)) + 1);
return state;'
""")
session.execute("""
CREATE AGGREGATE group_and_sum(int)
SFUNC state_group_and_sum
STYPE map<int, int>
INITCOND {}
""")
result = session.execute("SELECT group_and_sum(star_rating) FROM reviews WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")[0]
print result.simplex_group_and_sum_star_rating
# OUTPUT:
{2: 1, 3: 2, 4: 4, 5: 3}
sum = 0
for k,v in stars.iteritems():
sum += v
results = {k: v*100 / sum for k,v in stars.iteritems()}
print results
# OUTPUT
{2: 10, 3: 20, 4: 40, 5: 30}
session.execute("""
CREATE FUNCTION percent_stars(state map<int,int>)
RETURNS NULL ON NULL INPUT
RETURNS map<int, int>
LANGUAGE java
AS '
Integer sum = 0;
for(Object k : state.keySet()) {
sum = sum + (Integer) state.get((Integer) k);
}
java.util.Map<Integer, Integer> results = new java.util.HashMap<Integer, Integer>();
for(Object k : state.keySet()) {
results.put((Integer) k, ((Integer) state.get((Integer) k))*100 / sum);
}
return results;'
""")
session.execute("""
CREATE OR REPLACE AGGREGATE group_and_sum(int)
SFUNC state_group_and_sum
STYPE map<int, int>
FINALFUNC percent_stars
INITCOND {}
""")
results = session.execute("SELECT group_and_sum(star_rating) FROM reviews WHERE item_id=0979dea5-5a65-446d-bad6-27d04d5dd8a5")
print results.simplex_group_and_sum_star_rating
# OUTPUT
{2: 10, 3: 20, 4: 40, 5: 30}
public final class Csimplexpercent_stars_2 extends org.apache.cassandra.cql3.functions.UDFunction
{
public Csimplexpercent_stars_2(FunctionName name, List<ColumnIdentifier> argNames, List<AbstractType<?>> argTypes,
DataType[] argDataTypes, AbstractType<?> returnType, DataType returnDataType, boolean calledOnNullInput, String body)
{
super(name, argNames, argTypes, argDataTypes, returnType, returnDataType, calledOnNullInput, "java", body);
}
protected ByteBuffer executeUserDefined(int protocolVersion, List<ByteBuffer> params) throws InvalidRequestException
{
try
{
java.util.Map result = executeInternal(
(java.util.Map) compose(protocolVersion, 0, params.get(0))
);
return decompose(protocolVersion, result);
}
catch (Throwable t)
{
logger.debug("Invocation of function '{}' failed", this, t);
if (t instanceof VirtualMachineError)
throw (VirtualMachineError)t;
throw FunctionExecutionException.create(this, t);
}
}
private java.util.Map executeInternal(java.util.Map state)
{
import java.util.*;
Integer sum = 0;
for(Object k : state.keySet()) {
sum = sum + (Integer) state.get((Integer) k);
}
Map<Integer, Integer> results = new HashMap<Integer, Integer>();
for(Object k : state.keySet()) {
results.put((Integer) k, ((Integer) state.get((Integer) k))*100 / sum);
}
return results;
}
}