Real Time Streaming Twitter Data Pipeline With AWS Services

Uday Veeramaneni
8 min readJan 19, 2021

Most of the companies are now modernizing their data infrastructure a shift in business intelligence to meet the real-time demands of the business. The ability to react in real-time to issues and trends can have a significant advantage and help to identify issues the moment they occur or in some cases before they occur. Think about this in Financial and Health Care terms and the advantages will be enormous. These timely decisions can only be achieved when the data generated is collected, processed, analyzed, and visualized as and when the data is generated. But there are challenges, some of them include how to get the data and where the data should be stored. Streaming data can be flooded from many different sources, for example, the web, social networks, sensors, mobile apps, and the list continues. Moreover, the data can be delivered in a wide variety of formats with no fixed structure. Cloud services have made real-time streaming easy by providing tools that can ingest and analyze millions of events per second from data generated around the globe and building cohesive data pipelines. I have to say that I am in love with cloud computing and I believe there are many opportunities to develop all the great features and apps with the help of cloud technology and services which otherwise would have been difficult and time-consuming. My name is Uday Veeramaneni, I have a passion for Analytics and Data Engineering, in this blog, I will describe at a high-level, workflow, and implementation steps of near real-time streaming Twitter data with AWS cloud. I have researched many sources to gather the information to implement this and to respect the work of them I am not monetizing this post and the whole purpose is to share the knowledge I gained, remember sharing is caring.

The data pipeline of streaming Twitter data includes extracting the tweets containing specific keywords using Twitter API(deployed on AWS EC2), putting these tweets into the Kinesis stream that then delivers this data into Amazon S3. A python Lambda function is triggered when a new object is created in S3 that then transforms the data into the required structure and loads into AWS Elastic Search cluster. I tried with different hashtags(Covid, Brexit, Trump),the results are interesting.

The transformed data triggered by the Lambda function can also be loaded into the DynamoDB. There are many alternatives depending on the specific requirements or business needs for example you can load data from Amazon S3 into Amazon redshift by AWS Glue with ETL (Extract, Transform and Load) operations.Connect Tableau to Amazon Redshift for data visualizations.

The below picture shows data pipeline architecture.

The data pipeline includes the following high-level steps

Step 1 Register for the twitter app

a) Visit the twitter developer site at https://developer.twitter.com/en/apply-for-access and apply for the developer account

b) Once the account is approved, log in to https://developer.twitter.com and sign in with your account

c) Navigate to “Projects and Apps” and create App

d) Next create the access token. Make a note of OAuth settings (Consumer key, consumer Secret, Access token, Access Secret)

Step 2 Create Amazon ElasticSearch Service Cluster

AmazonElastic Search is ideal for querying, searching, and analyzing large amounts of data from services like Kinesis. It also provides support for opensource ElasticSearch APIs and integration with Kibana

a. Log in to AWS ElasticSearch Service Console

b. Click “Create a New Domain”

c. Select “Deployment Type”. For this example, I have selected “Development and Testing”

d. Keep any other default selections and click Next

e. Provide the name for the Domain

f. Select “Instance Type”. I have selected t2.small.elasticsearch as part of the free tier

g. Keep the default values and click “next”

h. For the Role, I have selected “Custom Access policy” and provided the IAM ARN (Used IAM role that i created earlier with AWSLambdaElasticsearchExecutionRole policy attached) with an action allow.Add IPV4 address details as appropriate. For “Network Configuration” i have selected public access.

i. Click “create”(Please note that this could take around 5 to 10 minutes) for the domain to become active.

Step 3: Create the Firehose Delivery Stream

AWS Kinesis is a service that helps gather, analyze, and process real-time video and data streams,allow you to ingest real-time data from various sources, such as your application logs or website clickstreams.and format it for your data stores. You can use this to fuel real-time applications

a. Log into AWS Console and select Kinesis services and then firehose

b. Choose “Create Delivery Stream”

c. Select the appropriate details on the Destination page(Delivery Stream Name,Destination-S3,S3 bucket).Click next

d. On the Configuration page add appropriate permissions.Click “Create new or choose”, and choose to “create a new IAM Role”, or use an existing one

e. Leave other default settings choose “next” and then choose “Create Delivery Stream”

Step 4: Create the Amazon EC2 Instance in AWS.

Amazon Elastic Compute Cloud (Amazon EC2) is a web service that provides secure, resizable compute capacity in the cloud

a. Log into AWS EC2 Console choose “Launch Instance”

b. Choose an “AMI”. For this example, I have selected Amazon Linux 2 AMI (HVM), SSD Volume Type

c. Choose “Instance Type” (For example t2.micro)

d. Configure Instance details (Used default VPC and default public subnet)

e. Add storage (selected the default)

f. Configure “security group”. (used the following settings. Type: SSH, Protocol TCP, Port 22, Source 0.0.0.0/0)

g. Launch the instance (Please note that you need to create the key pair)

Step 5 Connect to EC2 instance and run the python script

Connect to the EC2 instance that you launched earlier and create a python script to capture near real-time tweets using the Twitter API. We will be using the Tweepy library to connect to the API and extract the streaming data. You can find the documentation for Tweepy here.Import the following python libraries JSON, boto3, time, Tweepy, and set up OAuth Tokens.Do not share auth credentials with anyone because these values are specific to your app. Write the code to parse and export the tweets into the JSON file.You need to create a class inheriting from StreamListener and instantiate an object from this class and use this to connect to API. You also need to include code providing delivery stream details and to connect to the Firehose using boto3

# Retrieve stream info

kinesis_client = boto3.client(‘firehose’,region_name=’’,aws_access_key_id=’’,aws_secret_access_key=’’) #Fill in AWS access details

Step 6 Create the Lambda function and upload the code.

When the new data arrives in the target s3 bucket, this triggers an event notification to Lambda that then runs the code to transform the data and load the data into an Elastic Search Cluster. You can also add code to analyze the Twitter data based on the emoticons and other text and transfer it into sentiments (Alternatively you can use separate sentiment analysis services, for example, Amazon Comprehend to evaluate the sentiment of the tweets made with specific Twitter hashtag).AWS lambda plays a key role in this pipeline.

a) Log in to AWS Lambda Console with your credentials, Select Functions ->Create Function

b) Provide the name and select the run time. For my example I have used Python 3.7

c) You can either create new role or choose existing role. Make sure you have AWSLambdaBasicExectionRole and AmazonS3ReadOnlyAccess permission policies attached to the role.

d) After creating the Lambda function, you need to add the trigger. For example, S3 bucket details (This is the bucket location where our tweets will be loaded and is the one that was selected during the Kinesis set up). Provide Event type for example “Put” and other appropriate details.

e) Upload the deployment package (zip file of the code). Please note that Handler name should match the main function in the code. Also, if the zip file is more than 3MB we cannot see the code in the console editor.

Once we start running the twitter program in EC2 the tweets would be loaded into the chosen S3 bucket (This depends on the time interval and size specified during the Firehose Delivery Stream creation). As soon as the file is uploaded to the S3, the Lambda function is triggered which then performs parsing and indexing. Then you can use the Kibana in your browser using the URL provided in the ElasticSearch. To start discovering the data in ES Service you need to create an index pattern.

As mentioned in the data pipeline diagram earlier you can have the pipeline to load data into DynamoDB. Firehose Delivery Stream creates source data in JSON format which loads the data into the S3 bucket. When the objects are saved this will trigger the Lambda function. Lambda function reads the S3 object and parses the information which converts into the required format for the DynamoDB table.To create DynamoDB table, follow the below steps. For any addition details on DynamoDB table please refer to AWS documentation

a) Log in to AWS console and select DynamoDB from the services Menu

b) Click Create Table

c) Enter the table name. This is the name of the table to create

d) Enter the Primary Key

e) Select appropriate table settings. I have used the default settings

f) Select all other details as appropriate and click create.

I have also provided an alternative pipeline in the diagram that shows the implementation of streaming solutions using AWS Glue (ETL), Amazon Redshift, and Tableau. You can use AWS Glue to set up continuous ingestion pipelines that stream data from Kinesis.You can also view Twitter streaming data on Tableau or any other visualization tools via Amazon Redshift. I will be providing full details of this pipeline in part 2 of this series(along with full code). At a high level the following steps are involved.

1) Create S3 bucket for Processed data to be stored

2) Create Amazon Redshift Cluster

3) Configure AWS Glue.

4) Connect Tableau with the data source i.e. Amazon Redshift for visualizing the data.

Conclusion

Having the ability to extract, process, and analyze data in real-time provides many advantages for example product recommendations for marketing teams, sentiment analysis, fraud detection, and made machine learning more accurate by making the data available in real-time. The use of many cloud services like AWS made the data pipeline simplified and receiving streaming data easy and to scale millions of records per second. Moreover, services like AWS lambda lets you run the code without provisioning or managing servers. By analyzing data as and when it gets created, we can gain real-time insights into what our customers want at any moment.

--

--