DataStax Blog

Scalable Inventory

By Matt Stump -  June 11, 2015 | 0 Comments

This example demonstrates best practices for the creation of a robust, linear scalable inventory management system built using distributed systems. Both robustness and scalability are achieved by avoiding shared mutable state. The approach is to partition the inventory by SKU, and then isolate SKU access by leveraging partitionable queues.

Schema

The Cassandra schema below is an example consisting of the tables inventory.counts to track actual counts, and inventory.log tracks history. The example assumes some knowledge of Cassandra modeling and query patterns.


CREATE KEYSPACE IF NOT EXISTS inventory
   WITH replication = { 'class' : 'SimpleStrategy',
                        'replication_factor' : 1 };
CREATE TABLE IF NOT EXISTS inventory.counts(
   sku text,
   store text,
   type text,
   count int,
   PRIMARY KEY(sku, store, type));
   
CREATE TABLE IF NOT EXISTS inventory.log(
   sku text,
   store text,
   time timeuuid,
   type text,
   count int,
   data map<text,text>,
   PRIMARY KEY((sku, store), time, type))
   WITH default_time_to_live = 15552000 AND
   compaction = {'class': 'DateTieredCompactionStrategy',
                 'base_time_seconds':'3600',
                 'max_sstable_age_days':'14'};

Sample records


INSERT INTO inventory.counts(sku, store, type, count)
   VALUES('prd-1833080', 'redwoodcity-1389', 'demand', 1);
INSERT INTO inventory.counts(sku, store, type, count)
   VALUES('prd-1833080', 'redwoodcity-1389', 'available', 12);

Partition Key and Counts

In the sample above the partition key is SKU and the cluster columns are store and type. Using SKU as the partition key serves multiple purposes. By using the SKU as the partition key it allows the service to obtain all counts for a SKU across all stores and distribution channels to gain full visibility of demand and availability.

SELECT * FROM inventory.counts WHERE sku='prd-1833080';

The second benefit for using SKU as the partition key is that isolation for lightweight or PAXOS transactions (LWT) in Cassandra is done at the partition level. Although this example doesn’t require synchronization, it is available if necessary provided that both the inventory demand, and availability are clustered within the same partition.


BEGIN BATCH
UPDATE inventory.counts SET count = 11
   WHERE sku='prd-1833080'
   AND store='redwoodcity-1389'
   AND type='available'
   IF count = 12;
UPDATE inventory.counts SET count = 2
   WHERE sku='prd-1833080'
   AND store='redwoodcity-1389'
   AND type='demand';
APPLY BATCH;

Transaction Log

The addition of a transaction log table as demonstrated by inventory.log allows us to log every mutation of the store counters for a SKU which allows us to perform auditing, analytics, and order reconciliation.


BEGIN UNLOGGED BATCH
INSERT INTO inventory.log(sku, store, time, type, count, data)
   VALUES(
       'prd-1833080',
       'redwoodcity-1389',
       fce96128-088c-11e5-a6c0-1697f925ec7b,
       'available', 11, {'order':'1234567'});       
INSERT INTO inventory.log(sku, store, time, type, count, data)
   VALUES(
       'prd-1833080',
       'redwoodcity-1389',
       fce96128-088c-11e5-a6c0-1697f925ec7b,
       'demand', 2, {'order':'1234567'});
APPLY BATCH;

The design of the transaction log contains two additional features, the first being the inclusion of a default TTL which causes data to be automatically removed after in this instance 180 days. The second feature is the utilization of the DateTieredCompactionStrategy. DateTieredCompactionStrategy performs optimizations for data which is written in an append only fashion reducing the load required to store and manage the transaction log and increase data density.

Request Flow and Concurrency Management

There are two ways to manage concurrency or synchronization in a multi-process system. The first method is to use synchronization primitives in the form of exclusive locks. Locks are expensive in distributed systems because they require the coordination of multiple machines across the network. Because of this coordination overhead locks should be generally avoided.

An alternative to locks is to use the chord pattern to ensure that only a single thread of execution is mutating a count at any given moment. Using this model concurrency isolation is achieved by assigning all modifications to a given SKU to a single threaded worker through consistent hashing.

The Moving Pieces

Two classes of workers are required, one for servicing HTTP requests at the API interface, these shall be referred to HTTP workers. The second class of workers are those that perform the actual increment and decrement operations within the database, and shall be referred to as inventory service workers.

In order to ensure linear scalability and concurrency isolation using this pattern a requirement is the use of a partitionable queue such as Kafka, NSQ, RabbitMQ, or TIBCO.

Queue Partitioning

Two queues are utilized by this pattern, one for inventory worker requests and one for responses so that the workers may communicate with the HTTP workers in an asynchronous fashion. The queue is partitioned using the maximum hypothetical workers, in this instance we’ll assume 256 for the inventory workers, and 100 for the HTTP workers.

The inventory service workers are assigned partitions of the queue such that no two workers are assigned the same partition. However, a worker may service multiple partitions.

Each of the HTTP workers have a dedicated queue partition. After the request is serviced by the inventory worker response messages are placed into the response queue in the partition for the corresponding originally requesting HTTP worker.

Request Flow

When a request is made the HTTP API makes a corresponding decrement request to the service layer through the queue. The HTTP front end then creates a future so that it may continue to service additional requests.

Example Request


{
   "http_worker": 11,
   "sku":"prd-1833080",
   "store":"redwoodcity-1389",
   "count": 1,
   "timeuuid": "fce96128-088c-11e5-a6c0-1697f925ec7b"
   "order": 1234567
   "http_session": "19804eef-398a-4d37-b5bf-5e2542f2450b"
}

The decrement request is assigned to queue partition by taking the hash of the SKU, modulo the number of queue partitions.


var queueParitions = 256
var partition = (HashFunction('prd-1833080') % queueParitions)

Inventory workers fetch requests from the queue in a serial fashion, determine if the request can be fulfilled, apply any mutations to the counters as demonstrated above and generate a response. The response is placed into the response queue using the partition for the requesting HTTP worker.

Request.png

The HTTP worker then fetches the response message from the queue, locates the corresponding future and responds to the HTTP request.

Response.png

Scalability and Reliability

By separating responsibilities into two classes of workers which communicate through the queue each side of the system is able to scale independently in a linear fashion only bounded by the total throughput of the queue and the Cassandra cluster. The data model within Cassandra is isolated at the SKU allowing the cluster to scale linearly with SKU and node count.

Because all state is either managed by the queue or Cassandra the workers are allowed to remain stateless and in the event of a process crash the worker may simply be restarted and it will begin servicing the assigned queues. At most in the event of a crash a single request will be lost, but this can be mitigated by either using a transactional queue or queue with high water mark which would provide that each request is serviced with the “at least once” guarantee.



Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>