Example: Loading JSON Data from Kafka Using the Streaming Server
Example: Loading JSON Data from Kafka Using the Streaming Server
In this example, you load JSON format data from a Kafka topic named topic_json_gpkafka into a Greenplum Database table named json_from_kafka. You perform the load as the Greenplum role gpadmin. The table json_from_kafka resides in the public schema in a Greenplum database named testdb.
{ "cust_id": 123, "month": 9, "amount_paid":456.78 }
You will run a Kafka console producer to emit JSON-format customer expense messages, start a Greenplum Streaming Server instance, and use the GPSS gpsscli subcommands to load the data into the json_from_kafka table.
Prerequisites
Before you start this procedure, ensure that you:
- Have administrative access to running Kafka and Greenplum Database clusters.
- Have configured connectivity as described in both the Greenplum Streaming Server Prerequisites section and the Prerequisites section in the Greenplum-Kafka Integration documentation.
- Identify and note the ZooKeeper hostname and port.
- Identify and note the hostname and port of the Kafka broker(s).
- Identify and note the hostname and port of the Greenplum Database master node.
- Register the GPSS extension.
This procedure assumes that you have installed the Apache Kafka distribution. If you are using a different Kafka distribution, you may need to adjust certain commands in the procedure.
Procedure
- Login to a host in your Kafka cluster. For example:
$ ssh kafkauser@kafkahost kafkahost$
- Create a Kafka topic named topic_json_gpkafka. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh --create \ --zookeeper localhost:2181 --replication-factor 1 --partitions 1 \ --topic topic_json_gpkafka
- Open a file named sample_data.json in the editor of your choice.
For example:
kafkahost$ vi sample_data.json
- Copy/paste the following text to add JSON-format data into the file, and then
save and exit:
{ "cust_id": 1313131, "month": 12, "expenses": 1313.13 } { "cust_id": 3535353, "month": 11, "expenses": 761.35 } { "cust_id": 7979797, "month": 10, "expenses": 4489.00 } { "cust_id": 7979797, "month": 11, "expenses": 18.72 } { "cust_id": 3535353, "month": 10, "expenses": 6001.94 } { "cust_id": 7979797, "month": 12, "expenses": 173.18 } { "cust_id": 1313131, "month": 10, "expenses": 492.83 } { "cust_id": 3535353, "month": 12, "expenses": 81.12 } { "cust_id": 1313131, "month": 11, "expenses": 368.27 }
- Stream the contents of the sample_data.json file to a Kafka
console producer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic topic_json_gpkafka < sample_data.json
- Verify that the Kafka console producer published the messages to the topic by
running a Kafka console consumer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 --topic topic_json_gpkafka \ --from-beginning
- Open a new terminal window, log in to the Greenplum Database master host as
the gpadmin administrative user, and set up the Greenplum
environment. For example:
$ ssh gpadmin@gpmaster gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
- Construct the Greenplum Streaming Server configuration file. For example, open a
file named gpsscfg_ex.json in the editor of your choice:
gpmaster$ vi gpsscfg_ex.json
- Designate a GPSS listen port number of 50007 and a gpfdist port
number of 8319 in the configuration file. For example, copy/paste the following
into the gpsscfg_ex.json file, and then save and exit the editor:
{ "ListenAddress": { "Host": "", "Port": 50007 }, "Gpfdist": { "Host": "", "Port": 8319 } }
-
Start the
Greenplum Streaming Server instance in the background, specifying the log directory
./gpsslogs. For example:
gpmaster$ gpss gpsscfg_ex.json --log-dir ./gpsslogs &
- Construct the gpkafka load configuration file. Open a file
named jsonload_cfg.yaml in the editor of your choice.
For example:
gpmaster$ vi jsonload_cfg.yaml
- Fill in the load configuration parameter values based on your environment.
This example assumes:
- Your Greenplum Database master hostname is gpmaster.
- The Greenplum Database server is running on the default port.
- Your Kafka broker host and port is localhost:9092.
- You want to write the Kafka data to a Greenplum Database table named json_from_kafka located in the public schema of a database named testdb.
- You want to write the customer identifier and expenses data to Greenplum.
The jsonload_cfg.yaml file would include the following contents:DATABASE: testdb USER: gpadmin HOST: gpmaster PORT: 5432 KAFKA: INPUT: SOURCE: BROKERS: localhost:9092 TOPIC: topic_json_gpkafka COLUMNS: - NAME: jdata TYPE: json FORMAT: json ERROR_LIMIT: 10 OUTPUT: TABLE: json_from_kafka MAPPING: - NAME: customer_id EXPRESSION: (jdata->>'cust_id')::int - NAME: month EXPRESSION: (jdata->>'month')::int - NAME: amount_paid EXPRESSION: (jdata->>'expenses')::decimal COMMIT: MINIMAL_INTERVAL: 2000
- Create the target Greenplum Database table named
json_from_kafka. For example:
gpmaster$ psql -d testdb testdb=# CREATE TABLE json_from_kafka( customer_id int8, month int4, amount_paid decimal(9,2) );
- Exit the psql subsystem:
testdb=# \q
- Submit the Kafka data load job to the GPSS instance running on
port number 50007. (You may consider opening a new terminal window to run the
command.) For example to submit a job named
kafkajson2gp:
gpmaster$ gpsscli submit --name kafkajson2gp --gpss-port 50007 ./jsonload_cfg.yaml 20181214:22:37:49.168 gpsscli:gpadmin:gpmaster:075435-[INFO]:-JobID: kafkajson2gp
- List all GPSS jobs. For example:
gpmaster$ gpsscli list --all --gpss-port 50007 JobID GPHost GPPort DataBase Schema Table Topic Status kafkajson2gp localhost 5432 testdb public json_from_kafka topic_json_gpkafka Stopped
The list subcommand displays all jobs. Notice the entry for the kafkajson2gp that you just submitted, and that the job is in the Stopped state.
- Start the job named kafkajson2gp. For example:
gpmaster$ gpsscli start kafkajson2gp --gpss-port 50007 20181214:22:43:32.590 gpsscli:gpadmin:gpmaster:075490-[INFO]:-JobID: kafkajson2gp is started
- Stop the job named kafkajson2gp. For example:
gpmaster$ gpsscli stop kafkajson2gp --gpss-port 50007 20181214:22:51:21.960 gpsscli:gpadmin:e11517afb6f6:075781-[INFO]:-Stop a job: kafkajson2gp, status Stopped
- Examine the gpss command output and log file, looking for
messages that identify the number of
rows inserted/rejected. For example:
... -[INFO]:- ... Inserted 9 rows ... -[INFO]:- ... Rejected 0 rows
- View the contents of the Greenplum Database target table json_from_kafka:
gpmaster$ psql -d testdb testdb=# SELECT * FROM json_from_kafka WHERE customer_id='1313131' ORDER BY amount_paid; customer_id | month | amount_paid -------------+-------+------------- 1313131 | 11 | 368.27 1313131 | 10 | 492.83 1313131 | 12 | 1313.13 (3 rows)