Este workshop pretende trabajar con el alumnado del curso de especialización de IA y Big Data las técnicas de acceso básico y uso del clúster Hadoop de Tartanga.

Comprender y diseñar una política de planificación de recursos de ejecución es una tarea que habrá que afrontar una vez realizado el despliegue del clúster. El objetivo final de esta tarea es prestar a los usuarios finales los recursos de ejecución necesarios para sus trabajos, de acuerdo a la capacidad de nuestro clúster. Como sabemos, el gestor de recursos de procesamiento en Hadoop es YARN y por lo tanto deberemos comprender y aplicar las funciones básicas de su planificador de recursos.

Recursos en YARN: contenedores

YARN proporciona dos tipos de recurso a las aplicaciones: memoria y núcleos procesadores. Estos dos recursos se integran en unidades llamadas contenedores. Un nodo worker normalmente dispone de una colección de contenedores. Estos contenedores tendrán un tamaño mínimo en cuanto a RAM y número de núcleos y un tamaño máximo hasta el que puede crecer cada contenedor. Un contenedor es supervisado por el NodeManager y planificado por el ResourceManager de YARN.

Cada Aplicación inicia un proceso llamado ApplicationMaster que se ejecuta en un contenedor (contenedor 0). Una vez iniciado, el ApplicationMaster debe negociar con el ResourceManager para obtener más contenedores. Las peticiones y liberaciones de contenedores pueden tener lugar de forma dinámica en tiempo de ejecución. Por ejemplo, un trabajo MapReduce puede solicitar cierta cantidad de contenedores para el mapeo y a medida que finalizan esas tareas de mapeo, puede liberar esos contenedores y solicitar e iniciar más contenedores para la reducción.

 

Planificación de recursos: colas

La gestión de recursos en YARN se realiza mediante colas (queues). A estas colas se les asignan recursos en forma un porcentaje de contenedores sobre la capacidad total del clúster. Cada cola puede tener asignados ciertos porcentajes de uso mínimos y máximos diferentes. Podremos, por lo tanto, ejecutar las aplicaciones en diferentes colas de acuerdo a la política de administración de recursos que queramos implementar entre los diferentes, usuarios, grupos u organizaciones que utilicen el clúster.  La estructura de colas es jerárquica. La capacidad se asigna a las colas como un porcentaje mínimo y máximo de los recursos de la cola padre en la jerarquía. La capacidad mínima es la cantidad de recursos que la cola debería tener disponibles si el clúster está a tope de uso. La capacidad máxima es una capacidad elástica que permite a las colas hacer uso de recursos que no se están utilizando para alcanzar la demanda de capacidad mínima en dichas colas.

 

Parámetros fundamentales para la planificación

Existe toda una familia de propiedades (yarn.scheduler.capacity…) del “scheduler” de YARN que permiten la definición y control de nuestras políticas de administración de recursos de ejecución. Las fundamentales, en una primera aproximación, son:

  • yarn.scheduler.capacity.root.<queue_hierarchy_route>. capacity: Capacidad mínima de la cola en % sobre la capacidad de la cola padre.
  • yarn.scheduler.capacity.root.<queue_hierarchy_route>. maximum-capacity: Capacidad máxima de la cola en % sobre la capacidad de la cola padre.
  • yarn.scheduler.capacity.root.<queue_hierarchy_route>. minimum-user-limit-percent:  Límite sobre la menor cantidad de recursos a los que un sólo usuario debería tener acceso cuando solicita recursos. Por ejemplo, un valor del 10% significa que 10 usuarios obtendrán, cada uno de ellos, un 10% de los recursos totales de la cola, suponiendo que están solicitándolos; este valor no es rígido en el sentido que si uno de los usuarios pide menos cantidad de recursos se pueden emplazar más usuarios en la cola.
  • yarn.scheduler.capacity.root.<queue_hierarchy_route>. user-limit-factor: Es la forma de controlar la máxima cantidad de recursos que un sólo usuario puede consumir. Se establece como un múltiplo de la capacidad mínima de la cola, donde un valor de 1 significa que el usuario puede consumir toda la capacidad mínima de la cola. Si el valor es mayor que 1 el usuario puede crecer en su uso de recursos hasta el máximo de la cola y si es valor es menor de 1, por ejemplo 0.5, el usuario solo podrá obtener la mitad de la capacidad mínima de la cola.

Ejemplo de configuración para nuestro clúster

La configuración de las capacidades para el planificador de YARN de nuestro clúster se ha hecho con el objetivo inicial de proporcionar suficientes recursos a 20 usuarios simultáneos utilizando el 100% de los recursos del clúster.

  • Colas

Nuestro clúster dispone de dos colas:  iabd y project. La cola iabd está dimensionada para su uso por al menos 20 usuarios simultáneos; su propósito es proporcionar recursos para el mayor número de usuarios posibles. La política de asignación de recursos a aplicaciones en esta cola es FAIR. La cola project está pensada para proporcionar la mayor cantidad de recursos posibles a uno o dos usuarios como máximo.La política de asignación de recursos a aplicaciones en esta cola es FIFO. Los parámetros de cada cola se muestran en la siguiente imagen:

 

  • Contenedores

Nuestro clúster dispone de tres nodos worker con 192GB disponibles para Hadoop, lo que hace un total de 576GB de memoria para el clúster. Se han definido 144 contenedores con los tamaños mínimo y máximo que se observan en la siguiente figura:

  • Mapeo de colas (Default queue mapping)
    • Existe la posibilidad de asignar colas de YARN a usuarios y/o grupos. En nuestro clúster existen los grupos iabd y project a los que se les ha asignado la cola de igual nombre en YARN:
      Además mediante las propiedades acl_administer_queue y acl_submit_applications se puede controlar quien (usuarios y/o grupos) puede administrar y enviar trabajos a las colas:
      NOTA: Hay que tener en cuenta que en los trabajos MapReduce (MR) la cola de ejecución viene determinada por la propiedad mapreduce.job.queuename que tiene un único valor por defecto, en nuestro caso iabd. Si un usuario ejecuta un trabajo MR este se enviará a dicha cola aunque el usuario no tenga permisos en la acl_submit de YARN. Para enviar el trabajo a otra cola (project) el usuario debe especificarlo al lanzar el trabajo. Por ejemplo:
  • Pruebas de carga

En las siguientes imágenes puede verse la prueba de carga realizada con 20 usuarios simultáneos ejecutando cada uno el mismo trabajo Spark sobre la cola iabd. Nótese que se ha habilitado la Preemption y la Intra-queue Preemption. Puede observarse que con estos valores se consigue asignar recursos a las aplicaciones de los 20 usuarios (7 contenedores/usuario), con un 95% de uso del clúster y un 118% de la cola iabd.

 


 

Relación de YARN con instancias de ejecución de otros servicios: Spark y YARN

Los contenedores de YARN contendrán los procesos de otros servicios del clúster que se apoyan en YARN. Estos contenedores deben ser capaces de alojar las instancias de esos otros servicios. Por ello, los tamaños de las instancias deben definirse de forma adecuada al tamaño de los contenedores. Por ejemplo, para Spark, que es uno de los servicios más utilizados en un clúster de este tipo, los tamaños del driver y los executors deben ser apropiados.  La siguiente figura muestra la relación de estos elementos en una ejecución de Spark en modo clúster.

En nuestro caso se han definido para Spark un total de 144 instancias para drivers o executors con un tamaño máximo de 3584MB de RAM y 1 vCore.

+info.

La charla se desarrolló de acuerdo a los siguientes puntos:
 
  • Introducción a Big Data y Hadoop,
  • Consideraciones técnicas y problemas de despliegue,
  • Documentación del proceso,
  • Servicios del clúster,
  • Uso del clúster: ejecución de trabajos,
  • Coloquio.

Configuración de red

La red de datos que conecta los nodos del clúster es una red de alta velocidad en la que cada nodo dispone de dos NIC Ethernet agregadas (“bonded”) mediante LACP. Este enlace agregado proporciona el doble de ancho de banda y tolerancia a fallos, en caso de que uno de los enlaces individuales falle. Cada enlace individual estará conectado a sendos switches CISCO Nexus  de la serie 3000 (concretamente N3K-C3064PQ-10GX) que componen un dominio vPC (virtual PortChannel). En la siguiente figura puede observarse esta estructura, destacada en azul, entre varias configuraciones típicas de conexionado vPC:

Virtual Port Channel structure for NX5000 series

Diagrama de red detallado

Para implementar el Host Port Channel en los servers se han de agregar mediante “bonding” las dos NIC estableciendo los siguientes parámetros:

  • MODE= 802.3ad
  • XMIT Hash Policy= layer2 (default)
  • LACP rate= fast

Configuración switches vPC (virtual PortChannel)

Vamos a configurar la agregación de switches para nuestra red de alta velocidad. Tenga en cuenta que hay que realizar las configuraciones en los dos switches agregados (agg1 y agg2)

Los pasos a realizar son:

  1. Configuraciónes iniciales
Configurar la IP del puerto mgmt0, para gestión remota del switch (la IP que se observa es para el switch agg1):
  1. Habilitar características
  1. Configurar keep-alive en el puerto mgmt0
  1. Dominio vPC
Las IPs que se observan en la definición de peer-keepalive han de intercambiarse para el switch agg2.

  1. Peer-link (el port-channel 15 será el peer link)
  1. Member ports
  1. Mostrar el resumen de configuración

El primer paso es instalar los paquetes binarios de PostgreSQL:

A continuación, configuramos las direcciones en la que escucha PostgreSQL. Hacemos que escuche en todas las direcciones del host:
Ahora hemos de permitir los accesos de los nodos del clúster a este todas las BDs del servidor (nótese la línea referida al clúster Hadoop dónde aparece samenet:
Creamos un usuario para Hive, que será utilizado en el paso correspondiente del asistente de despliegue del clúster:
Creamos una base de datos para Hive y probamos a acceder con el usuario hive:

El nodo administrador tendrá la función fundamental de alojar el servidor Ambari que es la herramienta que vamos a utilizar para desplegar y, posteriormente, administrar y monitorizar el clúster. Tendrá menos requisitos de hardware ya que no ejecutará los servicios que forman parte de Hadoop. Lo utilizaremos también como nodo “frontera” para dar servicios adicionales de acceso al clúster.

 

 

Servidores

El  servidor elegido para la implementación de este nodo es:

 

 

  • Servidor DL380 Gen10 con chasis de 2U, con capacidad para 8 discos SFF, equipado cada uno con:
  •  2 procesadores Intel Xeon Silver 4114 (10 cores, 2,20Ghz, 13,75 MB cache L3)
  •  64GB de memoria RAM (8x32GB) DDR4 
  •  Controladora de discos Smart Array P822 24xPCIe 8xSAS
  •  Adaptador ethernet de 4 puertos Gigabit  integrado
  •  Tarjeta PCIe Ethernet de 2 puertos 10Gb/s SFP+ HP 530FLR
  •  Doble fuente de alimentación redundante de 800W

 

 Discos

Discos duros 960GB SFF (2,5″)

  • 2 discos SSD de 960GB SATA Read Intensive SFF

Configuración de almacenamiento

Los dos discos SSD se configurarán en RAID1 con dos particiones para el directorio raíz y el directorio de arranque:

A continuación puede observarse como han quedado las particiones, sistemas de archivo y su montaje después de la instalación:

Configuración de red

La red de datos que conecta los nodos del clúster es una red de alta velocidad en la que cada nodo dispone de dos NIC Ethernet agregadas (“bonded”) mediante LACP. Este enlace agregado proporciona el doble de ancho de banda y tolerancia a fallos, en caso de que uno de los enlaces individuales falle. Cada enlace individual estará conectado a sendos switches CISCO Nexus  de la serie 3000 que componen un dominio vPC (virtual PortChannel). En la siguiente figura puede observarse esta estructura, destacada en azul, entre varias configuraciones típicas de conexionado vPC:

Virtual Port Channel structure for NX5000 series

 

Para implementar el Host Port Channel en los servers se han de agregar mediante “bonding” las dos NIC estableciendo los siguientes parámetros:

  • MODE= 802.3ad
  • XMIT Hash Policy= layer2 (default)
  • LACP rate= fast

 
El archivo de configuración de red queda como sigue:

Una vez realizada la personalización de los servicios a desplegar, se revisa un resumen del despliegue a realizar:

 

Podemos almacenar esta configuración en forma de “blueprint” pulsando en el botón correspondiente.

Ahora es el momento de desplegar pulsando en el botón “Deploy”. Veremos entonces una interfaz en la que se muestra el progreso del despliegue en cada nodo del clúster:

Una vez finalizada la instalación e inicio de servicios, el clúster estará desplegado y al web UI de Ambari nos proporcionará una vista de administración de nuestro clúster, donde podremos monitorizar y re-configurar tanto servicios como nodos:

Podemos observar como habrá servicios no activos que necesiten de nuestra atención y corrección. Estos defectos nos los muestra Ambari mediante un sistema de alertas muy detallado accesible en la UI:

Una vez sean debidamente atendidas estas alertas, podremos visualizar el clúster en un estado estable:

Resolución de errores de despliegue

Durante el despliegue, es decir, durante la instalación e inicio de servicios en los nodos, se pueden producir errores que la interfaz nos mostrará de la siguiente forma:

Estos errores se pueden consultar de forma más detallada para su corrección:
Mediante la interfaz anterior se puede visualizar un log que nos indicará el error en detalle:
stderr:
Traceback (most recent call last):
...
File "/usr/lib/ambari-agent/lib/resource_management/core/sudo.py", line 136, in unlink
os.unlink(path)
OSError: [Errno 21] Is a directory: '/usr/lib/flink/log'
stdout:
...
2024-02-27 07:36:36,670 - Directory['/var/log/flink'] {'mode': 0767}
2024-02-27 07:36:36,670 - Changing permission for /var/log/flink from 755 to 767
2024-02-27 07:36:36,670 - Link['/usr/lib/flink/log'] {'to': '/var/log/flink'}
2024-02-27 07:36:36,671 - Link['/usr/lib/flink/log'] replacing old symlink to /usr/lib/flink/log
Command failed after 1 tries

En este caso  se producía un error al intentar reemplazar el enlace simbólico /usr/lib/flink/log ya que era un directorio y no un link. La solución fue borrar dicho directorio y crear el enlace simbólico a /var/log/flink.

 

Una vez iniciado el servidor ambari podemos entrar a la UI web que es la herramienta que utilizaremos para definir y realizar el despliegue del clúster. Esta aplicación web está disponible en el puerto 8080 del nodo administrador: http://hadoop-admin1.tartangalh.eus:8080/

Una vez dentro, iniciamos un proceso de configuración paso a paso mediante asistente del despliegue a realizar del clúster. A continuación detallamos los diferentes pasos de este asistente.

    1. Lanzamos el asistente de instalación del clúster:
    2. Establecemos el nombre del clúster:
    3. En el siguiente paso vamos a subir el archivo de definición del stack (VDF) de Hadoop elegido:
      Dicho archivo refiere los servicios disponibles en la pila de Hadoop a desplegar, así como la URL del repositorio a utilizar para los sistemas operativos elegidos. Tiene el siguiente contenido:
      <?xml version="1.0"?>
      <repository-version xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="version_definition.xsd">
      <release>
      <type> STANDARD </type>
      <stack-id> BGTP-1.0 </stack-id>
      <version> 1.0 </version>
      <build> 1 </build>
      <release-notes> https://bigtop.apache.org/release-notes.html </release-notes>
      <display> BGTP-3.1.1 </display>
      </release>
      <manifest>
      <service id="AMBARI-METRICS" name="AMBARI-METRICS" version="Bigtop+3.2"/>
      <service id="FLINK-321" name="FLINK" version="Bigtop+3.2"/>
      <service id="HBASE-321" name="HBASE" version="Bigtop+3.2"/>
      <service id="HDFS-321" name="HDFS" version="Bigtop+3.2"/>
      <service id="HIVE-321" name="HIVE" version="Bigtop+3.2"/>
      <service id="KAFKA-321" name="KAFKA" version="Bigtop+3.2"/>
      <service id="SOLR-321" name="SOLR" version="Bigtop+3.2"/>
      <service id="SPARK-321" name="SPARK" version="Bigtop+3.2"/>
      <service id="TEZ-321" name="TEZ" version="Bigtop+3.2"/>
      <service id="YARN-321" name="YARN" version="Bigtop+3.2"/>
      <service id="ZEPPELIN-321" name="ZEPPELIN" version="Bigtop+3.2"/>
      <service id="ZOOKEEPER-321" name="ZOOKEEPER" version="Bigtop+3.2"/>
      </manifest>
      <available-services/>
      <repository-info>
      <os family="ubuntu18">
      <repo>
      <baseurl> http://repos.bigtop.apache.org/releases/3.1.1/ubuntu/18.04/$(ARCH) </baseurl>
      <repoid> BGTP-3.1.1 </repoid>
      <reponame> BGTP </reponame>
      </repo>
      </os>
      </repository-info>
      </repository-version>

      Una vez cargado el archivo VDF, revisamos la información en la web UI y finalizamos la selección de la versión (Nótese que marcamos la opción para evitar la validación de la URL del repositorio):
    4. En el siguiente paso, “Install Options”, introducimos los nombres DNS de los nodos que van a componer el clúster, así como el valor de la clave privada SSH para el acceso remoto a dichos nodos (recordar finalizar el contenido del campo para la clave en la siguiente línea a la última con contenido de la clave):
    5. Al pulsar “Register and Confirm” , el servidor Ambari contactará con cada uno de los host y realizará una serie de acciones para comprobar que se puede comunicar correctamente con dichos hosts y que podrá ejecutar las tareas de instalación y configuración durante el despliegue. En este paso, se transfiere a los hosts el agente ambari y se realiza alguna configuración sobre los sistemas de dichos hosts. Si se produce algún error, se nos mostrará en la propia web UI y habremos de corregirlo antes de que el asistente nos permita avanzar. Es decir, para continuar, en este paso todos los hosts tienen que aparecer con status “Success”.
      En esta misma página podemos visualizar un detalle del chequeo que se ha realizado en cada host, con un desglose de problemas detectados:
    6. En el siguiente paso, “Choose Services”,  vemos los servicios que se van a desplegar en el clúster, pudiendo elegir una selección de ellos. En nuestro caso vamos a instalar todos los definidos en la versión seleccionada anteriormente:
    7. Seguidamente, asignamos todos los servicios de tipo “Master ” al que será nodo máster de nuestro clúster: hadoop-master1
    8. A continuación, asignamos los servicios de tipo Client o Slave a los nodos trabajadores: hadoop-worker1, hadoop-worker2 y hadoop-worker3
    9. El siguiente paso es el más laborioso del proceso de configuración pre-despliegue ya que mediante una serie de páginas hemos de personalizar los parámetros que nos interesen de los diferentes servicios. Téngase en cuenta que la inmensa mayoría de parámetros de los servicios se pueden reconfigurar una vez el clúster esté desplegado, lo que forma parte del mantenimiento del clúster. No obstante, hay parámetros fundamentales, como los puntos de montaje de los sistemas de archivo o los parámetros de las bases de datos de soporte, por ejemplo, que han de ser ajustados en este momento para un exitoso despliegue del clúster. Los pasos de esta personalización son:
      • Credenciales: establecemos las credenciales para usuarios de Grafana y Hive:
      • Bases de Datos: definimos que servidor de base de datos , base de datos y usuario vamos a utilizar para Hive. Nótese que si no está instalado y configurado el sistema gestor de base de datos en el nodo máster, hemos de instalarlo y configurarlo previamente (vea el Procedimiento de instalación y configuración de PostgreSQL en nodo máster)
      • HDFS: Establecemos o revisamos parámetros generales del sistema de archivos distribuido, como el factor de replicación de bloques o el espacio reservado para uso no-DFS, así como las ubicaciones de los sistemas de archivo (puntos de montaje) en los nodos máster y workers.
      • YARN: Revisamos los valores de los parámetros sin necesidad, en principio, de modificarlos.
      • MapReduce: Revisamos los valores de los parámetros sin necesidad, en principio, de modificarlos.
      • Tez: Revisamos los valores de los parámetros sin necesidad, en principio, de modificarlos.
      • Hive: Revisamos los valores de los parámetros sin necesidad, en principio, de modificarlos.
      • HBase: Revisamos los valores de los parámetros sin necesidad, en principio, de modificarlos.
      • Zookeeper: Revisamos los valores de los parámetros sin necesidad, en principio, de modificarlos.
      • Ambari Metrics: Revisamos los valores de los parámetros sin necesidad, en principio, de modificarlos.
      • Kafka: Revisamos los valores de los parámetros sin necesidad, en principio, de modificarlos.
      • Spark: Revisamos los valores de los parámetros sin necesidad, en principio, de modificarlos.
      • Zeppelin: Revisamos los valores de los parámetros sin necesidad, en principio, de modificarlos.
      • Flink :Revisamos los valores de los parámetros sin necesidad, en principio, de modificarlos.
      • Solr: Revisamos los valores de los parámetros sin necesidad, en principio, de modificarlos.
    10. Por último y antes de pasar a la tarea de revisar y desplegar el clúster, revisamos y establecemos, en su caso, las cuentas de usuario que se van a utilizar para los diferentes servicios:

Durante el despliegue del clúster, los nodos deben acceder a algún repositorio de software del que obtener los paquetes que se van a instalar para cada servicio que va a ejecutar. Este repositorio contiene los paquetes que forman parte del “stack” de aplicaciones/servicios Hadoop elegidos para el clúster. Existen diversos stacks de Hadoop pero la mayoría de ellos son propietarios y pocos accesibles en repositorios públicos. El stack de Hadoop elegido para nuestro clúster es BigTop, concretamente la versión 3.1.1.

Para configurar dicho repositorio se realizan los siguientes pasos:

  •  Añadir la clave del repositorio para el gestor de paquetes (apt):
  •  Configurar en todos los nodos una fuente de software que apunte al repositorio :