ストリーミング・ソースを使用したコネクタの作成

コネクタ・ハブにコネクタを作成して、ストリーミング・サービスからターゲット・サービスにストリーム・データを転送します。

ストリーミング・サービスの詳細は、ストリーミングを参照してください。

ストリーミング・ソースおよび(オプション)ファンクション・タスクで定義されたコネクタは、ファンクションログ・アナリティクスオブジェクト・ストレージおよびストリーミングのターゲットをサポートします。通知ターゲットは、ファンクション・タスクが使用されていない場合にのみサポートされます。

コネクタ・ハブのワークフローの例は、コネクタ・ハブの概要を参照してください。モニタリングをソースとして使用するコネクタの例は、シナリオ: オブジェクト・ストレージへのメトリックの送信を参照してください。

コネクタ・ハブストリーミング・ソースの保持期間は顧客定義です。ストリーミング・リソースの制限を参照してください。配信の詳細は、配信の詳細を参照してください。

ストリーミング・ソースの読取り位置は、保持期間とともに、ストリーム内のデータの移動を開始する場所を決定します。ソース接続を指定するときに、読取り位置を指定します。

ノート

  • ストリーム入力スキーマについては、メッセージ参照を参照してください。
  • 通知ターゲットおよびストリーミング・ソースの場合、すべてのメッセージはRAW JSON BLOBとして送信されます。
    1. ナビゲーション・メニューを開き、「アナリティクスとAI」をクリックします「メッセージング」で、「コネクタ・ハブ」をクリックします。
    2. 「コネクタ」ページで、コンパートメントを選択します。
    3. 「コネクタの作成」をクリックします。
    4. 「コネクタの作成」ページで、新しいコネクタのわかりやすい名前とオプションの説明を入力します。機密情報を入力しないでください。
    5. 新しいコネクタを格納するコンパートメントを選択します。
    6. 「コネクタの構成」「ソース」で、「ストリーミング」を選択します。
    7. 「ターゲット」で、ストリーム・データの転送先のサービスを選択します:
      • ファンクション: ストリーム・データをファンクションに送信します。
      • ログ・アナリティクス: ストリーム・データをログ・グループに送信します。
      • 通知: ストリーム・データをトピックに送信します。通知は、関数タスクが定義されていない場合にのみサポートされます。
      • オブジェクト・ストレージ: ストリーム・データをバケットに送信します。
      • ストリーミング: ストリーム・データをストリームに送信します。
    8. (オプション)新しいコネクタのサービス・ログを有効にするには、「ログ」スイッチをクリックし、次の値を指定します:
      • ログ・カテゴリ: 値「コネクタ・トラッキング」が自動的に選択されます。
      • コンパートメント: コネクタのサービス・ログを格納するコンパートメントを選択します。
      • ログ・グループ: サービス・ログを格納するログ・グループを選択します。新しいログ・グループを作成するには、「新規グループの作成」をクリックし、名前を入力します。
      • ログ名: オプションで、ログの名前を入力します。
      • 拡張オプションの表示:
        • ログ保持: オプションで、サービス・ログを保持する期間を指定します(デフォルト: 30日)。
    9. 「ソース接続の構成」で、ソース・ストリームを選択します:
      • コンパートメント: 必要なストリームを含むコンパートメントを選択します。
      • ストリーム・プール: 必要なストリームを含むストリーム・プールを選択します。

        プライベート・エンドポイント構成はサポートされていません。ストリーム・プール構成の詳細は、ストリーム・プールの作成を参照してください。

      • ストリーム: データを受信するストリームの名前を選択します。
      • 読取り位置: ストリームの読取りを開始するカーソル位置を指定します。
        • Latest: コネクタの作成後に公開された読取りメッセージを開始します。
          • この構成で新しいコネクタを初めて実行すると、コネクタの作成時間からデータが移動します。最初の実行が失敗した場合(ポリシーが欠落している場合など)、解決後、コネクタはコネクタの作成時間からデータを移動するか、作成時間が保存期間外の場合はストリーム内で使用可能な最も古いデータを移動します。たとえば、保存期間が2時間のストリームについて午前10時に作成されたコネクタを考えてみます。失敗した実行が午前11時に解決された場合、コネクタは午前10時からデータを移動します。失敗した実行が午後1時に解決された場合、コネクタはストリーム内で使用可能な最も古いデータを移動します。
          • その後、ストリームの次の位置からデータを移動します。後で実行が失敗した場合、解決後、コネクタはストリームの次の位置またはストリーム内の最も古い使用可能なデータから、ストリームの保存期間に応じてデータを移動します。
        • 範囲の切捨て: ストリーム内で使用可能な最も古いメッセージからの読取りを開始します。
          • この構成で新しいコネクタを初めて実行すると、ストリーム内で使用可能な最も古いデータからデータが移動します。最初の実行が失敗した場合(ポリシーの欠落など)、解決後、コネクタはストリームの保存期間に関係なく、ストリーム内で使用可能な最も古いデータを移動します。
          • その後、ストリームの次の位置からデータを移動します。後で実行が失敗した場合、解決後、コネクタはストリームの次の位置またはストリーム内の最も古い使用可能なデータから、ストリームの保存期間に応じてデータを移動します。
    10. (オプション)「ファンクション・タスクの構成」で、ファンクション・サービスを使用してストリーム・データを処理するファンクション・タスクを構成します:
      • タスクの選択: 「関数」を選択します。
      • コンパートメント: 必要なファンクションを含むコンパートメントを選択します。
      • ファンクション・アプリケーション: 必要なファンクションが含まれるファンクション・アプリケーションの名前を選択します。
      • ファンクション: ソースから取得したデータの処理に使用するファンクションの名前を選択します。

        タスクとしてコネクタによって使用される場合は、次のいずれかのレスポンスを返すようにファンクションを構成する必要があります:

        • JSONエントリのリスト(レスポンス・ヘッダーContent-Type=application/jsonを設定する必要があります)
        • 単一のJSONエントリ(レスポンス・ヘッダーContent-Type=application/jsonを設定する必要があります)
        • 単一のバイナリ・オブジェクト(レスポンス・ヘッダーContent-Type=application/octet-streamを設定する必要があります)
      • 追加オプションの表示: このリンクをクリックして、関数に送信されるデータのバッチごとに制限を指定します。手動設定を使用する場合は、バッチ・サイズ制限(KB)およびバッチ時間制限(秒)の値を指定します。

      ファンクション・タスクの考慮事項:

      • コネクタ・ハブは、ファンクション・タスクの出力を解析しません。ファンクション・タスクの出力は、そのままターゲットに書き込まれます。たとえば、ファンクション・タスクで通知ターゲットを使用する場合、すべてのメッセージはRAW JSON BLOBとして送信されます。
      • ファンクションは、1回の呼出しで6MBのデータと同期するように呼び出されます。データが6MBを超えると、コネクタが再びファンクションを呼び出して制限を超えるデータを移動します。このような呼出しは順次処理されます。
      • ファンクションは最大5分間実行できます。
      • 関数タスクはスカラー関数に制限されます。
    11. ターゲットとして「ファンクション」を選択した場合は、「ターゲットの構成」で、ログ・データの送信先となるファンクションを構成します。ステップ15にスキップします。
      • コンパートメント: 必要なファンクションを含むコンパートメントを選択します。
      • ファンクション・アプリケーション: 必要なファンクションを含むファンクション・アプリケーションの名前を選択します。
      • ファンクション: データの送信先のファンクションの名前を選択します。
      • 追加オプションの表示: このリンクをクリックして、ファンクションに送信されるデータのバッチごとに制限を指定します。手動設定を使用する場合は、バッチ・サイズ制限(KBまたはメッセージ数)およびバッチ時間制限(秒)の値を指定します。

        たとえば、5,000KBまたは10メッセージを選択して、バッチ・サイズを制限します。バッチ時間制限の例は5秒です。

      ファンクション・ターゲットに関する考慮事項:

      • コネクタはソース・データをJSONリストとしてバッチでフラッシュします。最大バッチ(ペイロード)サイズは6MBです。
      • ファンクションは、1回の呼出しで6MBのデータと同期するように呼び出されます。データが6MBを超えると、コネクタが再びファンクションを呼び出して制限を超えるデータを移動します。このような呼出しは順次処理されます。
      • ファンクションは最大5分間実行できます。
      • ファンクション・ターゲットからコネクタにデータを返しないでください。コネクタ・ハブは、ファンクション・ターゲットから返されたデータを読み取りません。
    12. ターゲットとして「ログ・アナリティクス」を選択した場合は、「ターゲットの構成」で、ログ・データを送信するログ・グループを構成します。ステップ15にスキップします。
      • コンパートメント: 必要なログ・グループを含むコンパートメントを選択します。
      • ログ・グループ: 必要なログ・グループを選択します。
      • ログ・ソース識別子(ストリーミング・ソースの場合のみ): ログ・ソースを選択します。
    13. ターゲットとして「通知」を選択した場合は、「ターゲットの構成」で、ログ・データの送信先となるトピックを構成します。ステップ15にスキップします。
      • コンパートメント: 目的のトピックを含むコンパートメントを選択します。
      • トピック: データの送信先のトピックの名前を選択します。ストリーミング・ターゲットの場合、メッセージはRAW JSON BLOBとして送信されます。

      通知ターゲットに関する考慮事項:

      • 通知ターゲットは、ファンクション・タスクが使用されていない場合にのみ、ストリーミング・ソースでサポートされます。
      • 通知ターゲットの最大メッセージ・サイズは128KBです。最大サイズを超えるメッセージは削除されます。
      • SMSメッセージでは、特定のコネクタ構成の場合に予期しない結果が生成されます。この問題は、指定されたコネクタ構成でSMSサブスクリプションを含むトピックに限定されます。詳細は、1つの通知に複数のSMSメッセージを参照してください。
    14. ターゲットとして「オブジェクト・ストレージ」を選択した場合は、「ターゲットの構成」で、ログ・データの送信先となるバケットを構成します。ステップ15にスキップします。
      • コンパートメント: 目的のバケットを含むコンパートメントを選択します。
      • バケット: データの送信先のバケットの名前を選択します。
      • オブジェクト名の接頭辞: オプションで、接頭辞値を入力します。
      • 追加オプションの表示: このリンクをクリックし、オプションでバッチ・サイズ(MB)およびバッチ時間(ミリ秒)の値を入力します。

      オブジェクト・ストレージ・ターゲットに関する考慮事項:

      • バッチ・ロールオーバーの詳細:

        • バッチ・ロールオーバー・サイズ: 100MB
        • バッチ・ロールオーバー時間: 7分
      • オブジェクト・ストレージに保存されるファイルは、gzipを使用して圧縮されます。

    15. ターゲットとして「ストリーミング」を選択した場合は、「ターゲットの構成」で、ログ・データの送信先となるストリームを構成します。
      • コンパートメント: 必要なストリームを含むコンパートメントを選択します。
      • ストリーム: データの送信先のストリームの名前を選択します。

      プライベート・エンドポイント構成はサポートされていません。ストリーム・プール構成の詳細は、ストリーム・プールの作成を参照してください。

    16. デフォルト・ポリシーを受け入れるには、各デフォルト・ポリシーに用意されている「作成」リンクをクリックします。

      デフォルト・ポリシーは、このコネクタがソース、タスクおよびターゲット・サービスにアクセスするために必要な認可のために提供されます。

      この認可は、このようなデフォルト・ポリシーまたはグループベースのポリシーから取得できます。デフォルト・ポリシーは、コンソールを使用してコネクタを作成または編集する際に必ず提供されます。唯一の例外は、一致するポリシーがIAMにすでに存在する場合です。この場合、デフォルト・ポリシーは提供されません。この認可要件の詳細は、認証と認可を参照してください。

      • デフォルト・ポリシーを受け入れる権限がない場合は、管理者に連絡してください。
      • 自動的に作成されたポリシーは、コネクタが削除されてもそのままです。ベスト・プラクティスとしては、コネクタを削除するときに関連するポリシーを削除してください。

      新しく作成したポリシーを確認するには、関連するビュー・リンクをクリックします。

    17. (オプション)コネクタにタグを割り当てます。「拡張オプションの表示」をクリックし、タグ付けフィールドの値を指定します。
      • タグ・ネームスペース: 定義済タグを追加するには、既存のネームスペースを選択します。free-fromタグを追加する場合は、空白のままにします。
      • タグ・キー: 定義済タグを追加するには、既存のタグ・キーを選択します。フリーフォーム・タグを追加するには、必要なキー名を入力します。
      • タグ値: 必要なタグ値を入力します。
      • タグの追加: クリックして別のタグを追加します。
    18. 「作成」をクリックします。
  • oci sch service-connector createコマンドおよび必須パラメータを使用して、ストリーミング・ソースでコネクタを作成します:

    oci sch service-connector create --display-name "<display_name>" --compartment-id <compartment_OCID> --source [<stream_source_in_JSON>] --target [<target_in_JSON>]

    CLIコマンドのパラメータおよび値の完全なリストは、CLIコマンド・リファレンスを参照してください。

  • CreateServiceConnector操作を実行してコネクタを作成します。

    ストリーミング・ソースを使用してコネクタを作成するには、リクエストのsource (CreateServiceConnectorDetails)にストリーミングの詳細を移入します。例については、StreamingSourceDetailsを参照してください。

新規コネクタによるデータの移動の確認

コネクタを作成したら、データが移動していることを確認します。

  • コネクタでデータ・フローの詳細を取得するためのログの有効化
  • ターゲット・サービスで予想される結果を確認します。

データが移動されていることを確認すると、自動非アクティブ化を回避できます。これは、コネクタに長時間障害が発生した場合に発生します。