Scenario: Streaming Messages from a Normalized Data Queue

Create a database queue to stream normalized IoT data.

After you set up your device and data is sending and receiving, then you can stream messages. This example shows how to stream messages from normalized data. You can use a similar approach to subscribe to the raw data or rejected data queues. For more information, see Introduction to Transactional Event Queues and Advanced Queuing.

Before you begin

Using database queues requires configuring a direct database connection.

Note

Maximum retention period for public queues 24 hours.

Step 1: Add a Subscriber

Use this SQL database command to register a subscriber on the queue to receiving messages. Specify the queue name and a unique subscriber name to allow this subscriber or consumer to dequeue messages.

For more information, see DBMS_AQADM.ADD_SUBSCRIBER.

BEGIN
    dbms_aqadm.add_subscriber(
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
END;
/

Step 2: Bulk Dequeue Messages

Use this bulk dequeuing SQL database command to process many messages at once. This example processes or consumes up to 30 messages at a time for the specific subscriber.

For more information, see DBMS_AQ.DEQUEUE_ARRAY.

DECLARE
    dequeue_options    dbms_aq.dequeue_options_t;
    message_properties dbms_aq.message_properties_array_t;
    l_normalized_data_records           JSON;
    msgid_array        dbms_aq.msgid_array_t;
    retval             PLS_INTEGER; 
    l_json_size NUMBER;
BEGIN
    dequeue_options.consumer_name := 'test_public_subscriber';   -- Filter messages for this subscriber
    dequeue_options.wait          := 10;                        -- Wait up to 10 seconds for messages
    dequeue_options.dequeue_mode  := dbms_aq.remove;             -- Remove after dequeueing
    dequeue_options.navigation    := dbms_aq.first_message;      -- Get oldest message first
    
    retval := dbms_aq.dequeue_array(
        queue_name               => '&&domain.__IOT.NORMALIZED_DATA',
        dequeue_options          => dequeue_options,
        array_size               => 30,                         -- Try to fetch up to 30 messages at once
        message_properties_array => message_properties,
        payload_array            => l_normalized_data_records,  -- The JSON data
        msgid_array              => msgid_array
    );
    
    l_json_size := json_value(json_query( l_normalized_data_records, '$.size()'), '$');
    dbms_output.put_line('Successfully dequeued! ' || l_json_size);  -- Output number of messages
    COMMIT;
EXCEPTION
    WHEN OTHERS THEN
        dbms_output.put_line('Error Code: ' || sqlcode);
        dbms_output.put_line('Error Message: ' || sqlerrm);
        ROLLBACK;
        raise_application_error(-20001, 'Unable to dequeue: ' || sqlerrm);
END;
/

Optional Step: Remove the Subscriber

After processing the information, use this SQL database command to remove the subscriber. Removing the subscriber is a best practice when the subscriber is no longer needed, preventing unwanted message retention. For more information, see DBMS_AQADM.REMOVE_SUBSCRIBER.

BEGIN
    DBMS_AQADM.REMOVE_SUBSCRIBER (
        queue_name => '&&domain.__IOT.NORMALIZED_DATA',
        subscriber => sys.aq$_agent('test_public_subscriber', NULL, NULL)
    );
    COMMIT;
EXCEPTION
WHEN OTHERS THEN 
    NULL;   -- Ignore errors (in case the subscriber wasn't present)
END;
/