SDK for Go Streaming Quickstart

This quickstart shows you how to use the Oracle Cloud Infrastructure (OCI) SDK for Go and Oracle Cloud Infrastructure Streaming to publish and consume messages.

Refer to the Overview of Streaming for key concepts and more Streaming details. For more information about using the OCI SDKs, see the SDK Guides.

Prerequisites

  1. To use the SDK for Go, you must have the following:

    • An Oracle Cloud Infrastructure account.
    • A user created in that account, in a group with a policy that grants the required permissions. This user can be yourself, or another person/system that needs to call the API. For an example of how to set up a new user, group, compartment, and policy, see Adding Users. For a list of typical policies you may want to use, see Common Policies.
    • A key pair used for signing API requests, with the public key uploaded to Oracle. Only the user calling the API should possess the private key. For more information, see SDK configuration file.
  2. Collect the Messages endpoint and OCID of a stream. See Listing Streams and Stream Pools for instructions on viewing stream details. For the purposes of this quickstart, the stream should use a public endpoint and let Oracle manage encryption. Refer to Creating a Stream and Creating a Stream Pool if you do not have an existing stream.
  3. Go installed locally. Follow these instructions if necessary. Ensure that go is in your PATH.
  4. Visual Studio Code (recommended) or any other integrated development environment (IDE) or text editor.

  5. Ensure that you have a valid SDK configuration file. For production environments, you should use instance principal authorization.

Producing Messages

  1. Open your favorite editor, such as Visual Studio Code, from the empty working directory wd.
  2. Create a file named Producer.go in this directory.
  3. Add the following code to Producer.go. Replace values of variables ociConfigFilePath, ociProfileName,ociStreamOcid, and ociMessageEndpoint in the following code snippet with the values applicable for your tenancy.

    package main
    
    import (
    	"context"
    	"fmt"
    	"strconv"
    
    	"github.com/oracle/oci-go-sdk/v36/common"
    	"github.com/oracle/oci-go-sdk/v36/example/helpers"
    	"github.com/oracle/oci-go-sdk/v36/streaming"
    )
    
    const ociMessageEndpoint = "<stream_message_endpoint>"
    const ociStreamOcid = "<stream_OCID>"
    const ociConfigFilePath = "<config_file_path>"
    const ociProfileName = "<config_file_profile_name>"
    
    func main() {
    	fmt.Println("Go oci oss sdk example producer")
    	putMsgInStream(ociMessageEndpoint, ociStreamOcid)
    }
    
    func putMsgInStream(streamEndpoint string, streamOcid string) {
    	fmt.Println("Stream endpoint for put msg api is: " + streamEndpoint)
    
    	provider, err := common.ConfigurationProviderFromFileWithProfile(ociConfigFilePath, ociProfileName, "")
    	helpers.FatalIfError(err)
    
    	streamClient, err := streaming.NewStreamClientWithConfigurationProvider(provider, streamEndpoint)
    	helpers.FatalIfError(err)
    
    	// Create a request and dependent object(s).
    	for i := 0; i < 5; i++ {
    		putMsgReq := streaming.PutMessagesRequest{StreamId: common.String(streamOcid),
    			PutMessagesDetails: streaming.PutMessagesDetails{
    				// we are batching 2 messages for each Put Request
    				Messages: []streaming.PutMessagesDetailsEntry{
    					{Key: []byte("key dummy-0-" + strconv.Itoa(i)),
    						Value: []byte("value dummy-" + strconv.Itoa(i))},
    					{Key: []byte("key dummy-1-" + strconv.Itoa(i)),
    						Value: []byte("value dummy-" + strconv.Itoa(i))}}},
    		}
    
    		// Send the request using the service client
    		putMsgResp, err := streamClient.PutMessages(context.Background(), putMsgReq)
    		helpers.FatalIfError(err)
    
    		// Retrieve value from the response.
    		fmt.Println(putMsgResp)
    	}
    
    }
  4. Save Producer.go.
  5. Open the terminal and cd to the wd directory, run the following commands, in order:

    1. This command creates the go.mod file in the wd directory:

      go mod init oss_producer_example/v0
    2. This command installs the OCI SDK for Go and for Streaming:

      go mod tidy
    3. This command runs the example:

      go run Producer.go
  6. Use the Console to see the latest messages sent to the stream to verify that production was successful.

Consuming Messages

  1. First, ensure that the stream you want to consume messages from contains messages. You could use the Console to produce a test message, or use the stream and messages we created in this quickstart.
  2. Add the following code to Consumer.go. Replace values of variables ociConfigFilePath, ociProfileName,ociStreamOcid, and ociMessageEndpoint in the following code snippet with the values applicable for your tenancy.

    package main
    
    import (
    	"context"
    	"fmt"
    
    	"github.com/oracle/oci-go-sdk/v36/common"
    	"github.com/oracle/oci-go-sdk/v36/example/helpers"
    	"github.com/oracle/oci-go-sdk/v36/streaming"
    )
    
    const ociMessageEndpoint = "<stream_message_endpoint>"
    const ociStreamOcid = "<stream_OCID>"
    const ociConfigFilePath = "<config_file_path>"
    const ociProfileName = "<config_file_profile_name>"
    
    func main() {
    	fmt.Println("Go oci oss sdk example for consumer")
    	getMsgWithGroupCursor(ociMessageEndpoint, ociStreamOcid)
    }
    
    func getMsgWithGroupCursor(streamEndpoint string, streamOcid string) {
    	client, err := streaming.NewStreamClientWithConfigurationProvider(common.DefaultConfigProvider(), streamEndpoint)
    	helpers.FatalIfError(err)
    
    	grpCursorCreateReq0 := streaming.CreateGroupCursorRequest{
    		StreamId: common.String(streamOcid),
    		CreateGroupCursorDetails: streaming.CreateGroupCursorDetails{Type: streaming.CreateGroupCursorDetailsTypeTrimHorizon,
    			CommitOnGet:  common.Bool(true),
    			GroupName:    common.String("Go-groupname-0"),
    			InstanceName: common.String("Go-groupname-0-instancename-0"),
    			TimeoutInMs:  common.Int(1000),
    		}}
    
    	// Send the request using the service client
    	grpCursorResp0, err := client.CreateGroupCursor(context.Background(), grpCursorCreateReq0)
    	helpers.FatalIfError(err)
    	// Retrieve value from the response.
    	fmt.Println(grpCursorResp0)
    
    	simpleGetMsgLoop(client, streamOcid, *grpCursorResp0.Value)
    }
    
    func simpleGetMsgLoop(streamClient streaming.StreamClient, streamOcid string, cursorValue string) {
    
    	for i := 0; i < 5; i++ {
    		getMsgReq := streaming.GetMessagesRequest{Limit: common.Int(3),
    			StreamId: common.String(streamOcid),
    			Cursor:   common.String(cursorValue)}
    
    		// Send the request using the service client
    		getMsgResp, err := streamClient.GetMessages(context.Background(), getMsgReq)
    		helpers.FatalIfError(err)
    
    		// Retrieve value from the response.
    		if len(getMsgResp.Items) > 0 {
    			fmt.Println("Key : " + string(getMsgResp.Items[0].Key) + ", value : " + string(getMsgResp.Items[0].Value) + ", Partition " + *getMsgResp.Items[0].Partition)
    		}
    		if len(getMsgResp.Items) > 1 {
    			fmt.Println("Key : " + string(getMsgResp.Items[1].Key) + ", value : " + string(getMsgResp.Items[1].Value) + ", Partition " + *getMsgResp.Items[1].Partition)
    		}
    		cursorValue = *getMsgResp.OpcNextCursor
    
    	}
    }
  3. Save Consumer.go.
  4. Open the terminal and cd to the wd directory, run the following commands, in order:

    1. This command creates the go.mod file in the wd directory:

      go mod init oss_consumer_example/v0
    2. This command installs the OCI SDK for Go and for Streaming:

      go mod tidy
    3. This command runs the example:

      go run Consumer.go
  5. You should see messages similar to the following:

    Go oci oss sdk example for consumer
    { RawResponse={200 OK 200 HTTP/1.1 1 1 map[Access-Control-Allow-Credentials:[true] ... }
    Key : , value : Example Test Message 0, Partition 0
    Key : , value : Example Test Message 0, Partition 0
    Key : , value : Example Test Message 0, Partition 0
    Key : , value : Example Test Message 0, Partition 0
    Key : , value : Example Test Message 0, Partition 0
    Note

    If you used the Console to produce a test message, the key for each message is Null