Create and Configure Pipelines

You can create one or more load or export pipelines. When you create a pipeline, you use parameters and set pipeline attributes to configure the pipeline.

The options to create and configure a pipeline follow:

Create and Configure a Pipeline for Loading Data

You can create a pipeline to load data from external files in object store to tables in Autonomous Database.

A load pipeline consumes data placed on object store and loads it to a table in Autonomous Database. When you create a load pipeline, the pipeline runs at regular intervals to consume data placed on object store, when new data files arrive the pipeline loads the new data. You can also use a pipeline to reliably copy files, with resume and retry capabilities, from object store to a table on your database.

With a load pipeline, the pipeline package uses DBMS_CLOUD.COPY_DATA to load data.

On your Autonomous Database, either use an existing table or create the database table where you are loading data. For example:

CREATE TABLE EMPLOYEE
            (name     VARCHAR2(128),
             age      NUMBER,
             salary   NUMBER);
  1. Create a pipeline to load data from object store.
    BEGIN
         DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
            pipeline_name => 'MY_PIPE1',
            pipeline_type => 'LOAD',
            description   => 'Load metrics from object store into a table'
      );
    END;
    /

    See CREATE_PIPELINE Procedure for more information.

  2. Create a credential object to access the object store that contains the files you are loading.

    You specify the credential for the pipeline source location with the attribute credential_name. If you do not supply a credential_name in the next step, the credential_name value is set to NULL. You can use the default NULL value when the location attribute is a public or pre-authenticated URL.

    See CREATE_CREDENTIAL Procedure for more information.

  3. Set the pipeline attributes, including the required attributes: location, table_name, and format.
    BEGIN
         DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
           pipeline_name => 'MY_PIPE1',
           attributes    => JSON_OBJECT(
                'credential_name' VALUE 'OBJECT_STORE_CRED',
                'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                'table_name' VALUE 'employee',
                'format' VALUE '{"type":"json", "columnpath":["$.NAME", "$.AGE", "$.SALARY"]}',
                'priority' VALUE 'HIGH',
                'interval' VALUE '20')
      );
    END;
    /

    The following attributes must be set to run a load pipeline:

    • location: Specifies the source file location on object store.

    • table_name: Specifies the table in your database where you are loading data. The location you specify is for one table_name per pipeline.

    • format: Describes the format of the data you are loading.

      See DBMS_CLOUD Package Format Options for more information.

    The credential_name is the credential you created in the previous step.

    The priority value determines the number of files loaded in parallel. A pipeline with a higher priority consumes more database resources and completes each run faster, as compared to running at a lower priority.

    The interval value specifies the time interval in minutes between consecutive runs of a pipeline job. The default interval is 15 minutes.

    See DBMS_CLOUD_PIPELINE Attributes for details on the pipeline attributes.

    After you create a pipeline you can test the pipeline or start the pipeline:

As an alternative, to set the format for JSON, you could use the following format:

BEGIN
    DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
        pipeline_name   => 'MY_PIPE1',
        attribute_name  => 'format',
        attribute_value => JSON_OBJECT('type' value 'json', 'columnpath' value '["$.NAME", "$.AGE", "$.SALARY"]')
    );
END;
/

Create and Configure a Pipeline for Export with Timestamp Column

You can create an export pipeline to automatically export time-series data from your Autonomous Database to object store.

Using this export pipeline option you specify a table or SQL query and a column with a timestamp that the pipeline uses to keep track of the time of the last upload. You can use an export pipeline to share data for consumption by other applications or to save data to object store.

With an export pipeline, the pipeline package uses DBMS_CLOUD.EXPORT_DATA to export data.

An export pipeline exports data from your Autonomous Database to object store. When you create an export pipeline, the pipeline runs at regular intervals and places data on object store.

  1. Create a pipeline to export data to object store.
    BEGIN
         DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
            pipeline_name=>'EXP_PIPE1',
            pipeline_type=>'EXPORT',
            description=>'Export time series metrics to object store');
    END;
    /

    See CREATE_PIPELINE Procedure for more information.

  2. Create a credential object to access the destination object store location where you are exporting data files.

    You specify the credential for the pipeline destination location with the attribute credential_name. If you do not supply a credential_name in the next step, the credential_name value is set to NULL. You can use the default NULL value when the location attribute is a public or pre-authenticated URL.

    See CREATE_CREDENTIAL Procedure for more information.

  3. Set the export pipeline attributes.

    When you specify a table_name parameter, table rows are exported to object store. When you specify a query parameter, the query specifies a SELECT statement so that only the required data is exported to object store.

    • Using using a table_name parameter:

      BEGIN
           DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
             pipeline_name => 'EXP_PIPE1',
             attributes    => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED',
                'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                'table_name' VALUE 'metric_table',
                'key_column' VALUE 'metric_time',
                'format' VALUE '{"type": "json"}',
                'priority' VALUE 'MEDIUM',
                'interval' VALUE '20')
        );
      END;
      /
    • Using a query parameter:

      BEGIN
           DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
             pipeline_name => 'EXP_PIPE1',
             attributes    => JSON_OBJECT('credential_name' VALUE 'OBJECT_STORE_CRED',
                 'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
                 'query' VALUE 'SELECT * from metrics_table',
                 'key_column' VALUE 'metric_time',
                 'format' VALUE '{"type": "json"}',
                 'priority' VALUE 'MEDIUM',
                 'interval' VALUE '20')
        );
      END;
      /

    Where the credential_name is the credential you created in the previous step.

    The following attributes must be set to run an export pipeline:

    • location: Specifies the destination object store location. The location you specify is for one table_name per pipeline.

    • table_name: Specifies the table in your database that contains the data you are exporting (either the table_name parameter or the query parameter is required).

    • query: Specifies the query to run in your database that provides the data you are exporting (either the table_name parameter or the query parameter is required).

    • format: Describes the format of the data you are exporting.

      See DBMS_CLOUD Package Format Options for EXPORT_DATA for more information.

    The priority value determines the degree of parallelism for fetching data from the database.

    The interval value specifies the time interval in minutes between consecutive runs of a pipeline job. The default interval is 15 minutes.

    See DBMS_CLOUD_PIPELINE Attributes for details on the pipeline attributes.

    After you create a pipeline you can test the pipeline or start the pipeline:

Create and Configure a Pipeline to Export Query Results (Without a Timestamp)

You can create an export pipeline to automatically export data from your Autonomous Database to object store. Using this export pipeline option you specify a SQL query that the pipeline runs periodically to export data to object store. You can use this export option to share the latest data from your Autonomous Database to object store for other applications to consume the data.

An export pipeline exports data from your Autonomous Database to object store. When you create an export pipeline, the pipeline runs at regular intervals and places data on object store.

  1. Create a pipeline to export data to object store.
    BEGIN
         DBMS_CLOUD_PIPELINE.CREATE_PIPELINE(
            pipeline_name=>'EXP_PIPE2',
            pipeline_type=>'EXPORT',
            description=>'Export query results to object store.');
    END;
    /

    See CREATE_PIPELINE Procedure for more information.

  2. Create a credential object to access the destination object store location where you are exporting data files.

    You specify the credential for the pipeline destination location with the attribute credential_name. If you do not supply a credential_name in the next step, the credential_name value is set to NULL. You can use the default NULL value when the location attribute is a public or pre-authenticated URL.

    See CREATE_CREDENTIAL Procedure for more information.

  3. Set the export pipeline attributes.
    BEGIN
         DBMS_CLOUD_PIPELINE.SET_ATTRIBUTE(
           pipeline_name => 'EXP_PIPE2',
           attributes    => JSON_OBJECT(
              'credential_name' VALUE 'OBJECT_STORE_CRED',
              'location' VALUE 'https://objectstorage.us-phoenix-1.oraclecloud.com/n/namespace-string/b/bucketname/o/',
              'query' VALUE 'SELECT * FROM table_name',
              'format' VALUE '{"type": "json"}',
              'priority' VALUE 'MEDIUM',
              'interval' VALUE '20')
      );
    END;
    /

    Where the credential_name is the credential you created in the previous step.

    The following attributes must be set to run an export pipeline:

    • location: Specifies the destination object store location.

    • query: Specifies the query to run in your database that provides the data you are exporting.

    • format: Describes the format of the data you are exporting.

      See DBMS_CLOUD Package Format Options for EXPORT_DATA for more information.

    The priority value determines the degree of parallelism for fetching data from the database.

    The interval value specifies the time interval in minutes between consecutive runs of a pipeline job. The default interval is 15 minutes.

    See DBMS_CLOUD_PIPELINE Attributes for details on the pipeline attributes.

    After you create a pipeline you can test the pipeline or start the pipeline:

Test Pipelines

Use RUN_PIPELINE_ONCE to run a pipeline once on-demand without creating a scheduled job.

RUN_PIPELINE_ONCE is useful for testing a pipeline before you start the pipeline. After you run a pipeline once to test the pipeline and check that it is working as expected, use RESET_PIPELINE to reset the pipeline's state (to the state before you ran RUN_PIPELINE_ONCE).

  1. Create a pipeline.
  2. Run a pipeline once to test the pipeline.
    BEGIN
        DBMS_CLOUD_PIPELINE.RUN_PIPELINE_ONCE(
            pipeline_name => 'MY_PIPE1'
    );
    END;
    /

    See RUN_PIPELINE_ONCE Procedure for more information.

  3. Perform any required checks to verify the pipeline is operating as expected.

    See Monitor and Troubleshoot Pipelines for more information.

  4. Reset the pipeline.
    BEGIN  
       DBMS_CLOUD_PIPELINE.RESET_PIPELINE(
         pipeline_name => 'MY_PIPE1',
         purge_data => TRUE
    );
    END;
    /

    See RESET_PIPELINE Procedure for more information.