Hadoop for a Distributed Complex Event Processing Network


Had a discussion with a friend on real-time Map/Reduce for various internet-scale event processing. It so happened that another friend of mine (who, incidentally left to a mobile company) returned the book “The Power Of Events” by David Luckham, he had borrowed from me long time ago.

Interesting coincidence, which led me seeking synergy between the world of CEP and the realms of the Internet of events.

An initial outline from my notes. I will followup with architecture and running code (a good candidate for NY’11 resolution!):

  • Use case
    • The use case is very simple – correlate events happening across the Internet, to solve a class of problems that span recommendation engines, input to pricing strategies and contextual semantics for consumer analytics/inferences.
    • [Update 1/31/xi : Came across SAP High-Performance Analytic Appliance, interestingly very close to what I was describing, but for enterprises]
    • And I think, an infrastructure that employs a combination of Hadoop, real-time analytics, Machine Learning, HBase & coprocessors, will do the trick
      • The answer could be a grid of hadoop-lets – a network of Map/Reduce tasks that perform incremental processing [Google dremel-esque fashion] achieving dynamic scale via elasticity
  • Background
    • Fundamental events happen at the protocol level (usually http exchanges, in the case of the web). Some formal precise annotations map these elemental transactions to an abstract model in a domain.
      • For example when two people buy the same two products, we generate a rule even which says there is a casual relationship between these two products which in turn can be used by a recommendation engine.
      • In the Internet space, this correlation will not be just two transactions, for example, in case of Netflix, this will be a stream of related movies and an ensemble based algorithmic set that determines the correlation for recommendation
    • Usually the events have a temporal relationship, ie A has happened before B, based on timestamps or a sequential ordering.
  • Definitions:
    • POSet: And we end up with a POSet (Partially Ordered Set of events) which can be represented by a DAG(p.100 of [1])
    • Causality : If the POSet points to an event, it can be called a Casual Vector, ie the set of events that are the cause of another event. The casual vector, in effect, is augmenting time with causality, which is good.
    • Aggregation : And events can and should be aggregated to generate a more granular event.
    • Filters : Naturally before aggregation filters are needed to reduce the dimensionality and increase relevance. There is also the concept of contextual filters, which is interesting.
    • Event Maps: Usually filtering and aggregation is done by specifying event maps.
  • Infrastructure : An EPN infrastructure has three essential layers, viz.
    • A way of specifying the events of interest, the behaviors, the event timing, the aggregation rules and so forth. We can model and capture the event driven behaviors and constraints in an EPL (Event processing Language) or as routines in a program.
    • A way of capturing and providing the interfaces to consume, view and drill down of the POSets. The instrumentation would definitely include connectors and EPAs (Event Processing Agents) to the basic elemental event transactions
    • Layers for EPN analytics using casual models and maps. Static causality is relatively easier, but causality can be dynamic !

One of the guarantees that a generic EPN should provide (p.96 of [1]) is the Cause-Time Axiom – “ If event A caused event B in system S, then no clock in S gives B an earlier timestamp than it gives A”

Note : I have done some work on Time Synchronization (IEEE 1588v2 et al)  and this is not that easy.

First of all, there is a difference between time distribution (like NTP) and synchronization. And synchronization is a Gaussian statistical process than a tolerance.

Second, time sync in Internet scale is lot more harder; may be vector clocks is a better alternative

In any case, a discussion for another day …

  • Pragmatics to today’s internet
    • As we have seen earlier, in the Internet space, the casual relationships and inferences are based on lot more higher order relationships than just a temporal ordering.
    • And, we all know that Correlation != Causality.
    • One interesting development is the application of probabilistic and learning networks that infer the causality! This, I think is one of the major differences between the normal enterprise EPNs and internet scale DiCEPNs – the other of course is scale and the high degree of distributed-ness. David Luckham[1] talks about induced causality between two sets of events, which is interesting and worth diving deep into.
  • Map/Reduce & DiCEPNs
    • Back to the main feature, real-time Map/Reduce based EPNs : The capture of an event based on event patterns and then the processing of event maps (filtering, aggregation) are good candidates for Map/Reduce jobs – event capturing as map tasks and event map processing as reduce jobs. The event abstraction layers and hierarchies lend themselves very well for a Map/Reduce infrastructure – but the trick is to perform them in real-time in a (Google) dremel-esque fashion.
  • Industry momentum
    • In 2009, Aleri acquired Corel8, which in turn was acquired by SyBase in 2010. I had high hopes for Corel8, sigh ;o(
    • Remaining companies are StreamBase, EventZero and Truviso. In the database domain, Oracle has the CQN (Continuous Query Notification), which sends a notification when the data in the underlying query changes.
    • I am not sure if any of them have transcended from the domain of Enterprise CEP to an Internet-scale distributed CEP.
    • I also think that NOSQL stores can be programmed to handle DiCEPNs – coprocessors in HBase ?
    • Let us see what 2011 brings …

Cheers & Happy Holidays

<k/>

[1] The Power Of events by David Luckham

6 thoughts on “Hadoop for a Distributed Complex Event Processing Network

  1. Hi,

    Interesting ideas. You should check out my blog a bit – you’re right, the offerings from StreamBase, Aleri, and Apama aren’t internet scale. Web companies don’t use CEP because they don’t get CEP; they don’t use CEP because most of the current offerings simply don’t scale to the required extent.

    Except for DarkStar, which is our event driven, distributed complex event processing engine, ala Map/Reduce (or Hadoop).

    2011 is already here.

    🙂

    • Thanks. Saw your blog, good thoughts – I will go through in detail. NOSQL a $2,000,000,000 market, eh ? ;o)

      Cheers & Merry Christmas (It is 8:29 PM on 24th, Santa should be on his way … )

  2. Hi Krishna,
    This is an interesting thought. I have done some programming with Hadoop and map/reduce. I had thought that map/reduce is best suited for batch processing. Is there any product/white paper describing the use of these technology for real time internet applications.
    The use case which you have mentioned is very similar to how Amazon.com / ebay give recommendations to the users for buying products. I had thought that those recommendations are given based on the statistics collected on visited products. I might be completely wrong 🙂

    –Vivek

  3. Pingback: Hadoop NextGen – From a Framework To an Analytic Platform « My missives

  4. Pingback: Hadoop NextGen – From a Framework To an Analytics Platform « My missives

Leave a comment