SDK for .NETストリーミング・クイックスタート
このクイックスタートでは、Oracle Cloud Infrastructure (OCI) SDK for .NETおよびOracle Cloud Infrastructure Streamingを使用して、メッセージを公開および消費する方法を示します。これらの例では、C#言語を使用しています。
主要概念およびストリーミングの詳細は、ストリーミングの概要を参照してください。OCI SDKの使用の詳細は、SDKガイドを参照してください。
前提条件
このクイックスタートでは、Visual Studio Codeおよび.NET CLIを使用して、単純な.NETコンソール・アプリケーションを作成して実行します。プロジェクトの作成、コンパイル、実行などのプロジェクト・タスクは、.NET CLIを使用して行います。必要に応じて、別のIDEでこのチュートリアルに従い、ターミナルでコマンドを実行できます。
-
SDK for .NETを使用するには、次が必要です:
- Oracle Cloud Infrastructureアカウント。
- そのアカウントで作成され、必要な権限を付与するポリシーがあるグループに含まれるユーザー。このユーザーは、自分自身であるか、またはAPIをコールする必要がある別の個人/システムです。新しいユーザー、グループ、コンパートメントおよびポリシーの設定方法の例は、ユーザーの追加を参照してください。使用する一般的なポリシーのリストは、共通ポリシーを参照してください。
- APIリクエストの署名に使用されるキー・ペア(公開キーがOracleにアップロードされている)。秘密キーはAPIをコールするユーザーのみが所有する必要があります。詳細は、SDK構成ファイルを参照してください。
- ストリームのメッセージ・エンドポイントおよびOCIDを収集します。ストリームの詳細を表示する手順は、ストリームおよびストリーム・プールのリストを参照してください。このクイックスタートの目的では、ストリームでパブリック・エンドポイントを使用し、Oracleで暗号化を管理する必要があります。既存のストリームがない場合は、ストリームの作成およびストリーム・プールの作成を参照してください。
- .NET 5.0 SDK以降をインストールします。
dotnet
がPATH
環境変数に設定されていることを確認します。 -
C#拡張機能がインストールされたVisual Studio Code (推奨)。Visual Studio Codeに拡張機能をインストールする方法の詳細は、VS Code Extension Marketplaceを参照してください。
- 有効なSDK構成ファイルがあることを確認します。本番環境では、インスタンス・プリンシパル認可を使用する必要があります。
メッセージの生成
- 空の作業ディレクトリ
wd
から、Visual Studio Codeなどのお気に入りのエディタを開きます。 - ターミナルを開き、
wd
ディレクトリにcd
で移動します。 -
ターミナルで次のコマンドを実行して、C# .NETコンソール・アプリケーションを作成します:
dotnet new console
アプリケーションが作成されたことを示すメッセージが表示されます:
The template "Console Application" was created successfully.
これにより、単純な"HelloWorld"アプリケーションのC#コードを含む
Program.cs
ファイルが作成されます。 -
IAM基本認証およびストリーミング用のOCI SDKパッケージをC#プロジェクトに次のように追加します:
dotnet add package OCI.DotNetSDK.Common
dotnet add package OCI.DotNetSDK.Streaming
-
wd
ディレクトリのProgram.cs
のコードを次のコードに置き換えます。次のコード・スニペットの変数configurationFilePath
、profile
、ociStreamOcid
およびociMessageEndpoint
の値は、テナンシに適用可能な値で置き換えてください。using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Oci.Common.Auth; using Oci.Common.Waiters; using Oci.StreamingService; using Oci.StreamingService.Models; using Oci.StreamingService.Requests; using Oci.StreamingService.Responses; namespace OssProducer { class Program { public static async Task Main(string[] args) { Console.WriteLine("Starting example for OSS Producer"); string configurationFilePath = "<config_file_path>"; string profile = "<config_file_profile_name>"; string ociStreamOcid = "<stream_OCID>"; string ociMessageEndpoint = "<stream_message_endpoint>"; try { var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile); StreamClient streamClient = new StreamClient(provider); streamClient.SetEndpoint(ociMessageEndpoint); await PublishExampleMessages(streamClient, ociStreamOcid); } catch (Exception e) { Console.WriteLine($"Streaming example failed: {e}"); } } private static async Task PublishExampleMessages(StreamClient streamClient, string streamId) { // build up a putRequest and publish some messages to the stream List<PutMessagesDetailsEntry> messages = new List<PutMessagesDetailsEntry>(); for (int i = 0; i < 100; i++) { PutMessagesDetailsEntry detailsEntry = new PutMessagesDetailsEntry { Key = Encoding.UTF8.GetBytes($"messagekey-{i}"), Value = Encoding.UTF8.GetBytes($"messageValue-{i}") }; messages.Add(detailsEntry); } Console.WriteLine($"Publishing {messages.Count} messages to stream {streamId}"); PutMessagesDetails messagesDetails = new PutMessagesDetails { Messages = messages }; PutMessagesRequest putRequest = new PutMessagesRequest { StreamId = streamId, PutMessagesDetails = messagesDetails }; PutMessagesResponse putResponse = await streamClient.PutMessages(putRequest); // the putResponse can contain some useful metadata for handling failures foreach (PutMessagesResultEntry entry in putResponse.PutMessagesResult.Entries) { if (entry.Error != null) { Console.WriteLine($"Error({entry.Error}): {entry.ErrorMessage}"); } else { Console.WriteLine($"Published message to partition {entry.Partition}, offset {entry.Offset}"); } } } } }
-
wd
ディレクトリから、次のコマンドを実行します:dotnet run
- コンソールを使用して、ストリームに送信された最新のメッセージを表示し、生成が成功したことを確認します。
メッセージの消費
- 最初に、メッセージを消費するストリームにメッセージが含まれていることを確認します。コンソールを使用してテスト・メッセージを生成するか、このクイックスタートで作成したストリームおよびメッセージを使用できます。
- 空の作業ディレクトリ
wd
から、Visual Studio Codeなどのお気に入りのエディタを開きます。 -
ターミナルで次のコマンドを実行して、C# .NETコンソール・アプリケーションを作成します:
dotnet new console
アプリケーションが作成されたことを示すメッセージが表示されます:
The template "Console Application" was created successfully.
これにより、単純な"HelloWorld"アプリケーションのC#コードを含む
Program.cs
ファイルが作成されます。 -
IAM基本認証およびストリーミング用のOCI SDKパッケージをC#プロジェクトに次のように追加します:
dotnet add package OCI.DotNetSDK.Common
dotnet add package OCI.DotNetSDK.Streaming
-
wd
ディレクトリのProgram.cs
のコードを次のコードに置き換えます。次のコード・スニペットの変数configurationFilePath
、profile
、ociStreamOcid
およびociMessageEndpoint
の値は、テナンシに適用可能な値で置き換えてください。using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Oci.Common.Auth; using Oci.Common.Waiters; using Oci.StreamingService; using Oci.StreamingService.Models; using Oci.StreamingService.Requests; using Oci.StreamingService.Responses; namespace OssConsumer { class Program { public static async Task Main(string[] args) { Console.WriteLine("Starting example for OSS Consumer"); string configurationFilePath = "<config_file_path>"; string profile = "<config_file_profile_name>"; string ociStreamOcid = "<stream_OCID>"; string ociMessageEndpoint = "<stream_message_endpoint>"; try { var provider = new ConfigFileAuthenticationDetailsProvider(configurationFilePath, profile); StreamClient streamClient = new StreamClient(provider); streamClient.SetEndpoint(ociMessageEndpoint); // A cursor can be created as part of a consumer group. // Committed offsets are managed for the group, and partitions // are dynamically balanced amongst consumers in the group. Console.WriteLine("Starting a simple message loop with a group cursor"); string groupCursor = await GetCursorByGroup(streamClient, ociStreamOcid, "exampleGroup", "exampleInstance-1"); await SimpleMessageLoop(streamClient, ociStreamOcid, groupCursor); } catch (Exception e) { Console.WriteLine($"Streaming example failed: {e}"); } } private static async Task<string> GetCursorByGroup(StreamClient streamClient, string streamId, string groupName, string instanceName) { Console.WriteLine($"Creating a cursor for group {groupName}, instance {instanceName}"); CreateGroupCursorDetails createGroupCursorDetails = new CreateGroupCursorDetails { GroupName = groupName, InstanceName = instanceName, Type = CreateGroupCursorDetails.TypeEnum.TrimHorizon, CommitOnGet = true }; CreateGroupCursorRequest createCursorRequest = new CreateGroupCursorRequest { StreamId = streamId, CreateGroupCursorDetails = createGroupCursorDetails }; CreateGroupCursorResponse groupCursorResponse = await streamClient.CreateGroupCursor(createCursorRequest); return groupCursorResponse.Cursor.Value; } private static async Task SimpleMessageLoop(StreamClient streamClient, string streamId, string initialCursor) { string cursor = initialCursor; for (int i = 0; i < 10; i++) { GetMessagesRequest getMessagesRequest = new GetMessagesRequest { StreamId = streamId, Cursor = cursor, Limit = 10 }; GetMessagesResponse getResponse = await streamClient.GetMessages(getMessagesRequest); // process the messages Console.WriteLine($"Read {getResponse.Items.Count}"); foreach (Message message in getResponse.Items) { string key = message.Key != null ? Encoding.UTF8.GetString(message.Key) : "Null"; Console.WriteLine($"{key} : {Encoding.UTF8.GetString(message.Value)}"); } // getMessages is a throttled method; clients should retrieve sufficiently large message // batches, as to avoid too many http requests. await Task.Delay(1000); // use the next-cursor for iteration cursor = getResponse.OpcNextCursor; } } } }
-
wd
ディレクトリから、次のコマンドを実行します:dotnet run
-
次のようなメッセージが表示されます:
Starting example for OSS Consumer Starting a simple message loop with a group cursor Creating a cursor for group exampleGroup, instance exampleInstance-1 Read 10 messagekey-0 : messageValue-0 messagekey-1 : messageValue-1 messagekey-2 : messageValue-2 messagekey-3 : messageValue-3 messagekey-4 : messageValue-4 messagekey-5 : messageValue-5 messagekey-6 : messageValue-6 messagekey-7 : messageValue-7 messagekey-8 : messageValue-8 messagekey-9 : messageValue-9 Read 10
次のステップ
詳細は、次のリソースを参照してください: