Featured

Postgres CDC with Redpanda in Six Easy Pieces, Part 1.0

Several lives ago, I spent a lot of time doing batch ETL. I got pretty good at it, in fact. But we discovered that it is inherently bad. Not so bad that you should avoid it at all costs, but just bad enough that it will introduce issues over time that are really hard to isolate, never mind the burden it puts on the sources system. One day I’ll write about the problems with using an incrementing ID or a “last touch date” will cause you to miss records eventually. But not today. Today is about a more foolproof method that ends up being much easier to set up anyway.

Enter Change Data Capture, or CDC. You can’t hardly swing a dead cat without finding a few hundred articles on CDC, so I won’t bore you with a long explanation of what it is and why it’s important; I’ll distill it down to one sentence: CDC is a real time stream of all the stuff that happened to your database, in sequence, without any additional load being placed on the source system. Inserts, updates, deletes, schema changes, it’s all in that CDC stream. That leaves just two questions: (1) how do I create & populate this stream, and (2) how do I consume it? We will answer the question of how to create & populate the CDC stream right here and now. How to consume it will be covered in Part 2.0+.

But Why Redpanda?

If you’re at all familiar with change data capture, you probably know that the usual reference architecture centers around Apache Kafka to hold the stream of change records generated by the database. It’s a tried and true design, but it also arguably uses a bigger footprint than is necessary since you need to also run a Zookeeper cluster to keep track of your Kafka cluster. Eliminating Zookeeper from Kafka has been on the roadmap for some time, but it’s still a dependency as of January 2023. Apache Kafka is also JVM-based, which is great if you know Java and want to be a committer, but maybe not the optimal choice if you need to maximize performance. But despite all that, Apache Kafka is ubiquitous, and not for any bad reasons.

At the risk of minimizing what Redpanda is, for all intents and purposes it is simply Kafka. Interacting with Redpanda, you wouldn’t know it isn’t Kafka; same functionality & same API, but without the need for Zookeeper. That it uses the same API means that your existing tooling will play just as nicely with Redpanda as they do with Apache Kafka. It is written in C++ as a single binary which natively includes things like a schema registry to simplify your deployment. And being written in C++ over Java, Redpanda shows significantly improved performance metrics over “legacy” Kafka. But lest I sound like a shill for Redpanda, the truth is that I wanted to spend some time with a new toy and it is easy to get up an running so that is “why Redpanda.”

The Setup

As per usual, we’re going to make heavy use of Docker, so you’ll want to be sure you’ve got that installed & working. Conventional wisdom would also dictate that you use Docker compose here, but I’m going to break ranks and spin up the necessary components individually. It would be remiss of me to not mention that much of this is spelled out across various Redpanda blog posts and documentation. But credit where credit is due, if you follow their blog posts to the letter, it won’t work (docker commands are missing some requirements and some incorrect ports are referenced).

Setup Step 1: Docker Network

First things first, we need to create a docker network to allow our containers to talk to one another. We’ll call it rp because after all, this is all about using Redpanda. Every container we spin up will need to specify this network.

docker network create rp

Setup Step 2: Redpanda (aka Kafka)

Next we need to spin up a Redpanda “cluster.” It’s only going to be one node so it’s a little disingenuous to call it a cluster, but the point here isn’t to prove out the viability of distributed computing; the point is to demonstrate a CDC data pipeline using Redpanda. More sophisticated deployment options can be found in Redpanda’s docs.

docker run -d --rm --name=redpanda \
--net rp \
-p 9092:9092 \
docker.redpanda.com/vectorized/redpanda:latest \
redpanda start \
--set redpanda.auto_create_topics_enabled=true \
--overprovisioned \
--smp 1  \
--memory 1G \
--node-id 0 \
--reserve-memory 0M \
--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 \
--advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092

Setup Step 3: Redpanda Console

One of the common complaints around Kafka is something called “Kafka blindness.” Out of the box, Kafka doesn’t provide a UI to allow visibility into topics, messages, offsets, etc. There are plenty of tools you can use to restore that visibility, but Redpanda provides their Console tool to cover that gap. For more details and options, the Redpanda docs are going to be your bestie.

Readers of this blog will recall that I love Apache Nifi, which happens to run on port 8080. The Redpanda Console also runs on 8080, so we’ll remap that to 7777 just in case we want to use Nifi later on (spoiler alert?).

docker run -d --rm --name console \
--net rp \
-e KAFKA_BROKERS=redpanda:29092 \
-p 7777:8080 \
docker.redpanda.com/vectorized/console:latest

Once this container is up, point your browser to http://localhost:7777 (or whatever port you used) to verify you can get to the Redpanda console. Since we haven’t created any topics or published any messages, this may leave you with a slightly empty feeling. Welcome to post-modernism.

Setup Step 4: Postgres

We’ll obviously need a database to source some changes for our stream, so why not Postgres? In order for Postgres to allow for streaming its change log, we need to configure the Postgres write-ahead log (WAL) for logical replication. Put the below config into a new file called pgwal.conf saved in your current working directory. When we spin up our Postgres container, we will mount this file as a volume that the database will use for its configuration. Of course you could similarly modify the .conf file of an existing database and be up and running as well.

# CONNECTION
listen_addresses = '*'

# REPLICATION
wal_level = logical
max_wal_senders = 4
max_replication_slots = 4

Feel free to change your Postgres user password if you’re into all that security stuff.

docker run -d --rm --name pg \
--net rp \
-v $PWD/pgwal.conf:/usr/share/postgresql/postgresql.conf.sample \
-e POSTGRES_PASSWORD=admin \
-p 5432:5432 \
postgres:latest

Setup Step 5: Kafka Connect/Debezium

Here is the first time we break from Redpanda components for our Kafka tools. Kafka Connect with Debezium is my favorite way to stream a database change log. Once configured, it will find your database (and optionally, specific tables) and start streaming the Postgres CDC stream into a Redpanda topic. We’re going to leverage Confluent’s Kafka Connect docker image, with one small modification to install the Debezium Postgres connector. Create a new file called dockerfile in your current working directory.

FROM confluentinc/cp-kafka-connect-base:latest

RUN confluent-hub install --no-prompt debezium/debezium-connector-postgresql:latest

And then build a new Docker image from this dockerfile:

docker build --no-cache . -t kconnect-debezium:1.0

Once we have that docker image built, you simply won’t be able to stop yourself from spinning up a so-imaged container. I certainly won’t stand in your way.

docker run -it -d --rm \
--name connect \
--net rp \
-p 8083:8083 \
-e CONNECT_BOOTSTRAP_SERVERS=redpanda:29092 \
-e CONNECT_REST_PORT=8082 \
-e CONNECT_GROUP_ID="1" \
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_CONFIG_STORAGE_TOPIC="cdc-config-storage" \
-e CONNECT_OFFSET_STORAGE_TOPIC="cdc-offsets-storage" \
-e CONNECT_STATUS_STORAGE_TOPIC="cdc-status-storage" \
-e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_REST_ADVERTISED_HOST_NAME="connect" \
kconnect-debezium:1.0

These environment variables are pretty much standard settings, with the exception of the three “storage topic” items. Those will be the names of the administrative topics that hold the CDC metadata.

Setup Step 6: Register a Postgres/Debezium connector

In order for Kafka Connect to do something (which is sort of the point after all) we need to first register a Debezium connector. It primarily consists of some database connectivity parameters, and some options around what databases, tables, etc you want to stream changes for. But that’s just the surface. You can do some crazy stuff with Debezium, like conditional row filtering, column transformations and other ETL-esque tasks. But don’t take my word for it, see what Debezium has to say about it.

This particular connector is set to point to our Postgres container, which we named “pg,” which is what we use for the database.hostname parameter. Since this is just a demonstration of the possible, we’ll be using the public schema under the postgres database, and authenticating as the postgres user. The name & database.server.name can be whatever you want. Regardless, save this JSON configuration into a file in your current working directory and name it pg-connector.json

{
   "name":"cdc-connector",
   "config":{
      "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname":"pg",
      "plugin.name":"pgoutput",
      "tasks.max": "1",
      "database.port":"5432",
      "database.user":"postgres",
      "database.password":"admin",
      "database.dbname":"postgres",
      "schema.include.list":"public",
      "database.server.name":"test-cdc"
   }
}

To actually register the connector, we’ll pass that JSON file to our Kafka Connect container through a REST endpoint:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @pg-connector.json

which should return a confirmation with a 201 response and the JSON connector you registered:

Optional: Debezium UI

Issuing curls might appeal to your CICD sensibilities, but sometimes it’s nice to have a UI to work within. Good news, Debezium now provides a web based interface for you to view & manage your connectors, which you can run as a docker container because why not?

docker run -d --rm --name debezium-ui \
--net rp \
-p 7778:8080 \
-e KAFKA_CONNECT_URIS=http://connect:8083 \
debezium/debezium-ui:latest

Pointing your browser to http://localhost:7778 (since it also wants to run on 8080) and you should see your connector with some metrics and current status.

now back to our story…

At this point Kafka Connect will have created the three topics we specified when we stood up the Kafka Connect container. Kafka-bros will want to flex on their knowledge of the Kafka API spec, but the the true Redpanda Chad will just refresh the Redpanda Console to get the same info with less keystrokes.

From here, we are ready to start streaming CDC logs off Postgres over to Redpanda. So take a sip off your Mtn Dew and get ready to do something cool.

Make CDC, not War

In a production application you probably already have hundreds of tables, but in our POC we have a blank slate. What we need to do is create some tables and insert some rows. If you have your favorite data generator, that would work too. But let’s not worry about that, you want to see it work NOW. I’m partial to Dbeaver, but whatever tool you prefer, run these SQL statements in your Postgres container.

create table postgres.public.test_table
(id int
,ts timestamp default now());

create table postgres.public.test_table_the_second
(id int
,ts timestamp default now());

insert into test_table (id) values (100);
insert into test_table_the_second (id) values(200);

Immediately, you should see two new topics show up in your Redpanda Console. I stress immediately, because this is a real time solution as opposed to a batch or even micro-batch, which was what we initially set out to improve upon.

These topics are named for the individual tables that we inserted data into, prefixed with “test-cdc” which the astute reader will recall is what we set in the database.server.name when we registered our Debezium connector. Clicking on one of those topics will allow us to see the actual CDC payloads. Those payloads are rich with information about whatever happened in the database. All of it is interesting in its own right, but of particular interest to us is the payload.before & payload.after items. This shows us what the row looked like prior to the CDC event, and what it looked like after. In this case it was an insert, so the before is NULL. The complete documentation on the payload contents can be found on debezium’s site.

The most important takeaway from this is that we have a complete log of database activity. It’s not compromised by multiple sessions, stale commits, cached sequence values, multiple mid-batch updates, schema changes, or any of the other overlooked use cases that plague batch ETL.

At the same time, the Redpanda console is pretty powerful, allowing you to configure retention policies on your topics, filtering messages to aid with troubleshooting, or even injecting messages directly into the topic without the hassle of building a producer or knowing the API syntax. The schema registry & Kafka Connect UI is also accessible here with a little bit of configuration, which we’ll leave as a topic for another time.

Great, So What’s Next?

These topics contain a sequenced queue of all the things that happened in the database. What you do with that is where the fun really starts. Do you want to use it to drive some event-based architecture? Or perhaps you want to query the stream in real time to identify trending behaviors in real time? Maybe push to a cloud object storage as part of your data lake? Or consume these messages and populate your Snowflake data warehouse? In part 2.0 and beyond of this series, I will explore some of these destinations. Maybe there’s something particularly cute you’d like to see done with a CDC stream? Holla back in the comments. The good news is that we’ve already done the hard part. Treat yourself to another Mtn Dew (I know I will).

Featured

The Time I Accidentally my Entire NiFi

Obligatory “this didn’t happen yesterday but let’s pretend it did because the faux recency gives my thoughts on the topic more gravity.” So I was working on a project/presentation for a job interview that included building out a decently complex flow in my favorite tool, Apache NiFi (sidebar: can the branding police please impose their will on the NiFi community? It’s “NiFi” on the Apache page but “nifi” in the actual UI. Won’t somebody think of the children?). Well, the flow was either complex, or I was green in my NiFi journey so every step was treacherous and wrought with googling. Come to think of it, little has changed. Regardless, it was complex enough that if I needed to re-create it, it would have been a challenge.

Of course you know where this is heading. My NiFi container crashed, and of course I hadn’t attached a volume so when the container went down, so did my hopes and dreams. I had some portions saved as templates, but some key pieces were missing, and I had no choice but to recreate them. But this isn’t the only problem I had. I also faced the problem of continually making changes to the flow, but often times wishing I hadn’t and wishing I could roll back those changes. Spoiler alert: I could have, if I had read this blog post back then.

NiFi Registry has entered the chat

It turns out there is a wonderful little tool called NiFi Registry that allows you to put process groups under version control, and integrates so cleanly with the NiFi UI that you can literally commit & rollback changes directly from the canvas faster than you can say ‘buncha munchy crunchy carrots.’ Let’s see how!

As usual, we’re going to spin up some Docker containers to get at these tools. In this case, we’ll have a container for NiFi and a second container for NiFi registry. There’s more than one way to attack this problem, and doing so a few different ways will give us a chance to explore some Docker features I haven’t got to use a whole lot yet.

NOTE: we’ll be using a slightly older version the NiFi docker image (tag 1.13.2) to avoid some security stuff that will only muddy the waters here. At the end I’ll touch on how to use the latest version.

The easy way!

The easy way is to just spin up each container from the command line as follows

docker run -d --name nifi -p 8080:8080 apache/nifi:1.13.2
docker run -d --name nifireg -p 18080:18080 apache/nifi-registry

These two containers don’t really know about each other, so the only way they can communicate is by way of the IP address assigned to the container. In our case, NiFi will need to know the IP address of the NiFi registry.

docker inspect nifireg | grep IPAddress

This address will be part of the default docker network that gets created when a container is first spun up. It’s usually of the form 172.17.0.x, but it’s always a private IP address. We’ll use this address here in a minute. The problem with this way is that if the container ever had to restart, the IP address could change, and that might break stuff.

The better way!!

The better way is to create a user-defined docker network, and have each container join that network when they spin up. Why is this better? Glad you asked. It’s better because containers which are on a user-defined network are able to communicate with each other through their name property. You know that --name switch? That acts like a hostname here. If you use the default network, this feature is not available. Fortunately creating a docker network is incredibly simple. I hope you’re sitting down.

docker network create nifi-net

And when we spin up the containers we’ll need to make sure they join the network, which they won’t do by default.

docker run -d --name nifi --net nifi-net -p 8080:8080 apache/nifi:1.13.2
docker run -d --name nifireg --net nifi-net -p 18080:18080 apache/nifi-registry

The only difference here is the addition of the --net <network name> switch. That’s literally all there is to it. From here on out, either container can communicate with the other container by their docker names, never again worrying about an IP address changing behind the scenes. So you can close that tab you opened about DNS and spin down your load balancer.

The best way!!!

The best way is to build on the docker networking we just learned about, but also to introduce reusability. Rather than remembering the exact set of switches you used and which image tag you used and how you set the environment variables, it might be nice if that were packaged up somehow in a reusable way.

Docker compose has entered the chat

Docker compose is a configuration file that tells Docker how you want to create containers (yes, multiple containers if you want) so that you can simply refer to this config whenever you want to spin something up or down. What docker compose can do is just about limitless. What we’re going to do here is about as simple as it gets, but sometimes that’s all you need. The other thing about docker compose is that it establishes a user-defined network by default, so you don’t even have to think about that. It’s like long division. You don’t need to know how to do it, but you need to appreciate that it exists.

It’s a YAML file that describes the desired parameters for each container. Again, this only scratches the surface, but you should be able to connect what we’ve done on the command line to what we have in the docker compose file. It’s worth mentioning that the level immediately below services corresponds to the --name switch on the command line, so nifireg and nifi are the container names.

version: '3'
services:
    nifireg:
      image: apache/nifi-registry
      ports:
        - 18080:18080

    nifi:
      image: apache/nifi:1.13.2
      ports:
        - 8080:8080

Save that as nifi-stack.yml and get ready to have fun by asking docker compose to bring up the stack.

docker compose -f nifi-stack.yml up

Immediately you’ll see logs fill your screen as docker compose follows your instructions and spins up those containers exactly as you desired.

Setting Up Nifi Registry Buckets

Give it a minute or two for NiFi to fire up, and eventually you’ll be able to get to the UI at http://localhost:8080/nifi and also the NiFi Registry UI at http://localhost:18080/nifi-registry. Seems like a really good time to open up that UI.

NiFi Registry is, for all intents and purposes, the github for dummies for your NiFi flows. The versions are stored in buckets in NiFi Registry, so let’s create a bucket. Click on the wrench in the upper right…

Then click on NEW BUCKET…

And call it whatever you like. I called mine Electricboogaloo Versioned Flows, because I like to stay on-brand whenever possible. There are 2 checkboxes in that dialog, you can leave both unchecked. At this point there is very little to see, so let’s move over to NiFi.

NiFi Registry Controller Service

Like just about every other external communication in NiFi, it is managed through a controller service. Under the “hamburger” in the upper right, go to Controller Settings.

From there we will need to add a new registry client by clicking on the Registry Clients tab, and then clicking the (+) button.

The name is how you’ll refer to it later, and could really be important if you end up with multiple NiFi registries. The URL will depend on how you spun up your containers. If you used the easy way, you’ll supply the IP address of the NiFi Registry container. If you created your own network or used docker compose, then you can use the name of the registry container, and remain insulated from any IP address changes.

Ok, now what?

Using NiFi Registry (finally)

At long last we finally get to actually use all the tools we took all this time to set up. The first thing to be aware of is that you can only version control process groups. So if you’re the type to put lots of processors on the root level canvas, you’re going to have a bad time. Learn to love processor groups. In fact, put one on the canvas now and name it whatever you like. Then right-click on it and check out something brand spanking new.

That was added when you created the registry client service. Now is no time to be shy, go ahead and click on it already.

Your registry service should appear in the top box, and the bucket you created earlier should show up in the bucket box. If you don’t see those two things, here are your troubleshooting steps:

  • Did you correctly type the URL when you created the registry client? http, not https, and the port is 18080
  • Did you get the name of the registry container correct? nifireg if you used my code.
  • If you did not use docker compose, did you set up the docker network? If not, you have to use the IP address

Full disclosure, in the course of writing this my version control dialog did not populate with my bucket. Turns out that after doing all this a zillion times, the containers I currently have up are the ones that use the default network, so using nifireg in the registry service URL would not work. Because of course.

From there, choose a name for your flow. You can put whatever you like in the description, and the Version Comments box is your commit message that will show up in the bucket. So hit save and flip back over to NiFi Registry, and click on the “NiFi Registry” header which is secretly a link. Yes, this navigation could be better. Be a committer on the project, what’s stopping you?

There you can see we have one version of Electricboogaloo Flow 1, and I did the initial commit after the kids went to bed.

Let’s do something simple in our process group, like generate a flow file, terminate on success, and hit apply. Now use the breadcrumb trail in the lower left to go back to the top level canvas. Right click on your process group to see even more new stuff!

You can commit those changes, revert them, or even show what those changes were. In the interest of time let’s commit those local changes. It is left as an exercise for the reader to first see the local uncommitted changes and then post-commit flip over to the registry & see that second commit.

Now lets roll things back. Right click on your process group one more time, and under Version you’ll see an option to change version, which brings up this dialog featuring the commit history for this flow.

Click on Version 1 (initial commit) and click CHANGE. Give it a second to work its magic (which could take a little bit if there were a lot of processors in the group), and eventually it will return control to you. Click into that process group and you’ll find that it looks like it did on the initial commit. Of course you can now just as easily change to the most recent version or any version in between.

It is important to note that each process group will have it’s own versioned flow. That is, they can all be in one NiFi Registry bucket, but each flow will be versioned independently. So if you have 15 process groups on your canvas, each process group would be committed/rolled back independently. You can go full Inception and have versioned process groups within versioned process groups, but at some point you begin to flirt with unstructured version space and limbo and the last thing anybody needs is Christopher Nolan’s take on SDLC.

The last thing I’d like to demonstrate is how to use the registry to import specific versions directly to the canvas. If you add a new process group to the canvas, you’ll notice a little import button that wasn’t there yesterday.

Clicking on it brings up a familiar dialog, featuring all your buckets and flows. So you can simultaneously have multiple versions side by side on your canvas. That’s probably really handy for some reason. Let me know how you work this to your advantage!

The Denouement

Thus concludes our tour through the NiFi Registry. It’s simple to set up, and simple to use. So simple that there is no excuse for not using it, ever. Well, that is, once you know it exists and all that. I was more or less unaware at the time, and it almost cost me. I ended up rebuilding my flows from scratch, and I nailed the interview. As far as the job offer goes, that’s a fun story for another time when I’m a little further removed from the reality of it all. This story does have a happy ending, however.

Oh, I almost forgot. Once you’re done with those containers, docker compose makes it super easy to tear everything back down:

docker compose -f nifi-stack.yml down

Thanks again for your time, gentle reader. Hopefully you find this useful and can avoid accidentally’ing your entire NiFi flow. Let me know if you have any questions or if there’s a topic you’d like me to cover. It gets lonely in the comments.

P.S. Somebody tell Mr. Perkins from freshman english that I finally used “denouement” in a semi-professional context.

Eplilogue

I promised to show how to deal with the latest nifi docker image and the default enabled user security, and now I will make good on that promise.

Beginning with version 1.14, user authentication is enabled by default. There are tons of things it supports, but the main objective is to make it work. If you just pretend nothing is different, it’ll break because secure nifi runs on port 8443. So you’ll need to change the exposed port. Then you’ll realize it’s no longer http, but https. Then you’ll discover that it uses a self-signed certificate and you have to “proceed to the unsafe website,” only to discover that you need a username and password.

For the love. If you scroll back (or simply grep) through the docker logs, you’ll find a section for the generated username and password that it randomly sets up for you.

You can use that hot mess to log in, OR you can specify your own when you spin up the container using some environment variables. Note that the password must be 12 characters or it will silently not use your supplied credentials and you’ll be left wondering why you can’t log in. Here’s what it looks like in a docker compose file:

version: '3'
  services:
    nifireg:
      image: apache/nifi-registry
      ports:
        - 18080:18080

    nifi:
      image: apache/nifi:latest
      ports:
        - 8443:8443
      environment:
        - SINGLE_USER_CREDENTIALS_USERNAME=admin
        - SINGLE_USER_CREDENTIALS_PASSWORD=yourpass1234

The interaction between NiFi and the registry remains the same. NiFi now lives on port 8443, and you’ll see a new user authentication screen in the NiFi UI . I would anticipate NiFi Registry adding similar user authentication in an upcoming release, and expect it to work similarly.

Until next time…

Featured

The Time I Wanted to Rekognize Your Face

It’s no big secret that facial recognition is totally a thing these days, and it’s probably also not a big secret that cloud vendors like AWS have services that can make facial recognition amazingly easy to implement. So let’s go ahead and see just how easy it is, ok? There are plenty of tutorials out there for using the basic features of AWS Rekognition, so if I am going to the effort to write up something, it needs to cover ground that hasn’t already been trampled. In my mind, the point of facial recognition is to be able to tell you the name of somebody in a picture, and that seems to be somewhat uncharted territory in the world of blogs & tutorials.

Maybe we should build a small web app that uses your web cam to see if the person at the keyboard is someone we know, and what their name is. You could easily extend this to facial recognition for authentication, or anything that requires you to know who is in a picture. I’m just demonstrating the possible, you’re going to have to come up with the ideas for how to use it.

I should probably take a moment to reflect on the vast amounts of development & machine learning that is required to enable facial recognition. But I won’t because this isn’t a blog about how to build a facial recognition system from scratch, it’s about how to build something that leverages existing facial recognition tools. There is no shame in standing on the shoulders of giants, and there is no shame in not knowing how facial recognition works. I mean, nobody knows how magnets work, either. So let’s just get started.

Overview

At a high level, we’re going to need a few things, in no particular order:

Reference architecture for our application

Creating an S3 bucket for our images

Rekognition is a powerful thing, but it has to have something to compare against, and that thing is a bucket of S3 images. Create an S3 bucket in your AWS account (be aware of what region you create it in, it will matter later on). I called mine cnelson-facial-recognition. Remember bucket names have to be globally unique. This bucket will not require public access, so leave it blocked.

We’ll also need to allow Rekognition to have read access to that bucket, so we’ll have to add a bucket policy to allow it. From the AWS console, go into the bucket and open the Permissions tab. Go to the Bucket policy section and edit it with the following policy, being sure to use the ARN for your bucket instead of mine.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AWSRekognitionS3Bucket",
            "Effect": "Allow",
            "Principal": {
                "Service": "rekognition.amazonaws.com"
            },
            "Action": "*",
            "Resource": "arn:aws:s3:::cnelson-facial-recognition"
        },
        {
            "Sid": "AWSRekognitionS3BucketObjects",
            "Effect": "Allow",
            "Principal": {
                "Service": "rekognition.amazonaws.com"
            },
            "Action": "*",
            "Resource": "arn:aws:s3:::cnelson-facial-recognition/*"
        }
    ]
}

As a best practice (principle of least privilege), the permissions on this bucket should be tightened to allow just the necessary access, but that is left as an exercise for the reader.

We’re going to want to configure an event & listener for new uploads to our bucket, and while we could configure it from the S3 console, it ends up being somewhat easier to do it from the Lambda console. But let’s just hang onto that for the time being.

DynamoDB for image metadata

The S3 bucket will store the reference images, but we will need some way to understand & track just who exactly those people are. Rekognition is really good at identifying faces in a picture, and comparing faces in one picture to faces in another, but it is downright stupid in terms of knowing who a face belongs to (unless you’re using the celebrity detection feature and you and your friends all happen to be celebrities).

When Rekognition processes an image, it identifies the face and returns a Face ID that can store and use as a reference later on. From the AWS console, let’s create a DynamoDB table called faces, and the primary key of face_id, since that will be how we look up our faces later. The great thing about DynamoDB is that we don’t need to decide very many things about our data right now, other than the table name and the primary key. We could easily store this in a relational database, but DynamoDB is serverless so we don’t have to do anything in the way of provisioning services, is really fast for key/value lookups like what we will need, and is free for the scale at which we’re going to need.

You can do this from the console, but sometimes it’s just as easy to do it from the CLI:

aws dynamodb create-table \
    --table-name faces2 \
    --attribute-definitions \
        AttributeName=face_id,AttributeType=S  \
    --key-schema AttributeName=face_id,KeyType=HASH \
    --billing-mode PAY_PER_REQEUST

Setup Rekognition

Rekognition is also a serverless service, so there is nothing to provision. We will, however, have to set up some of the underpinnings to allow it to work for us. And the bad news for you, gentle reader, is that the AWS console doesn’t give you much in the way of help for setting these pieces up. Away we go to the command line!

Rekognition works by looking at faces that are stored in a collection, and when you actually use Rekognition you will need to tell it which collection to work against. Setting up a collection is easy from the CLI. The truly interested reader may find the Amazon Rekognition Developer Guide very useful here. In a flash of creativity, I called my collection “Collection” because yo dawg.

aws rekognition create-collection \
    --collection-id Collection

Once the collection has been created, we’ll need to add some faces to it. We could do this “the hard way” by writing a little python to upload images to S3 & then invoke a Rekognition API to process those images. But this isn’t 2019; we can do this in a much more cloud-native way.

Lambda has entered the chat.

Lambda functions & S3 event triggers

Once our image has been uploaded to S3, we know we want to do two things with it:

  • process the image through Recognize
  • add an entry to DynamoDB that will connect the Face ID to the person’s name

If we configure an event trigger on our S3 bucket, we can use it to fire a Lambda function to do all that stuff whenever a new image is uploaded. The facial recognition stuff is cool, but this sort of automation flow is my jam.

From the Lambda console, we’ll need to create a new function called addFaces with a Runtime of Python 3.8. Under permissions, you will want to let it create a new role with basic Lambda permissions. This gives it rights to log to CloudWatch, which allows you can see what your Lambda is doing once it’s up and running. We won’t need to deploy this into a VPC since none of the services we’re using are in a VPC. Once created, you’ll be taken to the configuration tab.

You can see above that I have already configured an S3 trigger, but to do so from the Lambda console is quite easy. Just click add trigger, choose S3 as the trigger source, select your bucket, and use “All object create events” as the event type. You can be more selective with object prefixes & suffixed, but we won’t need any bucket filtering.

Also note that it indicates Lambda will add the necessary permissions for the S3 event to invoke your Lambda function, so that’s one less thing to worry about.

The code for this Lambda can be found on my GitHub, and can be simply copied/pasted into the Lambda console. (or visit this article where I use the CLI to upload code to a Lambda)

git clone https://github.com/supahcraig/facial_recognition.git

This Lambda breaks down into three main components, S3 operations, Rekognition operations, and DynamoDB operations. The event passed into the Lambda contains most of the information we’ll need to process the image, particularly the bucket name and the key, which is the name of the object in the bucket (which most humans would call a file, but I hear they revoke AWS certifications if you were to do such a thing).

Spoiler alert, we’re going to pass the name of the person in the photo as a metadata attribute when we upload the image to S3. Unfortunately, the event does not include the object metadata, so we have to make a call to head_object to get that information. User metadata attributes must be prefixed with x-amz-meta-, which is why our name attribute is found as x-amz-meta-name.

bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
    
response = s3.head_object(Bucket=bucket, Key=key)
name = response['ResponseMetadata']['HTTPHeaders']['x-amz-meta-name']

To pull the face info out of the picture and add it to the faces collection we created earlier, we will use the Rekognition method index_faces. Now technically this returns a list of faces, but we’re going to make the bold assumption that we only have one face in our reference image. Otherwise it gets complicated mapping faces to names, and this seems like a sensible corner to cut, given our purposes. Rekognition returns an ID assigned to the face it found in your image.

	# identify the faces & add them to the collection
    response = client.index_faces(CollectionId=collection_id,
                                Image={'S3Object': {'Bucket': bucket, 'Name': key}},
                                ExternalImageId = key,
                                MaxFaces = 1,
                                QualityFilter = "AUTO",
                                DetectionAttributes = ['ALL'])

    print('Results for ' + key)

    faceID = ''
	
    # we're making a bold assumption that only one face is present in the photo
    for faceRecord in response['FaceRecords']:
        print('Face ID =  {}'.format(faceRecord['Face']['FaceId']))
        faceID = faceRecord['Face']['FaceId']

The final step in the process is to take that Face ID and persist it in our DynamoDB table along with the name we grabbed from the metadata.

    table = dynamodb.Table('faces')
    table.put_item(TableName="faces", Item={'face_id': faceId, 'name': name})

Lambda Permisssions

In order for our Lambda to actually be allowed to do all the stuff we just told it to do, we’re going to need to grant some permissions. When you created the Lambda, we told it to create a new IAM role for us. We’re going to need to attach a few policies to that role, but first we need to create one for S3 (yes, we could just attach S3 Full Access, but let’s prove we know how to do this the right way). From the IAM console, create a new policy at allows read permissions on our S3 bucket. If the metadata were contained in the S3 event, we wouldn’t need this specific policy; that is the only functionality it allows for.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::cnelson-facial-recognition",
                "arn:aws:s3:::cnelson-facial-recognition/*"
            ]
        }
    ]

Next, find the role created by your Lambda and attach 3 policies:

  • the policy you just created above
  • AmazonDynamoDBFullAccess
    • our Lambda needs to rights to put an item into a specific table; we could write a custom policy that ensures just this Lambda can only put items into one specific table, but this article is already long enough
  • AmazonRekognitionFullAccess
    • again, a best practice would be to use the policy of least privilege to allow minimal function to our Lambda alone

Upload an image

We set up some really tricky cloud stuff back there, it probably makes sense to try it out. Take an image of your beautiful face, and upload it to S3…….and be sure to specify your name as a metadata item so all that Lambda & DynamoDB magic will work correctly.

aws s3 cp dfv_face.jpg s3://cnelson-facial-recognition --metadata "name=Deep F. Value"

If all went well, you should see a new object in your S3 bucket, and an item in your DyamoDB table. Go ahead, check. I’ll wait. Note that the metadata item was called “name” when I made the CLI call to cp. AWS automatically appends the “x-amz-meta-” prefix to avoid conflict with other data in the header when the API call is actually made. So nice, those AWS fellas.

Ok, exhale. We did a lot, but it was all just laying the framework for what we actually want to do, which is make it work from a web front end. So do a lap around the house, grab you a Code Red and let’s get this across the finish line!

Set up Cognito for authentication

It’s time to peel another layer of the onion here. Our web page (which we haven’t even seen yet) is going to need to call some AWS services, which can be done in many ways. For the purposes of our POC, the way we’re going to do it is through client-side Javascript via the AWS SDK. This means we’re going to need the clients to authenticate somehow. AWS Cognito to the rescue! Congnito is a service that allows for users & applications to securely control authentication & limit what resources someone or something has access to. We will be setting up a Congito identity pool to allow our app access to the minimal set of services required to function. The principles are the same as with IAM, but how we control it is slightly different.

AWS has an excellent guide for setting up Cognito for a very similar purpose (the example has them building a web interface to work with Amazon Polly), but we’ll go through the steps here as well, except instead we’ll use Rekognition. From the AWS console, navigate to Cognito and create a new identity pool. Give it a lovely name, and be sure to check “Enable access to unauthenticated identities” and then click create.

From there you will have the opportunity to create roles which are necessary for Cognito to function. Make a note of the role names, because we will be attaching some policies to them before long. Click allow.

Next you will be given a snippet of code (change the platform to JavaScript) which includes your Cognito credentials your app will use to invoke the APIs through the SDK. Be sure to grab those, you’re going to need them soon.

Can you feel it? We’re *this* close to actually doing something useful.

Attach IAM policies to Cognito role

As it sits, Cognito won’t let you do much, so we will need to give it some privileges in the form of attaching IAM policies to the roles that were created when we built our identity pool earlier. Navigate to IAM, Roles, and search for the “unauth” role created above. Clicking into it gives you the summary for that IAM Role.

One inline policy already exists, which you may have seen when you were creating the identity pool. We now need to add policies to allow Cognito to call the services to make our thing do it’s thing. Specifically Rekognition and DynamoDB. As a best practice, you should only give the minimal set of permissions, but sometimes we’re lazy and we want to use a pre-existing policy that gets the job done. This is one of those times. Attach AmazonRekognitionFullAccess and AmazonDyanamoDBReadOnlyAccess. This will allow our app to do just about anything on Rekognition, and read from a DynamoDB table.

Again, it is left as an exercise to the reader to tighten the permissions to read just the faces table in DynamoDB, and to define the minimal set of permissions & collections for Rekognition.

Deploy the web application

Just as this isn’t an essay on how facial recognition works, this is also not a time to dive into the finer points of JavaScript & web development. I think it is evident I’m not a web developer. As such, I found a lovely writeup on how to enable the camera in a web page and leaned hard on the code found there to capture the image. Really what it comes down to is opening the camera, and then capturing a snapshot from the camera stream. You can find the code for the webpage & css on my GitHub.

git clone https://github.com/supahcraig/facial_recognition.git

Aside from the HTML, let’s break down the JavaScript that powers these particular fun and games. First up is Cognito. We set up Cognito earlier and it gave us some sample code, which we utilize here. You should put your Cognito identity pool ID in the placeholder spot. Any calls to AWS from this page will authenticate using these Cognito credentials.

AWS.config.region = 'us-east-1';
AWS.config.credentials = new AWS.CognitoIdentityCredentials({
    IdentityPoolId: 'us-east-1:YOUR-COGNITO-ID-HERE'
});

The next interesting part is where we grab the image and then turn it into a Blob which we can send to Rekognition. At the same time we’re building up a small parameters object that we’ll use in that call to Rekognition. I’m including the conversion to Blob here because I found other people struggling to do this while I was trying to figure it out.

function rekognize(imgSrc) {

    var params = {
        CollectionId: "Collection",
        Image: {
            Bytes: getBinary(imgSrc)
        }
    };
function getBinary(encodedFile) {
    var base64Image = encodedFile.split("data:image/png;base64,")[1];
    var binaryImg = atob(base64Image);
    var length = binaryImg.length;
    var ab = new ArrayBuffer(length);
    var ua = new Uint8Array(ab);

    for (var i = 0; i < length; i++) {
        ua[i] = binaryImg.charCodeAt(i);
    };

    var blob = new Blob([ab], {
        type: "image/png"
    });

    return ab;
};

From there we can actually invoke Rekognition’s search_faces_by_image method, which accepts a collection and an image, either in the form of an S3 location or a bytestream or Blob. If it returns successfully, the response is an object which contains an array of face match information. In our simplistic world, we’re assuming just one match comes back, so we only look at the first element in the array, and grab the FaceID. It also comes back with a confidence score(and a bunch of other information you might find handy), which would allow you to determine just how good the match was. Of course the API is much more sophisticated than what I’ve shown here, it is definitely worth a read to see what all is possible.

Once we have the matched Face ID, we call our getName function in async mode to look up the name in our DynamoDB table, to ensure the execution waits for a response from the database. Once we have the name looked up, we make update the DOM to include a welcome message to the recognized user.

function rekognize(imgSrc) {

    var params = {
        CollectionId: "Collection",
        Image: {
            Bytes: getBinary(imgSrc)
        }
    };

    rekognition.searchFacesByImage(params, function(err, data) {
        if (err) {
            console.log("ERROR!!!");
            console.log(err, err.stack);
        } else {
            console.log("SUCCESS!!!");
            console.log(data);
            matchedFaceId = data.FaceMatches[0].Face.FaceId;

            async function f() {
                var userName = await getName(matchedFaceId);
                console.log(userName);

                var element = document.getElementById("authenticatedUser");
                element.innerHTML = "Welcome, " + userName;
            };
            f();
        };
    });
};

function getName(faceId) {
    return new Promise(function(resolve, reject) {

        dbParams = {
            TableName: "faces",
            Key: {
                "face_id": matchedFaceId
            }
        };

        dbClient.get(dbParams, function(dbErr, dbData) {
            if (dbErr) {
                console.log(dbErr, dbErr.stack);

                reject("UNIDENTIFIED USER");

            } else {
                console.log(dbData);
                resolve(dbData.Item.name);
            };
        });
    });
};

Now that we’ve got all this sweet JavaScript & HTML, we need to host it somewhere. If this were 2016, you would probably spin up an EC2 instance to host a web server (being certain to enable TLS/HTTPS so you can enable the camera), but then you need to hassle with certificates and EC2 costs and potentially dynamic public IPs and the rising cost of toilet paper in the Czech Republic. It’s a lot to consider, really.

CloudFront has entered the chat.

Create CloudFront distribution

You may recall that S3 has the ability to host static web pages, which is exactly what we need. The problem is that S3 hosted websites do not allow for HTTPS, which our app requires due to the camera usage. CloudFront, however, can host a static website out of S3 using HTTPS.

First, we’ll need to create another S3 bucket to host the site. I called mine cnelson-facial-recognition-web. Using your preferred method, copy index.html & mystyle.css into your bucket.

From the CloudFront console, you’ll need to create a new distribution. For the Origin Domain Name, click in the box and it will become a dropdown showing all your S3 buckets. Find yours, and select it. You can use nearly all the default options, but changing Viewer Protocol Policy to “Redirect HTTP to HTTPS” is recommended. Depending on your location, you may want to change the Price Class to use only US, Canada, and Europe. You can get as sophisticated as you want with CloudFront, but the bare minimum will suffice today, and unless you expect a ton of traffic this service will be free.

You will need to enable public access on the bucket and on index.html & mystyle.css or else the site will still not be viewable.

It can take 20 minutes or so for the CloudFront distribution to deploy, so patiently refresh that screen until it shows the distribution is complete.

As an aside, I found it easier to develop the solution in EC2 with Apache installed (configured for HTTPS, obviously) and only pushed to CloudFront when I was satisfied my development was done.

Rekognize your face

Now comes the moment of truth. Visit the URL provided in your CloudFront distribution. On your webpage, the “Start Streaming” button turns on the camera, and the “Identify Me!” button takes a frame from the stream and sends it through our facial recognition workflow. If all goes according to plan, you’ll be greeted with a warm welcome…..by name.

Truly amazing. What a time to be alive.

Parting Thoughts

It turns out facial recognition is really easy if you let AWS do all the work for you. Actually, all of it is pretty easy if you think about it from a cloud-native standpoint. When I set out to tackle this, I knew I would use Rekognition and some JavaScript. But I also had some local Python scripts to upload the images, index the faces, and add the names to the database, and I planned to use EC2 to host the website. But it didn’t really feel super modern. To really get the value from using a cloud provider, you sometimes have to re-think your approach to make use of the tools provided by the likes of AWS. Need to do something with an uploaded file? Trigger a Lambda with an S3 event! Need a static website? Try S3 with or without CloudFront!

And cloud-native can often mean fully serverless as well, which means that you don’t have to provision servers & worry about spinning them down when you’re done for the day. You only get charged for what you use, and most (if not all) services have a free tier if your usage is below certain thresholds. Below is the pricing if you happen to move beyond the free tier:

  • Amazon Rekognition – $0.001/image
  • Amazon DyamoDB – $0.25/1M read requests
  • Amazon S3 – $0.023/GB
  • AWS Lambda – first 1M requests are free, then $0.20/1M requests
  • Amazon CloudFront – $0.085/GB transferred
  • Amazon Cognito – first 50k monthly active users are free

So not free, per se, but neither am I running up my bill while I’m writing this. If nobody hits the site, I only pay for my S3 storage, which itself is extremely cheap.

Please, drop me a note here and let me know what you think, or if there is something else you’d like to see done. I’m always up for a challenge!

Featured

The Time I Wanted to Try Serverless File Ingestion

In a recent past life, one of the things we did a whole lot of was taking in data from our clients in the form of csv or other flat files via sftp and then staging them in a database or data lake for further processing & analysis. There were a few too many moving parts, especially one particular 3rd party application that seemed to always break at the worst time, and typically without anyone even knowing. Discussion of replacing it was a constant topic, but these things can be hard to change.

As such, I had been thinking for a while about what a cloud-native solution might look like to take that same input data file and stage it in a database. There are probably as many ways to do this as there are ways to leave your lover, but I’m only going to tackle one of them today (if for no other reason than that I hope to build to something even more glorious in future writings): using AWS Lambda to read from S3 and write to an RDS database, triggered whenever a file is written to a particular S3 bucket. One nice thing about this solution is that aside from the destination database, there is no provisioning of servers or wrestling with capacity planning; there is only doing. And with Lambda there is only paying if there is doing, so there are some arguments to be made from a financial aspect as well. More on that later.

But this isn’t a dissertation on why serverless is cool, this is a guide on how to actually do something with it.

Overall, this isn’t terribly hard, but it does touch on a number of AWS services. If you weren’t familiar before, you sure will be when this is all said and done. At a high level, this is what we’re going to have to stitch together. It might seem like a lot. And maybe it is a lot. Or maybe it is the same effort as what we had been doing, only it was hidden behind the network engineers, DBAs, sysadmins, and devops teams.

  • S3
    • Where the files will land, and how the lambda will be triggered.
  • RDS & the brand new RDS Proxy
    • RDS for the database layer.
    • RDS Proxy as a connection pool layer to be used by Lambda.
  • Lambda
    • Serverless engine for actually running our code.
  • IAM Roles & Policies
    • To ensure we keep a tight lid on who can do what.
  • Secrets Manager
    • For database authentication.
  • VPC Endpoints
    • To keep certain network traffic confined to our VPC.
  • Security Groups
    • To ensure the correct network access exists.
  • CloudWatch
    • Logs, because it doesn’t always work on the first try.

The sequencing that follows is somewhat important, because certain steps will need a particular service to have already been created. I’ve tried to present the steps in the way that results in the most linear progression, but there is always another way to do something in AWS. I’ve also tried to name things in a way that probably won’t conflict with your existing objects, and will make it easy to tear back down once you’re done. The names will also factor into the IAM policies, so ignore my lead at your own risk. #ominousTone

Create Security Groups

This can be done (perhaps more simply) with a single security group, but as I was going through this, having the security groups somewhat more role-isolated ended up helping me understand just how everything was working. Everything should go into your favorite VPC.

Security Group 1

  • Name = electricboogaloo-LambdaTest-LAMBDA
  • Outbound allow all traffic on all ports on 0.0.0.0/0
  • Inbound allow all traffic on all ports from itself.

This may be somewhat confusing, but the primary purpose of this group is to allow other security groups to accept traffic from our Lambda function via this security group. Is it gauche to link to a Stackoverflow post in a how-to guide? I hope not. I found this writeup to be very helpful in understanding how to configure security groups for Lambda.

Security Group 2

  • Name = electricboogaloo-LambdaTest-RDS
  • Outbound allow all traffic on all ports on 0.0.0.0/0
  • Inbound allow
    • TCP on port 3306 (aka Aurora/MySQL) from your IP address to allow connection via the MySQL client.
    • All traffic on all ports from electricboogaloo-LambdaTest-LAMBDA

Create a MySQL RDS Instance

You may be tempted to use a different flavor of database here, but later on we will be utilizing the brand new RDS Proxy service for connectivity, and it is only available on MySQL & Aurora at this time. Create the RDS instance with these options:

  • Free tier is perfect
  • DB instance identifier = electricboogaloo-lambda-test-mysql
  • Master username/password – make sure you don’t forget these.
  • Use a db.t2.micro for the instance size, it will be plenty.
  • Connectivity
    • Choose the same VPC as where you put the security groups you created.
    • Select your subnet group
    • Make it publicly accessible
    • Select the electricboogaloo-LambdaTest-RDS security group
  • Use password authentication
  • Additional Configuration
    • Initial database name = lambda_test_db

Install MySQL Client

It will take a few minutes for that RDS instance to spin up. Take this time to install the MySQL client.

brew install mysql-client
echo 'export PATH="/usr/local/opt/mysql-client/bin:$PATH"' >> ~/.bash_profile

Once the database is up, we need to create a table in which to write our data. I am using some baseball stats archive, specifically the batting data which dates back to the Grant administration.

Log into the RDS instance….

mysql -u YOUR_USER -p -h YOUR_RDS_ENDPOINT

then create the table via the MySQL client…

create table lambda_test_db.batting (
playerID varchar(25),
yearID int,
stint int,
teamID varchar(3),
lgID varchar(2),
G int,
AB int,
R int,
H int,
2B int,
3B int,
HR int,
RBI int,
SB int,
CS int,
BB int,
SO int,
IBB int,
HBP int,
SH int,
SF int,
GIDP int);

Lastly, download and extract the batting data…

wget https://github.com/chadwickbureau/baseballdatabank/archive/master.zip

unzip -p master.zip baseballdatabank-master/core/Batting.csv > Batting.csv

Create a Secret

Loose lips sink ships. I guess data security are the ships in this analogy. Somewhere in this process something is going to need to connect to the database. Securing those credentials is usually an afterthought in a POC like this, but it is actually quite easy to keep the credentials out of your source code using the AWS Secrets Manager.

So let’s create a new secret. AWS has a few templates for different databases that handles key rotation and all that, but the “other type of secret” template is plenty for our purposes. Just add a secret key/value pair for username, password, and just for kicks put another key/value pair called db_name to hold the initial database name from our RDS instance. Use the default encryption key, and call the secret electricboogaloo-lambda-test-RDS-secret.

Now any service that wants to connect to the database can get the credentials out of this secret. Of course this needs to be locked down with an IAM policy, but that will come later. In light of that, make a note of the ARN for this secret. And yes, this could have been done with via IAM, but I honestly really just wanted to use the Secrets Manager.

Create an IAM Role

There are a lot of services that need to interact in specific ways, and the way to manage all that is through an IAM Role. In particular, Lambda functions need a specific set of permissions in order to create network interfaces and write logs to CloudWatch, and of course Amazon provides a policy that does just that.

  • Role name = electricboogaloo-LambdaTest-S3-RDS-Role
  • Assign policy AWSLambdaVPCAccessExecutionRole

Next go ahead and update the role’s Trust Policy to allow RDS & Lambda to assume the role:

{
 "Version": "2012-10-17",
 "Statement": [
  {
   "Sid": "",
   "Effect": "Allow",
   "Principal": {
    "Service": ["rds.amazonaws.com", "lambda.amazonaws.com"]
   },
   "Action": "sts:AssumeRole"
  }
 ]
}

Create an RDS Proxy

RDS Proxy is a brand new feature that is still in preview mode at the time of writing. You can think of RDS Proxy as an Amazon-managed connection pool for RDS. It is probably overkill for this example, but there is a lot of overhead in creating a database connection, and RDS Proxy removes that overhead by essentially overriding the connection create and returning an existing RDS Proxy connection What a time to be alive.

While you’re reflecting on the glory of all that, create an RDS Proxy with these options:

  • Name = electricboogaloo-lambda-test-RDS-proxy
  • Engine compatibility = MYSQL
  • TLS not required
  • Target Group Configuration
    • Pick your RDS instance (if it’s not there, the RDS instance may not be available yet)
  • Connectivity
    • Use the IAM role you just created
    • Have IAM authentication disabled
    • Pick your subnets
  • Additional Connectivity Configuration
    • Choose Existing VPC Security Groups
    • Remove default
    • Add electricboogaloo-LambdaTest-LAMBDA
  • Accept that this is a preview release and your enterprise will go down in flames if you deploy this into production.

This will also take a few minutes to become available.

The way you use the proxy is exactly how you would establish a connection directly to a database, the only difference is that instead of pointing to the RDS endpoint, you point to the proxy endpoint. The proxy then uses the secret we created to establish actual database connections. Our Lambda will make a call to create a database connection, but what actually happens is that the proxy returns an existing connection from the pool.

Create VPC Endpoints for S3 & Secrets

If there was anything tricky about this whole thing, it was getting Lambda to access the secret. Turns out that the Secret Manager endpoints are only available on the public internet, but if your Lambda is in a VPC it won’t be able to get out to the internet unless you set up a NAT Gateway….or you can set up a VPC Endpoint to Secrets Manager. This allows Lambda to access the secret without leaving the comfy confines of your VPC. We also must also do this for S3, but that is a common use case. (Dear AWS, why aren’t VPC endpoints enabled by default when you create a VPC?)

Create two new endpoints in your VPC for these services:

  • com.amazonaws.us-east-1.s3
  • com.amazonaws.us-east-1.secretsmanager

Secrets manager will ask for a security group. You should use electricboogaloo-LambdaTest-LAMBDA, since Lambda is the service that will be calling the secrets endpoint.

And with that, you deserve a frosty beverage of your choice. But we’re only halfway done.

Featured

Loading Twitter Data into Snowflake the Easy Way

There I was, all proud of myself for finding a GitHub repository with a set of Snowflake processors so I could ingest all that Twitter data via Apache Nifi. It was good work, and it solved a problem. But something about it didn’t feel right. There were a lot of steps, and it seemed like pushing files to the Snowflake stage and then telling Snowpipe to process each file by name out of the stage was the long way around the block. Especially when you consider that a Snowflake stage is literally just an S3 bucket. Much like Frank Costanza on that fateful December day when he was moved to rain down blows upon the man who would endeavor to buy George’s doll, I realized there had to be another way.

That other way? Amazon S3 event notification.

Recap of the Hard Way

As I covered in painful detail in Part 4, the general strategy beyond actually getting the tweet data was to us a PutSnowflake processor to copy FlowFiles into an internal Snowflake stage. Snowflake stages are nothing more than S3 buckets that facilitate loading data, and internal stages are simply S3 buckets managed by Snowflake. Once the files were staged, we would then fire an ExcuteSnowPipe processor for each staged file, which would copy the data into the actual table.

I realized there had to be another way.

Frank Costanza

Now to be fair, it wasn’t all that hard. The “trickiest” parts were that PutSnowflake uses the Put command fired over a JDBC connection, so we had to maintain that connectivity/authentication as well as the necessary authentication to let Snowflake talk to the S3 bucket. And then that the Put step could optionally compress the file which changes the filename extension, so you have to remember to account for that. Oh, and you had to land the flowfiles on the Nifi server, which means that managing the filesystem becomes a thing you need to be concerned with. Again, easily overcome obstacles, but perhaps they weren’t necessary in the first place.

The Easy Way

The easy way involves setting up events on an S3 bucket to submit event notifications to an SQS queue whenever an object is created in that bucket, and then telling the pipe to automatically ingest whenever it sees a notification in that queue. So let’s do it. Fair warning– this is not your daddy’s file ingestion solution.

The External Stage

The PutSnowflake processor really does two things: it compresses files and copies them up to S3. Our example copied them to a Snowflake managed S3 bucket, but there is nothing stopping us from using an external stage pointed to our own S3 bucket.

create or replace stage nifi_twitter_stage 
URL='s3://cnelson-nifi-snowpipe/'
credentials=(aws_key_id='<YOUR_AWS_KEY' aws_secret_key='<YOUR_AWS_SECRET_KEY');

New IAM User & Group

For this example I created a new IAM group with S3 Full Access policy and then created a new IAM user placed into that group. Under the Security Credentials tab I created a new access key & secret that was used in the create pipe DDL above.

Create an Auto Ingest Pipe

You’ll recall that a Snowflake pipe is essentially an instruction to Snowflake to copy data from a pipe and put it into a table. The pipes we had been using were manual ingest pipes; meaning that we needed to tell Snowflake to process files through the pipe, and we had to tell it to do so for each file. That’s why we needed the ExecuteSnowPipe processor. Aside from using the auto_ingest=true setting, this pipe will look very similar to the one we had been using.

I also created a new table called twitter_auto just to differentiate from my prior work:

create table twitter_auto
(tweet_payload variant);
create or replace pipe nifi_twitter_pipe_auto auto_ingest=true as
copy into twitter_auto
from @nifi_twitter_stage
file_format = (type=json);

You can see some metadata about the new pipe by running show pipes in the console. In my case, I have the pipe from my prior example as well as my new auto ingest pipe. Of particular interest is the notification_channel field, which has an AWS ARN for an SQS queue. When you create an auto ingest pipe, Snowflake creates an SQS queue for you. It was at this point that I decided to add Snowflake to my Christmas card list.

Copy that ARN to your clipboard, we’re about to use it somewhere else.

Event Notifications in S3

Navigating to your S3 bucket in the AWS console, under the Properties tab you’ll find Events in the Advanced Settings section. Click into that and add a new notification. The only event we’re interested is the “All object create events,” which will trigger whenever a new file is added to the bucket. We will be sending the notification to an SQS queue, and the SQS ARN is already copied into your clipboard so paste it here now and save the event.

And just like that, any file dropped into that S3 bucket will be automatically loaded to the Snowflake table defined in the copy portion of the pipe definition. Frank and I told you there was a better way!

But What about Nifi?

Nothing about all that changes the fact that I really like Apache Nifi, and there is still an important role for it to play in this flow since all we did was automate the loading from S3. We still need to get the data into S3, and Nifi is perfectly suited for that. A completed flow template will be available for download at the end of of this example, but much of it is ground we’ve already covered: getting tweets from the Twitter endpoint, filtering deletes, and merging individual FlowFiles into fewer larger files. Nothing about all that is different from what we’ve been doing.

The difference is that we need to instead push those files to S3, and it probably wouldn’t kill us to compress them first. I always tell my kids “you can do hard things.” That lesson is completely invalid here because this is going to be a really easy thing. The only hard thing is understanding why companies pay people to do this. Having said that, I’m looking for a job doing this sort of thing so please hit me up. No, really.

CompressContent

Drop a CompressContent processor on the canvas and connect the output from the MergeContent processor to the input of CompressContent.

  • Mode
    • This processor can either compress or decompress
  • Compression Format
    • Many popular compression formats are supported, pick your bestie.
  • Compression Level
    • Only valid if you’re using gzip, see the man page for the finer points of this setting.
  • Update Filename
    • When true, the filename will be updated to reflect that it was compressed (e.g. filename.txt becomes filename.txt.gz). [or for decompression, will remove the extension].
    • When true, has the side effect of updating the flowfile attribute ${filename} to be the new compressed filename.

PutS3

Once the flowfiles are compressed, we can put them into our S3 bucket. Drop a PutS3 processor on the canvas and connect the input to the output of CompressContent.

  • Object Key
    • The filename is a great choice here, so we can use Nifi’s expression language to grab the flowfile attribute ${filename}.
  • Bucket
    • The name of our S3 bucket
  • Content Type
    • This can lead to interesting effects depending on how you set it, but since we have JSON data, application/json is a solid choice that won’t get you judged negatively by your peers.
  • Access Key ID/Secret Access Key
    • These will be from the Security tab for your IAM user. It can be the same key pair used in your stage definition from earlier.
  • Region
    • This is the AWS region where your S3 bucket was created.
  • The remainder of the options can be left as the default.

Since once the flowfiles get pushed to S3 we won’t have any further use for the files in Nifi, so this processor should be set to automatically terminate the flowfiles on success. Your full flow should look something like this.

Fire it up!

If you got this far, it’s time to press GO on your flow (protip: you can right-click on the canvas to start all your processors at once). You should start seeing files in your S3 bucket once the MergeProcessor reaches it’s trigger threshold, and before long you should start seeing Twitter data show up in your table.

I think you’ll agree that this was considerably easier than the method I initially used. There is a valuable lesson here, which is that a tried and true design pattern may break down when you introduce a disruptive technology into the mix. The old way may well still work, but it might not be optimal, and it might leverage too much of the old tech and not enough of the new tech. I supposed finding the right mix of tools & tasks is the real treasure. Well that, and all the friends we made along the way.

I hope you had as much fun as I did going through this. It is certainly not the only way to do this sort of thing. Leave a comment and let’s compare notes!

Nifi Templates Available on GitHub

The full flow template is available here: https://github.com/supahcraig/nifi_NoSQL/blob/master/NifiTwitterS3_template.xml

Featured

The Time I Wanted to Send Twitter Data into a Database Part 4: Snowflake

UPDATE: Turns out this the hard way. It’s sort of like long division– You’ll learn lots along the way that makes the easy way make more sense, and you can tell your kids when you were young you had to load data uphill. Both ways. In the snow(flake). But if you just want to do it the easy way I’m here to educate, not judge.

The Easy Way: https://electricboogaloo.home.blog/2020/04/30/loading-twitter-data-into-snowflake-the-easy-way/

Now, for the truly brave, let’s tackle the hard way.

Yes, I know I’ve been saying NoSQL NoSQL NoSQL and Snowflake isn’t a NoSQL database, they say so right on their website. But it is at this point where I realized that the goal of this journey was simply to take some semi-structured data and do some analysis on it in several different databases. Whether or not those databases are “officially” NoSQL isn’t all that relevant. The point is to use the right tool for the job and Snowflake seems to fit the bill. What’s in a name, anyway? Would SQL by any other name still query as sweet? It might. It just might.

What sets Snowflake apart from the databases we’ve looked at in parts 1 (MongoDB), 2 (Couchbase), and 3 (CosmosDB) is that Snowflake bills itself as a data warehouse, so performing the sort of aggregate analysis we’ve been running is perfectly in their wheelhouse. Fans of those other products would almost certainly tell you that they weren’t designed to do OLAP style queries, and that is probably fair. Many of those systems don’t even have mechanisms to do so; MongoDB only recently added that capability. Those products were built to solve OLTP problems, and they do a great job at it (probably).

The problem with using OLAP type products for analyzing JSON data is that traditional relational databases have been slow to support semi-structured data (e.g. Oracle forces you to store it as VARCHAR2 or CLOB), and querying of complex structures isn’t quite as straightforward as you might hope. Snowflake goes a long way toward overcoming that, so let’s see it in action. Buckle in, this is going to be a bit of a ride.

Snowflake Setup

At www.snowflake.com, they have a “start for free” button in the upper right. This will take you through a setup process where you’ll create your user, chose your cloud provider & region, and will drop you into the Snowflake UI. It won’t take but a minute or two, I promise. I went with AWS in N. Virginia, but you should pick whatever works for you. There will be a few references to Amazon S3 later on, so translate accordingly if you go with Azure or Google Cloud.

The Strategy

Our plan is to use Snowflake’s Snowpipe feature to load the data into a table. The way this works is that we create a “pipe” which is essentially an instruction for Snowflake to take a file out of a “stage” and copy the data from that file into a table. Wait, so what is a stage? A stage is actually an Amazon S3 bucket where our data sits until a pipe is instructed to actually load it. So in short, data -> stage -> pipe -> table.

Snowflake Environment Setup

There is a good deal of setup we need too do within the Snowflake console to get all these things created, nearly all of which can be found on my GitHub, but I will quickly go through the steps here so we understand what we’re doing.

First things first, we need to create a database and tell Snowflake we want to use it. Once the database & schema are set, you won’t need to fully qualify your objects. The Snowflake UI is quite nice, but it doesn’t appear to have command completion & object context (when you hit the dot), so fully qualifying doesn’t do much more than help you hone your typing skills.

CREATE DATABASE twitter_db;
USE DATABASE twitter_db;
USE SCHEMA twitter_db.public;

Next we create the table used to store the tweets. Their DDL is similar to what you’re probably already familiar with, although they have some of their own datatypes. Perhaps most notably is the VARIANT datatype, which allows for storing of semi-structured data in a way that allows for easy querying via schema-on-read. Nothing is more irritating than having some complex JSON structure that you can’t even load without first defining the schema. The whole point is that this schema is complex and could evolve, why are you making me put a stake in the ground by locking in the structure as the very first step? Clearly I have some unresolved hot sports opinions on this topic, but Snowflake handles this scenario absolutely beautifully.

CREATE OR REPLACE TABLE twitter
(tweet_payload VARIANT);

Next, we need to define the stage & pipe. It isn’t strictly required, but you can define a file format specification for your stage. This is where you’d specify things like ignore header row, escape characters, delimiters, etc. The Snowflake documentation is fantastic, and I would highly recommend spending a few minutes seeing what capabilities are out there.

create or replace file format tweet_payload_format type = JSON;

The stage creation is quite simple since we’re using an internal stage. You can use an external stage where your data is hosted in your own S3 bucket, or really just about anywhere. The nice thing about the internal stage is that Snowflake manages it, so we don’t need to worry about creating buckets or creating a keypair and all that. Whether or not that’s the right choice for your needs is a different topic. Regardless, we define the stage to use the file format we created above.

create or replace stage nifi_twitter_stage
file_format = tweet_payload_format;

Once the stage is created, the pipe can be created. It is defined by telling Snowflake where we want the data copied to and where we want it copied from. The @ operator is used to indicate that the object is a stage.

create or replace pipe nifi_twitter_pipe as
copy into twitter
from @nifi_twitter_stage
file_format = (type=json);

The final step is the heart of the Snowflake magic: the warehouse. The warehouse object is where you define how much computing horsepower you want to use (aka how fast do you want it and how much do you want to pay for that speed). Snowflake uses a pay-by-the-drink model, so this is where we define how big our mouth is. You can also define an auto-suspend timeframe (in seconds) where the warehouse will essentially shut down and stop costing you money. Make certain you set this.

create or replace warehouse compute_wh
warehouse_size = SMALL
auto_suspend=600;

Nifi Setup

As usual, we will start with our working nifi template which gets a stream of tweets from twitter & parses out the deletes. You can follow along step-by-step in Part 1, or just import the template from my GitHub. But if you’re not yet comfortable in nifi, you’re not going to like what happens next.

We’re going to be a on a free trial Snowflake account with $400 in credits so we need not be overly concerned about our storage & processing budget. Let’s treat this like a real data science initiative and load the whole thing, unadulterated. Goodbye jolt, we hardly knew ye!

Snowflake Processors

Currently (April 2020), nifi does not ship with any Snowflake processors. It is probably possible to use base-level JDBC processors to achieve our goal or maybe something sophisticated like using Amazon SNS/SQS, but we’ll be building our own nifi processors & controller service because why not. At this point I need to thank my friend Paul Gibeault, who did the work to actually code a set of nifi Snowflake processors and shared them on GitHub. Thanks again, sir!

Clone Repository & Build

The Snowflake processors can be found here: https://github.com/paulgibeault/SnowflakeNiFiProcessors

git clone https://github.com/paulgibeault/SnowflakeNiFiProcessors.git

Navigate into the folder created from the clone operation and then run the maven build.

cd ./SnowflakeNifiProcessors

mvn clean install

Where the artifacts build do will depend on how you have maven configured. For me they go into the ~/.m2 folder, but your mileage may vary. Once you find the artifact path, you’ll hopefully see a file called snowflake-nar-1.0.0.nar. A nar is a “nifi archive” similar to a jar, and is how nifi processor functionality is packaged up. We’ll need to copy this file into our nifi container under /opt/nifi/nifi-current, and then restart nifi for the new nar to be recognized.

cd ~/.m2/repository/com/snowflake/nifi/snowflake-nar/1.0.0

docker cp snowflake-nar-1.0.0.nar nifi:/opt/nifi/nifi-current/lib

docker restart nifi

Snowflake JDBC Driver

While you’re waiting for nifi to restart, we can download the drivers for snowflake and copy them into the container as well. The full version history of the Snowflake JDBC jar can be found at https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/ and I used the latest which is version 3.12.4. Where we put it doesn’t really matter; in a subsequent step we’re going to tell nifi where it is at anyway. A restart is not required after this step.

wget https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.12.4/snowflake-jdbc-3.12.4.jar

docker cp snowflake-jdbc-3.12.4.jar nifi:/var/tmp

Public/Private Key Authentication

Since we’re in the shell anyway, let’s go ahead and create our public & private keys which will be needed when we configure the Snowpipe processor. Not at all surprisingly, there is a fantastic writeup of this in the documentation. There are a few options along the way, but here’s what I did. Of course your key will be different. I hope I didn’t need to tell you that.

openssl genrsa -out mySnowflakeKey.pem 2048

openssl rsa -in mySnowflakeKey.pem -pubout > mySnowflakeKey.pub

cat mySnowflakeKey.pub
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyFn1jbULNi93misGhqhv
h9QVXV498fTi9sIegi19pbVt+EjLnmI85pw1rMvOURj+nyWWUDriNGudJzzjIeVU
LqLrFCl4Yn5vujE0QHVoKWlrEtITj4eo8Mtt+ewW9qzR8LRCnB6UoZ4cNISK5D2O
AIplZu5WyvqJ/6PiOscS9vmPCULb5WHuCIzzWTCZgSBMmMCDYhRDPKdHX+qw5Hxt
bmKaFpQlmVmM6GiBf1vsJBZUKArJad5S4PrKWw8PDmsU+Tq5i1TjqFwm1k47tu+W
Ee3Vpco95r8FSj5KANvVonWVj1EqgM3vf0EZfN2o0NIQORhVhGG08IGhbYZf4AW6
2wIDAQAB
-----END PUBLIC KEY-----

Moving back to the Snowflake UI for a moment, we need to add the public RSA fingerprint to our user. Nifi will get the private key.

alter user cnelson set rsa_public_key  = 'MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyFn1jbULNi93misGhqhv
h9QVXV498fTi9sIegi19pbVt+EjLnmI85pw1rMvOURj+nyWWUDriNGudJzzjIeVU
LqLrFCl4Yn5vujE0QHVoKWlrEtITj4eo8Mtt+ewW9qzR8LRCnB6UoZ4cNISK5D2O
AIplZu5WyvqJ/6PiOscS9vmPCULb5WHuCIzzWTCZgSBMmMCDYhRDPKdHX+qw5Hxt
bmKaFpQlmVmM6GiBf1vsJBZUKArJad5S4PrKWw8PDmsU+Tq5i1TjqFwm1k47tu+W
Ee3Vpco95r8FSj5KANvVonWVj1EqgM3vf0EZfN2o0NIQORhVhGG08IGhbYZf4AW6
2wIDAQAB';

And by the time you complete all that, nifi should be back up and running, ready to Snowflake its way into oblivion!

Using Snowflake Processors in Nifi

It will take a few steps to get data actually landed in our Snowflake table, and we’re going to use a few processors we haven’t yet used along they way in the interest of optimizing the load a little bit.

  • MergeContent
    • This allows for multiple FlowFiles to be grouped into a single FlowFile. Since each FlowFile represents a single tweet, merging multiples tweets into a single FlowFile will make for less back and forth as we push this data into our Snowflake stage.
  • PutFile
    • This processor takes FlowFiles and persists them to disk as a normal flat file. In a perfect world, this wouldn’t be necessary, as we would simply leverage the native FlowFiles all the way to the destination. However our Snowflake processors are still a work in progress, and don’t yet handle the FlowFiles natively (if you’re looking to contribute to an open source project, this is your big chance!). Instead we need to operate on flat files, so this processor gets the data where we need it.
  • PutSnowflake
    • This is one of the ones we built earlier. It takes a flat file and pushes it to our Snowflake stage using JDBC connectivity, which is why we needed to load the Snowflake jar into nifi.
  • ExecuteSnowPipe
    • This is another processor from the Snowflake nar we built. It instructs Snowflake to run the Snowflake pipe on a specific file in the Snowflake stage. This step is what actually puts the data in our table.

MergeContent

There are a lot of ways to configure this processor to merge FlowFiles to your liking. One way I like to configure it is to leave nearly everything default, but set the maximum number of entries to a reasonably big value….like 100k. This means that every 100,000 inbound messages would be grouped into a single merged FlowFile. But since the source might take a while to produce 100k messages, I’ll update the scheduling to run every minute. The net result is that messages stack up until either 100k messages are collected or 1 minute has passed, at which point the queued FlowFiles are merged into one file and sent to the next processor.

PutFile

The PutFile processor simply takes FlowFiles and writes them to the server at a location you define. What is interesting is that this one really does 2 things: it writes the files out, and then passes the actual FlowFiles on to the next processor. Make sure you catch that: it writes the files out, but doesn’t do anything with them. The next processor doesn’t receive those flat files, it receives the FlowFiles just like every processor we’ve used up to this point.

You can put them wherever you like, but /var/tmp/snowflake_files seemed like a good place. You can configure the processor to create missing directories, so if you don’t have that path, no problem. You can set the owner, group, and permissions as well. Left unset, the files will be created as nifi:nifi, which is perfect for our purposes. The filename will be identical to the name of the FlowFile, which will come in handy later.

PutSnowflake

Things get slightly more complicated from here. Since this one pushes the data to the Snowflake stage via JDBC, we need to configure a JDBC Connection pool, similar to the controller services we’ve configured for other databases. In this case we’ll use a DBCPConnectionPool, and go through the normal configuration & enable steps.

DBCPConnectionPool

Much of this information & more can be found bound in the Snowflake JDBC documentation.

  • Database Connection URL
    • This is your snowflake JDBC connection string. The hostname was sent to you when you completed your trial account setup, but it’s also the URL of the Snowflake console UI.
    • I had problems with the connection expiring, so I included a keep-alive in the connection string.
    • jdbc:snowflake://sta00821.us-east-1.snowflakecomputing.com/?CLIENT_SESSION_KEEP_ALIVE=true
  • Database Driver Class Name
    • net.snowflake.client.jdbc.SnowflakeDriver
  • Database Driver Location(s)
    • This is the location you copied the jar into earlier.
    • /var/tmp/snowflake-jdbc-3.12.4.jar
  • Database User/Password
    • This is the user/password you created as part of the trial account setup (or one you’ve created since).

PutSnowflake Processor

  • Input Directory
    • this is the location defined in the PutFile step. A best practice would be to introduce this value with an UpdateAttribute processor early on in the process and reference the attribute in our processors.
  • File Name
    • we can use nifi’s expression language to dynamically get the filename of the file we landed in the PutFile step based on the fact that that file has the same name as the flowfile.
    • {$filename}
      • This means use the FlowFile attribute named filename
  • Internal Stage
    • This is the name of the Snowflake stage we created earlier. This probably needs to be fully qualified since we did not specify the database or schema in our jdbc connection string.
    • twitter_db.public.nifi_twitter_stage
  • Auto Compression
    • Ask me how I discovered how this works, I dare you. Spoiler: the default is true, which means files are automatically compressed when staged.
    • What I also learned the hard way was that the default compression is gzip, and the .gz is added to the filename (not surprising unless you were surprised that they were compressed in the first place).
    • The zipped filename is exposed as a flowfile attribute named PutSnowflake.target_filename
  • Source Compression
    • You can choose your compression type here, if your files are already compressed. If your files are not already compressed you can pick None or use the default, which is None per the Snowflake docs. Ours were not compressed, so I left it as default.

ExecuteSnowPipe

Executing the Snowpipe is where we tell Snowflake to take a file out of the stage and actually load it using the COPY command specified in the pipe. As a quick reminder, this was what our pipe looks like (or you could just use the describe pipe command in the console).

create or replace pipe nifi_twitter_pipe as
copy into twitter
from @nifi_twitter_stage
file_format = (type=json);

So the pipe will run that copy statement– taking data from our stage and putting into the twitter table, with json as the file format. The nuance here is that ExecuteSnowPipe operates on a specific staged file, but let’s not get too far ahead of ourselves. We first need to configure the processor….

Snowflake Connection Service

Much the same as every database processor, a controller service needs to be configured. In this case we won’t use JDBC, but instead a SnowflakeControllerService which is part of the nar we built. We’ll go through the usual configure & enable steps.

  • Snowflake URL
    • This is the hostname of your Snowflake connection.
    • <username>.<region>.snowflakecomputing.com
    • NOTE that it is just the hostname, no https:// or any other prefix.
  • Snowflake Port Number
    • I could not find this documented, but 443 works.
  • Snowflake Protocol
    • HTTPS is your ticket to success here.
  • Snowflake account name
    • you can grab this off the Snowflake URL or…
    • select current_account();
  • Snowflake username
    • This will probably be the same username from the PutSnowflake processor.
  • Private Key
    • Remember when we created the public & private keys earlier? The private key goes here.
    • Copy the private key out of mySnowflakeKey.pub and paste it into this field.

ExecuteSnowPipe

The remaining setup is pretty straightforward, but is honestly where I spent 98% of my time troubleshooting. The pipe is your fully qualified pipe name, but the File Name requires the tiniest bit of attention to detail and the astute reader will already know what the trouble was.

The general idea here is that we have already staged the flat files that we created in the PutFile step. Now we want to execute the Snowpipe for each staged file, recalling that each staged file is a mirror of a flowfile in the processing queue. So the flowfile itself is really just a sentinel that triggers an action to happen; we no longer care about the contents of the flowfile since those contents were staged to Snowflake by the prior step. All we really need is the name of the file that was staged.

One beautiful feature of the PutSnowflake processor is that it adds several flowfile attributes based on output from the Snowflake PUT command, including the revised filename in the event the file was compressed on the way to being staged (which ours was). The flowfile attribute in question is ${PutSnowflake.target_filename}, and with it we can tell the pipe which file it should load. If it takes you a while to wrap your head around that, you should bear no shame.

Connect them and Run!

When everything is said and done, you should have a nifi flow that looks something like this. As usual, I’ll provide a link to download the finished version of the template at the end.

Start all the processors and watch the magic unfold before your very eyes. Aside from watching the nifi flow, you can also run some queries in the Snowflake console to see what is happening. I let mine run for 10 minutes or so and collected about 200k tweets. The ExecuteSnowPipe step is clearly the bottleneck in the process, so that might be worth some future investigation. Cranking up the compute in our warehouse for the duration of the load would probably alleviate that pain.

To see the files that have been loaded to the stage:

LIST @nifi_twitter_stage;

To see the contents of the stage:

SELECT * FROM @nifi_twitter_stage;

And then you can use normal SQL to watch the row count grow:

SELECT COUNT(*) FROM twitter;

Run the Hashtag Frequency

Another nice thing about Snowflake being more of an OLAP style thing is that all the SQL you know from your relational roots carries right over. We just need to learn a little syntax to deal with the fact that we have semi-structured data in there. Snowflake makes it fairly easy to get at what you need, so you spend your time getting value out of the data and not learning arcane syntax. (I’m looking at you, MongoDB).

Since we didn’t apply any transformations to our tweet payload, what we have stored is the whole shebang. But the piece we’re interested in is inside the entities section, which is at the root level of the document. Let’s take a look just to refresh our memory.

"entities": {
    "hashtags": [
      {
        "indices": [
          16,
          29
        ],
        "text": "お家で全力ハレ晴レユカイ"
      },
      {
        "indices": [
          123,
          132
        ],
        "text": "stayhome"
      }
    ],
    "symbols": [],
    "urls": [
      {
        "display_url": "youtu.be/mTn-Pnl6Ft4",
        "expanded_url": "https://youtu.be/mTn-Pnl6Ft4",
        "indices": [
          82,
          105
        ],
        "url": "https://t.co/R5PkJct6RJ"
      }
    ],
    "user_mentions": [
      {
        "id": 1240342877844037633,
        "id_str": "1240342877844037633",
        "indices": [
          3,
          14
        ],
        "name": "AGRS_staff",
        "screen_name": "AGRS_staff"
      }
    ]
  }

What we need is inside the hashtags array, which itself is a document with the text key giving us the actual hashtag used. We can use a lateral view coupled with Snowflake’s flatten function to explode the array into multiple rows. Another cool thing Snowflake gives us is the ability to refer to columns & expressions by their numbered position in the Order By……and the Group By! That’s a welcome addition, and eliminates a lot of duplicated code when your SQL gets more complex. SnowSQL 1, ASNI SQL 0.

SELECT count(*)
     , htags.value:text
FROM twitter t
   , LATERAL FLATTEN(input => t.tweet_payload:entities:hashtags) htags
WHERE 1 = 1
GROUP BY 2
ORDER BY 1 DESC;

Well honestly, that couldn’t have been any easier. And whatever #ThalaAJITHBdayGalaCDP is, it’s even hotter than Hansel right now, and by a wide margin.

Conclusion & Next Steps

At the risk of honeymooning my way into full-on homerism, I love Snowflake. Aside from there being no context help in the console (if that’s a deal breaker, you can use something like Dbeaver which also works great with Snowflake), the UI is fantastic. Even their choice of “Snowflake blue” for the keyword highlighting makes the whole thing feel completely tight. But the chrome don’t get you home as they say, and they have much more to offer than a chromed engine dress up kit; the engine actually goes vrrooom.

Getting up and running isn’t exactly obvious in some places, but they more than make up for it with absolutely fantastic documentation. Most documentation (like what you find for say, Nifi) looks like it was written by a java developer, for a java developer. Snowflake docs look to have been written by actual humans who want other humans to learn how to use the tools, including examples of sophisticated integrations like Kafka and Amazon SNS to name a few.

Their SQL implementation makes transitioning from the relational world super easy, but they also have allowed for easy integration with semi-structured data. The fact that you can use a schema-on-read paradigm for json data immediately makes the cost of entry low for semi-structured analysis, allowing you to delay decisions about data transformations until late in the game. All this and I haven’t even mentioned their scale up/down compute model which lets you decide how much horsepower you need at runtime. I rate Snowflake as a perfect 5/7 on the Sullivan Rating Scale. I am very much looking forward to exploring Snowflake at a much deeper level.

As I mentioned at the top of the page, there is actually an easier way to achieve this that doesn’t involve maven builds and landing files on your Nifi server: https://electricboogaloo.home.blog/2020/04/30/loading-twitter-data-into-snowflake-the-easy-way/

Next Steps

From here, there are no bounds on where we can go. We’ve ingested twitter data and done semi-structured analysis on 4 different databases. There are probably 100 more database products that I could replicate this work into, but I feel like I’ve beaten this particular pattern into the ground at this point. I may take a break from Twitter & NoSQL for now and tackle a more design-focused topic like blue-green database upgrades.

Leave a comment and let me know what you think about all this. I’d love to hear from you.

Nifi Templates Available on GitHub

You can download the complete flow as a nifi template here:

https://github.com/supahcraig/nifi_NoSQL/blob/master/NifiTwitterSnowflake-full_template.xml

SQL worksheet for the work done in the Snowflake console:

https://github.com/supahcraig/nifi_NoSQL/blob/master/snowflake_twitter_hashtags.sql

Featured

The Time I Wanted to Send Twitter Data into a NoSQL Database Part 3: CosmosDB

E.B. White once famously said that “Everything in life is somewhere else, and you get there in the cloud.” Or something like that. Up to this point in our NoSQL adventure we’ve used containerized databases on our local machine but in the year twenty-and-twenty, we have to increasingly gaze skyward as the cloud becomes more and more a part of every day life. In Part 1 we went web scale on MongoDB, and in Part 2 we took a power nap on Couchbase. For Part 3 we are going to space cowboy our way into Azure’s CosmosDB. So let’s get started.

Overview

  1. Import Get Twitter nifi template
  2. Create CosmosDB account/instance
    1. Choose your API
    2. Create database and container
  3. Additional Nifi setup
    1. Add Jolt processor
    2. Add PutMongo processor
      1. Congfigure controller service
      2. Configure processor
  4. Explore data in the Azure console
    1. Open a shell & run queries
    2. Perform our hashtag count
  5. Conclusion & Next Steps

Once again, we’ll be heavily leveraging the initial Nifi setup work detailed in the first chapter of this series. Or just grab the nifi template from my GitHub and hit the ground running.

Create a CosmosDB Account

Let me save you some trouble from the outset. When I started down this path Azure was offering a free trial of CosmosDB. Head out to the coast, have a few laughs, learn a database for free…. and you completely can do that. But there is a critical aggregation feature disabled from the free version that makes it impossible to do our hashtag count. So I broke down and spent a buck fitty just so you, gentle reader, could see it all in action.

Choose Your API

From All Services, find Azure CosmosDB and click Add. This isn’t Azure 101 so it is left as an exercise to the reader to learn the finer points of how to spin up Azure resources. The only trick for CosmosDB is in choosing the API. You see, CosmosDB is a multi-model database service. What that means is that at build time, you get to pick which API interface you want to use to interact with CosmosDB. If that made any sense to you, just wait. Understand that I built out this entire demo, took notes, and then wrote this blog post and I still don’t think I understand it. But let’s pretend just so we can keep things moving. Your choices for API are:

  • Core (SQL)
  • Azure CosmosDB for MongoDB API
  • Cassandra
  • Azure Table
  • Gremlin (graph)

This is nothing short of crazy, right? How can one database potentially use all of those API’s? Well it can’t, or at least not simultaneously. Whichever you pick at build time is the one you’re stuck with. The amazing thing is that (based on my limited exploring of the other APIs) it doesn’t seem to change how it works under the covers. Since we already covered MongoDB in Part 1, let’s pick Azure CosmosDB for MongoDB API to see how things compare. There is a lot to unpack here, and since Cassandra is on the docket of databases to explore, I hope to revisit that API at a future date. (Also there isn’t a nifi processor for Core SQL [why don’t you build one and blog about it? -ed] and I don’t need to solve all the problems in this one post [oh. – ed].) Continue through the wizard and create that sucker.

Create Database & Container

Once your database is deployed, you’ll want to navigate to the Data Explorer. Create a New Database as shown below and give it a nice name. At this time you’ll also get to provision your throughput in units of RUs. An RU is essentially a rate-based currency and covers all the resources used such as CPU, IOPS, and memory. You can read more about it in the Azure CosmosDB documentation. The minimum is 400 RU/sec which will set you back 77 cents per day, but is probably going to frustrate you as you get deeper into this exercise. I bumped up to 800 and found it to be suitable for our purposes.



Protip: It takes about 10 minutes to spin up a CosmosDB instance, so make sure you spell your own name right if you’re going to take a screen cap and paste into a blog, or decide if tearing it down to correct it is worth your time.

Once the database is created you’ll need to create a New Collection. Collections are just that: collections of documents, analogous to relational tables. Each document in the collection need not adhere to the same schema, but every document in the collection does need the same Shard Key. Put the collection in the database you just created, and give it a super clever name. Like “twitter,” for instance. The shard key is how CosmosDB will partition the data, and since much has already been written on the strategy of sharding, we won’t stop down here to discuss it further. The tweet language is present in just about every tweet and has a fairly low cardinality, so it is what we’ll use. Hit create.

Additional Nifi Setup

At this point we need to make a small course-correction from how we’ve been handling the data. Thus far, we’ve made no attempt to pare down the massive tweet payload. This is big data after all, aren’t we supposed to be able to handle all manner of massive structured/semi-structured/whatever data? Yes. Yes we are. And yes we can. But we also have to actually pay for it since we’re doing this on somebody else’s computer. And since throughput is one of the things we pay for, it seems prudent to trim the fat on our payload and only send what we actually care about, which in this case is the entities sub-document and maybe a few other fields.

JoltTransformJSON Processor

Nifi provides several ways to manipulate the JSON in a flow file, but the most direct way is via Jolt. So let’s Jolt our way to a new document containing

  • id
  • text (the actual tweet text)
  • created_at
  • entities
    • The entire sub-document, including user mentions, urls, etc.
  • lang
    • This is our shard key. Not including it will cause an error downstream.

So drop a JoltTransformJSON processor on the canvas and paste that Jolt spec into the Jolt specification.

[
  {
    "operation": "shift",
    "spec": {
      "id": "id",
      "text": "text",
      "created_at": "created_at",
      "entities": "entities",
      "lang": "lang"
    }
  }
]
A rare glimpse of a Jolt Spec in it’s native habitat

Protip: The Advanced button will take you to a Jolt transformation development window where you can test your Jolt against sample JSON.

Sample Jolt Output

{
  "id" : 1250132964148760580,
  "text" : "RT @RBW_MAMAMOO: [#솔라] \n\n1st Solo Album <SPIT IT OUT> \n'뱉어' Image Teaser #2\n\n🎼 2020.04.23 Thu 6PM(KST)\n\n#SOLAR #SPIT_IT_OUT #뱉어 https://t.c…",
  "created_at" : "Tue Apr 14 18:44:59 +0000 2020",
  "entities" : {
    "hashtags" : [ {
      "text" : "솔라",
      "indices" : [ 18, 21 ]
    }, {
      "text" : "SOLAR",
      "indices" : [ 110, 116 ]
    }, {
      "text" : "SPIT_IT_OUT",
      "indices" : [ 117, 129 ]
    }, {
      "text" : "뱉어",
      "indices" : [ 130, 133 ]
    } ],
    "urls" : [ ],
    "user_mentions" : [ {
      "screen_name" : "RBW_MAMAMOO",
      "name" : "마마무(MAMAMOO)",
      "id" : 2295631308,
      "id_str" : "2295631308",
      "indices" : [ 3, 15 ]
    } ],
    "symbols" : [ ]
  },
  "lang" : "en"
}

PutMongo Processor (Yes, PutMongo)

You probably recall where we chose Azure CosmosDB for MongoDB API when we spun up our instance of CosmosDB. It turns out that nifi does not have any native processors for CosmosDB, but because CosmosDB has been configured to use the MongoDB APIs, we can instead use MongoDB processors to send data to CosmosDB. Let that sink in. We’re using a MongoDB nifi processor to send data to CosmosDB. I don’t think I saw that coming. So go ahead and drop a PutMongo processor on the canvas and configure it.

Configure Client Controller Service

Inside the PutMongo processor, you’ll have to create a Client Service using the MongoDB Controller Service, just as we did in Part 1. Make sure you name it intelligently so you can differentiate this one from other Mongo client services you might already have created.

The Mongo URI includes sensitive data in the connection string in plain text.
Mongo URI

Azure does you a real solid when it comes to the connection string. From the Azure console you can find all manner of relevant connection info under the Connection String pane. In general, the connection string is of the form:

mongodb://<username>:<primary password>@<host>:<port>/?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000&appName=@<username>@

Enable the Controller Service

Controller services must be enabled before they can be used, so click the lightning bolt to enable it. If you need to make changes later, you’ll need to first disable the service before you can make any changes. At this point you can continue configuring the actual processor.

Configure PutMongo Processor

The Mongo Database Name and Mongo Collection Name are from our CosmosDB setup we did earlier, and should look virtually identical to our processor from Part 1: MongoDB.

But what about the shard key?

You may also recall that we defined a field in the tweet payload to be the shard key and you may have likewise noticed that we did not need to configure that here. CosmosDB is able to inspect the payload on the way in and grab the designated field for sharding. This seems like an opportunity for improvement… imagine a scenario where you have multiple sources dumping into the same container but the shard key is called “lang” in one source but “language” in another. As constructed, this can’t work. Thankfully we don’t have any such contrived scenarios here, so we press on.

Connect all the processors & RUN!

At long last we can wire up our full pipeline and start getting data from the Twitter gardenhose and write it into CosmosDB.

The speed of ingestion will depend on a few factors, including by by no means limited to your Resource Unit provisioning. GetTwitter can yield data at a pretty good clip, but you will almost certainly be bottlenecked by the push data to CosmosDB. You may even see Azure error code 16500, which indicates you exceeded your throughput threshold. I hit this with my RU set to 400, but when I scaled up to 800 the problem went away. As usual, your mileage may vary.

Explore the data in the Azure console

Running queries in CosmosDB will look different depending on your initial choice of API. Since we’re using the MongoDB API, this is going to feel very similar to how things worked when we were directly using MongoDB. From the console, navigate through Data Explorer and on into Documents underneath your database and container.

You’ll see a list of your documents & shard key, and clicking on one will show you the actual document.

Open a New Shell

This will drop you into what appears to be a MongoDB shell. Of course it is actually the Azure CosmosDB flavor of it, but for all intents and purposes, it is a MongoDB shell. Many of the commands we used in Part 1 are going to apply here:

Immediately you’re reminded that every operation you run consumes RUs. We’re just one user with 11k scaled down tweets, but it is easy to imagine a multi-user environment or scaled up web application that could blow through RUs in grand fashion. Getting your RU provisioning right is vital.

Run a hashtag frequency

As we’ve discussed, counting up hashtags is something of a challenge since they are nested inside an array that we need to somehow flatten out. Fortunately we’ve already got this all worked in Part 1, as the MongoDB query syntax carries over to CosmosDB.

db.twitter.aggregate( [ 
    {$unwind: '$entities.hashtags'}, 
    { $group: { _id: '$entities.hashtags.text', 
               tagCount: {$sum: 1} }}, 
    { $sort: { tagCount: -1 }} 
]);

Actually running this in the shell gives us the result we’ve been looking for. Having done this exercise several times over the past few weeks, I finally didn’t expect to see COVID19 near the top. Accounting for variations on a theme, it isn’t too far off the lead. Brazil Big Brother seems to have lost steam, however.

I should also point out that this aggregation consumed 756.04 RUs, so if you plan to do this sort of work on CosmosDB you’ll be hit in the face with the cost. Honestly, this is probably a good thing. Having users be aware of the cost implications of their query activity at run time will likely pressure them into better behavior. It certainly made me be aware of my actions as I worked through this exercise.

Conclusions & Next Steps

CosmosDB is a strange bird. We only explored the MongoDB API, but the fact that it can act like MongoDB, Cassandra, SQL, and other flavors of NoSQL suggests we’ve only just scratched the surface. It is hard to not just do a direct comparison against MongoDB….so I won’t fight the urge. From a usability standpoint, you’ll have no trouble working with CosmosDB if you’re versed in MongoDB. The Azure console makes it easy to go from 0 to 60 without burning an afternoon reading a mountain of documentation. Granted it’s pretty easy to get rolling with MongoDB as well, but an administrative UI would be welcome and Azure has that knocked out of the park. Then of course there is the whole host/manage it yourself vs fully managed in the cloud debate, which I’m not even attempting to address. But if you like MongoDB and don’t want to host it and don’t want to manage it, well, I might look hard at CosmosDB. The query language is concise, if somewhat arcane, but I had no problem getting to the result I wanted. I hope to revisit CosmosDB in the future to explore some of the other API choices.

In Part 4 we will hook up with the could-based Snowflake and I can assure you that a good time will be had by all. Until then, drop a comment and let me know what you think about all this. Oh, and don’t forget to tear it down so you don’t blow your inheritance on Azure fees.

Nifi templates available on GitHub

You can get the nifi templates for this flow from my GitHub: https://github.com/supahcraig/nifi_NoSQL/blob/master/twitter_CosmosDBMongoDB_nifi_template.xml

Featured

The Time I Wanted to Send Twitter Data into a NoSQL Database Part 2: Couchbase

It is happening to everyone across the globe– Coronavirus got you down and you’re stuck at home with all this Twitter data and you don’t know where to put it. Couchbase to the rescue! They claim to be NoSQL with NoEQUAL, so let’s see what that’s all about as we continue our NoSQL exploration.

In Part 1 of this series, we went through a preliminary setup which we will build upon as we stick our toes into the warm waters of Couchbase. If you haven’t gone through that exercise, no worries; you can just grab the nifi template off my GitHub and import it into your nifi instance. If you don’t already have nifi running with Twitter data flowing in, I detail how to get up and running with a nifi Docker container in Part 1: The Time I Wanted to Send Twitter Data into a NoSQL Database Part 1: MongoDB. What we’re going to do next will assume you have that piece in place.

Send the Data to Couchbase

Couchbase is a NoSQL key-value document-oriented database and well-suited to handle enormous amounts of JSON data. Like many NoSQL databases, it is schema-less so it provides the flexibility of evolving data without downtime or other manual intervention to accommodate changes….Twitter is known to evolve their schema over time, so a database that can quietly handle that seems like a good fit, at least based on that criteria.

Spin up Couchbase

You can download Couchbase Server from their website, but I prefer the Docker approach, especially since Couchbase has put several official images out on Docker Hub. So let’s spin one up

docker run -d --name couchbase -p 8091-8096:8091-8096 -p 11210-11211:11210-11211 couchbase

Once the container is up, navigate to localhost:8091, which is where the Couchbase web UI lives. There is a short setup process you will be guided through:

  • Setup New Cluster
    • Assign cluster name (I picked “testcluster”)
    • Create admin username/password
  • Accept Terms
  • Configure disk & memory
    • The defaults are fine for our purposes
Add a Bucket

Couchbase stores key-value pairs in things called buckets, which is somewhat analogous to a table in the relational world. Of course if there is no schema defined, then the concept of a table is really a lot more fluid than what you’re probably used to. Bucket is really a good name for it in the sense of when you go to Harbor Freight and fill up a bucket with a bunch of random stuff; it could be all tools but it could also be a bag of gummy bears. It is probably convenient if everything in the bucket is reasonably related, but it certainly doesn’t have to be. I mean, who doesn’t want to query about hammers and delicious candy in one fell swoop? Ultimately what you put in the bucket is up to you.

To create a bucket, use the left hand navigation bar and go to Buckets, and then click ADD BUCKET

You’ll need to choose a name for your bucket (I went with the obvious ‘twitter’). Protip: if you pick a name that has certain special characters, when it comes to writing queries you may need to wrap your bucket in back ticks. But you do you, so name it whatever you like and deal with the consequences. The remainder of the default settings should be fine.

PutCouchbaseKey Processor

Now that we have Couchbase up and running, it is time to configure nifi to send data to Couchbase. I should take a moment to shout out to Matthew Groves’ treatment on this topic on the Couchbase blog titled Getting Started with NiFi and Couchbase Server. When I first had this whole idea, his post was the one that got me rolling on Couchbase. That article has some other useful nifi related stuff worth checking out.

Back to the action…drop a PutCouchbaseKey processor on the canvas and double-click to configure it. You will need to create a new Couchbase Cluster Controller Service. Give it a good name since nifi does a terrible job of organizing your controller services (by which I mean it makes no attempt whatsoever).

Configure the Controller Service

Once you create the new service you will be taken to the list of your controller services. Find the one you just created and click the gear to configure it. This controller service assumes Couchbase is running on the default 11210 port. If your container is running on a different port, add :port to your connection string. If you want to go completely turbo with your multi-node cluster, I’ll just refer you to the Couchbase connection string documentation.

Click APPLY, which will take you back to the list of controller services.

Enable the Controller Service

Controller services must be enabled before they can be used, so click the lightning bolt to enable it. If you need to make changes later, you’ll need to first disable the service before you can make any changes. At this point you can continue configuring the actual processor.

Configure PutCouchbaseKey Processor

At long last we can configure the remainder of the processor. Tell it your bucket name, and since the Twitter data is JSON I’ll make the bold assumption that you can determine if the Document Type should be Json or Binary. Also, you can just look at the screen cap below and copy what I did.

For the Document Id, we need to make a call back to Part 1 of this series, where we identified a few elements (“id” & “lang”) in the Tweet payload and pushed them into flow file attributes. While it isn’t required by Couchbase, you might find it handy to supply a unique key for lookups later on. If you don’t have a key, Couchbase will assign one but it probably won’t be useful for lookups. Since each Tweet does have a unique ID (“id), we’ll use it as our key. Here we get to use nifi’s expression language to pull it from the flow file attribute and into this processor: ${id}, which will map into your bucket as the key. The actual flow file contents are mapped in as the value side of the key-value equation. It’s truly magic.

Connect all the processors & RUN!

At long last we can wire up our full pipeline and start getting data from the Twitter gardenhose and write it into our Couchbase bucket.

You should only have to let it run for a minute or so, I let it collect about 10k records. You can track that in the metrics displayed in the nifi processors, or by looking at the Buckets tab in the Couchbase console. You can see I loaded 10,089 tweets. Later on you’ll see that you could do it with a COUNT(*) .

Counting the Hashtags

What first interested me in Couchbase was their N1QL (pronounced “nickel”) query language. It is advertised to be very SQLesque which makes for a much easier pivot for those of us coming from the relational world. Of course if you’re a SQL hater, you’ll probably think this is stupid and wonder why it even exists. Thankfully the NoSQL world has something to offer just about everyone.

Create Indexes

One thing you’ll learn about Couchbase is they love their indexes. In fact, you can’t even query a bucket without a primary index defined. Honestly, I’m not sure why it isn’t created along with the bucket. Secondary indexes are also a thing, and they can be rather complex from composite indexes to function based and beyond. To create one, go to the Query tab in the console and run this query

create primary index idxl_twitter on twitter;

We may decide we want some additional indexes, but this will get us off the ground. The Indexes tab will show what indexes you have in place, etc. You can also drop indexes from that view.

Run Some Queries

The N1QL documentation is pretty hearty and worth some bedtime reading. If you know ANSI SQL, you are probably already off to the races. Queries can be ran directly in the console, and the output can be viewed as JSON or in a more traditional table-style view which can get a little hairy if your data has complex nested structures….like our Twitter data has. To refresh our memory, let’s examine the portion of the tweet that holds the hashtags and in particular lets find a tweet that actually has a hashtag in use.

select entities
from twitter
where entities.hashtags != [];
{
        "entities": {
            "hashtags": [
                {
                    "indices": [
                        21,
                        28
                    ],
                    "text": "प्रमोट"
                },
                {
                    "indices": [
                        57,
                        63
                    ],
                    "text": "ट्वीट"
                },
                {
                    "indices": [
                        67,
                        75
                    ],
                    "text": "रिट्वीट"
                },
                {
                    "indices": [
                        87,
                        95
                    ],
                    "text": "रिट्वीट"
                },
                {
                    "indices": [
                        116,
                        121
                    ],
                    "text": "फॉलो"
                }
            ],
            "symbols": [],
            "urls": [],
            "user_mentions": [
                {
                    "id": 1248082561105940480,
                    "id_str": "1248082561105940480",
                    "indices": [
                        3,
                        13
                    ],
                    "name": "Komal Panday (TPN)",
                    "screen_name": "KomalP666"
                }
            ]
        }
    }

This came back in 11 seconds and shows the entities sub-document in all its complex glory. But that filter looking for hashtag arrays which aren’t empty might benefit from an index, and the console has an advisor that will guide you on what indexes might benefit your query.

There are even buttons in that Advisor pane which go ahead and build the index for you, so I had it build both indexes which were recommended. Running the same N1QL again brought back the result in 3.3 seconds. Congratulations, you were now a Certified N1QL Tuning Practitioner.

Run a Hashtag Frequency

Counting the hashtags is something of a challenge since the hashtags aren’t a simple scalar thing we can group by & count up. We need to first flatten out that array to make it something we can aggregate. Enter the UNNEST keyword which works similar to the unwind function we used in MongoDB. It is best demonstrated with an example where we unnest the hashtags and include the tweet ID so we can see how a single tweet with multiple hashtags is transformed into a single row (that is, single JSON document) per tweet per hashtag.

Switching to JSON view reminds us that we aren’t actually looking at traditional tables but rather we have JSON in & JSON back out:

Which brings us to our actual hashtag count N1QL, where we wrap the unnest query in an inline view to perform our aggregation. This is 101 stuff in the SQL world, but probably not what you expected in the NoSQL world. It certainly isn’t what I expected.

SELECT count(*) as htag_count, x.hashtag_text
FROM (
SELECT twitter.id
     , htags.text as hashtag_text
FROM twitter
UNNEST twitter.entities.hashtags as htags
WHERE 1 = 1
  AND twitter.entities.hashtags != []
) x
GROUP BY x.hashtag_text
HAVING count (*) > 1
ORDER BY htag_count DESC;

Which returned these top hashtags. Again, the most surprising thing (aside from the fact that we were able to do this against complex nested JSON with something that looks exactly like traditional SQL) is that on April 11 COVID19 is only the 3rd most popular hashtag, and by a 3:1 margin. #flattenthecurve

Conclusions & Next Steps

It’s probably too early too draw any conclusions, but I have to admit I really like Couchbase. MongoDB definitely has the corner on hype, but Couchbase seems to be extremely powerful and the web UI (of which we only barely scratched the surface) comes with enough administrative tools to facilitate tuning for high performance without any additional setup. There are also other features beyond just Query, such as Search and Analytics which expand the capabilities even further.

Of course we haven’t spent any time investigating the internals, what clustering looks like, cost/benefit of a shared nothing architecture, etc, so just ingesting data and running some queries is not a be-all-end-all test. But based on what I’ve seen thus far, I can’t imagine why it was voted among the Most Dreaded Databases on the 2019 stack overflow Developer Survey. There’s no accounting for taste, I suppose. I conclude that their claim of NoSQL with NoEQUAL remains plausible. I would love to hear your thoughts on Couchbase, so please drop me a comment and let’s discuss it.

In Part 3 we’ll venture into the cloud and look at Azure’s CosmosDB and see how big a bill we can run up in a short amount of time.

Nifi templates available on GitHub

You can get the nifi templates for this flow from my GitHub: https://github.com/supahcraig/nifi_NoSQL

Featured

The Time I Wanted to Send Twitter Data into a NoSQL Database Part 1: MongoDB

You can’t swing a dead cat without finding a thousand articles on the internet about how NoSQL databases are the greatest thing since whatever the previous greatest thing was, and how they’re great for unstructured & semi-structured data. The problem is there is a lot of hype and buzzspeak out there, and with so many different flavors of NoSQL databases available and very little solid guidance on when you should use one over another.

This series of articles isn’t going to make you a NoSQL expert, but over the next several installments we will work through how a particular use case might be implemented on a handful of NoSQL databases with an actual working example (gasp!). By spinning up working POC’s for these systems we will gain at least a cursory understanding of what working with that flavor of NoSQL might look like. Going into this, I had virtually no experience in that realm.

So what are we actually going to do?

We’re going to count up hashtags off a stream of Tweets. I like this one because since the Twitter payload has the hashtags nested as an array within a sub-document of the full Tweet document and is something that isn’t necessarily well-suited to a relational database.

The formatted JSON was over 600 lines, so let’s look at just the portion that has the hashtags so we can see what we’re up against (and begin to see why a relational solution might be a bad fit).

"entities" : {
      "hashtags" : [ {
        "text" : "ILSen",
        "indices" : [ 45, 51 ]
      }, {
        "text" : "ILPol",
        "indices" : [ 52, 58 ]
      } ],
      "urls" : [ {
        "url" : "https://t.co/UKUNUXNles",
        "expanded_url" : "https://twitter.com/i/web/status/1247187321247510534",
        "display_url" : "twitter.com/i/web/status/1…",
        "indices" : [ 109, 132 ]
      } ]
}

The entities key holds the hashtags, URLs, and user mentions and is at the root level of the tweet payload. The hashtags themselves are nested inside the entities as an array of objects which include the actual hashtag text and the positional indexes of the hashtag within the tweet text. Our goal is to count up all the text values inside that array across all the tweets we will capture.

NoSQL Choices

The plan is to work this up for the following popular NoSQL databases. Drop me a comment if I didn’t list your favorite, I’m happy to give it a shot.

  • MongoDB
  • Couchbase
  • Cassandra
  • CosmosDB
  • DynamoDB
  • PostgreSQL w/JSONB

Nifi Environment Setup

This will form the basis for each installment — a Docker container running Apache Nifi which consumes a Twitter API to give us a good data source with as much volume as we can handle. I pulled down the latest, which at the time was 1.10.0-RC3. We will be building out a small flow (this nifi template is available on my GitHub):

  • Get the tweets via GetTwitter
  • Isolate & split off the deletes via RouteOnContent
  • Add some elements of the tweet as flow file attributes via EvaluateJsonPath

This is what it should look like when we’re done:

Spin up the docker container

docker run -d --name nifi -p 8080:8080 apache/nifi

Verify it’s working by opening up a browser tab to localhost:8080/nifi

You’ll also need a developer account from Twitter in order to access the APIs. Setting that up is left as an exercise to the reader. Once you’ve completed that excursion, you will be given 4 access keys by Twitter: API key, API secret key, Access token, and Access token secret. These will be used to authenticate from within nifi.

GetTwitter processor

Drop a new processor on the canvas for GetTwitter and then double-click on it to configure it, using the keys provided by Twitter. You’re free to use the filter parameters as well, they might even work.

Remove the deletes

Turns out there are a lot of deleted tweets and they use a completely different JSON schema so it works out to our advantage to identify those and route them elsewhere (in this case, we’ll route them to /dev/null). The easiest way to do this is to use a RouteOnContent processor with a regular expression to isolate the deletes from the tweets:

Add attributes to the flow files

Lastly, some NoSQL databases require a primary key and/or partition or sharding key. Without spending a ton of time on partitioning strategy, I’ve opted to use the Tweet “id” as the primary key and the “lang” field as the sharding key if necessary, and then pushing those elements into FlowFile attributes named id & lang, respectively:

Connect all those boxes and you’ll just about be ready to actually do something!

Send the Data to MongoDB

For my money, the easiest way to play with many database technologies is to simply spin up a Docker container, which is exactly how we’re going to set up MongoDB. MongoDB is a document database, and as you have probably heard, it is web scale. If you have a ton of JSON you need to ingest, it could be a really good option for you. Especially if you are one of those tortured souls who don’t like SQL.

docker run -d --name mongo -p 27017:27101 mongo

As shown above, there is no user authentication, but if that sort of thing is important to you setting it up after the fact requires a container restart which you might not want to bother with. If you specify these two environment variables when you start the container mongoDB will run in auth mode, allowing you to actually create users & require authentication:

docker run -it --name mongo \
-e MONGO_INITDB_ROOT_USERNAME=admin \
-e MONGO_INITDB_ROOT_PASSWORD=password \
-p 27017:27017 mongo

PutMongo processor

We first need to set up our nifi processor. Put on your surprised face, we’ll be using the PutMongo processor.

Configure Client Controller Service

Create the Controller Service

The first step in configuring the processor is setting up a client controller service. In the drop down select Create New Service… and in the dialog pick MongoDBControllerService and then name it appropriately. (Spoiler alert: this will not be the only MongoDB controller service we’ll be creating on our journey, so pick a useful name). Click CREATE.

Configure the Controller Service

Once the controller service has been created, it will take you to the Controller Services list. Find the one you just created, and click the gear to configure it. The Mongo URI will be the connection string to your mongoDB instance. The mongoDB connection string is generally of the form

mongodb://username:primarypassword@host:port

Now the default configuration doesn’t have any user authentication set up, so you can simply remove the “username:primarypassword” bit and your connection string becomes mongodb://host:port

Click APPLY, which will take you back to the list of controller services.

Enable the Controller Service

Controller services must be enabled before they can be used, so click the lightning bolt to enable it. If you need to make changes later, you’ll need to first disable the service before you can make any changes. At this point you can continue configuring the actual processor.

Interestingly, the PutMongo processor has a parameter for Mongo URI here as well as in the controller service. If you set it here you can avoid the hassle of using a controller service, but it is probably a nifi best practice to connect through a controller service since the services can be used in other processors. In either case the credentials are stored in plain text, which is an open Jira (https://issues.apache.org/jira/browse/NIFI-7037)

Create Database & Collection via MongoDB Shell

Next we need to set the Mongo Database Name and Mongo Collection Name. Which of course we don’t have yet, so we’ll need to go to the MongoDB CLI. The easy way would be to drop into a bash shell in the container and simply running “mongo,” but this seems like a good time to walk you through installing the mongo client on your Mac. Open a terminal window and use homebrew to install the mongo client (without installing the full on database)

Install the Mongo Shell on MacOS and Connect

$ brew tap mongodb/brew
$ brew install mongodb-community-shell


$ mongo localhost:27017

Or if you opted to use the authentication when you stood up the container:

$ mongo "mongodb://admin:password@localhost:27017"

This drops us into the MongoDB shell (aka CLI).

Create a Database & Collection

Turns out creating databases and collections is really easy, and the way I’m about to show you is somehow the hard way.

> use testdb
Switched to db testdb
> db.createCollection("twitter")
{ "ok" : 1 }
> show collections
twitter

Of course it turns out that MongoDB will create the database & collections on the fly when you try to insert data, but where is the fun in that? Now that we have those 2 elements, we can complete our nifi configuration.

Configure PutMongo processor

Connect all the processors & RUN!

At long last we can wire up our full pipeline and start getting data from the Twitter gardenhose and write it into MongoDB.

You shouldn’t have to let it run for very long. A minute or so will give you several thousand records, and 10k will be plenty suitable for our purposes. You can look at the In count on the PutMongo processor or you can count them from the MongoDB shell.

Counting the Hashtags

I was not familiar with the MongoDB query language & aggregation framework when I started out on this adventure so I fully anticipated this challenge from the outset. MongoDB itself is well documented, but I opted to stand on the shoulders of giants in order to fast-track what I needed to know. So thanks to Jonathan Freeman’s InfoWorld article on Twitter Analysis the easy way with MongoDB for helping me get through that. SQL is my jam, so this was well out of my comfort zone. Here’s our hashtag count query:

db.twitter.aggregate( [ 
    {$unwind: '$entities.hashtags'}, 
    { $group: { _id: '$entities.hashtags.text', 
               tagCount: {$sum: 1} }}, 
    { $sort: { tagCount: -1 }} 
]);

At a very high level, here is how that query works:

  • unwind flattens the hashtag array into something we can aggregate
  • group will group up whatever is inside that section
  • _id is the field being grouped on
  • tagCount is the alias for the {$sum: 1} operation
  • $sum: 1 is the functional equivalent of COUNT(*) in the SQL world
  • $sort will sort by the contents, in this case by tagCount. -1 indicates sort descending

Execute the query in the shell

Back to the MongoDB shell to execute the query. The only surprising thing is that on April 6 2020, #COVID19 is not the most popular hashtag currently in use. I’ll save you the effort: BBB20 is Brazil Big Brother, the TV show. What a time to be alive.

Conclusions & Next Steps

So there you have it, from zero to result without any real effort having been expended. My initial thoughts on this first step on my adventure into NoSQLand is that MongoDB solves a problem I couldn’t (easily) solve in Oracle; certainly not in a 5 line query. Ingesting the data was simple, although the lack of a web UI might turn off some people. The query language will be a stumbling block for the traditional relational database user, but unless us relational people don’t ever plan on dealing with semi-structured data, there is no avoiding it. Luckily MongoDB is not the only option.

In Part 2, we’ll do this same exercise on Couchbase and see if it poses any unique challenges and find out if their N1QL can make equally quick work of our hashtag count problem.

Until then, leave a comment and let me know what you think. Happy NoSQLing!

Nifi templates available on GitHub

You can get the nifi templates for this flow from my GitHub: https://github.com/supahcraig/nifi_NoSQL

The Time I Used Quantum Mechanics to solve Rock/Paper/Scissors

Last night I was surfing Reddit, and came across a post in r/learningpython where the poster was asking for tips on making his Python program more efficient. The program itself was a Rock/Paper/Scissors implementation, and he used a ton of code to do it. The truth is it was probably a homework assignment and they managed to get Reddit to do the heavy lifting for them (slow clap). On the other hand there are lots of ways to learn, and seeing other people solve problems is a valid strategy. I’m not here to judge.

Our friend, Levi-Civitas

As I was reading the comments I got an idea out of the blue, something I hadn’t thought about since I took Classical Mechanics in grad school, having been first introduced to it in undergrad Quantum Mechanics. That something is called the Levi-Civita Tensor, and thankfully it is less complicated than it sounds. It is usually represented as “epsilon i j k” or εijk and is defined such that if i, j, and k are “in order,” (i.e. i, j, k; j, k, i; etc, which is an even permutation) then the value is +1. If the are not in order (i.e. j, i, k) then that is an odd permutation and the value is -1. If any index is repeated (i.e. j, j, k) then the value is 0. Simple, right?

The TL;DR of Levi-Civita is that if you cycle forward you get +1, and if you cycle in reverse you get -1. And if you don’t cycle at all you get 0. And therein is the connection to Rock Paper Scissors: the 3 “weapons” have a cyclic nature as to what beats what. Let’s set i for rock, j for paper, and k for scissors. To play the game, both players choose their weapon and determine which direction to cycle to find the other person’s weapon. If you cycle clockwise, you win. Counter-clockwise, you lose. And if you pick the same weapon you draw and do it all again.

To put it in terms of Levi-Civita, we will need to supply a 3rd value to complete the cycle, since Levi-Civita requires 3 indices (well, the 3 dimensional version needs 3 indices, but that’s a rabbit hole for another day). Solving for the 3rd index initially had me confused but also turned out to have a very simple solution. The simple case is if both players pick different weapons, the 3rd index is just whatever weapon hasn’t been picked yet. The “more complicated” case is if both players pick the same weapon, but this turns out to be even more trivial since the 3rd index doesn’t even matter. If an index is repeated, Levi-Civita evaluates to 0 regardless of the cycling of the remaining index, and a repeated index is what you get if both players pick the same weapon.

Using what we have just deduced, we can now use Levi-Civita to determine the outcome of the game. Mathematically, it will look like this:

More practically, if player 1 chooses rock and player 2 chooses paper, it would simplify to εrock-paper-scissors which in more mathematical terms is εijk and represents a counter-clockwise cycle, so player 1 loses. But what if player 1 picks rock and player 2 picks paper? That would simplify to εpaper-rock-scissors which in more mathematical terms is εjik. That is a counter-clockwise cycle so player 1 wins.

Great, what does this have to do with coding the game in python efficiently? You’re probably thinking isn’t it easier to just directly define the game rules in the python program and spit out the answer? Well, if you had to write the python to evaluate Levi-Civita from scratch, then this would literally be an academic exercise. But the real beauty of python is that there’s a good chance somebody has already built what you need.

Introducing the sympy Python module

Because of course sympy has a built-in class for Levi-Civita that you can use right out of the box with nothing more than an import statement.

pip install sympy
from sympy import LeviCivita

Using it is unbelievably simple:

r = LeviCivita(i, j, k)

where i, j, and k are just integers, and r is the evaluation of Levi-Civita so r will always be +1, -1, or 0, which is just another way of determining who won the game. Putting it all together with randomized selections we have a very simple way to implement Rock Paper Scissors, and I only had to take a quantum mechanics class to learn the technique.

from sympy import LeviCivita
import random
import time

choices = [0, 1, 2]
weapons = ['rock', 'paper', 'scissors']

while True:
    p1 = random.randint(0, 2)
    p2 = random.randint(0, 2)

    print(f'Player 1 has chosen {weapons[p1]}, Player 2 has chosen {weapons[p2]}....')

    k_index = list(set(choices) - set([p1, p2]))[0]

    r = LeviCivita(p1, p2, k_index)
    if r == 0:
        print(f"It's a draw; both players picked {weapons[p1]}")
    elif r < 0:
        print(f'Player 1 wins; {weapons[p1]} defeats {weapons[p2]}')
    elif r > 0:
        print(f'Player 2 wins; {weapons[p2]} defeats {weapons[p1]}')

    print('-' * 10)

    time.sleep(3)

There is no magic there whatsoever, outside of LeviCivita itself. I often joke that my professors remain worried that one day I will use my physics degree in anger. Today Drs. Allen, Brandt, and Daniels can rest easy, as I have satisfied the urge to use my degree but have done so in a way that probably won’t endanger lives.

What have we learned today?

Did you find this interesting? I’m always fascinated when complex math topics show up in random places like kids game strategy, but this may be nearing the limit of topic complexity vs game simplicity. But the real lesson here doesn’t have anything to do with any of that. The real lesson is that it is very likely you can pip install just about anything you want.

pip install world-peace

Design a site like this with WordPress.com
Get started