gpkafka-v3.yaml (Beta)
gpkafka-v3.yaml (Beta)
GPSS load configuration file for a Kafka data source (version 3).
Synopsis
target: host: host port: greenplum_port user: user_name password: password database: db_name schema: schema_name table: table_name
source: kafka: topic: kafka_topic brokers: kafka_broker_host:broker_port %, ...% partitions: (partition_numbers) fallback_offset: earliest | latest key_content: data_format: column_spec other_props value_content: data_format: column_spec other_props meta: json: column: name: meta type: json rdkafka_prop: kafka_property_name: kafka_property_value ...
channel: gpdb_channel: mode: # specify a single mode property block (described below) insert: mode_specific_property: value ... update: mode_specific_property: value ... merge: mode_specific_property: value ... work_schema: work_schema_name error_limit: num_errors | percentage_errors window: batch: max_count: number_of_rows interval_ms: wait_time idle_duration_ms: idle_time window_size: num_batches window_statement: udf_or_sql_to_run mapping: target_column_name : source_column_name | expression ...
avro: source_column_name: column_name schema_url: http://schemareg_host:schemareg_port %, ...% bytes_to_base64: boolean
binary: source_column_name: column_name
csv: columns: - name: column_name type: column_data_type ...
custom: columns: - name: column_name type: column_data_type ... name: formatter_name options: - optname=optvalue ...
delimited: columns: - name: column_name type: column_data_type ... delimiter: delimiter_string
json: column: name: column_name type: json | jsonb
Where the mode_specific_propertys that you can specify for each mode follow:
insert: filter_expression: filter_string
update: filter_expression: filter_string match_columns: [match_column_names] order_columns: [order_column_names] update_columns: [update_column_names] update_condition: update_condition
merge: filter_expression: filter_string match_columns: [match_column_names] update_columns: [update_column_names] order_columns: [order_column_names] update_condition: update_condition delete_condition: delete_condition
Description
- Brackets [] are literal and are used to specify a list in version 3. They are no longer used to signify the optionality of a property.
- Curly braces {} are literal and are used to specify YAML mappings in version 3 syntax. They are no longer used with the pipe symbol (|) to identify a list of choices.
You specify load configuration properties for a Greenplum Streaming Server (GPSS) Kafka load job in a YAML-formatted configuration file. (This reference page uses the name gpkafka-v3.yaml when referring to this file; you may choose your own name for the file.) Load properties include Greenplum Database connection and data import properties, Kafka broker, topic, and message format information, and properies specific to the GPSS job.
The gpkafka load utility processes the YAML configuration file in order, using indentation (spaces) to determine the document hierarchy and the relationships between the sections. The use of white space in the file is significant. Keywords are not case-sensitive.
Keywords and Values
- host: host
- The host name or IP address of the Greenplum Database master host.
- port: greenplum_port
- The port number of the Greenplum Database server on the master host.
- user: user_name
- The name of the Greenplum Database user/role. This user_name must have permissions as described in the Configuring Greenplum Database Role Privileges.
- password: password
- The password for the Greenplum Database user/role.
- database: db_name
- The name of the Greenplum database.
- schema: schema_name
- The name of the Greenplum Database schema in which table_name resides. Optional, the default schema is the public schema.
- table: table_name
- The name of the Greenplum Database table into which GPSS loads the data.
- topic: kafka_topic
- The name of the Kafka topic from which to load data. The topic must exist.
- brokers: kafka_broker_host:broker_port
- A host and port number for each of one or more Kafka brokers.
- partitions: (partition_numbers)
- A single, a comma-separated list, and/or a range of partition numbers
from which GPSS reads messages from the Kafka topic. A range that you
specify with the M...N
syntax includes both the range start and end values. By default, GPSS
reads messages from all partitions of the Kafka topic.
Note: Ensure that you do not configure multiple jobs that specify overlapping partition numbers in the same topic; GPSS behavior is undefined.
- fallback_offset: earliest | latest
- Specifies the behaviour of GPSS when it detects a Kafka message offset gap. When set to earliest, GPSS automatically resumes a load operation from the earliest available published message. When set to latest, GPSS loads only new messages to the Kafka topic. If this property is not set, GPSS returns an error.
- key_content:
- The Kafka message data type, field names, and type-specific properties. You must specify all Kafka key elements in the order in which they appear in the Kafka message. Optional when you specify a value_content block; gpkafka ignores the Kafka message key in this circumstance.
- value_content:
- The Kafka message value data type, field names, and type-specific properties. You must specify all Kafka data elements in the order in which they appear in the Kafka message. Optional when you specify a key_content block; gpkafka ignores the Kafka message value in this circumstance.
- column_spec
- The source to Greenplum column mapping. The supported column specification differs for different data formats as described below.
- The default source-to-target data mapping behaviour of GPSS is to match a column name as defined in source_column_name, column:name, or columns:name with a column name in the target Greenplum Database table. You can override the default mapping by specifying a mapping: block.
- data_format
- The format of the Kafka message key or value data. You may
specify a data_format of avro,
binary, csv, custom,
delimited, or json for the
key and value, with some restrictions.
- avro
- When you specify the avro data format for a
key or value, GPSS reads the data into a single
json-type
column. You may specify a schema registery location and
optional SSL certificates and keys, and whether or not you
want GPSS to convert bytes
fields into base64-encoded strings.
- source_column_name: column_name
- The name of the single json-type column into which GPSS reads the Kafka key or value data.
- schema_url: schemareg_host:schemareg_port
- When you specify the avro format and the Avro schema of the JSON data that you want to load is registered in the Confluent Schema Registry, you must identify the host name and port number of each Confluent Schema Registry server in your Kafka cluster. You may specify more than one address, and at least one of the addresses must be legal.
- schema_ca_on_gpdb: sr_ca_file_path
- The file system path to the CA certificate that GPSS uses to verify the peer. This file must reside in sr_ca_file_path on all Greenplum Database segment hosts.
- schema_cert_on_gpdb: sr_cert_file_path
- The file system path to the client certificate that GPSS uses to connect to the HTTPS schema registry. This file must reside in sr_cert_file_path on all Greenplum Database segment hosts.
- schema_key_on_gpdb: sr_key_file_path
- The file system path to the private key file that GPSS uses to connect to the HTTPS schema registry. This file must reside in sr_key_file_path on all Greenplum Database segment hosts.
- schema_min_tls_version: minimum_version
- The minimum transport layer security (TLS) version that GPSS requests on the connection to the schema registry. Supported versions are 1.0, 1.1, 1.2, or 1.3. The default minimum TLS version is 1.0.
- schema_path_on_gpdb: path_to_file
- When you specify the avro format and
the Avro schema of the JSON key or value data that you
want to load is specified in a separate
.avsc file, you must identify the file
system location in path_to_file,
and the file must reside in this location on every
Greenplum Database segment host. Note: GPSS does not cache the schema. GPSS must reload the schema for every batch of Kafka data. Also, GPSS supports providing the schema for either the key or the value, but not both.
- bytes_to_base64: boolean
- When true, GPSS converts Avro bytes fields into base64-encoded strings. The default value is false, GPSS does not perform the conversion.
- binary
- When you specify the binary data format,
GPSS reads the data into a single bytea-type
column.
- source_column_name: column_name
- The name of the single bytea-type column into which GPSS reads the Kafka key or value data.
- csv
- When you specify the csv data format, GPSS
reads the data into the list of columns that you specify.
The message
content cannot contain line ending characters (CR and LF).
Note: You must not provide a value_content block when you specify csv format for the key_content block. Similarly, you must not provide a key_content block when you specify csv format for a value_content block.
- columns:
- A set of column name/type mappings. The value
[] specifies all columns.
- name: column_name
- The name of a key or value column. column_name must match the column name of the target Greenplum Database table.
- type: column_data_type
- The data type of the column. You must specify an equivalent data type for each Kafka message data element and the associated Greenplum Database table column.
- custom
- When you specify the custom data format,
GPSS uses the custom formatter that you specify to process
the input data before writing it to Greenplum Database.
- columns:
- A set of column name/type mappings. The value
[] specifies all columns.
- name: column_name
- The name of a key or value column. column_name must match the column name of the target Greenplum Database table.
- type: column_data_type
- The data type of the column. You must specify an equivalent data type for each Kafka message data element and the associated Greenplum Database table column.
- name: formatter_name
- When you specify the custom data format, formatter_name is required and must identify the name of the formatter user-defined function that GPSS should use when loading the data.
- options:
- A set of function argument name=value pairs.
- optname=optvalue
- The name and value of the set of arguments to pass into the formatter_name UDF.
- delimited
- When you specify the delimited data format,
GPSS reads the data into the list of columns that you specify.
You must specify the data delimiter.
- columns:
- A set of column name/type mappings. The value
[] specifies all columns.
- name: column_name
- The name of a key or value column. column_name must match the column name of the target Greenplum Database table.
- type: column_data_type
- The data type of the column. You must specify an equivalent data type for each Kafka message data element and the associated Greenplum Database table column.
- delimiter: delimiter_string
- Optional. When you specify the delimited data format, delimiter_string is required and must identify the Kafka message data element delimiter. delimiter_string may be a multi-byte value, and up to 32 bytes in length. It may not contain quote and escape characters.
- json
- When you specify the json data format, GPSS reads the data into a single json- or jsonb-type column of which you choose the name.
- meta:
- The data type and field name of the Kafka meta data.
meta: must specify the json
or jsonb (Greenplum 6 only) data format,
and a single json-type column. The available
Kafka meta data properties include:
- topic (text) - the Kafka topic name
- partition (int) - the partition number
- offset (bigint) - the record location within the partition
- rdkafka_prop:
- Kafka consumer configuration property names and values.
- kafka_property_name
- The name of a Kafka property.
- kafka_property_value
- The Kafka property value.
- mode:
- The table load mode; insert,
merge, or update. The default mode is
insert.Note: update and merge are not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.
- insert:
- Inserts source data into Greenplum.
- update:
- Updates the target table columns that are listed in update_columns when the input columns identified in match_columns match the named target table columns and the optional update_condition is true.
- merge:
- Inserts new rows and updates existing rows when:
- columns are listed in update_columns,
- the match_columns target table column values are equal to the input data, and
- an optional update_condition is specified and met.
- the match_columns target table column values are equal to the input data, and
- an optional delete_condition is specified and met.
- mode_property_name: value
- The name to value mapping for a mode property. Each
mode supports one or more of the following
properties as specified in the Synopsis.
- filter_expression: filter_string
- The filter to apply to the input data before GPSS loads the data into Greenplum Database. If the filter evaluates to true, GPSS loads the message. If the filter evaluates to false, the message is dropped. filter_string must be a valid SQL conditional expression and may reference one or more source value, key, or meta column names.
- match_columns: [match_column_names]
- A comma-separated list that specifies the column(s) to use as the join condition for the update. The attribute value in the specified target column(s) must be equal to that of the corresponding source data column(s) in order for the row to be updated in the target table.
- Required when mode is merge or update.
- order_columns: [order_column_names]
- A comma-separated list that specifies the column(s) by which GPSS sorts the rows. When multiple matching rows exist in a batch, order_columns is used with match_columns to determine the input row with the largest value; GPSS uses that row to write/update the target.
- Optional. May be specified in merge mode to sort the input data rows.
- update_columns: [update_column_names]
- A column-sparated list that specifies the column(s) to update for the rows that meet the match_columns criteria and the optional update_condition.
- Required when mode is merge or update.
- update_condition: update_condition
- Specifies a boolean condition, similar to that which you would declare in a WHERE clause, that must be met in order for a row in the target table to be updated (or inserted, in the case of a merge). Optional.
- delete_condition: delete_condition
- In merge mode, specifies a boolean condition, similar to that which you would declare in a WHERE clause, that must be met for GPSS to delete rows in the target table that meet the match_columns criteria. Optional.
- work_schema: work_schema_name
- The name of the Greenplum Database schema in which GPSS creates internal tables. The default work_schema_name is public.
- error_limit: num_errors | percentage_errors
- The error threshold, specified as either an absolute number or a percentage. GPSS stops running the job when this limit is reached.
- window:
- The batch size and commit window.
- batch:
- Controls how GPSS commits data to Greenplum
Database. You may specify both configuration properties
as long as both values are not zero (0). Try setting
and tuning interval_ms to your environment;
introduce a max_count setting only if you encounter
high memory usage associated with message buffering.
- max_count: number_of_rows
- The number of rows to batch before triggering an INSERT operation on the Greenplum Database table. The default value of max_count is 0, which instructs GPSS to ignore this commit trigger condition.
- interval_ms: wait_time
- The minimum amount of time to wait (milliseconds) between each INSERT operation on the table. The default value is 5000.
- idle_duration_ms: idle_time
- The maximum amount of time to wait (milliseconds) for the first
batch of Kafka data. When you use this property to enable lazy load,
GPSS waits until Kafka data is available before locking the target
Greenplum table. You can specify:
- 0 (lazy load is disabled)
- -1 (lazy load is enabled, the job never stops), or
- a positive value (lazy load is enabled, the job stops after idle_time duration of no data in the Kafka topic)
- window_size: num_batches
- The number of batches to read before executing window_statement. The default batch interval is 0.
- window_statement: udf_or_sql_to_run
- A user-defined function or SQL command(s) that you want to run after GPSS reads window_size number of batches. The default is null, no command to execute.
- mapping:
- Optional. Overrides the default source-to-target column mapping.
-
Note: When you specify a mapping, ensure that you provide a mapping for all source data elements of interest. GPSS does not automatically match column names when you provide a mapping block.
- target_column_name: source_column_name | expression
- target_column_name specifies the target Greenplum Database table column name. GPSS maps this column name to the source column name specified in source_column_name, or to an expression. When you specify an expression, you may provide a value expression that you would specify in the SELECT list of a query, such as a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so on.
- name: job_name
- Identifies the name of the job.
- save_failing_batch: boolean
- Determines whether or not GPSS saves data into a backup table
before it writes the data to Greenplum Database. Saving the data
in this manner aids recovery when GPSS encounters errors during
the evaluation of expressions. The default is
false; GPSS does not use a backup table, and
returns immediately when it encounters an expression error.
When you set this property to true, GPSS writes
both the good and the bad data in the batch to a backup table named
gpssbackup_jobhash, and
continues to process incoming data. You must then
manually load the good data from the backup table into Greenplum
or set recover_failing_batch (Beta) to
true to have GPSS automatically reload
the good data.
Note: Using a backup table to hedge against mapping errors may impact performance, especially when the data that you are loading has not been cleaned.
- recover_failing_batch: boolean (Beta)
- When set to true and
save_failing_batch is also
true, GPSS automatically reloads the good data
in the batch and retains only the error data in the backup table.
The default value is false; GPSS does not process
the backup table.Note: Enabling this property requires that GPSS has the Greenplum Database privileges to create a function.
- consistency: consistency_value
- Specify how GPSS should manage message offsets when it acts as a high-level Kafka consumer. Valid values are strong, at-least, at-most, and none. The default value is strong. Refer to Understanding Kafka Message Offset Management for more detailed information.
- schedule:
- Controls the frequency and interval of restarting failed jobs.
- retry_interval: retry_time
- The period of time that GPSS waits before retrying the job. You can specify the time interval in day (d), hour (h), minute (m), second (s), or millisecond (ms) integer units; do not mix units. The default retry interval is 5m (5 minutes).
- max_retries: num_retries
- The maximum number of times that GPSS attempts to retry the job. The default is 0, do not retry. If you specify a negative value, GPSS retries the job indefinitely.
Notes
If you created a database object name using a double-quoted identifier (delimited identifier), you must specify the delimited name within single quotes in the load configuration file. For example, if you create a table as follows:
CREATE TABLE "MyTable" (c1 text);
Your YAML configuration file would refer to the table name as:
target: table: '"MyTable"'
Kafka Properties
PROPERTIES: api.version.request: false broker.version.fallback: 0.8.2.1
Examples
Load data from Kafka as defined in the Version 3 configuration file named kafka2greenplumv3.yaml:
gpkafka load kafka2greenplumv3.yaml
Example kafka2greenplumv3.yaml configuration file:
target: host: mdw-1 port: 15432 user: gpadmin password: changeme database: testdb schema: public table: tbl_order_merge source: kafka: topic: daily_orders brokers: localhost:9092 key_content: binary: source_column_name: key value_content: json: column: name: value type: JSON meta: json: column: name: meta type: JSON channel: gpdb_channel: mode: merge: match_columns: [pk] update_columns: [data, key] order_columns: [p, o] work_schema: public error_limit: "25" batch_window: batch: interval_ms: 5000 mapping: data: (value->>'data')::text o: (meta->>'offset')::bigint p: (meta->>'partition')::int pk: (value->>'pk')::int