Loading Kafka Data into Greenplum
Loading Kafka Data into Greenplum
Apache Kafka is a fault-tolerant, low-latency, distributed publish-subscribe message system. The Greenplum Streaming Server supports loading Kafka data from the Apache and Confluent Kafka distributions. Refer to the Apache Kafka Documentation for more information about Apache Kafka.
A Kafka message may include a key and a value. Kafka stores streams of messages (or records) in categories called topics. A Kafka producer publishes records to partitions in one or more topics. A Kafka consumer subscribes to a topic and receives records in the order that they were sent within a given Kafka partition. Kafka does not guarantee the order of data originating from different Kafka partitions.
You can use the gpsscli or gpkafka load utilities to load Kafka data into Greenplum Database.
Both the gpsscli and the gpkafka load utilities are a Kafka consumer. They ingest streaming data from a single Kafka topic, using Greenplum Database readable external tables to transform and insert or update the data into a target Greenplum table. You identify the Kafka source, data format, and the Greenplum connection options and target table definition in a YAML-formatted load configuration file that you provide to the utility. In the case of user interrupt or exit, the utility resumes a subsequent data load operation specifying the same Kafka topic and target Greenplum Database table names from the last recorded offset.
Requirements
PROPERTIES: api.version.request: false broker.version.fallback: 0.8.2.1
Load Procedure
You will perform the following tasks when you use the Greenplum Streaming Server to load Kafka data into a Greenplum Database table:
- Ensure that you meet the Prerequisites.
- Register the Greenplum Streaming Server extension.
- Identify the format of the Kafka data.
- (Optional) Register custom data formatters.
- Construct the load configuration file.
- Create the target Greenplum Database table.
- Assign Greenplum Database role permissions to the table, if required.
- Run the gpkafka load command to load the Kafka data into Greenplum Database.
- Check the progress of the load operation.
- Check for load errors. (Note that the naming format for gpkafka log files is gpkafka_date.log.)
Prerequisites
Before using the gpkafka utilities to load Kafka data to Greenplum Database, ensure that you:
- Meet the Prerequisites documented for the Greenplum Streaming Server.
- Have access to a running Kafka cluster with ZooKeeper, and that you can identify the hostname(s) and port number(s) of the Kafka broker(s) serving the data.
- Can identify the Kafka topic of interest.
- Can run the command on a host that has connectivity to:
- Each Kafka broker host in the Kafka cluster.
- The Greenplum Database master and all segment hosts.
About Supported Kafka Message Data Formats
The Greenplum Streaming Server supports Kafka message key and value data in the following formats:
Format | Description |
---|---|
avro | Avro-format data. gpkafka supports:
In both cases, gpkafka reads Avro data from Kafka only as a single JSON-type column. gpkafka supports libz-, lzma- and snappy-compressed Avro data from Kafka. |
binary | Binary format data. gpkafka reads binary data from Kafka only as a single bytea-type column. |
csv | Comma-delimited text format data. |
custom | Data of a custom format, parsed by a custom formatter. |
delimited | Text data separated by a configurable delimiter. |
json | JSON- or JSONB-format data. gpkafka
reads JSON data from Kafka only as a single column.
Note: GPSS supports JSONB-format data only when loading to
Greenplum 6.
|
To write Kafka message data into a Greenplum Database table, you must identify the data format in the load configuration file.
Avro
Specify the avro format when your Kafka message data is a single-object encoded Avro file or you are using the Confluent Schema Registry to load Avro message key and/or value data. gpkafka reads Avro data from Kafka and loads it into a single JSON-type column. You must define a mapping if you want gpkafka to write the data into specific columns in the target Greenplum Database table.
Binary
Use the binary format when your Kafka message data is a stream of bytes. gpkafka reads binary data from Kafka and loads it into a single bytea-type column.
CSV
Use the csv format when your Kafka message data is comma-delimited text and conforms to RFC 4180. The message content may not contain line ending characters (CR and LF).
Data in csv format may appear in Kafka messages as follows:
"1313131","12","backorder","1313.13" "3535353","11","shipped","761.35" "7979797","11","partial","18.72"
Custom
The Greenplum Streaming Server provides a custom data formatter plug-in framework for Kafka messages using user-defined functions. The type of Kafka message data supported by a custom formatter is formatter-specific. For example, a custom formatter may support compressed or complex data.
Delimited Text
The Greenplum Streaming Server supports loading Kafka data delimited by one or more characters that you specify. Use the delimited format for such data. The delimiter may be a multi-byte value and up to 32 bytes in length. You cannot specify a quote or an escape character in the delimiter.
Sample data using a pipe ('|') delimiter character follows:
1313131|12|backorder|1313.13 3535353|11|shipped|761.35 7979797|11|partial|18.72
JSON
Specify the json format when your Kafka message data is in JSON or JSONB format. gpkafka reads JSON data from Kafka only as a single column. You must define a mapping if you want gpkafka to write the data into specific columns in the target Greenplum Database table.
Sample JSON message data:
{ "cust_id": 1313131, "month": 12, "amount_paid":1313.13 } { "cust_id": 3535353, "month": 11, "amount_paid":761.35 } { "cust_id": 7979797, "month": 11, "amount_paid":18.82 }
Registering a Custom Formatter
A custom data formatter for Kafka messages is a user-defined function. If you are using a custom formatter, you must create and register the formatter function in each database in which you will use it to write Kafka data to Greenplum tables.
Constructing the gpkafka.yaml Configuration File
You configure a data load operation from Kafka to Greenplum Database via a YAML-formatted configuration file. This configuration file includes parameters that identify the source Kafka data and information about the Greenplum Database connection and target table, as well as error and commit thresholds for the operation.
The Greenplum Streaming Server supports three versions of the YAML configuration file: version 1 (deprecated), version 2, and version 3 (Beta). Versions 2 and 3 of the configuration file format supports all features of Version 1 of the configuration file, and introduce support for loading both the Kafka message key and value to Greenplum, as well as loading meta data.
Refer to the gpkafka.yaml reference page for Version 1 configuration file contents and syntax. Refer to the gpkafka-v2.yaml reference page for Version 2 configuration file format and the configuration parameters that this version supports. gpkafka-v3.yaml describes the Version 3 (Beta) format.
Contents of a sample gpkafka Version 2 YAML configuration file named loadcfg2.yaml follows:
DATABASE: ops USER: gpadmin PASSWORD: changeme HOST: mdw-1 PORT: 5432 VERSION: 2 KAFKA: INPUT: SOURCE: BROKERS: kbrokerhost1:9092 TOPIC: customer_expenses2 PARTITIONS: (1, 2...4, 7) VALUE: COLUMNS: - NAME: c1 TYPE: json FORMAT: avro AVRO_OPTION: SCHEMA_REGISTRY_ADDR: http://localhost:8081 KEY: COLUMNS: - NAME: key TYPE: json FORMAT: avro AVRO_OPTION: SCHEMA_REGISTRY_ADDR: http://localhost:8081 FILTER: (c1->>'month')::int = 11 ERROR_LIMIT: 25 OUTPUT: SCHEMA: payables TABLE: expenses2 MAPPING: - NAME: customer_id EXPRESSION: (c1->>'cust_id')::int - NAME: newcust EXPRESSION: ((c1->>'cust_id')::int > 5000000)::boolean - NAME: expenses EXPRESSION: (c1->>'expenses')::decimal - NAME: tax_due EXPRESSION: ((c1->>'expenses')::decimal * .075)::decimal METADATA: SCHEMA: gpkafka_internal COMMIT: MINIMAL_INTERVAL: 2000
Greenplum Database Options (Version 2-Focused)
You identify the Greenplum Database connection options via the DATABASE, USER, PASSWORD, HOST, and PORT parameters.
The VERSION parameter identifies the version of the gpkafka YAML configuration file. The default version is Version 1.
KAFKA:INPUT Options
Specify the Kafka brokers and topic of interest using the SOURCE block. You must create the Kafka topic prior to loading data. By default, GPSS reads Kafka messages from all partitions. You may specify a single, a comma-separated list, and/or a range of partition numbers to restrict the partitions from which GPSS reads messages. The PARTITIONS property is supported only for version 2 and 3 load configuration file formats.
When you provide a VALUE block, you must specify the COLUMNS and FORMAT parameters. The VALUE:COLUMNS block includes the name and type of each data element in the Kafka message. The default source-to-target data mapping behaviour of gpkafka is to match a column name as defined in COLUMNS:NAME with a column name in the target Greenplum Database OUTPUT:TABLE:
- You must identify the Kafka data elements in the order in which they appear in the Kafka message.
- You may specify NAME: __IGNORED__ to omit a Kafka message value data element from the load operation.
- You must provide the same name for each non-ignored Kafka data element and its associated Greenplum Database table column.
- You must specify an equivalent data type for each non-ignored Kafka data element and its associated Greenplum Database table column.
The VALUE:FORMAT keyword identifies the format of the Kafka message value. gpkafka supports comma-delimited text format (csv ) and data that is separated by a configurable delimiter (delimited). gpkafka also supports binary (binary), JSON/JSONB (json), custom (custom), and Avro (avro) format value data.
- topic - text
- partition - int
- offset - bigint
When you provide a KEY block, you must specify the COLUMNS and FORMAT parameters. The KEY:COLUMNS block includes the name and type of each element of the Kafka message key, and is subject to the same restrictions as identified for VALUE:COLUMNS above. The KEY:FORMAT keyword identifies the format of the Kafka message key. gpkafka supports avro, binary, csv, custom, delimited, and json format key data.
The FILTER parameter identifies a filter to apply to the Kafka input messages before the data is loaded into Greenplum Database. If the filter evaluates to true, gpkafka loads the message. The message is dropped if the filter evaluates to false. The filter string must be a valid SQL conditional expression and may reference one or more KEY or VALUE column names.
The ERROR_LIMIT parameter identifies the number of errors or the error percentage threshold after which gpkafka should exit the load operation. The default ERROR_LIMIT is zero; the load operation is stopped when the first error is encountered.
KAFKA:OUTPUT Options
You identify the target Greenplum Database schema name and table name via the KAFKA:OUTPUT: SCHEMA and TABLE parameters. You must pre-create the Greenplum Database table before you attempt to load Kafka data.
The default load mode is to insert Kafka data into the Greenplum Database table. gpkafka also supports updating and merging Kafka message data into a Greenplum table. You specify the load MODE, the MATCH_COLUMNS and UPDATE_COLUMNS, and any UPDATE_CONDITIONs that must be met to merge or update the data. In MERGE MODE, you can also specify ORDER_COLUMNS to filter out duplicates and a DELETE_CONDITION.
About the Merge Load Mode
MERGE mode is similar to an UPSERT operation; gpkafka may insert new rows in the database, or may update an existing database row that satisfies match and update conditions. gpkafka deletes rows in MERGE mode when the data satisfies an optional DELETE_CONDITION that you specify.
gpkafka stages a merge operation in a temporary table, generating the SQL to populate the temp table from the set of OUTPUT configuration properties that you provide.
gpkafka uses the following algorithm for MERGE mode processing:
- Create a temporary table like the target table.
- Generate the SQL to insert the source data into the temporary table.
- Add the MAPPINGS.
- Add the FILTER.
- Use MATCH_COLUMNS and ORDER_COLUMNS to filter out duplicates.
- Update the target table from rows in the temporary table that satisfy MATCH_COLUMNS, UPDATE_COLUMNS, and UPDATE_CONDITION.
- Insert non-matching rows into the target table.
- Delete rows in the target table that satisfy MATCH_COLUMNS and the DELETE_CONDITION.
- Truncate the temporary table.
Other Options
The KAFKA:METADATA:SCHEMA parameter specifies the name of the Greenplum Database schema in which gpkafka creates external and history tables.
gpkafka commits Kafka data to the Greenplum Database table at the row and/or time intervals that you specify in the KAFKA:COMMIT: MAX_ROW and/or MINIMAL_INTERVAL parameters. You must specify at least one of these parameters.
You can configure gpkafka to execute a task (user-defined function or SQL commands) after GPSS reads a configurable number of batches from Kafka. Use the KAFKA:TASK: POST_BATCH_SQL and BATCH_INTERVAL configuration parameters to specify the task and the batch interval.
Specify a KAFKA:PROPERTIES block to set Kafka consumer configuration properties. gpkafka sends the property names and values to Kafka when it instantiates a consumer for the load operation.
About KEYs, VALUEs, and FORMATs
You can specify any data format in the Version 2 configuration file KEY:FORMAT and VALUE:FORMAT parameters, with some restrictions. The Greenplum Streaming Server supports the following KEY:FORMAT and VALUE:FORMAT combinations:
KEY:FORMAT | VALUE:FORMAT | Description |
---|---|---|
any | none (VALUE block omitted) | gpkafka loads only the Kafka message key data, subject to any MAPPING that you specify, to Greenplum Database. |
none (KEY block omitted) | any | Equivalent to gpkafka configuration file Version 1. gpkafka ignores the Kafka message key and loads only the Kafka message value data, subject to any MAPPING that you specify, to Greenplum Database. |
csv | any | Not permitted. |
any | csv | Not permitted. |
avro, binary, delimited, or json | avro, binary, delimited, or json | Any combination is permitted. gpkafka loads both the Kafka message key and value data, subject to any MAPPING that you specify, to Greenplum Database. |
About Transforming and Mapping Kafka Input Data
You can define a MAPPING between the Kafka input data (VALUE:COLUMNS, KEY:COLUMNS, and META:COLUMNS) and the columns in the target Greenplum Database table. Defining a mapping may be useful when you have a multi-field input column (such as a JSON-type column), and you want to assign individual components of the input field to specific columns in the target table.
You might also use a MAPPING to assign a value expression to a target table column. The expression must be one that you could specify in the SELECT list of a query, and can include a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so forth.
If you choose to map more than one input column in an expression, you can can create a user-defined function to parse and transform the input column and return the columns of interest.
For example, suppose a Kafka producer emits the following JSON messages to a topic:
{ "customer_id": 1313131, "some_intfield": 12 } { "customer_id": 77, "some_intfield": 7 } { "customer_id": 1234, "some_intfield": 56 }
You could define a user-defined function, udf_parse_json(), to parse the data as follows:
=> CREATE OR REPLACE FUNCTION udf_parse_json(value json) RETURNS TABLE (x int, y text) LANGUAGE plpgsql AS $$ BEGIN RETURN query SELECT ((value->>'customer_id')::int), ((value->>'some_intfield')::text); END $$;
This function returns the two fields in each JSON record, casting the fields to integer and text, respectively.
An example MAPPING for the topic data in a JSON-type KAFKA:INPUT:COLUMNS named jdata follows:
MAPPING: cust_id: (jdata->>'customer_id') field2: ((jdata->>'some_intfield') * .075)::decimal j1, j2: (udf_parse_json(jdata)).*
The Greenplum Database table definition for this example scenario is:
=> CREATE TABLE t1map( cust_id int, field2 decimal(7,2), j1 int, j2 text );
About Mapping Avro Bytes Fields to Base64-Encoded Strings
When you specify AVRO_OPTION:BYTES_TO_BASE64, GPSS maps Avro bytes fields to base64-encoded strings. You can provide a MAPPING to decode these strings and write the data to a Greenplum bytea column.
For example, if the Avro schema is:
{ "type": "record", "name": "bytes_test", "fields": [ {"name": "id", "type": "long"}, {"name": "string", "type": "string"}, {"name": "bytes", "type": "bytes"}, { "name": "inner_record", "type": { "type": "map", "values": { "type": "bytes", "name": "nested_bytes" } } } ] }
And if your load configuration file includes these input property settings:
VALUE: COLUMNS: - NAME: c1 TYPE: json FORMAT: avro AVRO_OPTION: SCHEMA_REGISTRY_ADDR: http://localhost:8081 BYTES_TO_BASE64: true
You can define a MAPPING to decode the encoded strings as follows:
MAPPING: - NAME: id EXPRESSION: (c1->>'id')::int - NAME: bytes1 EXPRESSION: (decode(c1->>'bytes', 'base64')) - NAME: bytes2 EXPRESSION: (decode((c1->>'inner_record')::json->>'nested_bytes', 'base64'))
This mapping decodes the bytes1 and bytes2 fields to the Greenplum bytea data type. GPSS would expect to load these mapped fields to a Greenplum table with the following definition:
CREATE TABLE avbyte( id int, bytes1 bytea, bytes2 bytea);
Creating the Greenplum Table
You must pre-create the Greenplum table before you load Kafka data into Greenplum Database. You use the KAFKA:OUTPUT: SCHEMA and TABLE load configuration file parameters to identify the schema and table names.
The target Greenplum table definition must include each column that gpkafka will load into the table. The table definition may include additional columns; gpkafka ignores these columns, and loads no data into them.
The name and data type that you specify for a column of the target Greenplum Database table must match the name and data type of the related, non-ignored Kafka message element. If you have defined a column mapping, the name of the Greenplum Database column must match the target column name that you specified for the mapping, and the type must match the target column type or expression that you define.
The CREATE TABLE command for the target Greenplum Database table receiving the Kafka topic data defined in the loadcfg2.yaml file presented in the Constructing the gpkafka.yaml Configuration File section follows:
testdb=# CREATE TABLE payables.expenses2( customer_id int8, newcust bool, expenses decimal(9,2), tax_due decimal(7,2) );
Running the gpkafka load Command
When you run gpkafka load, the command submits, starts, and stops a GPSS job on your behalf.
VMware recommends that you migrate to using the GPSS utilities directly.
You run the gpkafka load command to load Kafka data to Greenplum. When you run the command, you provide the name of the configuration file that defines the parameters of the load operation. For example:
$ gpkafka load loadcfg2.yaml
The default mode of operation for gpkafka load is to read all pending messages and then to wait for, and then consume, new Kafka messages. When running in this mode, gpkafka load waits indefinitely; you can interrupt and exit the command with Control-c.
To run the command in batch mode, you provide the --quit-at-eof option. In this mode, gpkafka load exits when there are no new messages in the Kafka stream.
gpkafka load resumes a subsequent data load operation specifying the same Kafka topic and target Greenplum Database table names from the last recorded offset.
Refer to the gpkafka load reference page for additional information about this command.
Configuring the gpfdist Server Instance
Or, you may choose to provide gpfdist host or port configuration settings on the gpkafka load command line by specifying the --gpfdist-host hostaddr or --gpfdist-port portnum options to the command. Any options that you specify on the command line override settings provided in the gpfdistconfig.json file.
About Kafka Offsets, Message Retention, and Loading
Kafka maintains a partitioned log for each topic, assigning each record/message within a partition a unique sequential id number. This id is referred to as an offset. Kafka retains, for each gpkafka load invocation specifying the same Kafka topic and Greenplum Database table names, the last offset within the log consumed by the load operation. The Greenplum Streaming Server also records this offset value.
Kafka persists a message for a configurable retention time period and/or log size, after which it purges messages from the log. Kafka topics or messages can also be purged on demand. This may result in an offset mismatch between Kafka and the Greenplum Streaming Server.
gpkafka load returns an error if its recorded offset for the Kafka topic and Greenplum Database table combination is behind that of the current earliest Kafka message offset for the topic, or when the earliest and latest offsets do not match.
When you receive one of these messages, you can choose to:
- Resume the load operation from the earliest available message published
to the topic by specifying the --force‑reset‑earliest
option to gpkafka load:
$ gpkafka load --force-reset-earliest loadcfg2.yaml
- Load only new messages published to the Kafka topic, by specifying the
‑‑force‑reset‑latest option with
the command:
$ gpkafka load --force-reset-latest loadcfg2.yaml
- Load messages published since a specific timestamp (milliseconds since
epoch), by specifying the --force‑reset‑timestamp
option to gpkafka load. To determine the create time
epoch timestamp for a Kafka message, run the Kafka console consumer on the
topic specifying the --property print.timestamp=true
option, and review the output. You can also use a converter such as
EpocConverter
to convert a human-readable date to epoch time.
$ gpkafka load --force-reset-timestamp 1571066212000 loadcfg2.yaml
Checking the Progress of a Load Operation
- If you are loading Kafka data to Greenplum with the gpkafka load command, GPSS writes the progress log file to the directory that you specified with the -l | --log-dir option to the command, or to the $HOME/gpAdminLogs directory.
- If you are loading Kafka data to Greenplum with the gpsscli commands, GPSS writes the progress log file to the directory that you specified with the -l | --log-dir option when you started the GPSS server instance, or to the $HOME/gpAdminLogs directory.
A progress log file includes information and statistics about the load time, data size, and speed. It also includes the number of rows written to the Greenplum table, and the number of rows rejected by Greenplum.
A progress log file includes the following header row:
timestamp,pid,level,batch_id,start_time,end_time,total_byte,speed,total_read_count,inserted_rows,rejected_rows
Example Kafka progress log message:
20200804 10:17:00.52827,101417,info,1,2020-08-04 17:16:33.421+00,2020-08-04 17:17:00.497+00,79712,2.88KB,997,991,6