ストリーミング・ソースとのコネクタの作成

コネクタ・ハブでコネクタを作成し、ストリーミング・サービスからターゲット・サービスにストリーム・データを転送します。

ストリーミング・サービスの詳細は、「ストリーミング」を参照してください。

ストリーミング・ソースおよび(オプション)ファンクション・タスクで定義されたコネクタでは、ファンクション、ログ・アナリティクス、オブジェクト・ストレージおよびストリーミングのターゲットがサポートされます。Notificationsターゲットは、ファンクション・タスクが使用されていない場合にのみサポートされます。

コネクタ・ハブ・ワークフローの例は、「コネクタ・ハブの概要」を参照してください。ソースとしてMonitoringを使用するコネクタの例については、シナリオ: オブジェクト・ストレージへのメトリックの送信を参照してください。

ノート

  • ストリーム入力スキーマについては、メッセージ参照を参照してください。
  • ストリーミング・ソースを使用するNotificationsターゲットの場合、すべてのメッセージはRAW JSON BLOBとして送信されます。

保持期間: ストリーミング・ソース

コネクタ・ハブのストリーミング・ソースの保持期間は顧客定義です。ストリーミング・リソースの制限 を参照してください。搬送の詳細は、「搬送詳細」を参照してください。

保存期間とともに、ストリーミング・ソースの読取り位置によって、ストリーム内のデータの移動を開始する場所が決まります。ソース接続を指定するときに、読取り位置を指定します。

    1. 「コネクタ」リスト・ページで、「コネクタの作成」を選択します。リスト・ページの検索に関するヘルプが必要な場合は、コネクタのリストを参照してください。
    2. 「コネクタの作成」ページで、新しいコネクタのわかりやすい名前と説明(オプション)を入力します。機密情報を入力しないでください。
    3. 新しいコネクタを格納するコンパートメントを選択します。
    4. 「コネクタの構成」「ソース」で、「ストリーミング」を選択します。
    5. 「ターゲット」で、ストリーム・データの転送先のサービスを選択します:
      • ファンクション: ストリーム・データをファンクションに送信します。
      • ログ・アナリティクス: ストリーム・データをログ・グループに送信します。
      • 通知: ストリーム・データをトピックに送信します。通知は、ファンクション・タスクが定義されていない場合にのみサポートされます。
      • オブジェクト・ストレージ: ストリーム・データをバケットに送信します。
      • ストリーミング: ストリーム・データをストリームに送信します。
    6. (オプション)新しいコネクタのサービス・ログを有効にするには、「ログ」スイッチを選択し、次の値を指定します:
      • ログ・カテゴリ: 値「コネクタ・トラッキング」が自動的に選択されます。
      • コンパートメント: コネクタのサービス・ログを格納するコンパートメントを選択します。
      • ログ・グループ: サービス・ログを格納するログ・グループを選択します。新しいログ・グループを作成するには、「新規グループの作成」を選択し、名前を入力します。
      • ログ名: オプションで、ログの名前を入力します。
      • 拡張オプションの表示:
        • ログ保持: オプションで、サービス・ログを保持する期間を指定します(デフォルト: 30日)。
    7. 「ソース接続の構成」で、ソース・ストリームを選択します:
      • コンパートメント: 必要なストリームを含むコンパートメントを選択します。
      • ストリーム・プール: 必要なストリームを含むストリーム・プールを選択します。

        プライベート・エンドポイント構成はサポートされていません。ストリーム・プール構成の詳細は、ストリーム・プールの作成を参照してください。

      • ストリーム: データを受信するストリームの名前を選択します。
      • 読取り位置: ストリームの読取りを開始するカーソル位置を指定します。
        • Latest: コネクタの作成後に公開されたメッセージの読取りを開始します。
          • この構成で新しいコネクタを初めて実行すると、コネクタの作成時間からデータが移動します。最初の実行が失敗した場合(ポリシーが欠落している場合など)、解決後、コネクタはコネクタの作成時間からデータを移動するか、作成時間が保存期間外の場合はストリーム内で使用可能な最も古いデータを移動します。たとえば、保存期間が2時間のストリームについて午前10時に作成されたコネクタを考えてみます。失敗した実行が午前11時に解決された場合、コネクタは午前10時からデータを移動します。失敗した実行が午後1時に解決された場合、コネクタはストリーム内で使用可能な最も古いデータを移動します。
          • その後、ストリームの次の位置からデータを移動します。後で実行が失敗した場合、解決後、コネクタはストリームの次の位置またはストリーム内の最も古い使用可能なデータから、ストリームの保存期間に応じてデータを移動します。
        • トリム範囲: ストリーム内で使用可能な最も古いメッセージから読取りを開始します。
          • この構成で新しいコネクタを初めて実行すると、ストリーム内で使用可能な最も古いデータからデータが移動します。最初の実行が失敗した場合(ポリシーの欠落など)、解決後、コネクタはストリームの保存期間に関係なく、ストリーム内で使用可能な最も古いデータを移動します。
          • その後、ストリームの次の位置からデータを移動します。後で実行が失敗した場合、解決後、コネクタはストリームの次の位置またはストリーム内の最も古い使用可能なデータから、ストリームの保存期間に応じてデータを移動します。
    8. (オプション)「ファンクション・タスクの構成」で、ファンクション・サービスを使用してストリーム・データを処理するファンクション・タスクを構成します:
      • タスクの選択: 「関数」を選択します。
      • コンパートメント: 必要なファンクションを含むコンパートメントを選択します。
      • ファンクション・アプリケーション: 必要なファンクションが含まれるファンクション・アプリケーションの名前を選択します。
      • ファンクション: ソースから取得したデータの処理に使用するファンクションの名前を選択します。

        タスクとしてコネクタによって使用される場合は、次のいずれかのレスポンスを返すようにファンクションを構成する必要があります:

        • JSONエントリのリスト(レスポンス・ヘッダーContent-Type=application/jsonを設定する必要があります)
        • 単一のJSONエントリ(レスポンス・ヘッダーContent-Type=application/jsonを設定する必要があります)
        • 単一のバイナリ・オブジェクト(レスポンス・ヘッダーContent-Type=application/octet-streamを設定する必要があります)
      • 追加オプションの表示: このリンクを選択し、ファンクションに送信されるデータのバッチごとに制限を指定します。手動設定を使用するには、バッチ・サイズ制限(KB)およびバッチ時間制限(秒)の値を指定します。

      ファンクション・タスクの考慮事項:

      • コネクタ・ハブは、ファンクション・タスクの出力を分析しません。ファンクション・タスクの出力は、そのままターゲットに書き込まれます。たとえば、ファンクション・タスクで通知ターゲットを使用する場合、すべてのメッセージはRAW JSON BLOBとして送信されます。
      • ファンクションは、1回の呼出しで6MBのデータと同期するように呼び出されます。データが6MBを超えると、コネクタが再びファンクションを呼び出して制限を超えるデータを移動します。このような呼出しは順次処理されます。
      • ファンクションは最大5分間実行できます。Delivery Detailsを参照してください。
      • 関数タスクはスカラー関数に制限されます。
    9. ターゲットとして「ファンクション」を選択した場合は、「ターゲットの構成」で、ログ・データの送信先となるファンクションを構成します。ステップ15にスキップします。
      • コンパートメント: 必要なファンクションを含むコンパートメントを選択します。
      • ファンクション・アプリケーション: 必要なファンクションを含むファンクション・アプリケーションの名前を選択します。
      • ファンクション: データの送信先のファンクションの名前を選択します。
      • 追加オプションの表示: このリンクをクリックし、ファンクションに送信されるデータのバッチごとに制限を指定します。手動設定を使用するには、バッチ・サイズ制限(KBまたはメッセージ数)およびバッチ時間制限(秒)の値を指定します。

        たとえば、5,000KBまたは10メッセージを選択して、バッチ・サイズを制限します。バッチ時間制限の例は5秒です。

      ファンクション・ターゲットの考慮事項:

      • コネクタはソース・データをJSONリストとしてバッチでフラッシュします。最大バッチ(ペイロード)サイズは6MBです。
      • ファンクションは、1回の呼出しで6MBのデータと同期するように呼び出されます。データが6MBを超えると、コネクタが再びファンクションを呼び出して制限を超えるデータを移動します。このような呼出しは順次処理されます。
      • ファンクションは最大5分間実行できます。Delivery Detailsを参照してください。
      • ファンクション・ターゲットからコネクタにデータを返さないでください。コネクタ・ハブは、ファンクション・ターゲットから返されたデータを読み取りません。
    10. ターゲットとして「ログ・アナリティクス」を選択した場合は、「ターゲットの構成」で、ログ・データを送信するログ・グループを構成します。ステップ15にスキップします。
      • コンパートメント: 必要なログ・グループを含むコンパートメントを選択します。
      • ログ・グループ: 必要なログ・グループを選択します。
      • ログ・ソース識別子(ストリーミング・ソースの場合のみ): ログ・ソースを選択します。
    11. ターゲットとして「通知」を選択した場合は、「ターゲットの構成」で、ログ・データの送信先となるトピックを構成します。ステップ15にスキップします。
      • コンパートメント: 目的のトピックを含むコンパートメントを選択します。
      • トピック: データの送信先のトピックの名前を選択します。ストリーミング・ターゲットの場合、メッセージはRAW JSON BLOBとして送信されます。

      通知ターゲットの考慮事項:

      • Notificationsターゲットは、ファンクション・タスクが使用されていない場合にのみ、ストリーミング・ソースでサポートされます。
      • Notificationsターゲットの最大メッセージ・サイズは128KBです。最大サイズを超えるメッセージは削除されます。
      • SMSメッセージでは、特定のコネクタ構成で予期しない結果が生成されます。この問題は、指定されたコネクタ構成でSMSサブスクリプションを含むトピックに限定されます。詳細は、1つの通知に対する複数のSMSメッセージを参照してください。
    12. ターゲットとして「オブジェクト・ストレージ」を選択した場合は、「ターゲットの構成」で、ログ・データの送信先となるバケットを構成します。ステップ15にスキップします。
      • コンパートメント: 目的のバケットを含むコンパートメントを選択します。
      • バケット: データの送信先のバケットの名前を選択します。
      • オブジェクト名の接頭辞: オプションで、接頭辞値を入力します。
      • 追加オプションの表示: このリンクを選択し、オプションでバッチ・サイズ(MB)およびバッチ時間(ミリ秒)の値を入力します。

      オブジェクト・ストレージ・ターゲットに関する考慮事項:

      • バッチ・ロールオーバーの詳細:

        • バッチ・ロールオーバー・サイズ: 100MB
        • バッチ・ロールオーバー時間: 7分
      • オブジェクト・ストレージに保存されるファイルは、gzipを使用して圧縮されます。

    13. ターゲットとして「ストリーミング」を選択した場合は、「ターゲットの構成」で、ログ・データの送信先となるストリームを構成します。
      • コンパートメント: 必要なストリームを含むコンパートメントを選択します。
      • ストリーム: データの送信先のストリームの名前を選択します。

      プライベート・エンドポイント構成はサポートされていません。ストリーム・プール構成の詳細は、ストリーム・プールの作成を参照してください。

    14. デフォルト・ポリシーを受け入れるには、各デフォルト・ポリシーに用意されている「作成」リンクを選択します。

      デフォルト・ポリシーは、このコネクタがソース、タスクおよびターゲット・サービスにアクセスするために必要な認可のために提供されます。

      この認可は、このようなデフォルト・ポリシーまたはグループベースのポリシーから取得できます。デフォルト・ポリシーは、コンソールを使用してコネクタを作成または編集する際に必ず提供されます。唯一の例外は、IAMに正確なポリシーがすでに存在する場合です。このケースではデフォルト・ポリシーは提供されません。この認可要件の詳細は、認証と認可を参照してください。

      • デフォルト・ポリシーを受け入れる権限がない場合は、管理者に連絡してください。
      • 自動的に作成されたポリシーは、コネクタが削除されてもそのままです。ベスト・プラクティスとしては、コネクタを削除するときに関連するポリシーを削除してください。

      新しく作成したポリシーを確認するには、関連付けられたビュー・リンクを選択します。

    15. (オプション)コネクタにタグを割り当てます。「拡張オプションの表示」をクリックし、タグ付けフィールドの値を指定します。
      • タグ・ネームスペース: 定義済タグを追加するには、既存のネームスペースを選択します。解放元タグを追加するには、値を空白のままにします。
      • タグ・キー: 定義済タグを追加するには、既存のタグ・キーを選択します。フリーフォーム・タグを追加するには、必要なキー名を入力します。
      • タグ値: 必要なタグ値を入力します。
      • タグの追加: クリックして別のタグを追加します。
    16. 「作成」をクリックします。
    作成プロセスが開始され、その進行状況が表示されます。完了すると、コネクタの詳細ページが開きます。
  • ストリーミング・ソースでコネクタを作成するには、oci sch service-connector createコマンドと必要なパラメータを使用します:

    oci sch service-connector create --display-name "<display_name>" --compartment-id <compartment_OCID> --source [<stream_source_in_JSON>] --target [<target_in_JSON>]

    CLIコマンドのパラメータおよび値の完全なリストは、CLIコマンド・リファレンスを参照してください。

  • CreateServiceConnector操作を実行してコネクタを作成します。

    ストリーミング・ソースを含むコネクタを作成するには、リクエスト(CreateServiceConnectorDetails)のsourceにストリーミングの詳細を移入します。例については、StreamingSourceDetailsを参照してください。

新規コネクタによるデータの移動の確認

コネクタを作成したら、データが移動していることを確認します。

  • コネクタでデータ・フローの詳細を取得するためのログの有効化
  • ターゲット・サービスで予想される結果を確認します。

データが移動されていることを確認すると、自動非アクティブ化を回避できます。これは、コネクタに長時間障害が発生した場合に発生します。