New options and better performance in cqlsh copy
date: January 17, 2016
COPY FROM and COPY TO are cqlsh commands for importing and exporting data to/from Cassandra using csv files. They were introduced in version 1.1.3 and were previously discussed here. These commands were enhanced in the latest releases with the following new features:
- CASSANDRA-9302 improves the performance of COPY FROM with batched prepared statements that are sent to the correct replica via token aware routing.
- CASSANDRA-9304 improves the performance of COPY TO with token aware multi-process data export.
- CASSANDRA-9303 introduces new options to fine-tune COPY operations to specific use cases.
We will review these new features in this post; they will be available in the following cassandra releases: 2.1.13, 2.2.5, 3.0.3 and 3.2.
Here is a brief reminder on how to use COPY commands in cqlsh:
COPY table_name ( column, ...) FROM ( 'file_pattern1, file_pattern2, ...' | STDIN ) WITH option = 'value' AND ... COPY table_name ( column , ... ) TO ( 'file_name' | STDOUT ) WITH option = 'value' AND ...
File patterns are either file names or valid Python glob expressions such as folder/*.csv and so forth. You can use file patterns to import multiple files. Refer to the examples at the end of this post.
How it works
COPY commands launch worker processes that perform the actual import and export in parallel; one worker process per CPU core will be launched but one core is reserved to the parent process and the maximum number of worker processes is 16. This number can be changed with the new option NUMPROCESSES and set to any value with no maximum. Worker and parent processes communicate via Python queues.
When exporting data, the ring tokens are first retrieved in order to split the ring into token ranges. These ranges are passed to worker processes, which send one export request per token range to one of the replicas where the range is located. The results are finally concatenated in the csv output file by the parent process. For this reason, the export order across partitions is not deterministic. Parallel export via token ranges is only available for the random and murmur3 partitioners. For other partitioners the behaviour of COPY TO has not changed: the entire data set is retrieved from the server to which cqlsh is connected to.
When importing data, the parent process reads from the input file(s) chunks with *CHUNKSIZE* rows and sends each chunk to a worker process. Each worker process then analyses a chunk for rows with common partition keys. If at least 2 rows with the same partition key are found, they are batched and sent to a replica that owns the partition. You can control the minimum number of rows with a new option, MINBATCHSIZE, but it is advisable to leave it set to 2. For rows that do not share any common partition key, they get batched with other rows whose partition key belong to a common replica. These rows are then split into batches of size MAXBATCHSIZE, currently 20 rows. These batches are sent to the replicas where the partitions are located. Batches are of type UNLOGGED in both cases.
The benchmark below compares release 2.2.3 (before the changes were implemented) and 2.2.5 (2.2 HEAD as of January 12 2016). The data was generated by inserting 1 million rows with cassandra stress into a 3 node cluster running locally on a laptop (8 Intel cores i7-4702HQ CPU @ 2.20GHz with 16 GB memory and hybrid HDD). The data is exported to a csv file using default options and then re-imported.
We can observe an improvement of 70% for importing and 1,280% for exporting data. The data in this benchmark is random and mostly with unique partition keys. Data that shares partition keys more consistently would have a substantially improved import rate.
The following options can be used for both COPY TO and COPY FROM.
These options existed already and have not changed:
- DELIMITER, a character used to separate fields in the input and output files. It defaults to ','.
- QUOTE, a character used to quote fields containing special characters, such as the delimiter. It defaults to '"'.
- ESCAPE, a character used to escape other characters such as the quoting character or the delimiter. It defaults to ‘\’.
- HEADER, a boolean indicating if the first line contains a header, which should be skipped when importing. When exporting, it indicates if the header should be printed. It defaults to false.
- NULL, a string that represents null values. It defaults to the empty string. When exporting values that are missing, they will be indicated by this string. When importing, if this string is encountered it is assumed the value should be imported as missing (null). You should set this string to some other character sequence if you need in to import blank strings.
- DATETIMEFORMAT, which used to be called TIMEFORMAT, a string containing the Python strftime format for date and time values, such as '%Y-%m-%d %H:%M:%S%z'. It defaults to the time_format value in cqlshrc.
These options are new:
- MAXATTEMPTS, an integer indicating the maximum number of attempts to import a batch or to export a range in case of a server error. Note that for some permanent errors, such as parse errors, no multiple attempts will be performed. It defaults to 5.
- REPORTFREQUENCY, a decimal number indicating the frequency in seconds with which status updates are displayed. It defaults to 0.25, four updates per second.
- DECIMALSEP, a character representing the separator for decimal values. It defaults to ‘.’. When this is specified, numbers will be formatted with this separator when exporting. When importing, numbers are parsed using this separator.
- THOUSANDSSEP, a character representing the thousands separator in digits. It defaults to empty. When this is specified, numbers will be formatted with this separator when exporting. When importing, numbers are parsed using this separator.
- BOOLSTYLE, a string containing two case insensitive words separated by a comma that represent boolean values. It defaults to ‘true, false’. Valid examples are ‘yes, no’ or ‘1, 0’ and so forth.
- NUMPROCESSES, an integer indicating the number of worker processes. It defaults to the number of cores minus one, capped at 16.
- CONFIGFILE, a string pointing to a configuration file with the same format as .cqlshrc (see the Python ConfigParser documentation). In this file you can specify other options under the following sections: [copy], [copy-to], [copy-from], [copy:ks.table], [copy-to:ks.table], [copy-from:ks.table], where ks is your keyspace name and table is your table name. Not all sections are mandatory and options are read from these sections in the order just specified. Command line options always override options in configuration files. Depending on the COPY direction, only the relevant copy-from or copy-to sections are used. If no configuration file is specified, then the default .cqlshrc is searched.
- RATEFILE, a string pointing to an optional file where output statistics will be printed. These are the same progress statistics that are normally displayed to standard output.
The following options are applicable to COPY TO.
This option existed already and has not changed:
- ENCODING, a string representing the encoding for the output files. It defaults to ‘utf8’.
These options are new:
- PAGESIZE, an integer indicating the page size for fetching results. It defaults to 1,000. The bigger the page size, the longer the page timeout should be. You may want to change this parameter if you have very large or very small partitions.
- PAGETIMEOUT, an integer indicating the timeout in seconds for fetching each page. It defaults to 10 seconds. Increase it for large page sizes or large partitions. If you notice timeouts, you should consider increasing this value. In case of server timeouts, there is an exponential backoff policy that kicks in automatically, so you may notice delays but this is to prevent overloading the server even further. The driver can also generate timeouts, in this case there is a small chance data may be missing or duplicated since the driver doesn't know if the server will later on drop the request or return a result. Increasing this value is very helpful to prevent driver generated timeouts.
- BEGINTOKEN, a string representing the minimum token to consider when exporting data. Records with smaller tokens will not be exported. It defaults to empty, which indicates no minimum token.
- ENDTOKEN, a string representing the maximum token to consider when exporting data. Records with bigger tokens will not be exported. It defaults to empty, which indicates no maximum token.
- MAXREQUESTS, an integer indicating the maximum number of in-flight requests each worker process can work on. The total level of parallelism when exporting data is given by the number of worker processes multiplied by this value. It defaults to 6. Each request will export the data of an entire token range.
- MAXOUTPUTSIZE, an integer indicating the maximum size of the output file measured in number of lines. Beyond this value, the output file will be split into segments. It defaults to -1, which indicates an unlimited maximum and therefore a unique output file.
The following options are applicable to COPY FROM, they are all new options:
- CHUNKSIZE, an integer indicating the size of chunks passed to worker processes. It defaults to 1,000. The parent process reads chunks from the input files and delivers them to worker processes. Worker processes will then sort individual chunks to group records with the same partition key, and failing this, belonging to the same replica. The bigger the chunk the higher the probability of successfully batching records by partition key, but also the higher the amount of memory used.
- INGESTRATE, an integer indicating an approximate ingest rate in rows per second. It defaults to 100,000. The parent process will not send additional chunks to worker processes if this rate is exceeded. Note that 100,000 may be higher than the maximum rate your cluster supports, in which case the ingest rate reported by the output statistics would be lower.
- MINBATCHSIZE, an integer indicating the minimum size of an import batch. It defaults to 2. If there are at least MINBATCHSIZE records with the same partition key in a chunk, then a batch with these records is sent. It makes sense to keep this parameter as low as 2 because each single partition corresponds to a single insert operation server side, so 2 rows would be inserted at the price of 1.
- MAXBATCHSIZE, an integer indicating the maximum size of an import batch. It defaults to 20. Records that do not have common partition keys in a chunk, end up in a left-overs group for each replica. These rows are then batched together up to this maximum number of rows. Even though batched rows are sent to the correct replica, a number too large here may cause timeouts or batch size warnings server side.
- MAXROWS, an integer indicating the maximum number of rows to be imported, It defaults to -1. When this number is a positive integer, data import stops when this number of rows is reached. If there is a header row and you've specified HEADER=TRUE, it does not count towards this maximum.
- SKIPROWS, an integer indicating the number of rows to skip. It defaults to 0, so no rows will be skipped. You can specify this number to skip an initial number of rows. If there is a header row and you've specified HEADER=TRUE, it does not count since it is always skipped.
- SKIPCOLS, a string containing a comma separated list of column names to skip. By default no columns are skipped. To specify which columns are in your input file you should use the COPY FROM syntax as usual, here you can specify columns that are in the file but that should not be imported.
- MAXPARSEERRORS, an integer indicating the maximum global number of parsing errors, It defaults to -1. When this number is a positive integer and the total number of parse errors reaches it, data import stops.
- MAXINSERTERRORS, an integer indicating the maximum global number of insert errors, It defaults to -1. When this number is a positive integer and the total number of server side insert errors reaches it, data import stops.
- ERRFILE, a string pointing to a file where all rows that could not be imported are saved. It defaults to import_ks_table.err where ks is your keyspace and table is your table name. You'll find in this file all rows that could not be imported for any reason. If an existing err file is found, it is renamed with a suffix that contains the current date and time.
- TTL, an integer indicating the time to live in seconds, by default data will not expire. Use this option to insert data that will expire after the specified number of seconds. This option is only available in cassandra 3.2.
Importing all csv files in folder1 and folder2:
COPY ks.table FROM folder1/*.csv, folder2/*.csv
Splitting an output file into segments of 1,000 lines each:
COPY ks.table TO table.csv WITH MAXOUTPUTSIZE=1000
Here is how to fix the ingest rate to 50,000 rows per second:
COPY ks.table FROM table.csv WITH INGESTRATE=50000
Here is how to add COPY options to the cqlshrc configuration file, taken from cqlshrc.sample:
; optional options for COPY TO and COPY FROM [copy] maxattempts=10 numprocesses=4 ; optional options for COPY FROM [copy-from] chunksize=5000 ingestrate=50000 ; optional options for COPY TO [copy-to] pagesize=2000 pagetimeout=20