Storing all that good data (Part 2)

Marcus Isherwood 20th March 2019

Welcome back! In part 1 of this blog series, we set up our Contact Flow in Amazon Connect in order to capture the response from our customer. This is then written to CloudWatch Logs ready for us to collect it to store it into Elasticsearch. In our second part, we need to transport that data over to it’s mid term resting place. Kibana works really well for data that is being viewed from minutes through to about 6-12 months, after that however, most organisations tend to use more robust ‘big data, analytics and reporting’ solutions.

First things first, you need an Elasticsearch cluster. For the purposes of the blog and gaining a technical understanding of the architecture, we’re going to avoid subjects such as right sizing, scaling or reliability, all of those are covered in depth by the documentation for Elasticsearch. We’re going to start with configuring an endpoint for us to begin sending our data to, in order for us to visualise it in a later step.

Configure Elasticsearch

Spin up an elastic search cluster, use as many defaults as possible, as stated earlier, we aren’t trying to show how to configure production grade services, just how data can be shifted about and used to gain insights into what’s happening.

Step 1:

Navigate to the Elasticsearch service in the dashboard and create a new domain, use the following settings (in the demo we configured everything in us-east-1):

Elasticsearch

Press Next on the screen.

Step 2:

Create the cluster with the defaults as follows:

 

Elasticsearch

Press next

Step 3:

Take note of the choices being made here. We are going to use a public cluster, which is not recommended if you are building a productions system.

Elasticsearch

Step 4:

Review the configuration to make sure it’s set as above. Click Next and then wait for the cluster to show as active, as shown below:

Elastisearch

NOTE: as said before, we’re not focussing on best practice or security, the purpose of our blog is to help you understand how you get the most out of data you are collecting from customers in your contact centre. The policy will however ensure that you at least need to use a set of valid credentials from your AWS account to post data to the Elasticsearch cluster.

Once your Elasticsearch is setup, you actually end up with a Kibana instance too, that means you don’t really have to do anything from this perspective with the washboarding tools, or with logstash, the configuration and provisioning is handled by Amazon.

Make a note of the hostname.

Submitting Data to Elasticsearch

Great, now we need to deliver the data, Elasticsearch accepts JSON formatted documents and by default they are not sent to the endpoint by CloudWatch logs. We have to add a subscription to the log group (manifested as a Lambda function) that will be triggered each time a new log arrives. The Lambda will need to read the log data package, extract any log events and play them into Elasticsearch as ‘Docs’.

We have chosen to solve this with Python, however most of the examples on the internet are NodeJS based. To that end we need to point you in the direction of how to package Python Lambdas. It may seem overkill to begin with, but as you work with Python we think you’ll see the benefits. If you want to follow this, you will need to ‘package’ the lambda into a Zip and upload it. Here are the instructions from AWS:
https://docs.aws.amazon.com/lambda/latest/dg/lambda-python-how-to-create-deployment-package.html

Luckily though we have provided the raw Lambda script and a build script that will use Python Virtual Env and create a zip file for you. This can be uploaded, but it’s worth you understanding what it does. In this blog, we’ll go over parts of the Python script so you understand what it is doing, but we will not explain the build process etc.

You can find the code here and use the build.sh script to create the zip file needed for your lambda:

https://github.com/ecs-cloud/cwl-logs-to-es

There are a few other parts to the Lambda that you will need to configure, we use Environment Variables in all of our Lambdas so they work with our automation and Infrastructure-as-Code constructs. The first is the region and endpoint for Elasticsearch:

Parameters
deployment_region = os.environ[‘AWS_ES_REGION’]

elasticsearchEndpoint = os.environ[‘AWS_ES_ENDPOINT’]

On the configuration screen of your Lambda, you can configure this as follows:

Elastisearch

Also as we were saying we need to set up the credentials to be able to post data to the Elasticsearch endpoint, this is done as follows and uses the `elasticsearchEndpoint` variable:

Credentials
credentials = boto3.Session().get_credentials()

awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, deployment_region, ‘es’, session_token=credentials.token)

Explanation of this is beyond the scope of this blog, ultimately you can read about how to sign requests in the following link. https://docs.aws.amazon.com/general/latest/gr/signature-version-4.html

As we were explaining we use a lot of automation and therefore we generate the endpoint for Elasticsearch without the full URL string, meaning we have to construct this after we’ve fed through the environment variable like so:

ES URL
elasticsearchUrl = str(‘https://’ +  elasticsearchEndpoint + ‘/_bulk’)

Note here we are going to use the `/_bulk` upload facility in ES as it makes pushing data into ES way easier.

All lambdas must have a handler though, where the event that is triggering the invocation will be passed. You can see in the script where we access and begin to manipulate the data.

Event Extraction
outEvent = str(event[‘awslogs’][‘data’])

outEvent = gzip.GzipFile(fileobj=StringIO(outEvent.decode(‘base64′,’strict’))).read()

cleanEvent = json.loads(outEvent)

The three lines here, extract our data from the event, unzip and decode it since it is sent as base64. This is a necessary step to access the JSON events that will be pushed into Elasticsearch. You can see we set the `cleanEvent` which contains this data as a decoded string.

Set Values
logStreamName = cleanEvent[‘logStream’]

messageType = cleanEvent[‘messageType’] # Are we meant to use this in _type?

awsAccountNumber = cleanEvent[‘owner’]

subscriptionFilters = cleanEvent[‘subscriptionFilters’] # Maybe surplus?

logGroupName = cleanEvent[‘logGroup’]

We now pull the ‘meta’ values for our logs. These are the parts of the data we need to let Elasticsearch know what we are sending it, they are outside of the actual log data and help ensure the correct Index is used in ES. What we can also note here, is contained in the data could be more than one log event, which is why the next segment will loop through the data to extract the meaningful events.

We loop through as follows:

Main Loop
for log in cleanEvent[‘logEvents’]:

That means that for each and every item we find in a log, we can perform a set of actions, first we access just the message and turn it into a object we can work with by doing the following:

Message Object
message = json.loads(log[‘message’])

We now need some really important items from our message, the kind that allow ES to index and search, such as Timestamp:

Per message values
ContactId = message[‘ContactId’]

ContactFlowModuleType = message[‘ContactFlowModuleType’]

Timestamp = message[‘Timestamp’]

messageId = log[‘id’]

The data that we need to insert into Elasticsearch actually comes in 2 parts which have to be created in such a way that they can be used with the `/_bulk` upload feature. This means that for each message in a log event, we will post data to Elasticsearch. The index chunk of the message looks as follows and uses the values we have been extracting up until now:

Index Object
index = {

 

‘index’: {

 

‘_index’:indexName,

 

‘_type’: logGroupName,

 

‘_id’: messageId

 

}

 

}

The second part of the data we will send to ES, looks as follows and contains the part we will be most interested in when looking at Kibana later on.

Doc Object
doc = {

 

‘@log_group’: logGroupName,

‘@log_stream’: logStreamName,

 

‘@message’: json.dumps(message).replace(‘”‘, ‘\”‘),

 

‘@owner’: awsAccountNumber,

 

‘@timestamp’: log[‘timestamp’]

 

}

After we get to this point we need to loop through the message and correct the format so we can post a document that ES will understand and index. We use the following commands to do this:

Loop and format
logMessage = ast.literal_eval(log[‘message’])

for item in logMessage:

doc[item] = logMessage[item]

stringToSend =  str(json.dumps(index)) + “\n” + str(json.dumps(doc)) + “\n”

stringToSend = stringToSend.encode(‘utf-8’)

In Python we can use a function, as with many languages to repeat tasks. In this case the posting of a message to ES uses such a function:

Post to ES Function
def sendToEs(data):

# post the data

print(“Sending to elasticsearch…”)

response = requests.post(elasticsearchUrl, auth=awsauth, data=data, headers={‘Content-Type’: ‘application/x-ndjson’})

Here we can see that the function is called `sendToEs()` and it accepts `data`. Inside the function we use a module called requests to build a post event that will use the elasticsearchUrl, awsauth object, data and fixed headers to post data to Elasticsearch. This call doesn’t work if you have not correctly configured the credentials and the IAM role that allows for the post action on Elasticsearch.

If we go further back, we can now see that the following line will trigger this function and submit data to ES:

Function Trigger
sendToEs(stringToSend)

Good, now you can test the Lambda with a test event, however you will have to add the JSON in correctly to the test event. Here is a sample below of a log, base64 encoded so it will test all of the features of your code. Once you have it digesting your test log without throwing errors, you need to create the subscription in CloudWatch Logs. You can do this from the Lambda console as follows:

Elasticsearch

You need to check that the configuration has set correctly, you will press add and then save the configuration. It should look as follows:

Elasticsearch

Once you have completed the Lambda function and the subscription, you should be able to call your contact flow as we did in the first blog and ask it what you will, such as “I want to buy some Cheese” or whatever is relevant to the project you are working on.

Here you should see in Elasticsearch under the ‘Indices’ section that you are getting data:CXExperience

Great, so you got this far. It’s been quite a long blog but we thought it would be worth stepping through so that the logic is understood. Technically with a few modifications you can use this for any other CloudWatch Log that you need to stream to Elasticsearch.

The next blog will explain visualisation using Kibana, which you already have access to through the Elasticsearch console if you can’t wait.

 

 

 

Found this interesting? Why not share it: