5. Graph like data using Neo4j Graph Database

5. Graph like data using Neo4j Graph Database

This is the 5th blog as part of the series Full Stack: Remastering Master Data Management into graph like data. Hope you enjoy the series and find it useful !!

Introduction

In today's world, people don’t just need to manage larger volumes of data – they need to generate insight from their existing data. In this case, the relationships between data points matter more than the individual points themselves.

In order to leverage data relationships, organizations need a database technology that stores relationship information as a first-class entity. That technology is a graph database.

Ironically, legacy relational database management systems (RDBMS) are poor at handling data relationships. Their rigid schema make it difficult to add different connections or adapt to new business requirements.

Not only do graph databases effectively store data relationships; they’re also flexible when expanding a data model or conforming to changing business needs.

Graph

Advantages of using a Graph database

We live in a connected world! There are no isolated pieces of information, but rich, connected domains all around us. Only a database that natively embraces relationships is able to store, process, and query connections efficiently. While other databases compute relationships at query time through expensive JOIN operations, a graph database stores connections alongside the data in the model.

Accessing nodes and relationships in a native graph database is an efficient, constant-time operation and allows you to quickly traverse millions of connections per second per core.

Independent of the total size of your dataset, graph databases excel at managing highly-connected data and complex queries. With only a pattern and a set of starting points, graph databases explore the neighboring data around those initial starting points — collecting and aggregating information from millions of nodes and relationships — and leaving any data outside the search perimeter untouched.

What is Neo4j?

Neo4j is an open-source, NoSQL, native graph database that provides an ACID-compliant transactional backend for your applications. It is referred to as a native graph database because it efficiently implements the property graph model down to the storage level. This means that the data is stored exactly as you whiteboard it, and the database uses pointers to navigate and traverse the graph. In contrast to graph processing or in-memory libraries, Neo4j also provides full database characteristics, including ACID transaction compliance, cluster support, and runtime failover – making it suitable to use graphs for data in production scenarios.

Some of the following particular features make Neo4j very popular among developers, architects, and DBAs:

  • Cypher, a declarative query language similar to SQL, but optimized for graphs.
  • Constant time traversals in big graphs for both depth and breadth due to efficient representation of nodes and relationships. Enables scale-up to billions of nodes on moderate hardware.
  • Flexible property graph schema that can adapt over time, making it possible to materialize and add new relationships later to shortcut and speed up the domain data when the business needs change.
  • Drivers for popular programming languages, including Java, JavaScript, .NET, Python, and many more.

How to define a Graph?

A graph is composed of two elements: a node and a relationship.

Each node represents an entity (a person, place, thing, category or other piece of data), and each relationship represents how two nodes are associated. This general-purpose structure allows you to model all kinds of scenarios – from a system of roads, to a network of devices, to a population’s medical history or anything else defined by relationships.

So having set the base clear, lets see how this can solve our primary use-case: Creating a data-lake for Master Data Management

We will be using the power of java drivers for Neo4j and our previous Quarkus project to continue. Quarkus provides extensions to support Neo4j. We will be using that.

You can use Neo4j Sandbox or run a quick instance of Docker. In my case, I am running a docker instance locally.

Dependencies

So first things first, lets add the dependencies to get started.

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-neo4j</artifactId>
</dependency>

Model

Lets define a simple User Model like below:

public class User {

    private String id;
    private String type;
    private long startTime;
    private long endTime;
    private String firstName;
    private String middleName;
    private String lastName;
    private String homePhone;
    private String mobilePhone;
    private String officePhone;
    private String notes;
    private String location;
    private String email;
    private String photo;
    private String employeeId;
    private String employeeTitle;
    private String employeeType;
    private String employeeStatus;
    private String employeeOrganization;

    public static User from(Node node) { // org.neo4j.driver.types.Node;
        return new User(node.get("id").asString(),
                node.get("type").asString(),
                node.get("startTime").asLong(),
                node.get("endTime").asLong(),
                node.get("firstName").asString(),
                node.get("middleName").asString(),
                node.get("lastName").asString(),
                node.get("homePhone").asString(),
                node.get("mobilePhone").asString(),
                node.get("officePhone").asString(),
                node.get("notes").asString(),
                node.get("location").asString(),
                node.get("email").asString(),
                node.get("photo").asString(),
                node.get("employeeId").asString(),
                node.get("employeeTitle").asString(),
                node.get("employeeType").asString(),
                node.get("employeeStatus").asString(),
                node.get("employeeOrganization").asString());
    }

}

Lets define a simple Relation Model like below:

public class Relation {

    private String id;
    private String type;
    private long startTime;
    private long endTime;
    private String lhsMappingId;
    private String lhsMappingType;
    private String rhsMappingId;
    private String rhsMappingType;

    public static Relation from(Relationship relation) {
        return new Relation(relation.get("id").asString(),
                relation.get("type").asString(),
                relation.get("startTime").asLong(),
                relation.get("endTime").asLong(),
                relation.get("lhsMappingId").asString(),
                relation.get("lhsMappingType").asString(),
                relation.get("rhsMappingId").asString(),
                relation.get("rhsMappingType").asString());
    }
}

Controllers

Lets define a simple User Controller for a POST method as below:

@Inject
Driver driver;

@Path("/neo/user")
@POST
public CompletionStage<Response> create(User user) {
AsyncSession session = driver.asyncSession();
return session
    .writeTransactionAsync(tx -> tx
        .runAsync("MERGE (u:User {id: $id,"
        + " type: $type,"
        + " startTime: $startTime,"
        + " endTime: $endTime,"
        + " firstName: $firstName,"
        + " middleName: $middleName,"
        + " lastName: $lastName,"
        + " homePhone: $homePhone,"
        + " mobilephone: $mobilePhone,"
        + " officePhone: $officePhone,"
        + " notes: $notes,"
        + " location: $location,"
        + " email: $email,"
        + " photo: $photo,"
        + " employeeId: $employeeId,"
        + " employeeTitle: $employeeTitle,"
        + " employeeType: $employeeType,"
        + " employeeStatus: $employeeStatus,"
        + " employeeOrganization: $employeeOrganization}) RETURN u",
        Values.parameters("id", user.getId(),
        "type", user.getType(),
        "startTime", user.getStartTime(),
        "endTime", user.getEndTime(),
        "firstName", user.getFirstName(),
        "middleName", user.getMiddleName(),
        "lastName", user.getLastName(),
        "homePhone", user.getHomePhone(),
        "mobilePhone", user.getMobilePhone(),
        "officePhone", user.getMobilePhone(),
        "notes", user.getNotes(),
        "location", user.getLocation(),
        "email", user.getEmail(),
        "photo", user.getPhoto(),
        "employeeId", user.getEmployeeId(),
        "employeeTitle", user.getEmployeeTitle(),
        "employeeType", user.getEmployeeType(),
        "employeeStatus", user.getEmployeeStatus(),
        "employeeOrganization", user.getEmployeeOrganization()))
        .thenCompose(fn -> fn.singleAsync())
    )
    .thenApply(record -> User.from(record.get("u").asNode()))
    .thenCompose(persistedUser -> session.closeAsync().thenApply(signal -> persistedUser))
    .thenApply(persistedUser -> Response
        .created(URI.create("/neo/users/" + persistedUser.getId()))
        .build()
    );
}

and GET methods as below:

@Inject
Driver driver;

@Path("neo/users/all")
@GET
public CompletionStage<Response> get() {
    AsyncSession session = driver.asyncSession(); 
    return session
        .runAsync("MATCH (u:User) RETURN u")  
        .thenCompose(cursor ->  
            cursor.listAsync(record -> User.from(record.get("u").asNode()))
        )
        .thenCompose(users ->  
            session.closeAsync().thenApply(signal -> users)
        )
        .thenApply(Response::ok) 
        .thenApply(ResponseBuilder::build);
}

@GET
@Path("neo/users/{id}")
public CompletionStage<Response> getSingle(@PathParam("id") String id) {
    AsyncSession session = driver.asyncSession();
    return session
        .readTransactionAsync(tx -> tx
            .runAsync("MATCH (u:User) WHERE u.id = $id RETURN u", Values.parameters("id", id))
            .thenCompose(fn -> fn.singleAsync())
    )
    .handle((record, exception) -> {
        if(exception != null) {
            Throwable source = exception;
            if(exception instanceof CompletionException) {
                source = ((CompletionException)exception).getCause();
            }
            Status status = Status.INTERNAL_SERVER_ERROR;
            if(source instanceof NoSuchRecordException) {
                status = Status.NOT_FOUND;
            }
            return Response.status(status).build();
        } else  {
            return Response.ok(User.from(record.get("u").asNode())).build();
        }
    })
    .thenCompose(response -> session.closeAsync().thenApply(signal -> response));
}

Similarly we can also define the Controllers for Relations as below:

@Inject
Driver driver;

@Path("neo/relation")
@POST
public CompletionStage<Response> create(Relation relation) {
    logger.info(relation.toString());
    AsyncSession session = driver.asyncSession();
    String neoQuery;
    switch (relation.getType()) {
    case "PersonaHasAccount":
        neoQuery = "MATCH (u:User), (a:Account) WHERE u.id = $lhsMappingId AND a.id = $rhsMappingId"
                + " AND u.startTime <= $startTime AND a.startTime <= $startTime"
                + " MERGE (u)-[r:PersonaHasAccount {id: $id, type: $type, startTime: $startTime,"
                + " endTime: $endTime, lhsMappingId: $lhsMappingId, lhsMappingType: $lhsMappingType,"
                + " rhsMappingId: $rhsMappingId, rhsMappingType: $rhsMappingType}]->(a) RETURN r";
        break;

    case "PersonaReportsToPersona":
        neoQuery = "MATCH (u1:User), (u2:User) WHERE u1.id = $lhsMappingId AND u2.id = $rhsMappingId"
                + " AND u1.startTime <= $startTime AND u2.startTime <= $startTime"
                + " MERGE (u1)-[r:PersonaReportsToPersona {id: $id, type: $type, startTime: $startTime,"
                + " endTime: $endTime, lhsMappingId: $lhsMappingId, lhsMappingType: $lhsMappingType,"
                + " rhsMappingId: $rhsMappingId, rhsMappingType: $rhsMappingType}]->(u2) RETURN r";
        break;

    default:
        neoQuery = null;
        break;
    }

    return session
            .writeTransactionAsync(tx -> tx.runAsync(neoQuery, Values.parameters("id", relation.getId(), "type",
                    relation.getType(), "startTime", relation.getStartTime(), "endTime", relation.getEndTime(),
                    "lhsMappingId", relation.getLhsMappingId(), "lhsMappingType", relation.getLhsMappingType(),
                    "rhsMappingId", relation.getRhsMappingId(), "rhsMappingType", relation.getRhsMappingType()))
                    .thenCompose(fn -> fn.singleAsync()))
            .thenApply(record -> Relation.from(record.get("r").asRelationship()))
            .thenCompose(persistedRelation -> session.closeAsync().thenApply(signal -> persistedRelation))
            .thenApply(persistedRelation -> Response
                    .created(URI.create("/neo/relations/" + persistedRelation.getId())).build());

}

@GET
@Path("neo/relation/{id}")
public CompletionStage<Response> getSingle(@PathParam("id") String id) {
    AsyncSession session = driver.asyncSession();
    return session.readTransactionAsync(
            tx -> tx.runAsync("MATCH ()-[r]->() WHERE r.id = $id RETURN r", Values.parameters("id", id))
                    .thenCompose(fn -> fn.singleAsync()))
            .handle((record, exception) -> {
                if (exception != null) {
                    Throwable source = exception;
                    if (exception instanceof CompletionException) {
                        source = ((CompletionException) exception).getCause();
                    }
                    Status status = Status.INTERNAL_SERVER_ERROR;
                    if (source instanceof NoSuchRecordException) {
                        status = Status.NOT_FOUND;
                    }
                    return Response.status(status).build();
                } else {
                    return Response.ok(Relation.from(record.get("r").asRelationship())).build();
                }
            }).thenCompose(response -> session.closeAsync().thenApply(signal -> response));
}

Configuration Properties

We can define the configuration properties to connect to Neo4j instance as below:

#
# Quarrkus :: Neo4j
#
quarkus.neo4j.uri = bolt://127.0.0.1:7687
quarkus.neo4j.authentication.username = neo4j
quarkus.neo4j.authentication.password = password

That's it ! Now we can start the application by running:

./mvnw compile quarkus:dev

Then you can perform POST or GET call to insert or fetch a bsic User data.

Source Code

Lets try to add some more Entities and Relationships. You can find the source code in: