パイプラインの使用

パイプラインを使用すると、一連のタスクを順序または並行して接続して、データ処理を調整できます。

パイプラインを作成することで、複雑なタスク依存関係グラフを構築し、タスクのワークロード全体を自動化できます。タスクは公開する必要があり、現在のワークスペースまたは別のワークスペースにあるアプリケーションから公開済タスクを追加できます。

このチュートリアルでは:

  • パイプラインで並行して実行する2つのデータ・ローダー・タスクを作成します。
  • 通知サービスを使用して電子メール通知を送信するRESTタスクを作成します。
  • パイプラインを作成し、データ・ローダー・タスク、マージ、統合タスクおよびRESTタスクの演算子を追加します。
  • パイプライン・タスクを作成して、パイプラインのランタイム・コンテキストを構成します。
  • パイプライン・タスクを公開し、パイプラインを実行します。
  • パイプライン実行をモニターします。

開始する前に

このチュートリアルを完了するには:

1. 収益データのデータ・ローダー・タスクの作成

Load Revenue Data into Data Warehouseタスクを複製して、収益データをロードおよび上書きする新しいタスクを作成します。

  1. DI Labプロジェクトの詳細ページで、サブメニューの「タスク」をクリックします。
  2. 「タスク」リストで、Load Revenue Data into Data Warehouseを見つけます。
  3. 「アクション」メニュー(「アクション」メニュー)をクリックし、「複製」を選択します。
  4. 「タスクの複製」ダイアログで、新しい名前にRevenue Data Loadと入力し、「複製」をクリックします。

    「識別子」の値は、指定した名前に基づいて自動的に生成されます。生成された値を変更できますが、新しいタスクの保存後は識別子の更新は許可されません。

  5. 「タスク」リストで、「収益データ・ロード」をクリックします。

    複製されたデータ・ローダー・タスク・ページが新しいタブで開きます。

  6. 「ターゲット」ステップのアイコンをクリックします。
  7. 「ターゲット・データ・エンティティのロード設定」で、「既存のデータ・エンティティの使用」をクリックします。
  8. 「統合戦略」メニューから、「上書き」を選択します。
  9. 「使用可能なデータ・エンティティ」で、REVENUE_TARGETのチェック・ボックスを選択し、「ターゲットとして設定」をクリックします。

    REVENUE_TARGETという名前が「選択したデータ・エンティティ」の横に表示されます。

  10. 「保存」をクリックしてタスクを保存し、編集を続行します。
  11. 「次」をクリックして、「変換」ステップに移動します。

    以前にSRC_ORDER_NUMBERに適用されたNull埋込変換は削除しないでください。

  12. 「次へ」をクリックして、「属性マッピング」ステップに移動します。

    すべてのソース属性とターゲット属性は自動的にマップされます。

  13. 「次へ」をクリックして、「確認および検証」ステップに移動します。

    タスクの検証が自動的に開始されます。

    各ステップの詳しい構成がブロックにまとめて表示されます。ステップの構成を変更した場合は、「確認および検証」ステップに移動してタスクを再度検証します。

    タスク検証の結果は、最後のブロック、「検証」に表示されます。

  14. 検証に成功したら、「保存して閉じる」をクリックします。

2. 顧客データのデータ・ローダー・タスクの作成

新しいターゲット・データ・エンティティを作成して、顧客データをデータ・ウェアハウスにロードするためのデータ・ローダー・タスクを作成します。

  1. DI Labプロジェクトの詳細ページで、サブメニューの「タスク」をクリックします。
  2. 「タスクの作成」をクリックし、「データ・ローダー」を選択します。

    「データ・ローダー・タスクの作成」ページが新しいタブで開きます。上部にある番号と名前が付けられたステップに従って構成を進めることができます。ステップを構成すると、ステップ・アイコンにチェック・マークが表示されます。ステップ間に移動するには、「次」または「前」をクリックします。アイコンを選択して、構成済ステップに直接移動することもできます。

  3. 「データ・ローダー・タスクの作成」ページの「基本情報」ステップで、次を選択します:
    項目選択
    ソース・タイプ ファイル・ストレージ
    ターゲット・タイプ データベース
    ロード・タイプ 1つのデータ・エンティティ
  4. タスクの「名前」に、Customer Data Loadと入力します。その後、「次」をクリックして次のステップに移動します。

    ステップを構成すると、「基本情報」ステップ・アイコンにチェック・マークが表示されます。

  5. 「ソース」ステップで、次を選択します:
    項目選択
    データ・アセット Data_Lake
    接続 デフォルトの接続
    コンパートメント サンプル・データ・ファイル(CUSTOMERS.JSON)をアップロードしたバケットを含むコンパートメント
    バケット サンプルJSONファイルを含むオブジェクト・ストレージ・バケット
  6. 「ファイル設定」で、次を選択します:
    項目選択
    ファイル・タイプ JSON
    圧縮タイプ 自動(デフォルト)
    エンコーディング: UTF-8

    残りのフィールドは、デフォルト設定のままにしておくことができます。

  7. 「使用可能なデータ・エンティティ」で、CUSTOMERS.JSONのチェック・ボックスを選択し、「ソースとして設定」をクリックします。

    CUSTOMERS.JSONという名前が、「選択したデータ・エンティティ」の横に表示されます。

  8. 「作成」をクリックしてタスクを保存し、編集を続行します。
  9. 「次」をクリックして「ターゲット」ステップに進み、次を選択します:
    項目選択
    データ・アセット Data_Warehouse
    接続 デフォルトの接続
    スキーマ BETA
  10. 「ステージングの場所」では、ターゲット・データ・アセットの作成時に設定されたデフォルトのステージングの場所を使用できます。

    または、チェック・ボックスの選択を解除して、別のObject Storageバケットを選択できます。

  11. 「ターゲット・データ・エンティティのロード設定」で、「新規データ・エンティティの作成」をクリックします。
  12. 「ターゲット・データ・エンティティ名オプション」で、「エンティティ名の指定」を選択します。次に、「エンティティ名」フィールドにCUSTOMER_JSON_TARGETと入力します。
  13. 「保存」をクリックしてタスクを保存し、編集を続行します。
  14. 「確認および検証」ステップをクリックし、オプションの変換ステップをスキップします。

    タスクの検証が自動的に開始されます。

    各ステップの詳しい構成がブロックにまとめて表示されます。ステップの構成を変更した場合は、「確認および検証」ステップに移動してタスクを再度検証します。

    タスク検証の結果は、最後のブロック、「検証」に表示されます。

  15. 検証に成功したら、「保存して閉じる」をクリックします。

3. 通知を送信するためのRESTタスクの作成

RESTタスクを使用して、パイプライン内でREST APIエンドポイントを実行できます。このチュートリアルでは、データ統合RESTタスクの通知サービスAPIを使用して、パイプライン内から電子メールを公開します。

このステップでRESTタスクを作成するには、次のものがすでにある必要があります:
  • 通知サービスで作成されたトピックおよび電子メール・サブスクリプション。

  • 作成したトピックのOCID。OCIDは、通知サービスのトピック詳細ページの「トピック情報」セクションで使用できます。

  • 通知REST APIを起動するデータ統合タスクを実行できる次のポリシー・ステートメント:

    allow any-user to use notification-family in tenancy where ALL {request.principal.type='disworkspace'}

次に、データ統合で、通知サービスAPIを使用してEメールを公開するRESTタスクを作成します。

  1. DI Labプロジェクトの詳細ページで、サブメニューの「タスク」をクリックします。
  2. 「タスクの作成」をクリックし、「REST」を選択します。

    「RESTタスクの作成」ページが表示されます。

  3. 「名前」に、Notify by Emailと入力します。

    「識別子」の値は、指定した名前に基づいて自動的に生成されます。生成された値を変更できますが、新しいタスクの保存後は識別子の更新は許可されません。

  4. 「REST API詳細」セクションで、「構成」をクリックします。

    「REST API詳細の構成」ページが表示されます。上部にある番号と名前が付けられたステップに従って構成を進めることができます。ステップを構成すると、ステップ・アイコンにチェック・マークが表示されます。ステップ間に移動するには、「次」または「前」をクリックします。アイコンを選択して、構成済ステップに直接移動することもできます。

  5. 「HTTPメソッド」で、「POST」を選択します。
  6. 「URL」フィールドに次を入力して[Enter]を押します。
    https://notification.us-ashburn-1.oci.oraclecloud.com/20181201/topics/${TOPICID}/messages
    ノート

    通知サービスに適切なリージョン識別子を使用していることを確認してください。

    URLの入力後に[Enter]を押すと、データ統合によって${}パラメータ構文が文字列URLパラメータに変換されます。

  7. 表内の新規追加URLパラメータTOPICIDの行で、「アクション」メニュー(「アクション」メニュー)から「編集」を選択します。
  8. 「値」フィールドに、作成した通知トピックのOCIDを入力し、「保存」をクリックします。
  9. 次に、次のステップに従ってヘッダーを追加します。
    1. 「ヘッダー」をクリックします。
    2. 「ヘッダーの追加」をクリックします。
    3. 「キー」フィールドにconと入力し、リストから「コンテンツ・タイプ」を選択します。
    4. 「値」フィールドにappと入力し、リストからapplication/jsonを選択します。
    5. 「追加」をクリックします。
  10. 次のステップに従って、リクエスト本文を追加します。
    1. 「リクエスト」をクリックします。
    2. エディタで、次のように入力します。
      {"title": "Put your title here", "body": "Put your email body here."}
    3. 「追加」をクリックします。
  11. 「次へ」「構成」の順にクリックします。
  12. 認証を提供するには、次を実行します。
    1. 認証」セクションで、「編集」をクリックして「認証の構成」パネルを表示します。
    2. 「認証」メニューから、「OCIリソース・プリンシパル」を選択します。
    3. 「認証ソース」で、「ワークスペース」を選択します。
    4. 「構成」をクリックします。
  13. オプションの「タスクの検証」セクションで、「検証」をクリックします。
  14. 検証に成功したら、「作成してクローズ」をクリックします。

4. データ・ローダーおよびRESTタスクの公開

  1. DI_Labプロジェクトの詳細ページで、サブメニューの「タスク」をクリックします。
  2. タスクのリストから、「収益データ・ロード」「顧客データ・ロード」および「Eメールで通知」の横にあるチェック・ボックスを選択します。
  3. 「アプリケーションにパブリッシュ」をクリックします。
  4. 「アプリケーションに公開」ダイアログで、「Lab Application」を選択し、「Publish」をクリックします。

    通知メッセージが、公開済のタスクを表示するためのアプリケーションへのリンクとともに表示されます。

  5. 通知で「アプリケーションの表示」を選択します。次に、「X」を選択して通知を閉じます。

    アプリケーションの詳細ページの「パッチ」リストが表示されます。公開しているタスクに対して1つのパッチ・エントリが作成されます。

  6. 「パッチ」リストで、パッチ・ステータスをモニターできます。「リフレッシュ」をクリックして、最新のステータス更新を取得します。

    パッチのステータスが「成功」に変更されると、アプリケーションの詳細ページの「タスク」リストに3つの公開済タスク・エントリが作成されます。

  7. ラボ・アプリケーションの詳細ページで「タスク」をクリックします。

    「収益データ・ロード」「顧客データ・ロード」および「Eメールによる通知」の公開済タスクがタスク・リストに表示されます。

5. パイプラインの作成

  1. タブ・バーで、「開く」タブ(プラス・アイコン)、「プロジェクト」の順に選択します。
  2. 「プロジェクト」ページで、DI_Labをクリックします。
  3. DI_Labプロジェクトの詳細ページで、左側のサブメニューの「パイプライン」をクリックし、「パイプラインの作成」をクリックします。

    パイプライン・デザイナが新しいタブで開きます。キャンバス上に開始演算子と終了演算子が配置されます。

  4. パイプラインの「プロパティ」パネルで、「名前」としてAnalyze Revenueと入力します。

    「識別子」の値は、パイプライン名に入力した値に基づいて自動的に生成されます。生成された値を変更できますが、パイプラインの保存後は識別子の更新は許可されません。

  5. 「作成」をクリックします。

    デザイナは開いたままになっており、編集を続行できます。

6. パイプライン演算子の追加

パイプラインでオーケストレーションする公開済タスクを指定するタスク演算子を追加します。

パイプライン演算子の詳細を参照してください。

  1. 「演算子」パネルから、データ・ローダー演算子をキャンバスにドロップし、開始演算子と終了演算子の間に配置します。

    「プロパティ」パネルに、バインドされていないデータ・ローダー・タスク演算子の詳細が表示されるようになりました。

  2. 「プロパティ」パネルの「詳細」タブで、「選択」をクリックします。

    公開されたデータ・ローダー・タスクを選択するための「データ・ローダー・タスクの選択」パネルが表示されます。

  3. 「ラボ・アプリケーション」から、「収益データ・ロード」(収益データをデータ・ウェアハウスにロードするタスク)を選択し、「選択」をクリックします。

    演算子アイコンの名前が、選択したタスクの名前に戻ります。

  4. 開始演算子を収益データ・ローダー・タスクに接続します。
  5. パイプラインを保存して編集を続行するには、「保存」をクリックします。
  6. ステップを繰り返して、2番目のデータ・ローダー演算子を追加します。今回は、「顧客データ・ロード」(顧客データをロードするタスク)を選択します。次に、開始演算子を顧客データ・ローダー・タスクに接続します。
  7. 次に、「マージ」演算子をキャンバスにドロップし、2つのデータ・ローダー・タスクの後に配置します。
  8. 各データ・ローダー・タスクをマージ演算子に接続します。
  9. マージ演算子の「プロパティ」パネルの「詳細」タブで、「マージ条件」メニューから「すべての成功」を選択します。

    これは、次のダウンストリーム操作を続行する前に、アップストリームにリンクされたパラレル操作が完了し、成功する必要があることを指定します。

  10. 「演算子」パネルから、統合演算子をキャンバスにドロップし、マージ演算子の後に配置します。
  11. 「プロパティ」パネルの「詳細」タブで、「選択」をクリックします。
  12. 「統合タスクの選択」パネルで、「Load Customers Lab」タスクを選択し、「選択」をクリックします。
  13. マージ演算子を統合タスク演算子に接続します。
  14. 次に、REST演算子をキャンバスにドロップし、統合タスクの後ろに配置します。
  15. 「プロパティ」パネルの「詳細」タブで、「選択」をクリックします。
  16. 「RESTタスクの選択」パネルで、「電子メールで通知」タスクを選択し、「選択」をクリックします。
  17. RESTタスク演算子の「プロパティ」パネルの「詳細」タブで、「受信リンク条件」メニューから「前の演算子の成功時に実行」を選択します。
  18. RESTタスクを終了演算子に接続します。
  19. キャンバス・ツールバーで「検証」をクリックします。

    「グローバル検証」パネルが表示され、警告またはエラーを確認できます。

  20. パイプラインを保存するには、「保存して閉じる」をクリックします。

7. パイプライン・タスクの作成

  1. タブ・バーで、「開く」タブ(プラス・アイコン)、「プロジェクト」の順に選択します。
  2. 「プロジェクト」ページで、DI_Labをクリックします。
  3. DI_Labプロジェクトの詳細ページで、左側のサブメニューの「タスク」をクリックします。
  4. 「タスクの作成」をクリックし、「パイプライン」を選択します。

    「パイプライン・タスクの作成」ページが表示されます。

  5. 「パイプライン・タスクの作成」ページで、「名前」Analyze Revenue Labに変更します。

    説明の入力はオプションです。「識別子」フィールドの値は、「名前」に入力した値に基づいて自動的に生成されます。生成された値は変更できますが、タスクの保存後は識別子の更新は許可されません。

  6. 「パイプライン」セクションで、「選択」をクリックします。
  7. 「パイプラインの選択」パネルで、「収益の分析」を選択し、「選択」をクリックします。

    パイプラインの検証が自動的に開始されます。

  8. 「作成してクローズ」をクリックします。

8. パイプライン・タスクの公開および実行

  1. DI_Labプロジェクトの詳細ページで、サブメニューの「タスク」をクリックします。
  2. 「タスク」リストで、「収益ラボの分析」「アクション」メニュー(「アクション」メニュー)をクリックし、「アプリケーションに公開」を選択します。
  3. 「アプリケーションに公開」ダイアログで、「Lab Application」を選択し、「Publish」をクリックします。

    通知メッセージが、公開済のタスクを表示するためのアプリケーションへのリンクとともに表示されます。

  4. 「ラボ・アプリケーション」の詳細ページに移動し、左側のサブメニューの「パッチ」をクリックして、タスク・パッチの詳細を表示します。

    パッチには、アプリケーションの公開済タスクに対する更新が含まれています。タスクを公開すると、公開パッチが作成されます。パッチの詳細を参照してください。

  5. 「パッチ」リストで、パッチ・ステータスをモニターできます。「リフレッシュ」をクリックして、最新のステータス更新を取得します。

    パッチのステータスが「成功」に変更されると、アプリケーションの詳細ページの「タスク」リストに公開タスク・エントリが作成されます。

  6. ラボ・アプリケーションの詳細ページで「タスク」をクリックします。

    パイプライン公開済タスク「収益ラボの分析」がタスク・リストに表示されます。

  7. パイプライン・タスクの「アクション」メニュー(「アクション」メニュー)をクリックし、「実行」を選択します。

    成功メッセージが表示されます。タスクを実行すると、タスク実行が作成されます。「実行」ページが自動的に表示され、すべてのタスク実行とそのステータスを表示できます。パイプライン実行の初期ステータスはNot startedです。

  8. 「ラボ・アプリケーション」詳細ページの「実行」リストで「リフレッシュ」をクリックして、最新のタスク実行ステータス更新を取得します。

    パイプラインの実行には、実行エンジンが実際のパイプライン実行を開始する前の、前処理、承認および検証のステップが含まれていることに注意してください。

    ステータスが「実行中」になるまで、数回「リフレッシュ」をクリックします。

  9. パイプライン・タスクが実行されているときに、タスク実行名をクリックします。

    「実行の詳細」ページが表示され、「パイプライン・グラフ」でパイプライン実行の進行状況をモニターできます。各ノードのステータスは、アイコンとラベルで示されます。たとえば、完了したノードの緑色のチェック・マーク、実行中のタスクのラベルRunning、実行を待機しているダウンストリーム・タスクのラベルWaitingなどです。

    パイプライン全体の実行ステータスの「成功」が表示されるまで、数回「リフレッシュ」をクリックします。

    「概要」をクリックして、パイプライン実行の詳細を表示することもできます。

  10. パイプライン実行が成功したら、「ラボ・アプリケーション」詳細ページの「実行」リストに移動し、パイプライン・タスク実行のメイン実行エントリを展開します。

    パイプライン内の4つの個別タスクの実行詳細を表示できます。

    また、通知サービスから電子メールを受信しました。