Process Data Creating Topologies In Storm

In part 1 of this tutorial key concepts that are used in Storm were discussed. In that tutorial it was explained Storm topologies are expressed as directed acyclic graphs. The nodes on the graphs are either bolts or spouts. Spouts represent the source of data streams, for example a twitter spout is used to acquire a stream of tweets. The bolts specify the logic that is used to process data. Data emitted by the spouts is processed by the bolts. In this tutorial the main objective is to demonstrate how to code topologies and submit them to Storm.

This tutorial requires some good understanding of Python or Java. The tutorial will not focus on explaining Java or Python concepts but will instead focus on how to use the two languages to develop storm topologies. Storm topologies can be developed in any programming language. The only challenge is to make sure the created components know how to use the thrift definition for storm. The thrift definition is available here https://github.com/apache/storm/blob/master/storm-core/src/storm.thrift. Storm runs as a process in the java virtual machine (JVM) so components created in non JVM languages run as sub processes.
To develop Python based Storm topologies you need at a text editor or an IDE like Eclipse, at least python 2.7, at least java 1.7 and Maven installed. For easily working with Storm a Python module has been developed which is available here https://github.com/apache/storm/blob/master/storm-multilang/python/src/main/resources/resources/storm.py. The storm.py module is used to develop spouts and bolts but the topology definition enabling the components to communicate is defined in Java or Clojure. Due to distributed nature of Storm availing all dependencies required for your python program to run can be tricky. To overcome this challenge you have the option of bundling all the dependencies with the jar file or manual set up on all nodes where they are required. To overcome the deployment dependencies you can also use frameworks like pyleus and streamparse.
We will not attempt to reinvent the wheel by developing a project from scratch so download a python project that is available here https://github.com/Azure-Samples/hdinsight-python-storm-wordcount
python project

Extract the download to show the project directory structure. The python project is found in JavaTopology directory. Open it to show its directory structure.
dir structure

There are two directories ie multilang and src. The multilang directory has a resources sub directory where non Java implementations are stored. This is where you place storm.py and any other python code that implements topologies. This enables directly referencing python code in java.

The pom.xml file contains all the settings that that will be referenced by Maven when building the project. Here we need to add storm as a dependency. Here we also specify the directory where python code is stored. The resources property in this file specifies all python code is found in multilang directory of base project directory. Other options such as the target java version and if the storm-core will be provided by the target deployment system or they should be bundled in jar.
storm pom

The logic to implement spouts and bolts is stored in /multilang/resources directory so open it.
java code

The sentencesspout.py contains the logic that emits words which becomes our data source. Open the file to inspect its contents. Here you just need to import the storm package and instantiate the bolt class. This code will then be called in Java.

The countbolt.py contains logic used to implement a bolt to do the counting. Open it to see the contents. You import storm package and counter method then you instantiate the bolt class.

The splitbolt.py is another implementation of a bolt. This is used to split the words.

In summary you create the python codes for implementing spouts and bolts then place them in the multilang directory. For each bolt or spout that corresponds to a DAG you place the code in its own package.
The src directory contains the Java code that is used to invoke the bolts and spouts created in Python.
java code

The CountBolt.java is used to invoke the python based counting bolt. Open it to inspect the code. There is nothing complex in this code we are just doing imports and invoking the python code. SentencesSpout.java and SplitBolt.java accomplish similar tasks of invoking their corresponding python code.
java bolt

The wordCount.java is the glue that binds together all the components and submits the topology to Storm cluster. Open it to observe the structure of the code. This code connects the sentencespout to splitbolt and it also connects the splitbolt to countbolt.
final topol

Once you have created your python topologies and used java to define how they will operate you can run it on the local machine and also package it in a jar file to be submitted to a cluster.
To run the topology locally you invoke Maven in the manner shown below at the command terminal while you are in the project directory.
mvn compile exec:java -Dstorm.topology=com.microsoft.example.WordCount
To deploy the topology to a remote cluster you package it into a jar file then use a file transfer utility such scp. To package your topology as a jar, while in the project main directory invoke maven as shown below.
mvn package 
After project build is complete a WordCount—1.0-SNAPSHOT.jar will be placed in the target directory of the project. You use scp or your favourite file transfer tool to move the file to a remote cluster. When using scp you need to specify the path where the file to be transferred is located, the DNS of the target server and a pem key file if you are deploying on amazon instances.


Post a Comment

Google Q&A Forum