You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
FRANCOIS PELLETIER ff228dd012 ajout projet initial 1 month ago
.gitignore Initial commit 1 month ago
LICENSE Initial commit 1 month ago
README.md ajout projet initial 1 month ago
address.csv ajout projet initial 1 month ago
person.csv ajout projet initial 1 month ago
person_address.csv ajout projet initial 1 month ago

README.md

Importation de données depuis un format CSV dans JanusGraph

Dans ce tutoriel, j’explique comment partir de données au format CSV contenant deux tables de propriétés et une table d’adjacence et les insérer dans une base de données JanusGraph.

Les fichiers exemples sont:

  • address.csv
  • person_address.csv
  • person.csv

{.align-center}

Résumé

L’insertion se fait selon les étapes suivantes:

  • Préparer les fichiers de configuration pour chacune des formes qu’occupera le graphe:
    • CSV/HDFS
    • Gryo
    • JanusGraph (Cassandra/Elasticsearch)
  • Importer les données dans HDFS
  • Préparer les données en une seule table de relations avec Spark
  • Convertir les données dans un graphe au format Kryo (Gryo)
  • Créer le schéma du graphe Janusgraph contenant les propriétés, les sommets et les arêtes
  • Créer les différentes indexations
  • Copier le graphe Kryo dans la base de données JanusGraph

Fichiers de configuration

  • Script Groovy pour lire les fichiers CSV (gremlin.hadoop.scriptInputFormat.script)
# csv-script-input.groovy
def parse(line, factory) {
    def (vlabel, vid, props, adjacent) = line.split(/:/)
    def v1 = factory.vertex(vlabel + ":" + vid, vlabel)
    switch (vlabel) {
      case "person":
        def (first, last) = props.split(/,/)
        v1.property("firstname", first)
        v1.property("lastname", last)
        def v2 = factory.vertex("address:" + adjacent)
        factory.edge(v1, v2, "livesIn")
        break
      case "address":
        def (country, region) = props.split(/,/)
        v1.property("country", country)
        v1.property("region", region)
        def v2 = factory.vertex("person:" + adjacent)
        factory.edge(v2, v1, "livesIn")
        break
    }
    return v1
}
  • Format CSV
<!-- -->
# csv-script.properties

## Gremlin Graph Configuration
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph

## Hadoopp Graph Configuration
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=/home/pef011/demo/input
gremlin.hadoop.scriptInputFormat.script=/home/pef011/demo/csv-script-input.groovy
gremlin.hadoop.outputLocation=/home/pef011/demo/output

## SparkGraphComputer Configuration
spark.master=local[4]
spark.executor.memory=1g
spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
  • Format Hadoop/Gryo
<!-- -->
# hadoop-graph/hadoop-load.properties

## Hadoop Graph Configuration
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.inputLocation=/home/pef011/demo/csv-graph.kryo
gremlin.hadoop.outputLocation=output
gremlin.hadoop.jarsInDistributedCache=true

## SparkGraphComputer Configuration
spark.master=local[*]
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
  • Format JanusGraph(Cassandra/Elasticsearch)
<!-- -->
# janusgraph-cassandra-es.properties
gremlin.graph=org.janusgraph.core.JanusGraphFactory
storage.backend=cql
storage.hostname=10.2.0.4
storage.username=cassandra
storage.password=9YoUW7wBBgFS
cache.db-cache = true
cache.db-cache-clean-wait = 20
cache.db-cache-time = 180000
cache.db-cache-size = 0.25
index.search.backend=elasticsearch
index.search.hostname=10.2.0.5

Transformation des fichiers CSV en une table de relations

On exécute ce script Spark en local (ou dans HDFS) pour produire la table de relations.

val address = sc.textFile("/home/pef011/demo/address.csv").map(_.split(",")).keyBy(a => a(0)).cache()
val person = sc.textFile("/home/pef011/demo/person.csv").map(_.split(",")).keyBy(p => p(0)).cache()
val outE = sc.textFile("/home/pef011/demo/person_address.csv").map(_.split(",")).keyBy(e => e(0)).cache()
val inE = outE.map(x => Array(x._2(1), x._2(0))).keyBy(e => e(0)).cache()
val addressOutE = address.join(inE).mapValues(x => (x._1.slice(1,3), x._2(1)))
val personInE = person.join(outE).mapValues(x => (x._1.slice(1,3), x._2(1)))
val addressLines = addressOutE.map(a => Array("address", a._1, a._2._1.mkString(","), a._2._2).mkString(":"))
val personLines = personInE.map(a => Array("person", a._1, a._2._1.mkString(","), a._2._2).mkString(":"))
addressLines.union(personLines).saveAsTextFile("/home/pef011/demo/input")

Il faut ensuite charger les fichiers dans HDFS si on a travaillé en local, incluant le script Groovy nécessaire pour lire la table de relations.

hadoop fs -copyFromLocal csv-script-input.groovy
hadoop fs -copyFromLocal input/ .

Table de relations

On utilise ensuite Gremlin pour transformer la table de relations en graphe Tinkergraph sauvegardé au format Gryo.

/* Graph instance */
csvGraph = GraphFactory.open('/home/pef011/demo/csv-script.properties')

/* CSV -> Gryo */
writeGraphConf = new BaseConfiguration()
writeGraphConf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph")
writeGraphConf.setProperty("gremlin.tinkergraph.graphFormat", "gryo")
writeGraphConf.setProperty("gremlin.tinkergraph.graphLocation", "/home/pef011/demo/csv-graph.kryo")

/* Transformation */
blvp = BulkLoaderVertexProgram.build().bulkLoader(OneTimeBulkLoader).writeGraph(writeGraphConf).create(csvGraph)
csvGraph.compute(SparkGraphComputer).workers(1).program(blvp).submit().get()
/* Fermeture de l'instance */
csvGraph.close()

Traversée du graphe Gryo

On peut vérifier que le graphe est bien écrit dans le fichier Gryo en effectuant une traversée et en listant les sommets.

gryoGraph = GraphFactory.open(writeGraphConf)
gt = gryoGraph.traversal()
gt.V().valueMap(true)

Création du schéma du graphe Janusgraph

/* Graph instance */
jgraph = JanusGraphFactory.open('/home/pef011/janusgraph-0.2.0-hadoop2/conf/janusgraph-cassandra-es.properties')

/* Propriétés */
mgmt=jgraph.openManagement()
livesIn = mgmt.makeEdgeLabel("livesIn").multiplicity(MANY2ONE).make()
firstname = mgmt.makePropertyKey("firstname").dataType(String.class).cardinality(Cardinality.SINGLE).make()
lastname = mgmt.makePropertyKey("lastname").dataType(String.class).cardinality(Cardinality.SINGLE).make()
country = mgmt.makePropertyKey("country").dataType(String.class).cardinality(Cardinality.SINGLE).make()
region = mgmt.makePropertyKey("region").dataType(String.class).cardinality(Cardinality.SINGLE).make()
blid = makePropertyKey("bulkLoader.vertex.id").dataType(Long.class).make()
person = mgmt.makeVertexLabel('person').make()
address = mgmt.makeVertexLabel('address').make()
mgmt.commit()
jgraph.close()

Indexation

On fait ensuite l’indexation selon la documentation

/* Graph instance */
jgraph = JanusGraphFactory.open('/home/pef011/janusgraph-0.2.0-hadoop2/conf/janusgraph-cassandra-es.properties')

/* Indexation */
livesIn = mgmt.getEdgeLabel("livesIn")
firstname =  = mgmt.getPropertyKey("firstname")
lastname =  = mgmt.getPropertyKey("lastname")
country = mgmt.getPropertyKey("country")
region = mgmt.getPropertyKey("region")
blid = mgmt.getPropertyKey("bulkLoader.vertex.id")
person = mgmt.getVertexLabel('person')
address = mgmt.getVertexLabel('address')

livesInCountryIndex = mgmt.buildEdgeIndex(livesIn, "livesInCountry", Direction.BOTH, Order.decr, country)
livesInRegionIndex = mgmt.buildEdgeIndex(livesIn, "livesInRegion", Direction.BOTH, Order.decr, region)
nameIndex = mgmt.buildIndex("nameIdx"), Vertex.class).addKey(firstname).addKey(lastname).buildCompositeIndex()
placeIndex = mgmt.buildIndex("placeIdx"), Vertex.class).addKey(country).addKey(region).buildCompositeIndex()
personIndex = mgmt.buildIndex("personIdx", Vertex.class).addKey(blid).indexOnly(person).buildCompositeIndex()
addressIndex = mgmt.buildIndex("addressIdx", Vertex.class).addKey(blid).indexOnly(address).buildCompositeIndex()

mgmt.commit()
jgraph.close()

Chargement depuis Gryo vers JanusGraph

/* Instanciation du graphe */
gryoGraph = GraphFactory.open('conf/hadoop-graph/hadoop-load.properties')
/* Écrire dans Cassandra/Elasticsearch */
blvp = BulkLoaderVertexProgram.build().writeGraph('/home/pef011/janusgraph-0.2.0-hadoop2/conf/janusgraph-cassandra-es.properties').create(gryoGraph)
gryoGraph.compute(SparkGraphComputer).program(blvp).submit().get()
gryoGraph.close()

Réindexer après l’importation des données

jgraph = JanusGraphFactory.open('/home/pef011/janusgraph-0.2.0-hadoop2/conf/janusgraph-cassandra-es.properties')
mgmt=jgraph.openManagement()
mgmt.updateIndex(mgmt.getGraphIndex("livesInCountry"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("livesInRegion"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("nameIdx"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("placeIdx"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("personIdx"), SchemaAction.REINDEX).get()
mgmt.updateIndex(mgmt.getGraphIndex("addressIdx"), SchemaAction.REINDEX).get()
mgmt.commit()

Traversée du graphe JanusGraph

jt = jgraph.traversal()
jt.V().valueMap(true)
jt.E().valueMap(true)