SPARK + SCALA
DEFINICIONES
RDD à
Resilient Distributed Dataset
Dataset à
coleccion de datos. Puede ser una lista de cadena, de enteros o incluso numero
de filas en una BBDD relacional
RDD à
puede contener cualquier tipo de objeto, incluso clases definidas por usuario
En Spark todo el trabajo se expresa en la creacion de nuevos
RDD, transformacion de RDD existentes o operaciones en RDD para sacar un
resultado.
Spark automaticamente distribuira los trabajos de los RDD a
traves de los clusters y paralelizara
las operaciones que se necesitan realizar sobre ellos.
QUE PODEMOS HACER EN RDD
TRANSFORMACIONES
-
Filtrar un RDD solo con los que contienen la
palabra “Friday”
Val lines = sc.textFile(“in/uppercase.text”)
Val linesWithFriday = lines.filter(line
=> line.contains(“Friday”))
ACCIONES
-
Selecciona solo el primer registro
Val lines = sc.textFile(“in/uppercase.text”)
Val linesWithFriday = lines.first
El flujo general de Spark RDD es:
-
Generar un RDD con la informacion de un dataset
externo
-
Aplicar las transformaciones pertinentes
-
Lanzar acciones
CREACION DE RDD
1-
Coger una coleccion existente en el programa y
pasarlas al metodo parallelize en SparkContext.
Val inputIntegers = List(1,2,3,4,5,6,7)
Val integerRdd =
sc.parallelize(inputIntegers)
SparkContest es la conexion con el cluster.
Usando parallelize los dato de la lista se
convierten en un dataset que se va a paralelizar.
2-
Cargar RDD desde una Fuente externa usando el
metodo textFile de SparkContext
Val sc = new SparkContext(conf)
Val lines = sc.textFile(“in/uppercases.txt”)
La Fuente externa suele ser un dataset
distribuido recuperado de Amazon S3 o HDFS.
Hay otros datasets que pueden ser
incorporados en Spark como conexiones jdbc, Cassandra, Elasticsearch, etc..
Spark JDBC driver:
https://docs.databricks.com/spark/latest/data-sources/sql-databases.html
Spark Cassandra connector:
http://www.datastax.com/dev/blog/kindling-an-introduction-to-spark-with-cassandra-part-1
Spark Elasticsearch connector:
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html
TRANSFORMACIONES
Las transformaciones generan un Nuevo RDD
en vez de transformar el RDD con el que se esta trabajando.
Las transformaciones mas comunes son FILTER
y MAP.
FILTER
Val cleanedLines = lines.filter (lines
=> !lines.isEmpty)
MAP
EJEMPLOS:
package com.sparkTutorial.rdd.airports import com.sparkTutorial.commons.Utils import org.apache.spark.{SparkConf, SparkContext} object AirportsInUsaSolution { def main(args: Array[String]) { val conf = new SparkConf().setAppName("airports").setMaster("local[2]")
// setAppName --> nombre que le damos a la aplicacion SPARK
// setMaster --> Master Cluster donde se ejecutara SPARk. En este ejemplo se ejecuta en
// el cluster local. local[2] --> dos cores, local[*] --> todos los cores,
// local --> 1 core
val sc = new SparkContext(conf)
// Se hace la conexion con SPARK usando la configuracion anterior val airports = sc.textFile("in/airports.text")
//Se cargar el fichero airports.text en el RDD airports val airportsInUSA = airports.filter(line => line.split(Utils.COMMA_DELIMITER)(3) == "\"United States\"") //Se genera un filtro por el tercer campo del fichero donde sea igual United States val airportsNameAndCityNames = airportsInUSA.map(line => { val splits = line.split(Utils.COMMA_DELIMITER) splits(1) + ", " + splits(2) })
//Para cada linea separa las palabras por comas y muestra el primer campo y el segundo
airportsNameAndCityNames.saveAsTextFile("out/airports_in_usa.text")
}
}
package com.sparkTutorial.rdd.airports import com.sparkTutorial.commons.Utils import org.apache.spark.{SparkConf, SparkContext} object AirportsByLatitudeProblem { def main(args: Array[String]) { /* Create a Spark program to read the airport data from in/airports.text, find all the airports whose latitude are bigger than 40. Then output the airport's name and the airport's latitude to out/airports_by_latitude.text. Each row of the input file contains the following columns: Airport ID, Name of airport, Main city served by airport, Country where airport is located, IATA/FAA code, ICAO Code, Latitude, Longitude, Altitude, Timezone, DST, Timezone in Olson format Sample output: "St Anthony", 51.391944 "Tofino", 49.082222 ... */ val conf = new SparkConf().setAppName("airports").setMaster("local[2]") val sc = new SparkContext(conf) val airports = sc.textFile("in/airports.text") val airportsInUSA = airports.filter(line => line.split(Utils.COMMA_DELIMITER)(6).toFloat > 40) val airportsNameAndCityNames = airportsInUSA.map(line => { val splits = line.split(Utils.COMMA_DELIMITER) splits(1) + ", " + splits(6) }) airportsNameAndCityNames.saveAsTextFile("out/airports_by_latitude.text") } }
CUANDO USAR FLATMAP o MAP
MAP --> cuando tenemos relaciones 1 a 1.
FLATMAP --> cuando tenemos relaciones 1 a N.
SET OPERATIONS
Operaciones para hacer en un RDD
- Sample: genera ejemplos del RDD original
- Distinct: objetos unicos
Operaciones que conllevan dos RDD
- Union
- Intersection
- Substract
- Cartesian product
Ejemplo Union:
package com.sparkTutorial.rdd.nasaApacheWebLogs import com.sparkTutorial.commons.Utils import com.sparkTutorial.rdd.nasaApacheWebLogs.UnionLogsSolution.isNotHeaderimport org.apache.spark.{SparkConf, SparkContext} object UnionLogProblem { def main(args: Array[String]) { /* "in/nasa_19950701.tsv" file contains 10000 log lines from one of NASA's apache server for July 1st, 1995. "in/nasa_19950801.tsv" file contains 10000 log lines for August 1st, 1995 Create a Spark program to generate a new RDD which contains the log lines from both July 1st and August 1st, take a 0.1 sample of those log lines and save it to "out/sample_nasa_logs.tsv" file. Keep in mind, that the original log files contains the following header lines. host logname time method url response bytes Make sure the head lines are removed in the resulting RDD. */ val conf = new SparkConf().setAppName("nasaFile1").setMaster("local[*]") val sc = new SparkContext(conf) val nasaFile1 = sc.textFile("in/nasa_19950701.tsv") val nasaFile2 = sc.textFile("in/nasa_19950801.tsv") val nasaUnion = nasaFile1.union(nasaFile2) val cleanLogLines = nasaUnion.filter(line => isNotHeader(line)) cleanLogLines.saveAsTextFile("out/sample_nasa_logs.tsv") } def isNotHeader(line:String):Boolean = !(line.startsWith("host") && line.contains("bytes")) }
Ejemplo Interception:
package com.sparkTutorial.rdd.nasaApacheWebLogs import org.apache.spark.{SparkConf, SparkContext} object SameHostsProblem { def main(args: Array[String]) { /* "in/nasa_19950701.tsv" file contains 10000 log lines from one of NASA's apache server for July 1st, 1995. "in/nasa_19950801.tsv" file contains 10000 log lines for August 1st, 1995 Create a Spark program to generate a new RDD which contains the hosts which are accessed on BOTH days. Save the resulting RDD to "out/nasa_logs_same_hosts.csv" file. Example output: vagrant.vf.mmc.com www-a1.proxy.aol.com ..... Keep in mind, that the original log files contains the following header lines. host logname time method url response bytes Make sure the head lines are removed in the resulting RDD. */ val conf = new SparkConf().setAppName("nasaFile1").setMaster("local[1]") val sc = new SparkContext(conf) val nasaFile1 = sc.textFile("in/nasa_19950701.tsv") val nasaFile2 = sc.textFile("in/nasa_19950801.tsv") val nasaFile1Split = nasaFile1.map(line => line.split("\t")(0)) val nasaFile2Split = nasaFile2.map(line => line.split("\t")(0)) val nasaIntersection = nasaFile1Split.intersection(nasaFile2Split) val cleanLogLines = nasaIntersection.filter(host => host != "host") cleanLogLines.saveAsTextFile("out/nasa_logs_same_hosts.csv") } }
ASPECTOS IMPORTANTES RDD:
1- RDD es distribuido.
2- RDD es inmutable.
3- RDD es Resistente (Resilient).
Un aspecto importante de Spark es que hasta que no se ejecuta una Accion no se ejecutan los comandos empleados anteriormente. Es lo que se denomina Lazy Evaluation.
Cuando se realiza una Transformacion se obtiene un RDD, cuando se realiza una Accion se obtiene cualquier otro tipo de dato.
SPARK - GUARDAR INFORMACION EN MEMORIA O EN DISCO
Si se necesita reutilizar un RDD en distintas acciones es bueno crear un RDD persistente. Esto se hace utilizando el comando persist().
Cuando se crea persistencia de un RDD, cuando se realiza la primera accion sobre este RDD este permanecera persistente en todos los nodos.
En el siguiente ejemplo el RDD permanecera persistente en MEMORIA.
package com.sparkTutorial.rdd.persist import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.storage.StorageLevel object PersistExample { def main(args: Array[String]) { Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf().setAppName("reduce").setMaster("local[*]") val sc = new SparkContext(conf) val inputIntegers = List(1, 2, 3, 4, 5) val integerRdd = sc.parallelize(inputIntegers) integerRdd.persist(StorageLevel.MEMORY_ONLY) integerRdd.reduce((x, y) => x * y) integerRdd.count() } }
Utilizar el comando .cache() es equivalente a utilizar persist(StorageLevel.MEMORY_ONLY).
Las diferentes configuraciones que se puede tener son:
MEMORY_ONLY --> solo en memoria
MEMORY_AND_DISK --> en memoria y cuando no cabe se almacena en disco accediendo a esta cuando es necesario
MEMORY_ONLY_SER --> guarda en memoria como un objeto java deserializado
MEMORY_AND_DISK_SER --> guarda en memoria o en disco, la parte que no cabe en memoria, un objeto java deserializado.
DISK_ONLY --> RDD se guarda en disco
Las opciones que mas rapido tratan los datos serian MEMORY_ONLY or MEMORY_ONLY_SER.
ARQUITECTURA SPARK
Se trata de una arquitectura distribuida MASTER - SLAVE.
En un cluster Spark normalmente hay un MASTER node y un numero determinado de nodos de TRABAJO.
En el nodo maestro se encuentra el programa DRIVER que es donde corre el metodo principal.
Cada ejecucion de un DRIVER se conoce como un JOB.
DRIVER es el responsable de convertir los jobs en tareas que envia a los nodos de trabajo para cuando terminen reenviar el resultado al DRIVER.
COMPONENTES SPARK
MOTOR (engine):
- Execution Model
- The Shuffle
- Caching
UI/API:
- Spark SQL: se pueden escribir sentencias parecidas a SQL para manipular datos RDD.
- Spark Streaming: proceso de datos casi tiempo real.
- Spark MLib (Machine Learning): clasificacion, regresion, clustering, etc..
- GraphX
Hay dos perfiles que utilizan SPARK:
Data Scientists (cientifico de datos):
- Identifica patrones, tendencias, riesgos y oportunidades
- Utiliza el analisis de datos para responder preguntas o comprender los datos
- Analisis a peticion
Data Processing Application Engineers:
- Construyen aplicaciones para el analisis de datos con cooperacion de los cientificos de datos.
PAIR RDD:
- Muchos de los juegos de datos que usamos tienen la estructura KEY-VALUE (clave, valor).
- Pair RDD es un tipo especifico de RDD con estructura calve valor.
CREACION DE PAIR RDD:
Creacion de Tuplas en Scala:
var tuple = ("Lily",23)
var name = tuple._1
var edad = tuple._2
En Spark, tuple_1 seria la Clave, tuple_2 seria el valor.
Ejemplo de clave valor a partir de la generacion de una tupla en Spark:
package com.sparkTutorial.pairRdd.create import org.apache.spark.{SparkConf, SparkContext} object PairRddFromTupleList { def main(args: Array[String]) { val conf = new SparkConf().setAppName("create").setMaster("local[1]") val sc = new SparkContext(conf) val tuple = List(("Lily", 23), ("Jack", 29), ("Mary", 29), ("James", 8)) val pairRDD = sc.parallelize(tuple) pairRDD.coalesce(1).saveAsTextFile("out/pair_rdd_from_tuple_list") } }
Tambien se puede crear PAIR RDD desde otro RDD usando MAP de la siguiente forma:
package com.sparkTutorial.pairRdd.create import org.apache.spark.{SparkConf, SparkContext} object PairRddFromRegularRdd { def main(args: Array[String]) { val conf = new SparkConf().setAppName("create").setMaster("local[1]") val sc = new SparkContext(conf) val inputStrings = List("Lily 23", "Jack 29", "Mary 29", "James 8") val regularRDDs = sc.parallelize(inputStrings) val pairRDD = regularRDDs.map(s => (s.split(" ")(0), s.split(" ")(1))) pairRDD.coalesce(1).saveAsTextFile("out/pair_rdd_from_regular_rdd") } }
TRANSFORMACIONES EN PAIR RDD:
- Se utiliza como si fuera un RDD normal. En teste caso se utiliza map para crear una tupla clave valor utilizando la coma como separador.
- Utilizando Filter se busca que el valor (keyValue._2 (posicion 2 de keyValue)) sea igual a Estados Unidos.
package com.sparkTutorial.pairRdd.filter import com.sparkTutorial.commons.Utils import org.apache.spark.{SparkConf, SparkContext} object AirportsNotInUsaSolution { def main(args: Array[String]) { val conf = new SparkConf().setAppName("airports").setMaster("local") val sc = new SparkContext(conf) val airportsRDD = sc.textFile("in/airports.text") val airportPairRDD = airportsRDD.map(line => (line.split(Utils.COMMA_DELIMITER)(1), line.split(Utils.COMMA_DELIMITER)(3))) val airportsNotInUSA = airportPairRDD.filter(keyValue => keyValue._2 != "\"United States\"") airportsNotInUSA.saveAsTextFile("out/airports_not_in_usa_pair_rdd.text") } }
Funciones MAP y MAPVALUES.
- La funcion MAP funciona correctamente con PAIR RDD y puede ser usado para convertir un RDD en otro RDD.
- La mayoria de las veces que trabajamos con PAIR RDD solo se quiere transformar el valor y no la clave, por lo que SPARK provee una funcion para hacer este tipo de mapeos llamada MAPVALUES.
- La funcion MAPVALUES aplicara a cada par CLAVE-VALOR convirtiendo los valores VALOR, pero no cambiando el valor de la CLAVE.
En el siguiente ejemplo se cambia el valor countryName (nombre del pais) a mayusculas, siendo la tupla nombreaeropuerto (key, posicion 1 del fichero), nombrepais(valor, posicion 3 del fichero).
package com.sparkTutorial.pairRdd.mapValues import com.sparkTutorial.commons.Utils import org.apache.spark.{SparkConf, SparkContext} object AirportsUppercaseSolution { def main(args: Array[String]) { val conf = new SparkConf().setAppName("airports").setMaster("local") val sc = new SparkContext(conf) val airportsRDD = sc.textFile("in/airports.text") val airportPairRDD = airportsRDD.map((line: String) => (line.split(Utils.COMMA_DELIMITER)(1), line.split(Utils.COMMA_DELIMITER)(3))) val upperCase = airportPairRDD.mapValues(countryName => countryName.toUpperCase) upperCase.saveAsTextFile("out/airports_uppercase.text") } }
AGREGACIONES de PAIR RDD.
- Para hacer agregaciones en un RDD normal se utilizaria reduce(). Para el caso de PAIR RDD se utilizaria la funcion reduceByKey()
No hay comentarios:
Publicar un comentario