Aleri, the complex event processing platform from Sybase was reviewed at high level in my last post.
This week, let's review the Aleri Studio, the user interface to Aleri platform and the use of pub/sub api, one of many ways to interface with the Aleri platform. The studio is an integral part of the platform and comes packaged with the free evaluation copy. If you haven't already done so, please download a copy from here. The fairly easy installation process of Aleri product gets you up and running in a few minutes.
The aleri studio is an authoring platform for building the model that defines interactions and sequencing between various data streams. It also can merge multiple streams to form one or more streams. With this eclipse based studio, you can test the models you build by feeding them with the test data and monitor the activity inside the streams in real time. Let's look at the various type of streams you can define in Aleri and their functionality.
Source Stream - Only this type of stream can handle incoming data. The operations that can be performed by the incoming data are insert, update, delete and upsert. Upsert, as the name suggests updates data if the key defining a row is already present in the stream. Else, it inserts a record in the stream.
Aggregate Stream - This stream creates a summary record for each group defined by specific attribute. This provides functionality equivalent to 'group by' in ANSI SQL.
Copy stream - This stream is created by copying another stream but with a different retention rule.
Compute Stream - This stream allows you to use a function on each row of data to get a new computed element for each row of the data stream.
Extend Stream - This stream is derived from another stream by additional column expressions
Filter Stream - You can define a filter condition for this stream. Just like extend and compute streams, this stream applies filter conditions on other streams to derive a new stream.
Flex Stream - Significant flexibility in handling streaming data is achieved through custom coded methods. Only this stream allows you to write your own methods to meet special needs.
Join Stream - Creates a new stream by joining two or more streams on some condition. Both, Inner and Outer joins can be used to join streams.
Pattern Stream - Pattern matching rules are applied with this stream
Union Stream - As the name suggests, this joins two or more streams with same row data structure. Unlike the join stream, this stream includes all the data from all the participating streams.
By using some of these streams and the pub api of Aeri, I will demonstrate the seggregation of twitter live feed into two different streams. The twitter live feed is consumed by a listener from Twitter4j library. If you just want to try Twitter4j library first, please follow my earlier post 'Tracking user sentiments on Twitter'. The data received by the twitter4j listener, is fed to a source stream in our model by using the publication API from Aleri. In this exercise we will try to separate out tweets based on their content. Built on the example from my previous post, we will divide the incoming stream into two streams based on the content. One stream will get any tweets that consists 'lol' and the other gets tweets with a smiley ":)" face in the text . First, let's list the tasks we need to perform to make this a working example.
- Create a model with three streams
- Validate the model is error free
- Create a static data file
- Start the Aleri server and feed the static data file to the stream manually to confirm correct working of the model.
- Write java code to consume twitter feed. Use the publish API to publish the tweets to Aleri platform.
- Run the demo and see the live data as it flows through various streams.
Image 1 - Aleri Studio - the authoring view |
This image is a snapshot of the Aleri Studio with the three streams - one on the left named "tweets" is a source stream and two on the right named "lolFilter" and "smileyFilter" are of the filter type. Source stream accepts incoming data while filter streams receive the filtered data. Here is how I defined the filter conditions -
like (tweets.text, '%lol%').
tweets is the name of the stream and text is the field in the stream we are interested in. %lol% means, select any tweets that have 'lol' string in the content. Each stream has only 2 fields - id and text. The id and text maps to id and text-message sent by twitter. Once you define the model, you can check it for any errors by clicking on the check mark in the ribbon at the top. Erros if any will show up in the panel at bottom right of the image. Once your model is error free, it's time to test it.
Image 2 - Aleri Studio - real time monitoring view |
The image below shows the format of the data file used to test the model
The next image shows the information flow.
Image 3 - Aleri Publishing - Information flow |
The source code for this exercise is at the bottom.
Remember that you need to have twitter4j library in the build path and have Aleri server running before you run the program. Because I have not added any timer to the execution thread, the only way to stop the execution is to abort it. For brevity and to keep the code line short, I have deleted all the exception handling and logging. The code utilizes only the publishing part of the pub/sub api of Aleri. I will demonstrate the use of sub side of the api in my next blog post.
This blog intends to provide something simple but useful to the developer community. Feel free to leave your comment and share this article if you like it.