Esta página ha sido traducida por una máquina.

Integración de Data Flow con Data Science

Con Data Flow, puede configurar Notebooks de Data Science para ejecutar aplicaciones de forma interactiva con Data Flow.

Data Flow utiliza blocs de notas de Jupyter totalmente gestionados para permitir a los científicos de datos e ingenieros de datos crear, visualizar, colaborar y depurar aplicaciones de ingeniería de datos y ciencia de datos. Puede escribir estas aplicaciones en Python, Scala y PySpark. También puede conectar una sesión de bloc de notas de Data Science a Data Flow para ejecutar aplicaciones. Los núcleos y las aplicaciones de Data Flow se ejecutan en Oracle Cloud Infrastructure Data Flow.

Apache Spark es un sistema informático distribuido diseñado para procesar datos a escala. Soporta SQL a gran escala, el procesamiento por lotes y de flujos, y tareas de Machine Learning. Spark SQL proporciona soporte similar a una base de datos. Para consultar datos estructurados, utilice Spark SQL. Se trata de una implantación de SQL estándar ANSI.

Las sesiones de Data Flow soportan las capacidades de cluster de escala automática de Data Flow. Para obtener más información, consulte Ampliación automática en la documentación de Data Flow.

Las sesiones de Data Flow soportan el uso de entornos conda como entornos de tiempo de ejecución de Spark personalizables.

Un bloc de notas de Data Science utiliza Data Flow Magic para enviar solicitudes a Data Flow mediante las API NotebookSession para ejecutar código de Spark en un servidor de Data Flow.
Limitaciones
  • Las sesiones de Data Flow duran hasta 7 días o 10 080 minutos (maxDurationInMinutes).

  • Las sesiones de Data Flow tienen un valor de timeout de inactividad por defecto de 480 minutos (8 horas) (idleTimeoutInMinutes). Puede configurar un valor diferente.
  • La sesión de Data Flow solo está disponible a través de una sesión de bloc de notas de Data Science.
  • Solo están soportadas las versiones 3.5.0 y 3.2.1 de Spark.
Consejo

Vea el vídeo de tutorial sobre el uso de Data Science con Data Flow Studio. Consulte también la documentación del SDK de Oracle Accelerated Data Science para obtener más información sobre la integración de Data Science y Data Flow.

Instalación del entorno conda

Siga estos pasos para utilizar Data Flow con Data Flow Magic.

  1. Cree o abra una sesión de bloc de notas en Data Science.
    • La sesión de Notebook debe estar en la región en la que el servicio se haya activado para el arrendamiento.
    • La sesión de bloc de notas debe estar en el compartimento del grupo dinámico de sesiones de bloc de notas.
  2. Instale y active el entorno conda pyspark32_p38_cpu_v3 en la sesión de Notebook:
    copy odsc conda install -s pyspark32_p38_cpu_v3
    Nota

    Recomendamos instalar el entorno conda actualizado pyspark32_p38_cpu_v3, ya que incluye soporte para reiniciar automáticamente sesiones que se han parado porque han excedido la duración máxima de timeout.

    El entorno conda estable anterior era pyspark32_p38_cpu_v2.

  3. Active el entorno conda pyspark32_p38_cpu_v3:
    source activate /home/datascience/conda/pyspark32_p38_cpu_v3

Uso de Data Flow con Data Science

Siga estos pasos para ejecutar una aplicación utilizando Data Flow con Data Science.

  • Asegúrese de que tiene las políticas configuradas para utilizar un Notebook con Data Flow.

  • Asegúrese de que tiene las políticas de Data Science configuradas correctamente.

  • Para obtener una lista de todos los comandos soportados, utilice el comando %help.
  • Los comandos de los siguientes pasos se aplican tanto a Spark 3.5.0 como a Spark 3.2.1. En los ejemplos se utiliza Spark 3.5.0. Defina el valor de sparkVersion según la versión de Spark utilizada.
  1. Configure la autenticación en ADS.
    • El SDK de ADS se utiliza para controlar el tipo de autenticación utilizado en Data Flow Magic.
    • La autenticación de clave de API se utiliza por defecto. Para cambiar el tipo de autenticación, utilice el comando ads.set_auth("resource_principal"):
      import ads
      ads.set_auth("resource_principal") # Supported values: resource_principal, api_key
  2. Cargar la extensión de Data Flow Magic
    %load_ext dataflow.magics
  3. (Opcional) Cree una sesión de Data Flow mediante el comando mágico, %create_session:
    Sesión común
    En este ejemplo se muestra cómo crear una nueva sesión en unidades flexibles:
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>/",
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Sesión con URI de archivo
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>",
        "archiveUri": <oci://<bucket>@<namespace>/archive.zip"
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Sesión con un entorno conda personalizado
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>",
        "configuration": {
            "spark.archives": "oci://<bucket>@<namespace>/conda_pack/<pack_name>"
        },
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Sesión con Metastore
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": oci://<bucket>@<namespace>",
        "metastoreId": "<ocid1.datacatalogmetastore.oc1.iad...>",
        "configuration": {
            "spark.archives": "oci://<bucket>@<namespace>/conda_pack/<pack_name>"
        },
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
  4. (Opcional) Utilice una sesión de Data Flow existente:
    Utilice el comando %use_session. Copie el OCID de la consola.
    %use_session -s <session_OCID> -r <region_name>
  5. Configure la sesión con el comando %config_session:
    • Para ver la configuración actual:
      %config
    • Para cambiar los controladores y los ejecutores:
      %configure_session -i '{"driverShape": "VM.Standard2.1", "executorShape": "VM.Standard2.1", "numExecutors": 1}'
    • Para aplicar la Escala automática:
      %configure_session -i '{"driverShape": "VM.Standard2.1",\
      "executorShape": "VM.Standard2.1", "numExecutors": 16,\
      "sparkVersion":"3.5.0",\
      "configuration":{"spark.dynamicAllocation.enabled":"true",\
      "spark.dynamicAllocation.shuffleTracking.enabled":"true",\
      "spark.dynamicAllocation.minExecutors":"16",\
      "spark.dynamicAllocation.maxExecutors":"54",\
      "spark.dynamicAllocation.executorIdleTimeout":"60",\
      "spark.dynamicAllocation.schedulerBacklogTimeout":"60",\
      "spark.dataflow.dynamicAllocation.quotaPolicy":"max"} \
      }'
      También puede aplicar una política de escala automática al crear una sesión.
  6. (Opcional) Para activar una sesión existente, utilice el comando %activate_session:
    %activate_session -l python -c 
    '{"compartmentId": "<Compartment_OCID>",
        "displayName": "<Name>",
        "applicationId": "<Application_OCID>"
    }'
  7. Para parar una sesión, utilice el comando %stop_session:
    %stop_session

Personalización de un entorno de Spark de Data Flow con un entorno conda

Puede utilizar un entorno conda publicado como entorno de tiempo de ejecución.

  1. Instale Spark 3.5.0 y Data Flow en la sesión de Notebook:
    odsc conda install -s pyspark32_p38_cpu_v3
  2. Instale las bibliotecas mediante conda.
  3. Publique el entorno conda:
    odsc conda publish -s pyspark32_p38_cpu_v3
  4. Inicie el cluster de Data Flow, por ejemplo:
    %create_session -l python -c '{"compartmentId":"<your-compartment-ocid>", \
    "displayName":"<your-display-name>",\
    "sparkVersion":"3.5.0", \
    "language":"PYTHON", \
    "type": "SESSION",\
    "driverShape":"VM.Standard2.1", \
    "executorShape":"VM.Standard2.1",\
    "numExecutors":1,\
    "configuration": {"spark.archives":"oci://<your-bucket>@<your-tenancy-namespace>/<your-path-to-the-conda-environment>#conda"}}'
    Nota

    La configuración de la sesión debe incluir los siguientes parámetros:
    • "sparkVersion":"3.5.0"
    • "language":"PYTHON"
    • "configuration" con una ruta de acceso de spark.archives al entorno conda en el almacenamiento de objetos.

Ejecución de spark-nlp en Data Flow

Siga estos pasos para instalar Spark-nlp y ejecutarlo en Data Flow.

Debe haber completado los pasos 1 y 2 de Personalización de un entorno de Spark de Data Flow con un entorno conda. La biblioteca spark-nlp está preinstalada en el entorno conda pyspark32_p38_cpu_v2.

  1. Instale los modelos y pipelines spark-nlp previamente entrenados.

    Si necesita modelos spark-nlp previamente entrenados, descárguelos y descomprímalos en la carpeta del entorno conda. Data Flow aún no soporta la salida a la red pública de internet. No puede descargar dinámicamente modelos entrenados previamente desde AWS S3 en Data Flow.

    Puede descargar modelos entrenados previamente desde el hub de modelos como archivos zip. Descomprima el modelo en la carpeta del entorno conda. El modelo de ejemplo es https://nlp.johnsnowlabs.com/2021/03/23/explain_document_dl_en.html:
    mkdir /home/datascience/conda/pyspark32_p38_cpu_v2/sparknlp-models
    unzip explain_document_dl_en_3.0.0_3.0_1616473268265.zip -d /home/datascience/conda/pyspark32_p38_cpu_v2/sparknlp-models/
  2. Publique el entorno conda. Consulte el paso 3 de Personalización de un entorno de Spark de Data Flow con un entorno conda.
  3. Inicie el cluster de Data Flow.

    En una celda de bloc de notas que se ejecuta en el núcleo pyspark30_p37_cpu_v5 de la sesión de bloc de notas, compruebe dos veces la

    parámetro spark.jars.packages. Refleja la versión de spark-nlp que ha instalado.
    %create_session -l python -c '{"compartmentId":" <your-compartment-ocid>", \
    "displayName":"sparknlp",\
    "sparkVersion":"3.2.1", \
    "language":"PYTHON", \
    "type": "SESSION",\
    "driverShape":"VM.Standard2.1", \
    "executorShape":"VM.Standard2.1",\
    "numExecutors":1,\
    "configuration": {"spark.archives":"oci://<your-bucket>@<your-tenancy-namespace>/<your-path-to-the-conda-environment>#conda",\
    "spark.jars.ivy":"/opt/spark/work-dir/conda/.ivy2",\
    "spark.jars.packages": "com.johnsnowlabs.nlp:spark-nlp_2.12:4.1.0"}\
    }'
  4. Pruébelo con un fragmento de código del repositorio GitHub de spark-nlp:
    %%spark
     
    from sparknlp.base import *
    from sparknlp.annotator import *
    from sparknlp.pretrained import PretrainedPipeline
    import sparknlp
     
    # Start SparkSession with Spark NLP
    # start() functions has 3 parameters: gpu, m1, and memory
    # sparknlp.start(gpu=True) will start the session with GPU support
    # sparknlp.start(m1=True) will start the session with macOS M1 support
    # sparknlp.start(memory="16G") to change the default driver memory in SparkSession
    spark = sparknlp.start()
     
    # Download a pre-trained pipeline
    pipeline = PretrainedPipeline('explain_document_dl', lang='en', disk_location="/opt/spark/work-dir/conda/sparknlp-models/")
     
     
    # Your testing dataset
    text = """
    Lawrence Joseph Ellison (born August 17, 1944) is an American business magnate and investor who is the co-founder,
    executive chairman, chief technology officer (CTO) and former chief executive officer (CEO) of the
    American computer technology company Oracle Corporation.[2] As of September 2022, he was listed by
    Bloomberg Billionaires Index as the ninth-wealthiest person in the world, with an estimated
    fortune of $93 billion.[3] Ellison is also known for his 98% ownership stake in Lanai,
    the sixth-largest island in the Hawaiian Archipelago.[4]
    """
     
    # Annotate your testing dataset
    result = pipeline.annotate(text)
     
    # What's in the pipeline
    print(list(result.keys()))
     
    # Check the results
    print(result['entities'])
    La siguiente salida está en la celda del bloc de notas:
    ['entities', 'stem', 'checked', 'lemma', 'document', 'pos', 'token', 'ner', 'embeddings', 'sentence']
    ['Lawrence Joseph Ellison', 'American', 'American', 'Oracle Corporation', 'Bloomberg Billionaires Index', 'Ellison', 'Lanai', 'Hawaiian Archipelago']

Ejemplos

A continuación, se muestran algunos ejemplos del uso de datos FlowMagic.

PySpark

La variable sc representa la instancia de Spark y está disponible cuando se utiliza el comando mágico %%spark. La siguiente celda es un ejemplo sencillo de cómo utilizar sc en una celda FlowMagic de datos. La celda llama al método .parallelize(), que crea un RDD, numbers, a partir de una lista de números. Se imprime información sobre el RDD. El método .toDebugString() devuelve una descripción del RDD.
%%spark
print(sc.version)
 
numbers = sc.parallelize([4, 3, 2, 1])
print(f"First element of numbers is {numbers.first()}")
print(f"The RDD, numbers, has the following description\n{numbers.toDebugString()}")

Spark SQL

El uso de la opción -c sql permite ejecutar comandos Spark SQL en una celda. En esta sección, se utiliza el juego de datos de Citi Bike. La siguiente celda lee el juego de datos en un marco de datos de Spark y lo guarda como una tabla. Este ejemplo se utiliza para mostrar Spark SQL.

El juego de datos de Citibike se carga en el almacén de objetos. Puede que tenga que hacer lo mismo en su dominio.
%%spark
df_bike_trips = spark.read.csv("oci://dmcherka-dev@ociodscdev/201306-citibike-tripdata.csv", header=False, inferSchema=True)
df_bike_trips.show()
df_bike_trips.createOrReplaceTempView("bike_trips")

En el siguiente ejemplo, se utiliza la opción -c sql para indicar a los datos FlowMagic que el contenido de la celda es SparkSQL. La opción -o <variable> toma los resultados de la operación Spark SQL y los almacena en la variable definida. En este caso,

df_bike_trips es un marco de datos de Pandas que está disponible para su uso en el bloc de notas.
%%spark -c sql -o df_bike_trips
SELECT _c0 AS Duration, _c4 AS Start_Station, _c8 AS End_Station, _c11 AS Bike_ID FROM bike_trips;
Imprima las primeras filas de datos:
df_bike_trips.head()
Del mismo modo, puede utilizar sqlContext para consultar la tabla:
%%spark
df_bike_trips_2 = sqlContext.sql("SELECT * FROM bike_trips")
df_bike_trips_2.show()
Por último, puede describir la tabla:
%%spark -c sql
SHOW TABLES

Widget de visualización automática

Los datos FlowMagic incluyen autovizwidget que permite la visualización de marcos de datos de Pandas. La función display_dataframe() toma un marco de datos de Pandas como parámetro y genera una interfaz gráfica de usuario interactiva en el bloc de notas. Tiene separadores que muestran la visualización de los datos de varias formas, como gráficos tabulares, gráficos circulares, trazos de dispersión, y gráficos de área y barras.

La siguiente celda llama a display_dataframe() con el marco de datos df_people creado en la sección Spark SQL del bloc de notas:
from autovizwidget.widget.utils import display_dataframe
display_dataframe(df_bike_trips)

Matplotlib

Una tarea común que realizan los científicos de datos es visualizar sus datos. Con juegos de datos grandes, generalmente no es posible y casi siempre es preferible no extraer los datos del cluster de Spark de Data Flow a la sesión de Notebook. En este ejemplo se muestra cómo utilizar los recursos del servidor para generar un gráfico e incluirlo en el bloc de notas.

El marco de datos df_bike_trips se define en la sesión y se vuelve a utilizar. Para producir una Matplotlib, incluya las bibliotecas necesarias y genere el gráfico. Utilice el comando mágico %matplot plt para mostrar el gráfico en el bloc de notas, aunque se represente en el servidor:
%%spark
import matplotlib.pyplot as plt
df_bike_trips.groupby("_c4").count().toPandas().plot.bar(x="_c4", y="count")
%matplot plt

Ejemplos adicionales

Hay más ejemplos disponibles de GitHub con ejemplos de Data Flow y ejemplos de Data Science.