Example - Loading CSV Data from the File System

In this example, you use the Greenplum Connector for Apache NiFi to load CSV-format data into Greenplum Database.

The CSV data represents department expense records, and includes department identifier (integer), month (integer), and expenses (decimal) fields. For example, a record for a department with identifier 123 that spent $456.78 in the month of September follows:

"123","09","456.78"

A record with the same department identifier and month identifies a new expense total for the month, replacing the previous amount.

You will use the Apache NiFi user interface to create a dataflow between the GetFile and PutGreenplumRecord processors.

In this flow:

  • The GetFile processor reads CSV files from the /tmp/gcan_data directory on the NiFi system and generates record-based FlowFiles.
  • The PutGreenplumRecord processor writes the data that it receives to a Greenplum Database table named gcan_dept_expense located in the public schema of a database named testdb.
  • The department identifier and month fields together uniquely identify a table row.
  • The write to Greenplum should specify the MERGE operation type; if an entry for the department/month does not exist, insert a new row into the table. If an entry for the department for the month already exists, replace the expenses amount with the new value.
  • You will explicitly specify the input FlowFile schema.

Prerequisites

Before you start this procedure, ensure that you:

  • Have access to a running Greenplum Database cluster.
  • Have access to a running Greenplum Streaming Server instance, or the privileges required to start an instance.
  • Have met the Prerequisites identified in the Loading topic.

For simplicity, this example assumes that Apache NiFi, Greenplum Database, and the Greenplum Streaming Server are running on the same host.

Process

Step 1: Prepare the Example Environment

Step 2: Add and Configure the GetFile Processor

Step 3: Configure a GreenplumGPSSAdapter Controller Service

Step 4: Identify the Input Data Source, Format, and Schema

Step 5: Configure a Record Reader Controller Service

Step 6: Add and Configure the PutGreenplumRecord Processor

Step 7: Connect and Start the Processors

Step 8: Create the Greenplum Database and Table

Step 9: Trigger the Flow and Check Results

Step 1: Prepare the Example Environment

In this step, you create sample data files.

  1. Log in to your Apache NiFi client system.

    $ ssh user@nifihost
    user@nifihost$ 
    
  2. Create a working directory. For example:

    user@nifihost$ mkdir gcan_work
    user@nifihost$ cd gcan_work
    
  3. Prepare some sample data:

    1. Write some data into a CSV file named sample1.csv:

      user@nifihost$ echo '"dept_id","month","expenses"
      "1313131","12","1313.13"
      "3535353","11","761.35"
      "7979797","10","4489.00"
      "7979797","11","18.72"
      "3535353","10","6001.94"
      "7979797","12","173.18"
      "1313131","10","492.83"
      "3535353","12","81.12"
      "1313131","11","368.27"' > sample1.csv
      
    2. Write some data into a CSV file named sample2.csv:

      user@nifihost$ echo '"dept_id","month","expenses"
      "1313131","11","555.55"
      "7979797","10","5555.55"
      "2222222","12","22.22"' > sample2.csv
      

      The data added to this file represents an expense for a new department (2222222), and new/updated expense values for two existing departments/months.

  4. Create an input directory and set the appropriate permissions:

    user@nifihost$ mkdir /tmp/gcan_data
    user@nifihost$ chmod a+rwx /tmp/gcan_data
    

    You will copy the sample data files to the input directory later in this procedure.

  5. Start the Apache NiFi user interface. For example, if your NiFi server is running on the local host on port number 9050, enter the following in a web browser window:

    http://localhost:9050
    

Step 2: Add and Configure the GetFile Processor

Perform the following steps to add and configure a GetFile processor instance:

  1. Click the Processors icon in the Apache NiFi components toolbar and drag to the canvas.

    This action opens the Add Processor dialog.

  2. Search for the GetFile Processor by typing in the Filter field.

  3. Click Add.

    This action adds a GetFile processor component to the canvas.

  4. Right-click on the component and select Configure from the context menu.

    This action displays the Configure Processor dialog.

  5. Select the PROPERTIES tab.

    1. Locate the Input Directory property and set the Value to the directory that you created in Step 1, /tmp/gcan_data.
    2. Click OK.
    3. APPLY the Configure Processor changes.

Step 3: Configure a GreenplumGPSSAdapter Controller Service

Perform the steps below to configure an instance of the GreenplumGPSSAdapter controller service named GreenplumGPSSAdapter-testdb:

  1. Click on an empty area in the Apache NiFi canvas.

  2. Click on the configure icon in the Operate Palette.

    This action opens the NiFi Flow Configuration dialog.

  3. Select the CONTROLLER SERVICES tab.

  4. Click the + icon to add a new controller service.

    This action opens the Add Controller Service dialog.

  5. Type Greenplum in the Filter field, select the GreenplumGPSSAdapter entry, and click ADD.

    This action adds a GreenplumGPSSAdapter row to the table of currently defined controller services, and selects this row.

  6. Click on the configure icon in the last column of the table to configure the service.

    This action opens the Configure Controller Service dialog.

  7. Select the SETTINGS tab, locate the Name field, and set the name to GreenplumGPSSAdapter-testdb.

  8. Select the PROPERTIES tab, locate the properties identified in the table below, and set each Value as specified:

    Property Name Value Comments
    Greenplum Streaming Server Host localhost Enter your host
    Greenplum Streaming Server Port 5000 Retain the default
    Greenplum Database Master Host localhost Enter your Greenplum master host
    Greenplum Database Master Port 5432 Retain the default
    Greenplum Database Name testdb
    Greenplum Database User Name gpadmin You can choose a different Greenplum user
    Greenplum Database User Password changeme Enter the password
  9. APPLY the Configure Controller Service changes.

  10. Click the thunderbolt icon in the GreenplumGPSSAdapter-testdb row to enable the controller service.

    The Enable Controller Service dialog displays.

    1. Click the ENABLE button.
    2. Click the CLOSE button.
  11. Click X in the upper right hand of the dialog to close the NiFi Flow Configuration window.

Step 4: Identify the Input Data Source, Format, and Schema

The source of the data is the GetFile processor, and the data format is CSV.

Because the CSV file includes a header row, you could choose to have Apache NiFi infer the schema. For this exercise, you will explicitly define and specify the schema.

As decribed above, the CSV data represents department expense records, and includes department identifier (integer), month (integer), and expenses (decimal) fields:

"123","09","456.78"

The schema that corresponds to records of this format follows:

{
    "name": "dept_expense_record",
    "namespace": "nifi_csv_example",
    "type": "record",
    "fields": [
        { "name": "dept_id",  "type": ["int", "null"] },
        { "name": "month",    "type": ["int", "null"] },
        { "name": "expenses", "type": {"type": "bytes", "logicalType": "decimal", "precision": 11, "scale": 2 } }
   ]
}

You will specify this schema when you configure a record reader controller service for a PutGreenplumRecord processor instance.

Step 5: Configure a Record Reader Controller Service

Perform the steps below to configure an instance of a CSV record reader controller service named CSVReader-dept-expenses:

  1. Click on an empty area in the Apache NiFi canvas.

  2. Click on the configure icon in the Operate Palette.

    This action opens the NiFi Flow Configuration dialog.

  3. Select the CONTROLLER SERVICES tab.

  4. Click the + icon to add a new controller service.

    This action opens the Add Controller Service dialog.

  5. Type CSV in the Filter field, select the CSVReader entry, and click ADD.

    This action adds a CSVReader row to the table of currently defined controller services, and selects this row.

  6. Click on the configure icon in the last column of the table to configure the service.

    This action opens the Configure Controller Service dialog.

  7. Select the SETTINGS tab, locate the Name field, and set the name to CSVReader-dept-expenses.

  8. Select the PROPERTIES tab, locate the properties identified in the table below, and set each Value as specified:

    Property Name Value Comments
    Schema Access Strategy Use ‘Schema Text’ Property The Schema Text property value will specify the schema definition
    Treat First Line as Header true The first line of the file is the header
  9. Locate the Schema Text property, and copy/paste the schema definition below into the Value field:

    {
      "name": "dept_expense_record",
      "namespace": "nifi_csv_example",
      "type": "record",
      "fields": [
        { "name": "dept_id",  "type": ["int", "null"] },
        { "name": "month",    "type": ["int", "null"] },
        { "name": "expenses", "type": {"type": "bytes", "logicalType": "decimal", "precision": 11, "scale": 2 } }
      ]
    }
    
  10. Retain the default values for the other properties.

  11. APPLY the Configure Controller Service changes.

  12. Click the thunderbolt icon in the CSVReader-dept-expenses row to enable the controller service.

    The Enable Controller Service dialog displays.

    1. Click the ENABLE button.
    2. Click the CLOSE button.
  13. Click X in the upper right hand of the dialog to close the NiFi Flow Configuration window.

Step 6: Add and Configure the PutGreenplumRecord Processor

Perform the following steps to add and configure a PutGreenplumRecord processor instance:

  1. Click the Processors icon in the Apache NiFi components toolbar and drag it to the canvas.

    This action opens the Add Processor dialog.

  2. Search for the PutGreenplumRecord Processor by typing in the Filter field.

  3. Click Add.

    This action adds a PutGreenplumRecord Processor component to the canvas.

  4. Right-click on the component and select Configure from the context menu.

    This action displays the Configure Processor dialog.

  5. Select the SETTINGS tab.

  6. Automatically terminate all relationships by checking the failure, retry, and success checkboxes.

  7. Select the PROPERTIES tab.

  8. Locate the Record Reader property. Click in the Value field, then select CSVReader-dept-expenses from the drop-down menu, and click OK.

  9. Locate the Greenplum Adapter property. Click in the Value field, select GreenplumGPSSAdapter-testdb from the drop-down menu, and click OK.

  10. Locate the properties identified in the table below and set each Value as specified:

    Property Name Value Comments
    Schema Name public Retain the default
    Table Name gcan_dept_expense You will create this table in the next step
    Operation Type MERGE Merge can both insert and update a table row
    Match Columns dept_id, month A table row is uniquely identified by these column values
    Translate Field Names true Retain the default
    Unmatched Field Behavior Ignore Unmatched Fields Retain the default
    Unmatched Column Behavior Warn on Unmatched Columns Log a warning message
    Rollback On Failure false Retain the default
    Maximum Record Batch Size 100
  11. APPLY the Configure Processor changes.

Step 7: Connect and Start the Processors

In this step, you create a connection between the GetFile and PutGreenplumRecord processors on the canvas, and then start the processors.

  1. Hover over the GetFile component on the canvas.
  2. Click the arrow icon and drag over to the PutGreenplumRecord component.

    This action displays the Create Connection dialog.

  3. No configuration is required; click ADD to create the connection.

    A line/box that represents the connection is displayed on the NiFi canvas.

  4. Right-click on the GetFile component and select Start from the context menu to start the processor.

    The icon next to the processor name changes to a green sideways triangle.

  5. Right-click on the PutGreenplumRecord component and select Start from the context menu to start the processor.

    The icon next to the processor name changes to a green sideways triangle.

Step 8: Create the Greenplum Database and Table

In this step, you create the Greenplum database testdb if it does not yet exist, and create the target Greenplum table.

  1. Open a new terminal window, log in to the Greenplum Database master host as the gpadmin administrative user, and set up your Greenplum environment. For example:

    $ ssh gpadmin@gpmaster
    gpadmin@gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
    
  2. Create a database named testdb if one does not already exist:

    gpadmin@gpmaster$ createdb testdb
    
  3. Start the psql subsystem:

    gpadmin@gpmaster$ psql -d testdb
    
  4. The Greenplum Streaming Server must be registered in the database to use the Connector. You can register the Greenplum Streaming Server as follows:

    testdb=# CREATE EXTENSION IF NOT EXISTS gpss;
    

    This command registers the extension only if it has not been previously registered.

  5. Create the target Greenplum Database table named gcan_dept_expense:

    testdb=# CREATE TABLE gcan_dept_expense( dept_id int8, month int8, expenses decimal(11,2) );
    

    This table definition matches the input data schema that you specified for the record reader in Step 5.

  6. Stay in the psql subsystem, you will be back.

Step 9: Trigger the Flow and Check Results

You will individually copy the sample data files to /tmp/gcan_data on the Apache NiFi system to trigger the flow. You will check the results by observing the Apache NiFi user interface and querying the Greenplum table.

You will also generate a sample file with bad data, trigger the flow, and check the results.

  1. Copy the sample1.csv data file to the input directory:

    user@nifihost$ cp gcan_work/sample1.csv /tmp/gcan_data/
    
  2. Examine the GetFile and PutGreenplumRecord processor components on the NiFi canvas, and notice when their statistics update.

  3. Examine the contents of the Greenplum Database table. Enter the following command in the psql terminal session that you used earlier:

    testdb=# SELECT * FROM gcan_dept_expense ORDER BY dept_id, month;
     dept_id | month | expenses 
    ---------+-------+----------
     1313131 |    10 |   492.83
     1313131 |    11 |   555.55
     1313131 |    12 |  1313.13
     3535353 |    10 |  6001.94
     3535353 |    11 |   761.35
     3535353 |    12 |    81.12
     7979797 |    10 |  5555.55
     7979797 |    11 |    18.72
     7979797 |    12 |   173.18
    (9 rows)
    
  4. Copy the sample2.csv data file to the input directory:

    user@nifihost$ cp gcan_work/sample2.csv /tmp/gcan_data/
    
  5. Wait until flow between the GetFile and PutGreenplumRecord processor components is triggered.

  6. Query the table again:

    testdb=# SELECT * FROM gcan_dept_expense ORDER BY dept_id, month;
     dept_id | month | expenses 
    ---------+-------+----------
     1313131 |    10 |   492.83
     1313131 |    11 |   555.55
     1313131 |    12 |  1313.13
     2222222 |    12 |    22.22
     3535353 |    10 |  6001.94
     3535353 |    11 |   761.35
     3535353 |    12 |    81.12
     7979797 |    10 |  5555.55
     7979797 |    11 |    18.72
     7979797 |    12 |   173.18
    (10 rows)
    

    Notice the new row for department 2222222, and the updated expenses values for department 1313131, month 11 and department 7979797, month 10.

  7. Write a sample file with bad input data directly to the input directory:

    user@nifihost$ echo '"dept_id","month","expenses"
    "1313131","12","12222.22"
    "7979797","zz","5555.55"' > /tmp/gcan_data/sample3.csv
    

    This data includes the value zz in what should be an int field.

  8. Observe the NiFi canvas and wait for the flow to triger. Notice that the PutGreenplumRecord processor canvas component eventually displays a red box in the right-hand corner. Hover over the red box and view the warning message. The processor generates a NumberFormatException when attempting to write the second record to the Greenplum table.

  9. Query the table again. In this query, filter on the department identifier in the first record of the sample3.csv data file to display only the table rows associated with that department:

    testdb=# SELECT * FROM gcan_dept_expense WHERE dept_id=1313131 ORDER BY month;
     dept_id | month | expenses 
    ---------+-------+----------
     1313131 |    10 |   492.83
     1313131 |    11 |   555.55
     1313131 |    12 |  1313.13
    (3 rows)
    

    Notice that the first record in sample3.csv, even though correctly formatted, was not written to the table. The Connector must process all records in the FlowFile successfully before it will write the batch to Greenplum Database.