例: PythonファンクションへのOracle Resource Principal Providerの追加によるネットワーキング・サービスからのVCNのリストの取得 🔗
動的グループにファンクションを追加し、動的グループがテナンシ内のVCNをリストできるようにするポリシーを作成すると、次の例のようなコードを設定してネットワーキング・サービスからVCNのリストを取得できます。この例では、Oracle Resource Principal Providerを使用してRPSTトークンから資格証明を抽出します。
コピー
import io
import json
from fdk import response
import oci
def handler(ctx, data: io.BytesIO=None):
signer = oci.auth.signers.get_resource_principals_signer()
resp = do(signer)
return response.Response(ctx,
response_data=json.dumps(resp),
headers={"Content-Type": "application/json"} )
def do(signer):
# List VCNs --------------------------------------------------------
client = oci.core.VirtualNetworkClient({}, signer=signer)
try:
vcns = client.list_vcns(signer.compartment_id)
vcns = [[v.id, v.display_name] for v in vcns.data]
except Exception as e:
vcns = str(e)
return {"vcns": vcns, }
#!/usr/bin/env python3
import base64
import email.utils
import hashlib
import httpsig_cffi.sign
import json
import logging
import os.path
import re
import requests.auth
import urllib.parse
LOG = logging.getLogger(__name__)
# The following class is derived from the Python section in https://docs.cloud.oracle.com/iaas/Content/API/Concepts/signingrequests.htm
class SignedRequestAuth(requests.auth.AuthBase):
"""A requests auth instance that can be reused across requests"""
generic_headers = [
"date",
"(request-target)",
"host"
]
body_headers = [
"content-length",
"content-type",
"x-content-sha256",
]
required_headers = {
"get": generic_headers,
"head": generic_headers,
"delete": generic_headers,
"put": generic_headers + body_headers,
"post": generic_headers + body_headers,
}
def __init__(self, key_id, private_key):
# Build a httpsig_cffi.requests_auth.HTTPSignatureAuth for each
# HTTP method's required headers
self.signers = {}
for method, headers in self.required_headers.items():
signer = httpsig_cffi.sign.HeaderSigner(
key_id=key_id, secret=private_key,
algorithm="rsa-sha256", headers=headers[:])
use_host = "host" in headers
self.signers[method] = (signer, use_host)
def inject_missing_headers(self, request, sign_body):
# Inject date, content-type, and host if missing
request.headers.setdefault(
"date", email.utils.formatdate(usegmt=True))
request.headers.setdefault("content-type", "application/json")
request.headers.setdefault(
"host", urllib.parse.urlparse(request.url).netloc)
# Requests with a body need to send content-type,
# content-length, and x-content-sha256
if sign_body:
body = request.body or ""
if "x-content-sha256" not in request.headers:
m = hashlib.sha256(body.encode("utf-8"))
base64digest = base64.b64encode(m.digest())
base64string = base64digest.decode("utf-8")
request.headers["x-content-sha256"] = base64string
request.headers.setdefault("content-length", len(body))
def __call__(self, request):
verb = request.method.lower()
# nothing to sign for options
if verb == "options":
return request
signer, use_host = self.signers.get(verb, (None, None))
if signer is None:
raise ValueError(
"Don't know how to sign request verb {}".format(verb))
# Inject body headers for put/post requests, date for all requests
sign_body = verb in ["put", "post"]
self.inject_missing_headers(request, sign_body=sign_body)
if use_host:
host = urllib.parse.urlparse(request.url).netloc
else:
host = None
signed_headers = signer.sign(
request.headers, host=host,
method=request.method, path=request.path_url)
request.headers.update(signed_headers)
return request
def rp_auther():
if os.environ['OCI_RESOURCE_PRINCIPAL_VERSION'] != "2.2":
raise EnvironmentError('{} must be set to the value "2.2"'.format('OCI_RESOURCE_PRINCIPAL_VERSION'))
rpst = os.environ['OCI_RESOURCE_PRINCIPAL_RPST']
if os.path.isabs(rpst):
with open(rpst) as f:
rpst = f.read()
private_key = os.environ['OCI_RESOURCE_PRINCIPAL_PRIVATE_PEM']
if os.path.isabs(private_key):
with open(private_key) as f:
private_key = f.read()
return get_claims(rpst), SignedRequestAuth('ST${}'.format(rpst), private_key)
def get_claims(rpst):
"""Parse an RPST as a JWT; return a dictionary of claims
The claims that are important are: sub, res_compartment, and res_tenant.
These carry the resource OCID together with its location.
"""
s = rpst.split('.')[1]
s += "=" * ((4 - len(s) % 4) % 4) # Pad to a multiple of 4 characters
return json.loads(base64.b64decode(s).decode('utf-8'))
# Use RP credentials to make a request
region = os.environ['OCI_RESOURCE_PRINCIPAL_REGION']
claims, rp_auth = rp_auther()
response = requests.get("https://identity.{}.oraclecloud.com/20160918/tenancies/{}".format(region, claims['res_tenant']), auth=rp_auth)
print(response.json())