AlgoTraderAlgoTrader Documentation

Chapter 11. Esper Engine

11.1. Esper Introduction
11.1.1. Introduction to event streams and complex events using Esper
11.1.2. Event representations
11.1.3. Event Stream Analysis
11.1.4. Combining Pattern Matching with Event Stream Analysis
11.1.5. Named windows
11.1.6. Variables
11.2. Esper Quick Start Guide
11.2.1. Event Types
11.2.2. Creating a Statement
11.2.3. Adding a Subscriber
11.2.4. Adding a Listener
11.2.5. Sending events
11.2.6. Configuration
11.3. Esper Documentation
11.4. AlgoTrader specific Esper Artifacts
11.4.1. Engine & EngineManager
11.4.2. Modules
11.4.3. Tags
11.4.4. Subscribers
11.4.5. Listeners
11.4.6. Service method invocation in Esper scripts
11.4.7. Aggregation Functions
11.4.8. Callbacks
11.5. Esper Threading

AlgoTrader uses the CEP (Complex Event Processing) engine Esper. AlgoTrader based strategies can optionally make use of a dedicated Esper engine in addition to the Esper engine used by the AlgoTrader server.

[1]Esper is an Event Stream Processing (ESP) and event correlation engine (CEP, Complex Event Processing). Targeted to real-time Event Driven Architectures (EDA), Esper is capable of triggering custom actions written as Plain Old Java Objects (POJO) when event conditions occur among event streams. It is designed for high-volume event correlation where millions of events coming in would make it impossible to store them all to later query them using classical database architecture.

A tailored Event Processing Language (EPL) allows expressing rich event conditions, correlation, possibly spanning time windows, thus minimizing the development effort required to set up a system that can react to complex situations.

Esper is a lightweight kernel written in Java which is fully embeddable into any Java process. It enables rapid development of applications that process large volumes of incoming messages or events.

Information is critical to make wise decisions. This is true in real life but also in computing, and especially in the finance and trading area. Information flows in from different sources in the form of messages or events (e.g. market data events), giving a hint on the state at a given time such as stock price. That said, looking at those discrete events is most of the time meaningless. A trader needs to look at the stock trend over a period, possibly combined with other information to make the best deal at the right time.

While discrete events when looked one by one might be meaningless, event streams (i.e. an infinite set of events) considered over a sliding window and further correlated, are highly meaningful, and reacting to them with the minimal latency is critical for effective action and competitive advantage.

Relational databases or message-based systems such as JMS make it really hard to deal with temporal data and real-time queries. Indeed, databases require explicit querying to return meaningful data and are not suited to push data as it changes. JMS systems are stateless and require the developer to implement the temporal and aggregation logic himself. By contrast, the Esper engine provides a higher abstraction and intelligence and can be thought of as a database turned upside-down: instead of storing the data and running queries against stored data, Esper allows applications to store queries and run the data through. Response from the Esper engine is real-time when conditions occur that match user defined queries. The execution model is thus continuous rather than only when a query is submitted.

In Esper, a tailored EPL allows registering queries in the engine. A listener class, which is basically a POJO, will then be called by the engine when the EPL condition is matched as events flow in. The EPL enables to express complex matching conditions that include temporal windows, joining of different event streams, as well as filtering, aggregation, and sorting. Esper statements can also be combined together with "followed by" conditions thus deriving complex events from more simple events. Events can be represented as JavaBean classes, legacy Java classes, XML document or java.util.Map, which promotes reuse of existing systems acting as messages publishers.

A trivial yet meaningful example is as follow: assume a trader wants to buy Google stock as soon as the price goes below some floor value, not when looking at each tick but when the computation is done over a sliding time window, say of 30 seconds. Given a TickVO event bean with a last price field and a reference to a Security ID and the following EPL, a listener POJO would get notified as ticks come in to trigger the buy order:

select
    avg(last)
from
    TickVO.win:time(30 sec)
where
    securityId=12

[2]This quick start guide provides step-by-step instructions for using Esper inside AlgoTrader.

Esper provides in depth documentation.

The following chapters of the Esper Documentation are relevant for developing trading strategies with AlgoTrader based on Esper:

In addition Esper Examples, Tutorials, Case Studies are available.

The system provides the following custom aggregation functions:

The GenericTALibFunction is a portation of ta-lib to AlgoTrader. It supports all TA-Lib operations.

Please consult TA-Lib for a list of all TA-Lib methods and their parameters.

If the TA-Lib Function returns just one value, the value is directly exposed by the AggregationFunction.

Example: The TA-Lib function movingAverage has just one double typed return value which can be accessed directly.

insert into MovingAverage
select talib("movingAverage", close.doubleValiue(), 30, "Sma") as result
from BarVO;

select result
from MovingAverage;

If the TA-Lib Function returns multiple-values, a dynamic class will be generated on the fly, which gives access to properly typed return-values. All return value names are lower-case!

Example: The TA-Lib function stochF has return values: outFastK and outFastD. The returned dynamic class will have double typed properties by the name of: fastk and fastd (all lowercase).

insert into Stochastic
select talib("stochF", high.doubleValiue(), low.doubleValiue(), close.doubleValiue(), 3, 2, "Sma") as result
from BarVO;

select result.fastk, result.fastd
from Stochastic;

Some functions are influenced by the entire range of the past data. These functions are sometimes called functions with memory. An example is the EMA (Exponential Moving Average). For these functions an optional unstable period parameter can be specified. The following statement will create a 30 period moving average with an unstable period of 10.

 insert into MovingAverage
 select talib("movingAverage", close.doubleValue(), 30, "Ema", 10) as result
 from BarVO;

For further details about the unstable period please see: SetUnstablePeriod

For additional information please visit the corresponding JavaDoc.

Note

As an alternative to the ta-lib based exponential moving average function the Esper aggregation function Section 11.4.7.1, “ExponentialMovingAverage” can be used which keeps the entire history an not just the unstable period.

A Consumer<List<OrderCompletionVO>> function can be registered with the Esper engine using the Engine#addTradePersistedCallback() method. Whenever all corresponding orders have been fully executed and all corresponding database transactions (e.g. OrderStatus, Transaction and Position) have been fully executed the consumer will be executed receiving a list of OrderCompletionVO objects as input.

This callback is particularly useful for situations where one needs to have a guarantee that all order related database transactions have been fully executed before continuing with next steps, e.g. to retrieve the current position quantity after an order has been executed. If using a regular trade callback for this, the Position Entity might not have been fully persisted by the time the consumer is executed. However when using the trade persisted callback it is guaranteed that the Position Entity has been fully updated in the database.

In order to correctly associate the trade callback with a specific order an orderId has to be retrieved from the order service and set onto the order before attaching the trade callback.

A typical use case of a trade persisted callback looks like this:

String orderId = getOrderService().getNextOrderId(order.getClass(), accountId);


order.setIntId(orderId);
engine.addTradePersistedCallback(Collections.singleton(orderId), orderCompletions -> {
    ...
});
getOrderService().sendOrders(orders);

Note

The Trade persisted callback is only supported in runtime mode but it is not supported in simulation mode. It is recommended to use the Section 11.4.8.2, “Trade callback” instead in simulation mode. The Trade persisted callback will only get executed if there has been at least one (partial) execution but not if an order has been cancelled or rejected before there has been any execution.

Esper has several options for enabling a multi-threaded environment, see Esper API Threading

In Live-Trading Mode AlgoTrader uses outbound threading with 3 threads by default. This means that all Subscriber / Listener Tasks are handled by a thread-pool of 3 threads. Subscribers / Listeners working in parallel is generally not a problem as long as they do not modify the same Entity at the same time. However if two Subscribers / Listeners modify the same Entity (e.g. a Position) at the same time, this could lead to an inconsistent state in the database.

For this purpose AlgoTrader uses general database isolation levels as well as Hibernate's optimistic and pessimistic locking.

For debugging reasons AlgoTrader logs the name of the thread using log4j, see Chapter 33, Logging



[1] Most of this section has been reproduced from the Esper Tutorial

[2] Most of this section has been reproduced from the Esper Quick Start