CompanyJuly 6, 2022

Building Scalable Streaming Applications with DataStax Astra Streaming

Building Scalable Streaming Applications with DataStax Astra Streaming

Astra Streaming enables developers to build streaming applications on top of an elastically scalable, multi-cloud messaging and event streaming platform, powered by Apache PulsarDataStax Astra Streaming is currently in beta, and we’ll be releasing a full demo soon. In the meantime, this article will give you a short demo to provide a great starting point for familiarizing yourself with this powerful new streaming service.

Here’s what you will learn:

  1. How to create an Astra Streaming tenant, complete with namespaces, topics, and sinks
  2. How to produce messages for a topic that makes use of serialized Java POJOs
  3. How to store topic messages in a DataStax Astra database

To illustrate, we’ll use Astra Streaming to replicate (albeit crudely) the streaming object tracking information provided by the Federal Aviation Authority (FAA) to each airport. This stream reports the location of every piece of equipment at the airport (planes, fuel trucks, aircraft tow tractors, baggage carts, etc).

Let’s start building!

With Astra Streaming, not only can you feed information into the streaming “pipe”, but also store those events into various destinations (aka "sinks"), like a Kafka topic, Elastic Search, Snowflake and more. In this example, we will store the events in an AstraDB database for later analysis. This allows us to view our object tracking information in two ways:

  1. Where is everything located right now?
  2. Where has a specific object been located historically? This is useful for tracking the paths of the object over time.

To build our streaming pipeline for tracking objects in real-time and historically, we’ll have to build the following:

  • An Astra Streaming tenant with a single topic object_location
  • A Java producer that will publish events to the object_location topic
  • An AstraDB database with two tables that get data from the object_location topic

 

Graphic showing all parts needed for this example

 

Figure 1: All the moving parts we’ll need to build for this demo.

Continuing with our object tracking example, a single topic will feed data into two different tables: the object_location table and object_location_history table. 

The object_location table records only the most recent known location for an object, and the object_location_history records all locations that an object has been located at any given time. The location history data is useful for different types of analyses, like analyzing the flow of different objects through the airport terminal.

In addition to object tracking, this approach can be used for any use case that requires the ability to see both real-time streaming data and historical data. For example, tracking stock prices where one table holds the current stock price, while another table holds the historical stock prices.

Create the database

The first step is to create the database. This is a very simple database with only two tables, which we’ll create in a keyspace called airport to keep things simple. The tables in the airport keyspace are object_location which tracks where every object is at the moment (more accurately, the last known location), and object_location_history which tracks the object's location over time, with the most recent update listed first.

If you are following along with your own AstraDB instance, simply create a database with the keyspace airport and then run the database/create.cql file to create your tables. Pick a cloud provider and a region. Remember these choices as you will want to create your streaming instance in the same provider and region later.

Create a custom role

It is possible to create an access token that grants you access to all of your databases, but I highly recommend creating database-specific tokens based on custom roles. On more than one occasion, I’ve accidentally leaked security tokens into GitHub (errors that I corrected within minutes). The only thing that’s saved my bacon is the fact that the token was restricted to a single database. If you’re not familiar with the process for creating a token, I’ll show you how later in this blog.

After creating the database, click on the down arrow next to your organization name, as shown in the red square in the following image.

Image showing how to access your organization settings

 

Figure 2. Accessing your organization settings.

This will bring up a menu. Click on the Organization Settings menu item. Once the page loads, click on the Role Management menu item on the left side of the page and press the Add Custom Role button. 

Give your role a meaningful name. As you can see in the following image, I named my custom role airport-demo. Then start selecting the permissions for your role. As this role will be specific to a database built for demo purposes, I'm being liberal with my permissions. Set your permissions to suit your needs and scroll down to access the rest of the page.

Screenshot of Permissions Settings

 

Figure 3. Screenshot showing how to define the permissions for your custom role.

Select the keyspace permission and table permissions as appropriate. I like to enable all of the APIs for my databases, so I usually select them all. The most important step occurs at the very bottom where you select the single database for which this role applies.

Screenshot showing the completion of a custom role

 

Figure 4. Screenshot showing how to finish the custom role definition.

When you are satisfied with your role configuration press the Create Role button.

Generating your database token

Now you can create a security token specific to your customer role and database. Select the Token Management menu item, followed by the custom role you created earlier. Click the Generate Token button.

Screenshot showing how to generate a token

 

Figure 5. Generating a Token.

You’ll need to be cautious here as the dialog box that pops up will never be displayed again. You’ll need this token information in your source code to connect to the database from Astra Streaming. Press the Download Token Details button to download a CSV file with your token.

Screenshot of the generated token - must copy this

Figure 6: You won't see this again. Take this opportunity to download It.

Create the Astra Streaming components

Now we’re going to shift gears and create the Astra Streaming components. Here we will create a tenant, namespace, and a topic.

Create the Astra Streaming tenant

An Astra Streaming tenant is the top-level object for streaming. Think of a tenant as akin to an application or a database. Create a new streaming tenant in the Astra DB web console and name it airport-events. When the tenant is fully created and running, you will see a small green dot to the left of its name, and the dashboard for the tenant will show up in your browser, as shown in Figure 7. 

Dashboard showing the streaming tenant

 

Figure 7. The airport-events tenant dashboard.

Create the Astra Streaming namespace

This step is optional because there is a default namespace created for you when you create a tenant. However, I like to keep things organized and isolated so I strongly recommend you create a namespace for the airport-events. Click on the Namespaces tab.

Screenshot showing how to create a topic in the namespace

 

Figure 8. Creating a namespace in Astra Streaming.

Set the namespace to airport and click on the Create namespace button. It’s just that easy!

Create the Astra Streaming Topic

Our next step is to create the topic for our object location events. Click on the Topics tab in the dashboard. By default, you will see both the new airport namespace and the default namespace listed in the dashboard. Click the Add Topic button in the airport namespace to create the new topic.

Screenshot showing how to create a topic in the namespace

 

Figure 9. Creating a topic in the airport namespace.

You only need to specify the name of the topic, object-location, as shown in the next image.

Create an object-location topic in airport namespace

 

Figure 10. Creating the object-location topic in the airport namespace.

Press the Add Topic button. 

At this point, we have a topic on which we can publish events – but those events don’t go anywhere just yet. We will next create two “sinks’’ that will consume the events and store them in a database. A “sink” in streaming terms is a destination for the messages. For this article, we will use the Astra DB to store our events.

Create the Astra DB sinks

The mechanism Astra Streaming uses to store events to a database is a sink. We will need to create two sinks, one for each of our tables.

  1. Create the object-location sink

Our first sink will store the event on the object_location table. This table is different from the object_location_history table – in that it does not have the ts (timestamp) field. Click on the Sinks tab and then press the Create Sink button.

Create the sink for the object-location table in Astra DB

 

Figure 11. Creating the sink for the object_location table in our Astra DB.

In Step 1 of the wizard, select the fields as shown in the following image:

Screenshot of window to create the object-location sink

 

Figure 12. Step 1 – Create the object-location sink.

Be sure to select the object-location topic in Step 1 of the wizard, as shown next:

Window showing how to select the topic

 

Figure 13. Step 2 – Select the topic.

Next, provide the connectivity information for your database. All of the information is important, but the database token is probably the most critical piece here. After pasting your token in, press the TAB button to exit the token field. This will prompt the Astra website to inspect your database and table and generate the field mappings, as you will see next:

Specify the database to use

 

Figure 14. Step 3 – Specify the database you want to use.

The field mapping is done automatically for you. Notice that the automatic mapping only concerns itself with the fields in the table you have specified. There is no schema for the overall topic yet because we haven’t sent any messages over to the topic (we will get into that in a bit). 

I have yet to find a condition where the automatic mapping is incorrect, but it never hurts to check twice! Also, you can now expand the area for the object_location schema and view the details there, as shown in the following image:

Screen showing the field mapping has been generated for you 

 

Figure 15. The field mapping Is generated automatically for you.

Press the Create button to create the sink.

  1. Create the objloc-history sink

Next, we will create our second sink that will capture information into the object_location_history table. You will essentially perform the same steps you did for the first sink – but with some key differences:

  • Sink Name: objloc-history (names are limited to 18 characters)
  • Topic: Pick the object-location topic again. It will feed both of our tables!
  • Table Name: object_location_history

This time when you enter the database token and TAB out of the field, the mapping will appear a little differently, as shown below.

Screen showing that the field mapping is slightly different for a sink

 

Figure 16. Notice the field mapping is slightly different for this sink.

This is because the ts or timestamp field (a Java long data type) is included in the mapping. Press the Create button to create this sink.

Create a Java producer

Things that generate messages on a topic are called “producers” in Apache Pulsar (and by extension in Astra Streaming). We need to create a producer that will send messages to the object-location topic. We don’t want to send simple string messages! We want to send an object that can be stored in database tables.

You can look at the Java code in the folder for this demo on GitHub, where you will see several files. The main entry point is the App.java file. It’s a pretty simple file that just instantiates a Flight object and causes the Flights run() method to be invoked every second. The interesting work is in the Flight class.

The Flight class produces messages on the object-location topic each time the run() method is invoked. The constructor of the Flight class takes care of creating the PulsarClient connection and then the Pulsar topic producer

The most important thing to note here is the use of a JSON Schema based on the ObjectLocation class. This tells Pulsar the exact schema of the object being sent. Pulsar will expect the message to match the specified JSON Schema. If the message doesn't match the schema exactly, you will receive an error message.

public Flight(String flightID, String aircraftType) {

try {

// Initialize our location

Date now = new Date();

objLoc = new ObjectLocation(flightID, aircraftType, 0.0, 0.0,

now.getTime());

// Create client object

client = PulsarClient.builder()

.serviceUrl(BROKER_SERVICE_URL)

.authentication(

AuthenticationFactory.token(Credentials.token)

)

.build();

 

// Create producer on a topic

producer = client.newProducer(JSONSchema.of(ObjectLocation.class))

.topic("persistent://" + STREAM_NAME + "/" + NAMESPACE + "/" + TOPIC)

.create();

} catch(Exception ex) {

System.out.println(ex.getMessage());

}

}

No messages are sent to the topic until the run() method is invoked. Here is the run() method implementation:

public void run() {

// Send a message to the topic

try {

producer.send(objLoc);

System.out.println(objLoc.toString());

Date now = new Date();

updatePosition(objLoc);

objLoc.setTs(now.getTime());

} catch(PulsarClientException pcex) {

pcex.printStackTrace();

}

}

The producer.send(objLoc) takes a native Java POJO that watches the schema expected and sends it over the topic. Note that you don’t have to serialize your object. The Pulsar libraries are smart enough to take care of that for you! Also, the very first time you run this code (which we will do next), Astra Streaming will record the schema for the message type. You can view that schema by navigating to your topic and clicking on the Schema tab, as shown next:

Code showing the topic schema

 

Figure 17. Viewing a topic schema.

Seeing it in action

If you load the project up in an editor like VS Code, you can run the App class to see the application in action. Before you do, you need to create your Credentials.java class. This file should be in the same folder as the App.java file. Your Credentials.java file should look like the following:

package com.datastax.pulsar;

public class Credentials {

public static String token = "YOUR TOKEN INFO HERE";

}

Where do you get your token info? Well, go to the dashboard for your airport-events streaming tenant and click on the Connect tab. Locate and press the Download client.conf button.

Screenshot showing how to download the client configuration

 

Figure 18. Downloading your client configuration for Astra Streaming

Open the client.conf button that you downloaded and copy just the token value (not including the token: prefix!). Paste that value into the token variable in your Credentials.java file and you're ready to go!

Code showing the event producer

Now you can run the App.java class and see output similar to the following:

Output from running the App.java class

 

Figure 19: Events generated by the producer.

From the output above, we can see that the producer is generating events/messages on our topic. Now let’s check our database tables to see the recorded data. I’m going to use the CQLShell window on the Astra website to keep things simple. Let’s start by looking at the object_location table.

Find the single record in the object_location table

 

Figure 20. There should be a single record in your object_location table.

Remember, the purpose of this table is to record the last known location of an object, a Boeing 737 in this case. Your X and Y coordinates will vary depending on when you stopped the application from creating messages.

Now let’s take a look at our object_location_history table:

Screen showing the object_location_history table data

 

Figure 21. Our object_location_history table’s data.

Try it yourself!

As you can see, making real use of Astra Streaming is easy to do. Regardless of several screenshots and the level of detail provided here, building this application involves just a few simple steps:

  • Create a Database
  • Create the object_location table
  • Create the object_loction_history table.
  • Create the custom role (optional)
  • Generate a token for the database
  • Create a Streaming Tenant
  • Create the airport namespace
  • TwitterCreate the object_location topic
  • Create the object_location sink
  • Create the objLoc-history sink
  • Create a Java Topic Producer

That’s all there is to it! Now you have a recipe for sending and receiving event objects via Astra Streaming and storing them in an Astra DB

There’s never been a better time to start building streaming applications, and now with Astra Streaming, it’s never been easier. Try this code yourself by creating your free Astra account with no credit card required. Your Astra account will work for both Astra DB and Astra Streaming. When you sign up, you’ll get $25.00 worth of free credits each month in perpetuity! That’s enough to cover 30 million read requests, four million write requests, and 40GB of data storage. 

Pro Tip - Use Public Visibility

One quick tip that tripped me up while writing this: out of habit I made my class attributes private instead of public. As a result, I was getting error messages about providing NULLs for the primary keys. Why? Because when the code was introspecting the serialized objects being passed in, private variables are not serialized! To see an example of how your objects should be defined, take a look at the ObjectLocation.java file. Once I changed these member variables to be public, everything worked fine.

Resources

  1. DataStax Astra DB
  2. DataStax Astra Streaming
  3. Apache Pulsar
  4. GitHub repo for this project
  5. DataStax Tech Blog
  6. DataStax Developers YouTube Channel
  7. DataStax Developers on Twitter
Discover more
DataStax Astra DB
Share

One-stop Data API for Production GenAI

Astra DB gives JavaScript developers a complete data API and out-of-the-box integrations that make it easier to build production RAG apps with high relevancy and low latency.