Translate

miércoles, 6 de junio de 2018

Spark - Teoria


Spark





Que es un dataframe en SQL SPARK?

Un dataframe es basicamente un fichero organizado en columnas. Conceptualmente es como una tabla en SQL.

En JAVA y SCALA un dataframe es representado como un fichero (dataset) de columnas.


Los dataframes se pueden crear:
1- desde un RDD
2- desde una tabla HIVE
3- desde otras fuentes de datos SPARK


Que es un fichero PARQUET en SPARK?

Apache PARQUET es un formato de guardado de datos en columnas disponible en el ecosistema HADOOP.
Cualquier framework, modelo de datos o lenguaje de programacion puede usarlo.

Es un formato comprimido, eficiente y codificado comun en los proyectos HADOOP.

SPARK SQL soporta la lectura y escritura de ficheros PARQUET, el cual preserva el esquema de datos original.

Durante la operacion de escritura todas las columnas son convertidas automaticamente para que puedan albergar tipo de datos NULL, asi en caso de que una columna no contenga datos se pondra automaticamente como NULL.


Cual es la diferencia entre APACHE SPARK y APACHE HADOOP MAPREDUCE?

- Velocidad: SPARK es de diez hasta cien veces mas rapido que Hadoop debido a que los procesos se hacen en MEMORIA.
- Memoria: SPARK guarda los datos en Memoria, mientras que Hadoop los guarda en disco duro.
- RDD: SPARK utiliza RDD que garantiza la tolerancia a fallos. Hadoop utiliza la replicacion de datos en multiples copias para conseguir la tolerancia a fallos.
- Streaming: Apache Spark soporta streaming de una forma mas facil que la usada por Hadoop.
- API: Spark soporta multiples fuentes de datos y diferentes lenguajes de programacion (Scala, Java, Python). Es mas completa que la proporcionada por Hadoop.

Cuales son los principales lenguajes de programacion soportados por SPARK?

- Scala: para poder usar Spark con Scala se tiene que generar un objeto SparkContext.
- Java: Se usa javaSparkContext como libreria.
- Python: se usa con SparkContext.
- R: se necesita cargar el modulo SparkR.
- SQL: se utiliza SparkSQL.

Cuales son los sistemas de ficheros soportados por Spark?

- HDFS
- S3
- Local File System
- Cassandra
- OpenStack File System
- MapR File System

Que es Spark Driver?

Spark Driver es un programa que se ejecuta en el nodo Maestro. Cuida de la declaracion de cualquier operacion de transformacion o accion en un RDD.









lunes, 28 de mayo de 2018

Unity 3d - UNET Conceptos basicos

COMO CREAR UN SENCILLO JUEGO EN RED (PING PONG)


Manager


Network Manager. Maneja la configuracion de la red.
Para comenzar hay que incluir la informacion del GameObject que se va a instanciar en red. En este caso un prefba llamado Player1.
En Player Spawn Method se ha elegido Round Robin, que significa que de los puntos (Spawn Points) en los que se instanciaran los jugadores se haran de forma secuencial. Si hay dos puntos y tres jugadores primero se instanciaran en el punto 1, despues en el punto 2 y el tercer jugador se volvera a instanciar en el punto 1.
Para poder instanciar GameObjects que no sea el objeto inicial del jugador se introduciran los objetos en "Registered Spawnble Prefabs".

Network Manager HUD. Con este script se va a activar los botones superiores para la conexion en red.




Player1 y Pelota PREFAB:
La siguiente configuración es la que tiene que tener minimamente un objeto para se visualizado remotamente.



Para el prefab Player1 (izquierda):

Network Identity: Este script le asigna un numero secuencial que identifica el objeto en la red. En este caso tiene seleccionado que es un objeto que va a ser utilizado por el jugador local y que solo el tiene autoridad para manejarlo ("local player authority").

Network Transform: Este script permite enviar la localizacion del objeto a los demas jugadores. En este caso al utiizar la propiedad KINEMATIC en el objeto solo se pasaria la informacion del Transform del objeto "Transform Sync Mode".


En el caso del objeto Pelota (configuración de la derecha), el cual si que se deja que la fisica de Unity actúe sobre el objeto el script Network Transform cambia ligeramente y se le pide que envíe toda la información relativa al RIGIDBODY (que es el script que aplica toda la física en Unity).


SpawnPoints:

Los spawn points son los objetos donde se van a instanciar los jugadores.


En el caso de PING PONG solo se necesitan dos spawn points, uno a la izquierda y el otro a la derecha.

El único componente de red que necesita este objeto es Network Start Position y Network Identity. En el Transform pondremos las coordenadas y la rotación en la que queramos que aparezcan los jugadores.














Scripts:

Para poder utilizar Scripts en objetos en red es necesario cambiar MONOBEHAVIOR por NETWORBEHAVIOUR. También es necesario añadir UnityEngine.Networking (ambos en rojo).

En este script debido a que todo el movimiento se esta controlando con el componente Network Transform no se envía ninguna información a la red directamente. El script se comporta igual que si estuviera en un juego normal (como si se utilizara MonoBehaviour).

Player.cs

using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using UnityEngine.Networking;


public class Player : NetworkBehaviour {
    [SerializeField][Range(1,20)]
    public float speed = 10;
    public Vector3 targetPosition;
    private Vector3 targetPositionTemp;
    private bool isMoving;

public int direccion;
public float posiAnterior;
public float posInicial = -8.0f;

    const int LEFT_MOUSE_BUTTON = 0;
const int ARRIBA = 2;
const int ABAJO = 0;
const int IGUAL = 1;

    private Rigidbody rb;

public override void OnStartLocalPlayer()
{
GetComponent<MeshRenderer>().material.color = Color.blue;
}

    void Start()
    {
posInicial = this.transform.position.x;
        targetPosition = transform.position;
        isMoving = false;
posiAnterior = targetPosition.z;
    }

    void Update()
    {
if (!isLocalPlayer)
return;
        if (Input.GetMouseButton(LEFT_MOUSE_BUTTON))
            SetTargetPosition();
        if (isMoving) MovePlayer();
    }

void OnTriggerEnter(Collider other){

}

    void SetTargetPosition()
    {
Plane plane = new Plane(Vector3.up,new Vector3(posInicial,transform.position.y,transform.position.z));
        Ray ray = Camera.main.ScreenPointToRay(Input.mousePosition);
        float point = 0f;

        if (plane.Raycast(ray, out point))
            targetPositionTemp = ray.GetPoint(point);

targetPosition = new Vector3 (posInicial, targetPositionTemp.y, targetPositionTemp.z);
        isMoving = true;

    }
    void MovePlayer() {
transform.position = Vector3.MoveTowards(new Vector3(posInicial,transform.position.y,transform.position.z), targetPosition, speed * Time.deltaTime);
        if (transform.position == targetPosition) isMoving = false;
if (transform.position.z > posiAnterior) {
direccion = ARRIBA;
} else {
if (transform.position.z < posiAnterior) {
direccion = ABAJO;
} else {
direccion = IGUAL;
}
}
posiAnterior = targetPosition.z;

    }


Sin embargo al utilizar un objeto distinto al Player Prefab instanciado por cada jugador como ese el caso de la pelota si que tenemos que enviar la informacion al Servidor, en primer lugar, y despues desde el servidor a todos los clientes.
Para poder hacer esto se utilizan los comandos [Command], que envia la informacion al servidor, y [ClientRpc] que envia la informacion desde el servidor a los distintos clientes conectados. De esta forma se replica el comportamiento de las pelota en todos los clientes.

Pelota.cs

using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using UnityEngine.Networking;
using UnityEngine.UI;

public class Pelota : NetworkBehaviour {

public float velocidadInicial = 600f;
public float velocidadBumper = 200f;

private Rigidbody rb;
public bool direcDerecha;
public int valor;
public float fuerza;
public float velocidadPelota;

public GameObject pelotaPrefab;

const int ARRIBA = 2;
const int ABAJO = 0;
const int IGUAL = 1;



public int puntuacionLocal;
public int puntuacionForaneo;



void Awake(){
if (isServer)
this.gameObject.SetActive (false);


    valor = Random.Range (1, 2);
if (valor == 1) {
direcDerecha = true;
} else {
direcDerecha = false;
}
rb = GetComponent<Rigidbody> ();
fuerza = 0;

}
// Use this for initialization
void Start () {
Direccion (fuerza);
}
// Update is called once per frame
void Update () {
}

void PorSiHaceFalta(){

//Mathf.Approximately(velocidadPelota,0)
}

void LateUpdate(){
if (Mathf.Approximately(rb.velocity.z,0.2f))
DireccionContinua (fuerza);
//rb.velocity = velocidadInicial * (rb.velocity.normalized);

}
void OnCollisionEnter(Collision other){
if (other.gameObject.tag == "Player") {
//Debug.Log (other.gameObject.name + " : Mas fuerza"); 
if (other.gameObject.GetComponent<Player> ().direccion == ARRIBA) {
fuerza = -100f;


} else {
if (other.gameObject.GetComponent<Player> ().direccion == ABAJO) {
fuerza = 100f;

} else {
fuerza = 0f;

}
}
Direccion (fuerza);

}

if (other.gameObject.tag == "Bumper") {
//Debug.Log ("Choca con un bumper");
rb.AddForce (rb.velocity.normalized * velocidadBumper);
}

}

public void OnTriggerEnter (Collider other){
if (other.gameObject.tag == "DetectorGol") {
if (other.gameObject.name == "BumperTrasero") {
GameObject.FindGameObjectWithTag ("ManagerJuego").GetComponent<Manager> ().AddGolLocal();
//puntuacionLocal++;
//GameObject.FindGameObjectWithTag ("Manager").GetComponent<Manager> ().puntuacionLocal = puntuacionLocal;
} else {
GameObject.FindGameObjectWithTag ("ManagerJuego").GetComponent<Manager> ().AddGolForaneo ();
//puntuacionForaneo++;
//GameObject.FindGameObjectWithTag ("Manager").GetComponent<Manager> ().puntuacionForaneo = puntuacionForaneo;
}

Invoke ("DestruirPelota", 2.0f);
}
}
public void MasFuerza(float valorZ){
rb.AddForce(new Vector3(velocidadInicial,velocidadInicial,valorZ));
}
public void MasFuerza2(float valorZ){
rb.AddForce(new Vector3(-1*velocidadInicial,velocidadInicial,valorZ));
}

public void DestruirPelota(){

Cmd_Pelota ();
Destroy (this.gameObject);
}

[Command]
public void Cmd_Pelota(){
GameObject nuevaPelota = Instantiate (pelotaPrefab);
nuevaPelota.transform.position = new Vector3 (0f, 0.25f, 0f);
nuevaPelota.name = this.name;
NetworkServer.Spawn (nuevaPelota);
Rpc_Pelota (nuevaPelota.transform.position);
}
[ClientRpc]
public void Rpc_Pelota(Vector3 posicion){
this.transform.position = posicion;
}
public void Direccion(float valorZ){
if (!direcDerecha) {
MasFuerza (valorZ); 

} else {
MasFuerza2 (valorZ);
}
direcDerecha = !direcDerecha;
}

public void DireccionContinua(float valorZ){
if (!direcDerecha) {
MasFuerza (valorZ); 

} else {
MasFuerza2 (valorZ);
}
}

}

Por ultimo tenemos el script manager que incrustado en el objeto ManaJuego. En este script se controla, desde el numero de jugadores conectados, a la puntuacion de los marcadores mostrada en los clientes.

Manager.cs

using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using UnityEngine.Networking;
using UnityEngine.UI;

public class Manager : NetworkBehaviour {

public GameObject pelota;
public Transform pelotaPosInicio;
public bool comienzoJuego = false;

public GameObject panelBoton;
public GameObject titulo;

public int puntuacionLocal;
public int puntuacionForaneo;
public Text marcadorLocal;
public Text marcadorForaneo;
// Use this for initialization
void Start () {
if (!isServer)
return;

}
// Update is called once per frame
void Update () {
if (!isServer)
return;
if (NetworkManager.singleton.numPlayers == 2 && !comienzoJuego) {
comienzoJuego = true;
puntuacionLocal = 0;
puntuacionForaneo = 0;
Cmd_Inicio ();
Cmd_SpawnPelota ();

}
//Debug.Log (NetworkManager.singleton.numPlayers);
}

[Command]
void Cmd_SpawnPelota(){
GameObject go = Instantiate (pelota, pelotaPosInicio);
NetworkServer.Spawn (go);
}

public void AddGolLocal(){
puntuacionLocal++;
Cmd_Puntuacion ();
}

public void AddGolForaneo(){
puntuacionForaneo++;
Cmd_Puntuacion ();
}

[Command]
void Cmd_Puntuacion(){

marcadorLocal.text = puntuacionLocal.ToString ();
marcadorForaneo.text = puntuacionForaneo.ToString ();
Rpc_Puntuacion (puntuacionLocal, puntuacionForaneo);
}

[ClientRpc]
void Rpc_Puntuacion(int pLocal, int pForaneo){
marcadorLocal.text = pLocal.ToString ();
marcadorForaneo.text = pForaneo.ToString ();

}

[Command]
void Cmd_Inicio(){

panelBoton.SetActive (false);
titulo.SetActive (false);
Rpc_Inicio();
}

[ClientRpc]
void Rpc_Inicio(){
panelBoton.SetActive (false);
titulo.SetActive (false);

}

public void Salir(){
Application.Quit ();
}

}

viernes, 4 de mayo de 2018

Spark + Scala



SPARK + SCALA

DEFINICIONES
RDD à Resilient Distributed Dataset
Dataset à coleccion de datos. Puede ser una lista de cadena, de enteros o incluso numero de filas en una BBDD relacional
RDD à puede contener cualquier tipo de objeto, incluso clases definidas por usuario

En Spark todo el trabajo se expresa en la creacion de nuevos RDD, transformacion de RDD existentes o operaciones en RDD para sacar un resultado.

Spark automaticamente distribuira los trabajos de los RDD a traves de los clusters y paralelizara  las operaciones que se necesitan realizar sobre ellos.


QUE PODEMOS HACER EN RDD

TRANSFORMACIONES
-       Filtrar un RDD solo con los que contienen la palabra “Friday”
Val lines = sc.textFile(“in/uppercase.text”)
Val linesWithFriday = lines.filter(line => line.contains(“Friday”))

ACCIONES
-       Selecciona solo el primer registro
Val lines = sc.textFile(“in/uppercase.text”)
Val linesWithFriday = lines.first


El flujo general de Spark RDD es:

-       Generar un RDD con la informacion de un dataset externo
-       Aplicar las transformaciones pertinentes
-       Lanzar acciones


CREACION DE RDD

1-    Coger una coleccion existente en el programa y pasarlas al metodo parallelize en SparkContext.

Val inputIntegers = List(1,2,3,4,5,6,7)
Val integerRdd = sc.parallelize(inputIntegers)

SparkContest es la conexion con el cluster.
Usando parallelize los dato de la lista se convierten en un dataset que se va a paralelizar.

2-    Cargar RDD desde una Fuente externa usando el metodo textFile de SparkContext

Val sc = new SparkContext(conf)
Val lines = sc.textFile(“in/uppercases.txt”)

La Fuente externa suele ser un dataset distribuido recuperado de Amazon S3 o HDFS.

Hay otros datasets que pueden ser incorporados en Spark como conexiones jdbc, Cassandra, Elasticsearch, etc..

Spark JDBC driver:
https://docs.databricks.com/spark/latest/data-sources/sql-databases.html

Spark Cassandra connector:
http://www.datastax.com/dev/blog/kindling-an-introduction-to-spark-with-cassandra-part-1

Spark Elasticsearch connector:
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html


TRANSFORMACIONES

Las transformaciones generan un Nuevo RDD en vez de transformar el RDD con el que se esta trabajando.

Las transformaciones mas comunes son FILTER y MAP.

FILTER
Val cleanedLines = lines.filter (lines => !lines.isEmpty)

MAP


EJEMPLOS:
package com.sparkTutorial.rdd.airports

import com.sparkTutorial.commons.Utils
import org.apache.spark.{SparkConf, SparkContext}

object AirportsInUsaSolution {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("airports").setMaster("local[2]")
// setAppName --> nombre que le damos a la aplicacion SPARK
// setMaster --> Master Cluster donde se ejecutara SPARk. En este ejemplo se ejecuta en
//               el cluster local. local[2] --> dos cores, local[*] --> todos los cores,
//               local --> 1 core
    val sc = new SparkContext(conf)
// Se hace la conexion con SPARK usando la configuracion anterior

    val airports = sc.textFile("in/airports.text")
//Se cargar el fichero airports.text en el RDD airports
    val airportsInUSA = airports.filter(line => line.split(Utils.COMMA_DELIMITER)(3) == "\"United States\"")
//Se genera un filtro por el tercer campo del fichero donde sea igual United States
    val airportsNameAndCityNames = airportsInUSA.map(line => {
      val splits = line.split(Utils.COMMA_DELIMITER)
      splits(1) + ", " + splits(2)
    })
//Para cada linea separa las palabras por comas y muestra el primer campo y el segundo
    airportsNameAndCityNames.saveAsTextFile("out/airports_in_usa.text")
  }
}


package com.sparkTutorial.rdd.airports

import com.sparkTutorial.commons.Utils
import org.apache.spark.{SparkConf, SparkContext}

object AirportsByLatitudeProblem {

  def main(args: Array[String]) {

    /* Create a Spark program to read the airport data from in/airports.text,  find all the airports whose latitude are bigger than 40.       Then output the airport's name and the airport's latitude to out/airports_by_latitude.text.
       Each row of the input file contains the following columns:       Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code,       ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format
       Sample output:       "St Anthony", 51.391944       "Tofino", 49.082222       ...     */
    val conf = new SparkConf().setAppName("airports").setMaster("local[2]")
    val sc = new SparkContext(conf)

    val airports = sc.textFile("in/airports.text")
    val airportsInUSA = airports.filter(line => line.split(Utils.COMMA_DELIMITER)(6).toFloat > 40)

    val airportsNameAndCityNames = airportsInUSA.map(line => {
      val splits = line.split(Utils.COMMA_DELIMITER)
      splits(1) + ", " + splits(6)
    })
    airportsNameAndCityNames.saveAsTextFile("out/airports_by_latitude.text")

  }
}


CUANDO USAR FLATMAP o MAP

MAP --> cuando tenemos relaciones 1 a 1.
FLATMAP --> cuando tenemos relaciones 1 a N.


SET OPERATIONS

Operaciones para hacer en un RDD
- Sample: genera ejemplos del RDD original
- Distinct: objetos unicos

Operaciones que conllevan dos RDD

- Union
- Intersection
- Substract
- Cartesian product


Ejemplo Union:

package com.sparkTutorial.rdd.nasaApacheWebLogs

import com.sparkTutorial.commons.Utils
import com.sparkTutorial.rdd.nasaApacheWebLogs.UnionLogsSolution.isNotHeaderimport org.apache.spark.{SparkConf, SparkContext}

object UnionLogProblem {

  def main(args: Array[String]) {

    /* "in/nasa_19950701.tsv" file contains 10000 log lines from one of NASA's apache server for July 1st, 1995.       "in/nasa_19950801.tsv" file contains 10000 log lines for August 1st, 1995       Create a Spark program to generate a new RDD which contains the log lines from both July 1st and August 1st,       take a 0.1 sample of those log lines and save it to "out/sample_nasa_logs.tsv" file.
       Keep in mind, that the original log files contains the following header lines.       host  logname  time method url  response bytes
       Make sure the head lines are removed in the resulting RDD.     */
    val conf = new SparkConf().setAppName("nasaFile1").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val nasaFile1 = sc.textFile("in/nasa_19950701.tsv")
    val nasaFile2 = sc.textFile("in/nasa_19950801.tsv")
    val nasaUnion = nasaFile1.union(nasaFile2)

    val cleanLogLines = nasaUnion.filter(line => isNotHeader(line))

    cleanLogLines.saveAsTextFile("out/sample_nasa_logs.tsv")
  }
  def isNotHeader(line:String):Boolean = !(line.startsWith("host") && line.contains("bytes"))
}

Ejemplo Interception:

package com.sparkTutorial.rdd.nasaApacheWebLogs

import org.apache.spark.{SparkConf, SparkContext}

object SameHostsProblem {

  def main(args: Array[String]) {

    /* "in/nasa_19950701.tsv" file contains 10000 log lines from one of NASA's apache server for July 1st, 1995.       "in/nasa_19950801.tsv" file contains 10000 log lines for August 1st, 1995       Create a Spark program to generate a new RDD which contains the hosts which are accessed on BOTH days.       Save the resulting RDD to "out/nasa_logs_same_hosts.csv" file.
       Example output:       vagrant.vf.mmc.com       www-a1.proxy.aol.com       .....
       Keep in mind, that the original log files contains the following header lines.       host  logname  time method url  response bytes
       Make sure the head lines are removed in the resulting RDD.     */
    val conf = new SparkConf().setAppName("nasaFile1").setMaster("local[1]")
    val sc = new SparkContext(conf)

    val nasaFile1 = sc.textFile("in/nasa_19950701.tsv")
    val nasaFile2 = sc.textFile("in/nasa_19950801.tsv")

    val nasaFile1Split = nasaFile1.map(line => line.split("\t")(0))
    val nasaFile2Split = nasaFile2.map(line => line.split("\t")(0))


    val nasaIntersection = nasaFile1Split.intersection(nasaFile2Split)

    val cleanLogLines = nasaIntersection.filter(host => host != "host")

    cleanLogLines.saveAsTextFile("out/nasa_logs_same_hosts.csv")
  }


}


ASPECTOS IMPORTANTES RDD:

1- RDD es distribuido.
2- RDD es inmutable.
3- RDD es Resistente (Resilient).

Un aspecto importante de Spark es que hasta que no se ejecuta una Accion no se ejecutan los comandos empleados anteriormente. Es lo que se denomina Lazy Evaluation.

Cuando se realiza una Transformacion se obtiene un RDD, cuando se realiza una Accion se obtiene cualquier otro tipo de dato.

SPARK - GUARDAR INFORMACION EN MEMORIA O EN DISCO

Si se necesita reutilizar un RDD en distintas acciones es bueno crear un RDD persistente. Esto se hace utilizando el comando persist().

Cuando se crea persistencia de un RDD, cuando se realiza la primera accion sobre este RDD este permanecera persistente en todos los nodos.


En el siguiente ejemplo el RDD permanecera persistente en MEMORIA.

package com.sparkTutorial.rdd.persist

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.storage.StorageLevel

object PersistExample {

  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName("reduce").setMaster("local[*]")
    val sc = new SparkContext(conf)

    val inputIntegers = List(1, 2, 3, 4, 5)
    val integerRdd = sc.parallelize(inputIntegers)

    integerRdd.persist(StorageLevel.MEMORY_ONLY)

    integerRdd.reduce((x, y) => x * y)
    integerRdd.count()
  }
}

Utilizar el comando .cache() es equivalente a utilizar persist(StorageLevel.MEMORY_ONLY).

Las diferentes configuraciones que se puede tener son:

MEMORY_ONLY --> solo en memoria
MEMORY_AND_DISK --> en memoria y cuando no cabe se almacena en disco accediendo a esta cuando es necesario
MEMORY_ONLY_SER --> guarda en memoria como un objeto java deserializado
MEMORY_AND_DISK_SER --> guarda en memoria o en disco, la parte que no cabe en memoria, un objeto java deserializado.
DISK_ONLY --> RDD se guarda en disco

Las opciones que mas rapido tratan los datos serian MEMORY_ONLY or MEMORY_ONLY_SER.

ARQUITECTURA SPARK

Se trata de una arquitectura distribuida MASTER - SLAVE.

En un cluster Spark normalmente hay un MASTER node y un numero determinado de nodos de TRABAJO.

En el nodo maestro se encuentra el programa DRIVER que es donde corre el metodo principal.

Cada ejecucion de un DRIVER se conoce como un JOB.

DRIVER es el responsable de convertir los jobs en tareas que envia a los nodos de trabajo para cuando terminen reenviar el resultado al DRIVER.


COMPONENTES SPARK


MOTOR (engine):

- Execution Model
- The Shuffle
- Caching

UI/API:
- Spark SQL: se pueden escribir sentencias parecidas a SQL para manipular datos RDD.
- Spark Streaming: proceso de datos casi tiempo real.
- Spark MLib (Machine Learning): clasificacion, regresion, clustering, etc..
- GraphX

Hay dos perfiles que utilizan SPARK:

Data Scientists (cientifico de datos):
- Identifica patrones, tendencias, riesgos y oportunidades
- Utiliza el analisis de datos para responder preguntas o comprender los datos
- Analisis a peticion

Data Processing Application Engineers:
- Construyen aplicaciones para el analisis de datos con cooperacion de los cientificos de datos.


PAIR RDD:

- Muchos de los juegos de datos que usamos tienen la estructura KEY-VALUE (clave, valor).
- Pair RDD es un tipo especifico de RDD con estructura calve valor.


CREACION DE PAIR RDD:

Creacion de Tuplas en Scala:

var tuple = ("Lily",23)
var name = tuple._1
var edad = tuple._2

En Spark, tuple_1 seria la Clave, tuple_2 seria el valor.

Ejemplo de clave valor a partir de la generacion de una tupla en Spark:

package com.sparkTutorial.pairRdd.create

import org.apache.spark.{SparkConf, SparkContext}

object PairRddFromTupleList {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("create").setMaster("local[1]")
    val sc = new SparkContext(conf)

    val tuple = List(("Lily", 23), ("Jack", 29), ("Mary", 29), ("James", 8))
    val pairRDD = sc.parallelize(tuple)

    pairRDD.coalesce(1).saveAsTextFile("out/pair_rdd_from_tuple_list")
  }
}



Tambien se puede crear PAIR RDD desde otro RDD usando MAP de la siguiente forma:

package com.sparkTutorial.pairRdd.create

import org.apache.spark.{SparkConf, SparkContext}

object PairRddFromRegularRdd {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("create").setMaster("local[1]")
    val sc = new SparkContext(conf)

    val inputStrings = List("Lily 23", "Jack 29", "Mary 29", "James 8")
    val regularRDDs = sc.parallelize(inputStrings)

    val pairRDD = regularRDDs.map(s => (s.split(" ")(0), s.split(" ")(1)))
    pairRDD.coalesce(1).saveAsTextFile("out/pair_rdd_from_regular_rdd")
  }
}

TRANSFORMACIONES EN PAIR RDD:

- Se utiliza como si fuera un RDD normal. En teste caso se utiliza map para crear una tupla clave valor utilizando la coma como separador.
- Utilizando Filter se busca que el valor (keyValue._2 (posicion 2 de keyValue)) sea igual a Estados Unidos.

package com.sparkTutorial.pairRdd.filter

import com.sparkTutorial.commons.Utils
import org.apache.spark.{SparkConf, SparkContext}

object AirportsNotInUsaSolution {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("airports").setMaster("local")
    val sc = new SparkContext(conf)

    val airportsRDD = sc.textFile("in/airports.text")

    val airportPairRDD = airportsRDD.map(line => (line.split(Utils.COMMA_DELIMITER)(1),      line.split(Utils.COMMA_DELIMITER)(3)))
    val airportsNotInUSA = airportPairRDD.filter(keyValue => keyValue._2 != "\"United States\"")

    airportsNotInUSA.saveAsTextFile("out/airports_not_in_usa_pair_rdd.text")
  }
}


Funciones MAP y MAPVALUES.

- La funcion MAP funciona correctamente con PAIR RDD y puede ser usado para convertir un RDD en otro RDD.
- La mayoria de las veces que trabajamos con PAIR RDD solo se quiere transformar el valor y no la clave, por lo que SPARK provee una funcion para hacer este tipo de mapeos llamada MAPVALUES.
- La funcion MAPVALUES aplicara a cada par CLAVE-VALOR convirtiendo los valores VALOR, pero no cambiando el valor de la CLAVE.


En el siguiente ejemplo se cambia el valor countryName (nombre del pais) a mayusculas, siendo la tupla nombreaeropuerto (key, posicion 1 del fichero), nombrepais(valor, posicion 3 del fichero).

package com.sparkTutorial.pairRdd.mapValues

import com.sparkTutorial.commons.Utils
import org.apache.spark.{SparkConf, SparkContext}

object AirportsUppercaseSolution {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("airports").setMaster("local")
    val sc = new SparkContext(conf)

    val airportsRDD = sc.textFile("in/airports.text")

    val airportPairRDD = airportsRDD.map((line: String) => (line.split(Utils.COMMA_DELIMITER)(1),      line.split(Utils.COMMA_DELIMITER)(3)))

    val upperCase = airportPairRDD.mapValues(countryName => countryName.toUpperCase)

    upperCase.saveAsTextFile("out/airports_uppercase.text")
  }
}

AGREGACIONES de PAIR RDD.

- Para hacer agregaciones en un RDD normal se utilizaria reduce(). Para el caso de PAIR RDD se utilizaria la funcion reduceByKey()