シングルトン・パイプを使用したメッセージのキャッシュ

シングルトン・パイプは、カスタム・メッセージをキャッシュおよび取得し、同時読取りで複数のデータベース・セッション間でメッセージを共有できるDBMS_PIPEパッケージの追加機能です。

シングルトン・パイプを使用したメッセージのキャッシュについて

DBMS_PIPEパッケージには、シングルトン・パイプをサポートするためのAutonomous Databaseの拡張機能があります。

DBMS_PIPEのシングルトン・パイプ:

  • シングルトン・パイプ・メッセージを使用して、カスタム・データのインメモリー・キャッシュを提供します。

  • 最大32,767バイトのカスタム・メッセージをキャッシュおよび取得する機能をサポートします。

  • 複数のデータベース・セッション間でキャッシュされたメッセージを同時読取りと共有できます。これにより、高スループットを実現し、データベース・セッション間でのメッセージの同時読取りをサポートします。

  • 読取り専用データベースおよび読取り/書込みデータベースをサポートします。

  • 複数のキャッシュ無効化メソッドをサポートします。

    • ユーザーによって制御される明示的なキャッシュ無効化。
    • ユーザーが指定した時間間隔(秒)後のキャッシュ無効化。この無効化メソッドは、メッセージ・リーダーではなくshelflifeパラメータを使用して、メッセージ・センダーによって制御されます。これにより、リーダーによるキャッシュの不正使用による一般的な落とし穴が回避されます。

標準パイプとシングルトンパイプについて

DBMS_PIPEパッケージを使用すると、2つ以上のデータベース・セッションがインメモリー・メッセージを使用して通信できます。パイプ機能には、外部サービス・インタフェース、デバッグ、独立したトランザクション、アラートなどの複数のアプリケーションがあります。

database-pipe-messages-singleton-pipes.epsの説明が続きます
図database-pipe-messages-singleton-pipes.epsの説明

シングルトン・パイプには、サポートされているDBMS_PIPEタイプのいずれかを指定できます。

  • 暗黙的パイプ: DBMS_PIPE.SEND_MESSAGE関数を使用して不明なパイプ名でメッセージを送信すると、自動的に作成されます。
  • 明示的パイプ:ユーザー指定のパイプ名を持つDBMS_PIPE.CREATE_PIPE関数を使用して作成されます。
  • パブリック・パイプ: DBMS_PIPEパッケージに対するEXECUTE権限を持つユーザーがアクセスできます。
  • プライベート・パイプ:パイプ作成者と同じユーザーを持つセッションからアクセスできます。

シングルトン・パイプを使用すると、Autonomous Databaseインスタンスのメモリーに単一のメッセージをキャッシュできます。

次に、シングルトンパイプを使用する一般的なワークフローを示します。

singleton-pipe-workflow.epsの説明が続きます
図singleton-pipe-workflow.epsの説明

シングルトンパイプの概要と機能

  • シングルトン・メッセージ

    • シングルトンパイプは、パイプに1つのメッセージをキャッシュできるため、「シングルトン」という名前になります。
    • シングルトン・パイプのメッセージは、最大32,767バイトのメッセージ・サイズまで、複数のフィールドで構成できます。
    • DBMS_PIPEは、DBMS_PIPE.PACK_MESSAGEプロシージャを使用することで、メッセージ内に複数の属性をパックする機能をサポートしています。
    • パブリック・シングロン・パイプのメッセージは、DBMS_PIPEパッケージに対する実行権限がある任意のデータベース・セッションから受信できます。
    • プライベートシングルトン・パイプの場合、メッセージはシングルトン・パイプの作成者と同じユーザーのセッションが受信できるようになります。
  • 読取りの高いメッセージ・スループット
    • シングルトン・パイプでは、無効化されるかパージされるまでパイプにメッセージをキャッシュします。データベース・セッションは、シングルトン・パイプからメッセージを同時に読み取ることができます。
    • シングルトン・パイプからメッセージを受信することは、非ブロッキング操作です。
  • メッセージ・キャッシュ
    • メッセージは、DBMS_PIPE.SEND_MESSAGEを使用してシングルトン・パイプにキャッシュされます。
    • シングルトン・パイプにキャッシュされた既存のメッセージがある場合、DBMS_PIPE.SEND_MESSAGEは前のメッセージを上書きして、シングルトン・パイプに1つのメッセージのみを保持します。
  • メッセージ無効化
    • 明示的無効化: プロシージャDBMS_PIPE.PURGEを使用してパイプをパージするか、DBMS_PIPE.SEND_MESSAGEを使用してメッセージを上書きします。
    • 自動無効化: 指定されたshelflife時間が経過すると、メッセージを自動的に無効化できます。
  • データベース・メモリーからの削除なし
    • シングルトン・パイプはOracle Databaseのメモリーから削除されません。
    • 明示的なシングルトン・パイプは、DBMS_PIPE.REMOVE_PIPEを使用して削除されるか、データベースが再起動されるまで、データベース・メモリーに常駐します。
    • 暗黙的シングルトン・パイプは、パイプにキャッシュされたメッセージが1つになるまでデータベース・メモリーに存在します。

シングルトンパイプ操作

工程 DBMS_PIPEファンクションまたはプロシージャ

明示的なシングルトン・パイプの作成

CREATE_PIPEファンクション

シングルトン・パイプにメッセージをキャッシュします。

PACK_MESSAGEプロシージャSEND_MESSAGEファンクション

シングルトン・パイプからキャッシュされたメッセージを読み取る

RECEIVE_MESSAGEファンクションUNPACK_MESSAGEプロシージャ

シングルトン・パイプのメッセージの削除

PURGEプロシージャ

明示的なシングルトン・パイプの削除

REMOVE_PIPEファンクション

キャッシュ関数を使用したキャッシュされたメッセージの自動リフレッシュ

DBMS_PIPEパッケージを使用すると、ユーザー定義のキャッシュ・ファンクションを使用してシングルトン・パイプ・メッセージを自動的に移入できます。

デフォルトでは、シングルトン・パイプの明示的な無効化または暗黙的な無効化でメッセージが無効化された後、後続のDBMS_PIPE.RECEIVE_MESSAGEではメッセージを受信しません。パイプに新しいメッセージを追加するには、DBMS_PIPE.SEND_MESSAGEをコールしてメッセージを明示的にキャッシュする必要があります。このケースを回避するために、シングルトン・パイプから読み取るときにメッセージを使用できないように、キャッシュ関数を定義できます。キャッシュ・ファンクションを定義すると、次のシナリオでメッセージを受信すると、キャッシュ・ファンクションが自動的に起動されます。

  • シングルトンパイプが空の場合。
  • shelflife時間が経過したためにシングルトン・パイプのメッセージが無効である場合。

キャッシュ・ファンクションを使用するには、キャッシュ・ファンクションを定義し、DBMS_PIPE.RECEIVE_MESSAGEcache_funcパラメータを含めます。ユーザー定義キャッシュ機能には、次の機能があります。

  • キャッシュ・ファンクションは、DBMS_PIPE.RECEIVE_MESSAGEを使用してシングルトン・パイプからメッセージを読み取るときに指定できます。
  • シングルトン・パイプにメッセージがない場合、DBMS_PIPE.RECEIVE_MESSAGEはキャッシュ関数を呼び出します。
  • メッセージのshelflife時間が経過すると、データベースはシングルトン・パイプに新しいメッセージを自動的に移入します。

キャッシュ・ファンクションを使用すると、シングルトン・パイプの使用が簡単になります。空のパイプからメッセージを受信するための障害ケースを処理する必要はありません。また、キャッシュ・ファンクションは、シングルトン・パイプからメッセージを読み取るときにキャッシュ・ミスがないことを確認し、キャッシュされたメッセージを最大限に活用します。

自動キャッシュリフレッシュ・キャッシュfunction.epsの説明が続きます
図automatic-cache-refresh-cache-function.epsの説明

キャッシュ・ファンクションを定義する場合、ファンクション名は所有者スキーマで完全修飾する必要があります。

  • OWNER.FUNCTION_NAME
  • OWNER.PACKAGE.FUNCTION_NAME

次のシグネチャを使用してキャッシュ関数を定義します。

CREATE OR REPLACE FUNCTION cache_function_name(
       pipename  IN VARCHAR2
) RETURN INTEGER;

キャッシュ・ファンクションでの一般的な操作は次のとおりです。

  • DBMS_PIPE.CREATE_PIPEを使用して、明示的なパイプ用のシングルトン・パイプを作成します。
  • シングルトン・パイプラインにキャッシュするメッセージの作成
  • キャッシュ・ファンクションで指定されたパイプにメッセージを送信し、オプションで暗黙的なメッセージにshelflifeを指定します。

キャッシュ・ファンクションを使用するには、DBMS_PIPE.RECEIVE_MESSAGEを起動する現在のセッション・ユーザーに、キャッシュ・ファンクションの実行に必須の権限が必要です。

キャッシュ・ファンクションの定義の詳細は、RECEIVE_MESSAGEファンクションを参照してください。

明示的なシングルトン・パイプの作成

指定されたパイプ名(明示的シングルトンパイプ)を持つシングルトンパイプを作成する手順について説明します。

最初に、この例では、DBMS_PIPE.RECEIVE_MESSAGEを繰り返しコールするreceive_messageヘルパー関数を作成します。これにより、シングルトンパイプ機能をテストできます。

CREATE OR REPLACE FUNCTION msg_types AS
       TYPE t_rcv_row IS RECORD (c1 VARCHAR2(32767), c2 NUMBER);
       TYPE t_rcv_tab IS TABLE OF t_rcv_row;
END;


CREATE OR REPLACE FUNCTION receive_message(
      pipename    IN VARCHAR2,
      rcv_count   IN NUMBER DEFAULT 1,
      cache_func  IN VARCHAR2 DEFAULT NULL)
   RETURN msg_types.t_rcv_tab pipelined
    AS
       l_msg    VARCHAR2(32767);
       l_status NUMBER;
 BEGIN
      FOR i IN 1..rcv_count LOOP
           l_status := DBMS_PIPE.RECEIVE_MESSAGE(
            pipename   => pipename,
            cache_func => cache_func,
            timeout    => 1);
         IF l_status != 0 THEN
              raise_application_error(-20000,
             'Message not received for attempt: ' || to_char(i) || ' status: ' ||
            l_status);
         END IF;

         DBMS_PIPE.UNPACK_MESSAGE(l_msg);
             pipe row(msg_types.t_rcv_row(l_msg));
     END LOOP;
 RETURN;
 END;
  1. shelflifeパラメータを3600 (秒)に設定して、PIPE_TESTという名前の明示的なシングルトン・パイプを作成します。
    DECLARE
      l_status INTEGER;
    BEGIN
      l_status := DBMS_PIPE.CREATE_PIPE(
                    pipename => 'MY_PIPE1',
                    private => TRUE,
                    singleton => TRUE,
                    shelflife => 3600);
    END;
    /

    詳細は、CREATE_PIPEファンクションを参照してください。

  2. シングルトン・パイプが作成されていることを確認します。
    SELECT name, singleton, type
         FROM v$db_pipes WHERE name= '&pipename' ORDER BY 1;
    
    NAME                 SINGLETON  TYPE
    -------------------- ---------- -------
    PIPE_TEST            YES        PRIVATE
  3. シングルトンパイプにメッセージをパックして送信します。
    
    EXEC DBMS_PIPE.PACK_MESSAGE('This is a real message that you can get multiple times');
    
    SELECT DBMS_PIPE.SEND_MESSAGE(pipename => '&pipename') status FROM DUAL;
    
    STATUS
    ----------
    0

    詳細は、PACK_MESSAGEプロシージャおよびSEND_MESSAGEファンクションを参照してください。

  4. シングルトンパイプからメッセージを受信します。
    SELECT * FROM receive_message(
        pipename => '&pipename',
        rcv_count => 2);
    
    MESSAGE
    --------------------------------------------------------------------------------
    This is a real message that you can get multiple times
    This is a real message that you can get multiple times

    receive_message関数は、DBMS_PIPE.RECEIVE_MESSAGEをコールするヘルパー関数です。

  5. メッセージを削除し、パイプを削除します。
    EXEC DBMS_PIPE.PURGE('&pipename');
    SELECT DBMS_PIPE.REMOVE_PIPE('&pipename') status FROM DUAL;

キャッシュ関数を使用した明示的シングルトン・パイプの作成

指定したパイプ名、明示的シングルトン・パイプを使用してシングルトン・パイプを作成し、キャッシュ関数を提供するステップについて説明します。キャッシュ・ファンクションを使用すると、メッセージをシングルトン・パイプに自動的に移入できます。

  1. シングルトン・パイプのキャッシュ関数test_cache_messageを作成します。
    CREATE OR REPLACE FUNCTION test_cache_message(
         pipename IN VARCHAR2) return NUMBER 
    AS 
       l_status NUMBER;
       l_data VARCHAR2(4000);
    BEGIN
       l_status := DBMS_PIPE.CREATE_PIPE(
              pipename => pipename,
              private => TRUE,
              singleton => true,
              shelflife => 600);
       IF l_status != 0 THEN RETURN l_status;
       END IF;
     
       DBMS_PIPE.PACK_MESSAGE('This is a placeholder cache message for an empty pipe');
       l_status := DBMS_PIPE.SEND_MESSAGE(pipename => pipename);
       RETURN l_status;
     END;
    /
    ノート

    DBMS_PIPE.RECEIVE_MESSAGEを起動する現行セッション・ユーザーには、キャッシュ・ファンクションの実行に必要な権限が必要です。
  2. キャッシュ関数で受信し、メッセージがパイプに移入されていることを確認します。パイプは、キャッシュ・ファンクションで作成されたプライベート・パイプとして存在する必要があります。
    SELECT * FROM receive_message(
         pipename => '&pipename',
         rcv_count => 1,
         cache_func => 'TEST_CACHE_MESSAGE');
    
    MESSAGE
    ---------------
    This is a placeholder cache message for an empty pipe
    

    receive_message関数は、DBMS_PIPE.RECEIVE_MESSAGEをコールするヘルパー関数です。receive_message定義については、明示的なシングルトン・パイプの作成を参照してください。

    詳細は、CREATE_PIPEファンクションを参照してください。

  3. メッセージがパイプ内に保持されていることを確認するために、キャッシュ関数なしで受信します。
    SELECT * FROM receive_message(
         pipename => '&pipename',
         rcv_count => 2);
    
    MESSAGE
    ---------------
    This is a placeholder cache message for an empty pipe
    This is a placeholder cache message for an empty pipe

    receive_message関数は、DBMS_PIPE.RECEIVE_MESSAGEをコールするヘルパー関数です。receive_message定義については、明示的なシングルトン・パイプの作成を参照してください。

    詳細は、CREATE_PIPEファンクションを参照してください。