import io import json import logging import oci import base64 from fdk import response from fdk import context from oci.ai_anomaly_detection.models import DataItem from oci.ai_anomaly_detection.models import InlineDetectAnomaliesRequest encoding = "utf-8" # Model Id, which will be used for all requests. model_id = def create_ad_request(stream_messages): """ This method is used preparing the Anomaly Detection Service request from the messages received from the stream. For this example, we assume that each message is single JSON object which represent a single row of data. As requirement for AD service, if timestamp field is present in the json object, then all json objects must have this field and all timestamps must be in increasing order and should not contain any duplicates. :param stream_messages: List of messages :class:`oci.streaming.models.Message` from stream in json format :return: A :class:`~oci.ai_anomaly_detection.models.InlineDetectAnomaliesRequest` object """ logging.info(F"Messages received {len(stream_messages)}") data = [] signal_names = None for stream_message in stream_messages: logging.info(F'Message offset: {stream_message["offset"]}') payload = json.loads(base64.b64decode( stream_message["value"]).decode(encoding)) if not signal_names: signal_names = [x for x in payload.keys() if x != "timestamp"] data_item = DataItem() if "timestamp" in payload: data_item.timestamp = payload["timestamp"] del payload["timestamp"] data_item.values = list(payload.values()) data.append(data_item) request = InlineDetectAnomaliesRequest() request.model_id(model_id) request.signl_names = signal_names return request def make_ad_request(ad_request): """ This method calls the AD service and returns the response back to the caller :param ad_request: A :class:`~oci.ai_anomaly_detection.models.InlineDetectAnomaliesRequest` object :return: A :class:`~oci.response.Response` object with data of type :class:`~oci.ai_anomaly_detection.models.AnomalyDetectResult` """ signer = oci.auth.signers.get_resource_principals_signer() ad_client = oci.ai_anomaly_detection.AnomalyDetectionClient({}, signer=signer) return ad_client.detect_anomalies(ad_request) def handler(ctx, data: io.BytesIO = None): stream_messages = json.loads(data.getvalue()) result = [] try: request = create_ad_request(stream_messages) response = make_ad_request(request) logging.info(F"Status Code {response.status_code}") if response.status_code == 200: result = json.loads(response.text) return response.Response( ctx, response_data=response.text, status_code=200, headers={"Content-Type": "application/json"}) else: return response.Response( ctx, response_data=response.text, status_code=response.status_code, headers={"Content-Type": "text/plain"}) except Exception as e: logging.error("Error occured", e) return response.Response( ctx, response_data=e, status_code=500, headers={"Content-Type": "text/plain"})