dev-resources.site
for different kinds of informations.
AWS Neptune for analysing event ticket sales between users - Part 1
This is the first of a two part blog series, where we will walk through the setup for using AWS Neptune to anaylse a property graph modelled from the Worldwide Event Attendance from AWS Marketplace Data Exchange, which is free to subscribe to. This contains data for user ticket purchases and sales for fictional daily events(operas, plays, pop concerts etc) across 2008. in the USA. This data is accessible from Redshift so part of this setup will involve loading the data in required format from Redshift to S3 bucket and then loading it into a Neptune DB instance for running queries and generating visualisations in Part 2.
Setting up the Neptune Cluster and Notebook
First we will need to create the Neptune Cluster and database instance. I have configured this from the AWS console, which can be followed using the steps in the docs but this could also be automated via one of the Cloudformation templates here.
- For the Engine options, select provisioned mode and the latest version of Neptune
- For Settings, select the Development and testing option rather than Production as this will give use the option to select the cheaper burstable class (db.t3.medium).
- We will not create any Neptune replicas in different availability zones so click No for Multi-AZ Deployment.
- For the Connectivity option, I have selected my default VPC for which I already have the security group configured with an inbound rule to allow access to any port in range with existing security group id as source. Alternatively you could add another custom rule to only allow inbound traffic to the specific default port for Neptune (8182).
- You can also choose to create a new VPC and new security group if you do not want to use the existing ones.
- We will configure the notebook separately after creating the cluster, so skip the Notebook configuration option.
- You can either skip the Additional configuration option and accept the defaults, which enable deletion protection, encryption at rest and auto minor version upgrades or disable the options you do not want.
We will now configure a Neptune graph notebook to access the cluster, so we can run queries and generate interactive visualisations. Neptune Workbench allows users to run fully managed jupyter notebook environment in Sagemaker with the latest release of the open source graph Neptune project. This has the benefit of offering in-built capabilities like visualisation of queries
- Click Notebooks from the navigation pane on the left and select Create notebook.
- In the Cluster list, choose your Neptune DB cluster. If you don't yet have a DB cluster, choose Create cluster to create one.
- For Notebook instance type, select ml.t3.medium which should be sufficient for this example.
- Under IAM role name, select create an IAM role for the notebook, and enter a name for the new role.
Finally, we need to create an IAM role for Neptune to assume to be able to load data from S3. Also, since Neptune DB instance is within a VPC, we need to create an S3 gateway endpoint to allow access to S3. This can be achieved by following the steps in the IAM prerequites for the Neptune Bulk Loader.
Redshift Serverless Data Query and Unload
In this previous blog, I have described how to configure AWS Redshift Serverless with access to AWS Marketplace Worldwide Events Dataset. Follow the steps to configure datashare to access this database from the redshift cluster.
We will model the users and events as nodes and relationship between each user and event as an edge. For example, a seller (node) would list (relationship) a ticket for a given event (node) for which one or many buyers (node(s)) would purchase (relationship(s)) tickets for (or unluckily noone may pruchase from the seller).
Open the query editor in the navigation pane in the Redshift Serverless console. We will first create a view which will filter the all_users
view in the worldwide events datashare, to only contain users who like theatre, concerts and opera. The additional constraint is that we will only keep data that has no nulls in any of the entries for the boolean columns selected.
CREATE VIEW user_sample_vw AS
SELECT userid, username, city, state, liketheatre, likeconcerts, likeopera FROM
"worldwide_event_data_exchange"."public"."all_users"
WHERE (liketheatre IS NOT NULL AND likeconcerts IS NOT NULL AND likeopera IS NOT NULL)
with no schema binding;
Lets also create another view containing a snapshot of events and related transactions between selected buyers and sellers in our user_sample_vw for the month of January. We also need to pull in additional columns corresponding to venue, event and ticket purchase and listing details (.e.g number of tickets and price). Hence we need to join to the respective tables.
Note We also only want records where either the buyer or seller cannot be NULL and all users must be from the subset we sampled in user_sample_vw.
CREATE OR REPLACE VIEW network_vw AS
SELECT *
FROM
(
SELECT S.saletime, L.sellerid, L.listtime, S.buyerid, E.eventid, E.eventname,
V.venuename , C.catname , V.venuecity, V.venuestate, pricepaid ,qtysold, D.caldate,
priceperticket AS listprice ,numtickets AS listtickets
FROM "worldwide_event_data_exchange"."public"."date" D
JOIN "worldwide_event_data_exchange"."public"."sales" S
ON D.dateid = S.dateid
JOIN "worldwide_event_data_exchange"."public"."listing" L
ON S.listid = L.listid
JOIN "worldwide_event_data_exchange"."public"."event" E
ON E.eventid = S.eventid
JOIN "worldwide_event_data_exchange"."public"."category" C
ON E.catid = C.catid
JOIN "worldwide_event_data_exchange"."public"."venue" V
ON E.venueid = V.venueid
JOIN "dev"."public"."user_sample_vw" U
ON S.buyerid = U.userid
WHERE D.qtr = 1
) A
JOIN "dev"."public"."user_sample_vw" B
ON A.sellerid = B.userid
with no schema binding;
You should see the network_vw view visible if you refresh the dev database and expand the view dropdown in the tree. A sample of the rows and columns of the view looks like below. We will use this later to simplify the creation of edge records for our csv to export to S3. We will also use the eventid and related properties to create nodes csv.
We will need to generate two csv files(one containing all the nodes and other containing all the relationship records) in the S3 bucket. This is a requirement when we will subsequently use the Neptune Bulk Loader to load the data into Neptune using the openCypher-specific csv format (since we will be using openCypher to query the graph data). In addition, the openCypher load format requires system column headers in node and relationship files as detailed in the docs. Any column that holds the values for a particular property needs to use a property column header propertyname:type.
We will need to create a role to associate with redshift serverless endpoint so it can unload data into S3.
In the Redshift Serverless console, go to Namespace configuration and select the namespace. Then go to Security and Encryption Tab and click on Manage IAM roles under the Permissions section. Click the Create IAM role option in the Manage IAM roles dropdown. This will create an IAM role as the default with AWS managed policy AmazonRedshiftAllCommandsFullAccess attached. This includes permissions to run SQL commands to COPY, UNLOAD, and query data with Amazon Redshift Serverless.
Select the option Specific S3 buckets and select the S3 bucket created for unloading the nodes and relationship data to. Then click Create IAM role as default.
This default role created does allow permissions to run select statements on other services besides S3, including Sagemaker, Glue etc. The policy attached to the new role created would need to be updated from IAM if you want to limit permissions to fewer services.
If you navigate back to the Namespace, you should see the IAM role and the associated arn (highlighted in yellow) which you will need to specify when running commands to unload data to S3.
We will use the UNLOAD command to unload the results of the queries above to S3 in csv format. We need to add the following options below.
- CSV DELIMITER AS: to use csv format with delimiter as ','
- HEADER: specify first row as header row
- CLEANPATH: to remove any existing S3 file before unloading new query
- PARALLEL OFF: turn off parallel writes as we want a single CSV files rather than multiple partitions.
unload ('<query>')
to <s3://object-path/name-prefix>
iam_role <your role-arn>
CSV DELIMITER AS ','
HEADER
cleanpath
parallel off;
The query below will unload the results for all the user and event node records to an S3 bucket s3://redshift-worldwide-events with object name prefix nodes. Replace the iam role arn with your role arn. The first line will force the column names to be the same case as used in the query (by default all column names are overriden to lowercase).
SET enable_case_sensitive_identifier TO true;
unload (
'SELECT DISTINCT *
FROM
(
SELECT CONCAT(''u'', A.buyerid) AS ":ID", B.username AS "name:String",
B.liketheatre AS "liketheatre:Bool", B.likeconcerts AS "likeconcerts:Bool", B.likeopera AS "likeopera:Bool",
NULL AS "venue:String", NULL AS "category:String", B.city AS "city:String", B.state AS "state:String", ''user'' AS ":LABEL"
FROM "dev"."public"."network_vw" A
JOIN user_sample_vw B
ON A.buyerid = B.userid
)
UNION
(
SELECT CONCAT(''u'', A.sellerid) AS ":ID", B.username AS "name:String",
B.liketheatre AS "liketheatre:Bool", B.likeconcerts AS "likeconcerts:Bool", B.likeopera AS "likeopera:Bool",
NULL AS "venue:String", NULL AS "category:String", B.city AS "city:String", B.state AS "state:String", ''user'' AS ":LABEL"
FROM "dev"."public"."network_vw" A
JOIN user_sample_vw B
ON A.sellerid = B.userid
)
UNION
(
SELECT CONCAT(''e'', eventid) AS ":ID", eventname AS "name:String",
NULL AS "liketheatre:Bool", NULL AS "likeconcerts:Bool", NULL AS "likeopera:Bool",
venuename AS "venue:String", catname AS "category:String", venuecity AS "city:String", venuestate AS "state:String", ''event'' AS ":LABEL"
FROM "dev"."public"."network_vw" B
)')
to 's3://redshift-worldwide-events/nodes'
iam_role '<your-iam-role>'
CSV DELIMITER AS ','
HEADER
cleanpath
parallel off
If it ran successfully, we should see a warning saying that 239 rows loaded successfully.
Lets break down the query and see what its doing. The first and second subqueries create records for buyer and seller nodes respectively by aliasing the column names to openCypher format and setting the event property columns to NULL. We need to join the network_vw (which contains the list of seller and buyer pairs) and the user_sample_vw (which contains the properties of all users) to select additional information per user like username, city and whether they like concerts, theatre and/or opera. The final subquery creates the records for the events nodes from network_vw and similarly aliasing the column names to the required format and setting the values for the columns corresponding to the users nodes to NULL. We then UNION the separate sub queries to combine them in the same results set.
We can similarily run a query for unloading the edge records results set. Here the S3 location option is slightly modified to use an object name prefix 'edges'
SET enable_case_sensitive_identifier TO true;
unload (
'SELECT ROW_NUMBER() OVER() AS ":ID",":START_ID",":END_ID", ":TYPE", "price:Double", "quantity:Int",
"date:DateTime"
FROM
(
(
SELECT CONCAT(''u'', sellerid) AS ":START_ID",
CONCAT(''e'', eventid) AS ":END_ID",''TICKETS_LISTED_FOR'' AS ":TYPE",
pricepaid AS "price:Double" ,qtysold AS "quantity:Int", caldate AS "date:DateTime"
FROM "dev"."public"."network_vw"
)
UNION
(
SELECT CONCAT(''e'', eventid) AS ":START_ID",
CONCAT(''u'', buyerid) AS ":END_ID",''TICKET_PURCHASE'' AS ":TYPE",
pricepaid AS "price:Double" ,qtysold AS "quantity:Int" , caldate AS "date:DateTime"
FROM "dev"."public"."network_vw"
)
)')
to 's3://redshift-worldwide-events/edges'
iam_role '<your-iam-role>'
CSV DELIMITER AS ','
HEADER
cleanpath
parallel off
Notice that we have also used a window function to rank the edge records for same node ids by date, so we can only take the latest transaction between pair of same users.
The screenshot below shows the edge records where there are multiple transactions between same buyer and seller on different dates. We will only keep the latest record.
If the query has loaded successfully, check that the two objects are visible in the S3 bucket.
Loading S3 Data into Neptune
Now we will load the data from the S3 bucket to the Neptune cluster. To do this, we will open the notebook we configured in Sagemaker to access the Neptune cluster.
- Go to the Sagemaker console, Notebook tab and select Notebook instances.
- You should see the Notebook instance status in service if the create notebook task ran successfully.
- Under Actions, click on Open Jupyter or Jupyter lab.
You should see a number of subfolders containing sample notebooks on various topics, one level below the Neptune parent folder. Either open one of the existing notebooks or start a blank new one.
First we will check if the notebook configurations are as we expect. Graph notebook offers a number of magic extensions in ipython3 kernel to run specific tasks in a cell such as run query in specific language (cypher, gremlin), check the status of load job/query, configurations settings, visualisation options etc.
In a new cell, use the magic command %graph_notebook_config
and execute. This should return a json payload containing connection information for the Neptune host instance the notebook is connected to.
If we want to override any of these (for example if we have set a port different to 8182, then we can copy the json output from the previous cell output and modify the required value. Run the cell with the magic command %%graph_notebook_config
to set the configuration to the new setting.
Check the status of the Neptune cluster endpoint is showing as healthy using the %status
magic extension.
We can use the Neptune loader command to send a post request to the Neptune endpoint as described here. For the request parameters we will use the following:
source : "s3://redshift-worldwide-events/",
format : "opencypher",
iamRoleArn :
region : "us-east-1",
failOnError : "FALSE",
parallelism : "MEDIUM",
updateSingleCardinalityProperties : "FALSE",
queueRequest : "FALSE"
This will output a loadid
in the payload.
Then we can check the load status, by using the loader get status request, replacing your neptune endpoint
, port
and loadId
in the command: curl -G https://your-neptune-endpoint:port/loader/loadId
If successful, you should see an output similar to the payload below. This returns one or more loader feed codes. If the load was successful you should see only a LOAD_COMPLETED code.
If there is an issue with one or both csvs then you may see a LOAD_FAILED code or one of the other codes listed here. In the next section, we will investigate some options to diagnose the errors. Also if one of the loads is still in progress, you will see a LOAD_IN_PROGRESS key with the value corresponding to the number of S3 object loads which are still in progress. Running the curl command to check the load status again, should hopefully update the code to LOAD_COMPLETED or one of the error codes, if there was an error.
Check that you can access some data by submitting an openCypher query to the openCypher HTTPS endpoint using curl as explained in the docs. In this case, we will just return a single pair of connected nodes from the database by passing the query MATCH (n)-[r]-(p) RETURN n,r,p LIMIT 1
as the value to the query attribute as in the screenshot below.
Note the endpoint is in the format HTTPS://(cluster endpoint):(the port number)/openCypher
. Your cluster endpoint will be different to mine in the screenshot below, so you will need to copy it from the Neptune dashboard for the database cluster identifier.
Debugging Neptune data load errors
Running the check loader status command can sometime return errors. To further diagnose the error logs, we can run this curl command with additional query parameters replacing neptune-endpoint, port and loadid with your values. This will give a more detailed response with an errorLogs object listing the errors encountered as shown in the screenshot below. Here, the load failed because some of the node ids in the edge records in the relationship csv file were missing in the node csv file.
The next screenshot below shows a cardinality violation error because some of the edge record ids in the original data are duplicated.
We can also reset the db and remove any existing data in it by using the magic command %db_reset
. This will prompt you to
tick an acknowledgement option and click Delete. You will then get a checking status check
. Wait for this to complete and the you should get a database has been reset
message when it is complete.
We are now setup for running more complex queries to generate insights from our data. Part 2 of this blog will run a number of openCypher queries to explore the property graph containing the model of the worldwide events network
Featured ones: