Sunday, September 15, 2019

Apache Spark GraphX with Scala and Neo4j (I)

    Basado en el trabajo de Carol McDonaldHow to Get Started Using Apache Spark GraphX with Scala he construido una base de datos Neo4j con los datos aportados en el fichero CSV adjunto en la página en el apartado Software.


GraphX Property Graph


GraphX extiende Spark RDD con un Resilient Distributed Property Graph.

El Property Graph es un multigrafo dirigido el cual puede tener multiples aristas en paralelo. Cada vértice y arista tiene asociados propiedades definidas por el usuario. Las aristas en paralelo permiten múltiples relaciones entre los mismos vértices.


Nosotros usaremos GraphX para analizar los datos de los vuelos.

Análisis de Datos de Vuelo Reales con GraphX


El escenario


Nuestros datos pertenecen a https://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time La informacion de los vuelos es de Enero de 2015. Para cada vuelo tenemos la siguiente información:

Field Description Example Value
dOfM(String) Day of month 1
dOfW (String) Day of week 4
carrier (String) Carrier code AA
tailNum (String) Unique identifier for the plane - tail number N787AA
flnum(Int) Flight number 21
org_id(String) Origin airport ID 12478
origin(String) Origin Airport Code JFK
dest_id (String) Destination airport ID 12892
dest (String) Destination airport code LAX
crsdeptime(Double) Scheduled departure time 900
deptime (Double) Actual departure time 855
depdelaymins (Double) Departure delay in minutes 0
crsarrtime (Double) Scheduled arrival time 1230
arrtime (Double) Actual arrival time 1237
arrdelaymins (Double) Arrival delay minutes 7
crselapsedtime (Double) Elapsed time 390
dist (Int) Distance 2475

Comenzamos por analizar tres vuelos. Por cada uno disponemos de la siguiente información:

Originating Airport Destination Airport Distance
SFO ORD 1800 miles
ORD DFW> 800 miles
DFW SFO> 1400 miles

En éste escenario se representará los aeropuertos como vértices y las rutas como aristas. En nuestro grafo tendremos tres vértices, cada uno representando un aeropuerto.. La distancia entre dos aeropuertos es una propiedad de la ruta, como se muestra más abajo:


Tabla de Vértices para Aeropuertos


Definimos los aeropuertos como vértices. Los vértices tienen un Id y pueden tener asociados propiedades o atributos. Cada vértice consiste en:
  • Id del vértice → Id
  • Propiedad del vértice → nombre

ID Property
1 SFO
2 ORD
3 DFW


Tabla de Aristas para las Rutas


Las aristas son las rutas entre dos aeropuertos. Una arista ha de tener una fuente, un destino, y puede tener propiedades. En nuestro ejemplo, una arista consiste en:
  • Id del Origen de la Arista → src
  • Id del Destino de la Arista → dest
  • Propiedad distancia de la Arista → distance

SrcId DestId Property
1 2 1800
2 3 800
3 1 1400


Software


Este tutorial se puede ejecutar sobre el MapR Sandbox incluido en Spark.
El código en Scala está actualizado y listado hasta el apartado PageRank inclusive. Inicialmente definimos el Esquema Flight como sigue,

case class Flight(dofM: String, dofW: String, carrier: String, tailnum: String,
                  flnum: Int, org_id: Long, origin: String, dest_id: Long, dest: String,
                  crsdeptime: Double, deptime: Double, depdelaymins: Double, crsarrtime: Double,
                  arrtime: Double, arrdelay: Double, crselapsedtime: Double, dist: Int)


Y seguidamente creamos la aplicación. En la primera parte desarrollamos el código que nos va a permitir leer el fichero CSV. Para ello la función definida abajo parsea cada linea del fichero CSV en la clase Flight,

def parseFlight(line: Array[String]): Flight = {
    Flight(line(0), line(1), line(2), line(3), line(4).toInt, line(5).toLong,
    line(6), line(7).toLong, line(8), line(9).toDouble, line(10).toDouble,
    line(11).toDouble, line(12).toDouble, line(13).toDouble, line(14).toDouble, line(15).toDouble, line(16).toInt)
  }


Cada linea en el fichero de datos corresponderá a un objeto Flight diferente

// each row is an array of strings (the columns in the csv file)
  val rows = ArrayBuffer[Array[String]]()
  val temp = ArrayBuffer[Array[String]]()

  val bufferedSource = scala.io.Source.fromFile("rita2014jan.csv")
  var textRDDPage : RDD[Array[String]] = null
  var endProgram = false

  // val cont = 0
   breakable {
    for (line <- bufferedSource.getLines) {
      var cols = line.split(",").map(_.trim)
      var row = ArrayBuffer[String]()
      for (col <- cols if !col.isEmpty) {
        row += col
      }

      if(row.size < 17)
        break

      rows += row.toArray
    }
  }
 
Vamos leyendo las líneas del fichero de datos una a una comprobando que no leemos una línea vacía y que está formada por 17 columnas. Cada una de las cuales será añadida al ArrayBuffer[String] 'row'. Finalmente cada fila es añadida al conjunto de filas definido como un ArrayBuffer[Array[String]]. La lectura se interrumpe con la primera línea vacía. Utilizaremos un RDD para los vuelos y otro para los aeropuertos obteniendo estos últimos a partir de los vuelos.

  var flightsRDD : RDD[Flight] = null
  var airports : RDD[(Long, String)] = null
  if (rows.size > 0) {
    textRDDPage = sc.parallelize(rows)
    println(textRDDPage.count() + " rows gotten")
    rows.clear()

    //Create RDD with the January 2014 data
    flightsRDD = textRDDPage.map(parseFlight).cache()
    airports = flightsRDD.map(flight => (flight.org_id, flight.origin)).distinct

    val items = airports.collectAsMap()
    val keys = items.keySet
    if (keys.size > 0) {
      println("Hello World, " + keys.size + " keys")
      for (key <- keys) {
        val airportId = key
        val airportName = items.get(key)
        println("id: " + airportId +
          ", name:" + airportName)
      }
    }
  }

Y finalmente verificamos listando los aeropuertos.


Saturday, March 12, 2016

Spring Data Neo4j - Cineasts.net

Cineasts.net


Cineasts.net is a project original from Misquitta Luanne which is based in the Spring Data Neo4j paradigma. In that project I have kindly added some new features to make it more functional and easy to use. I am talking about the posibbility to list all the movies as one of them.

The most relevant feature added to the project is Neo4j driver (REST API client) for Node.js which lets initialize the database of the web application if this does not exist. Here is the code,

var http = require('http');
var neo4j = require('neo4j');

var db = new neo4j.GraphDatabase('http://neo4j:neo@localhost:7474');

db.cypher({
    query: 'MATCH (user:User {login: {login}}) RETURN user',
    params: {
        login: 'micha',
    },
}, callback);

function callback(err, results) {
    if (err) {
        throw err;
    }
    var result = results[0];
    if (!result) {
        console.log('Database not found,');
        console.log('creating database...');

        http.get('http://localhost:8080/database', (res) => {
            console.log(`Got response: ${res.statusCode}`);
            console.log('run http://localhost:8080');
            // consume response body
            res.resume();
        }).on('error', (e) => {
            console.log(`Got error: ${e.message}`);
        });


    } else {
        //var user = result['user'];
        //console.log(user);
        console.log('Database found...');
        console.log('run http://localhost:8080');
    }
};

The question is if the database exists. To resolve it I ask by a cypher query for the user micha who is de administrator and he is created when the database is created as well. If the query returns a no result a GET is called to the URL 'http://localhost:8080/database'. See the Java code below,

@Controller
public class DatabaseController {
    
    @Autowired
    private DatabasePopulator populator;

    @RequestMapping(value = "/database", method = RequestMethod.GET)
    public String populateDatabase(Model model) {
        Collection<Movie> movies = populator.populateDatabase();
        model.addAttribute("movies", movies);
        return "index";
    }
}

where the code of method populateDatabase() from the class DatabasePopulator is as follows,
@Transactional
public List<Movie> populateDatabase() {
    importService.importImageConfig();
    User me = userRepository.save(new User("micha", "Micha", "password", User.SecurityRole.ROLE_ADMIN, User.SecurityRole.ROLE_USER));
    User ollie = new User("ollie", "Olliver", "password", User.SecurityRole.ROLE_USER);
    me.addFriend(ollie);
    userRepository.save(me);
    List<Integer> ids = asList(19995 , 194, 600, 601, 602, 603, 604, 605, 606, 607, 608, 609, 13, 20526, 11, 1893, 1892,
              1894, 168, 193, 200, 157, 152, 201, 154, 12155, 58, 285, 118, 22, 392, 5255, 568, 9800, 497, 101, 120, 121, 122);
    List<Movie> result = new ArrayList<Movie>(ids.size());
    for (Integer id : ids) {
        result.add(importService.importMovie(String.valueOf(id)));
    }

    /*me.rate(movieRepository.findById("13"), 5, "Inspiring");
    final Movie movie = movieRepository.findById("603");
    me.rate(movie, 5, "Best of the series");*/
    
    return result;
}

You can get my complete proyect at GitHub - my-sdn4-cineasts.

Thursday, June 18, 2015

Neo4j and REST API from Java when authentication is required

Based on "The Neo4j v2.3.0-M01 Manual" manual under 7.1 How to use the REST API from Java, I've developed the same example program using Jersey library (2.19). The goal of this example is to use the REST API from Java when auth is required.

The Jersey Client API reuses many aspects of the JAX-RS and the Jersey implementation. To utilize the client API it is first necessary to build an instance of a Client using one of the static ClientBuilder factory methods. Once you have a Client instance you can create a WebTarget from it.


Client client = ClientBuilder.newClient();
WebTarget webTarget = client.target("http://example.com/rest/");

The uri passed to the method as a String is the URI of the targeted web resource that represents a context root of a RESTful application. If there is a resource exposed on the URI "http://example.com/rest/resource", a WebTarget instance can be used to derive other web targets.

WebTarget resourceWebTarget = webTarget.path("resource");

The resourceWebTarget now points to the resource on URI "http://example.com/rest/resource"

In our example program, once the server is started, we’ll check the connection to it.
 
Authentication would take place the following code snippet,
private static WebTarget testDatabaseAuthentication()
{
    // START SNIPPET: testAuthentication
    Client client = ClientBuilder.newClient();

    HttpAuthenticationFeature authFeature =
        HttpAuthenticationFeature.basic(username, password);
    
    client.register(authFeature);

    WebTarget target = client.target(SERVER_ROOT_URI);

    Response response = target
            .request()
            .header("application/xml", "true")
            .get();

    String entity = response.readEntity(String.class);

    System.out.println( String.format(
            "GET, status code [%d], returned data: "
                    + System.getProperty( "line.separator" ) + "%s",
            response.getStatus(), entity ) );

    response.close();
    return target;
    // END SNIPPET: testAuthentication
}

In order to enable http authentication support in Jersey client register the HttpAuthenticationFeature. For our example I’ve used basic pre-emptive authentication. I’ll keep the 'target' object along the whole program so I don’t to need to login in each of the different methods again.

The main method should look as follows,

private static final String SERVER_ROOT_URI = "http://localhost:7474/db/data/";

private static final String username = "neo4j_username";
private static final String password = "neo4j_password";

public static void main( String[] args ) //throws URISyntaxException
{
    WebTarget target = testDatabaseAuthentication();

    // START SNIPPET: nodesAndProps
    URI firstNode = createNode( target );
    addProperty( target, firstNode, "name", "Joe Strummer" );
    URI secondNode = createNode( target );
    addProperty( target, secondNode, "band", "The Clash" );
    // END SNIPPET: nodesAndProps

    // START SNIPPET: addRel
    URI relationshipUri = addRelationship( target, firstNode, secondNode, "singer",
    "{ \"from\" : \"1976\", \"until\" : \"1986\" }" );
    // END SNIPPET: addRel

    // START SNIPPET: addMetaToRel
    addMetadataToProperty( target, relationshipUri, "stars", "5" );
    // END SNIPPET: addMetaToRel

    // START SNIPPET: queryForSingers
    findSingersInBands( target, firstNode );
    // END SNIPPET: queryForSingers

    sendTransactionalCypherQuery( target, "MATCH (n) WHERE has(n.name) RETURN n.name AS name" );
}

If you want you can download the code from my GitHub

All the best,
José