Leap Event Subscriptions

What is a Leap Event Subscription?

Subscriptions are a type of Event Consumer. When registering a Leap Event Subscription, the framework will consume data from the specified event source and route the received events for further processing.

In addition to event routing, a Leap Event Subscription also allows you to specify a retry mechanism for routing. The following sections of this document describe in detail, how you can tune a subscription to meet the needs of a specific solution.

Event Subscriptions can be added to an existing Leap Feature by using the Leap CLI -add command – see Leap CLI Commands for more details.

Types of Event Subscriptions

  • Kafka (topic)
  • JMS/Jakarta (queue)

Specific implementations of Topics and Queues can be added upon request

Topic (Kafka) Subscriptions

Before defining the EventSubscription for consuming events from Kafka, you must update the globalAppDeploymentConfig.properties file located in the /config directory of the Leap project. These settings are required in order to know which Kafka instance to communicate with.

For most use cases, the default values can be maintained, but, the following properties are exposed in order to allow for flexible configuration of subscribers. It’s important to note that all of the subscribers will use these same settings unless overridden within the EventSubscription.

#=================================================================================
# Configure Kafka Consumer (used by each Subscriber) 
# <projectDir>/attunedlabs/config/<profile>/globalAppDeploymentConfig.properties
#=================================================================================

brokerHostPort=localhost:9092
groupId=testGroup
clientId=C1
consumersCount=1
autoCommitEnable=false
autoCommitIntervalMs=3000
autoOffsetReset=earliest
autoCommitOnStop=sync
breakOnFirstError=true
maxPollRecords=1
sessionTimeoutMs=80000
consumerRequestTimeoutMs=600000
fetchWaitMaxMs=60000
parallelProcessingConsumers=1

Examples

Once the globalAppDeploymentConfig.properties file for your desired build profile is updated, we will turn our attention to the eventing file for the specific feature. After adding eventing via Leap CLI, navigate to the /src/main/resources directory of the feature and look for the event configuration file.

Leap CLI will generate an eventing configuration file using the naming convention:
<featureName>-<implementationName>-eventing-config.xml.

<?xml version="1.0" encoding="UTF-8"?>
<EventFramework
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:noNamespaceSchemaLocation="eventframework.xsd"
	xmlns:dis="http://attunedlabs.com/internalevents/Dispatcher">
    <!-- Container for all Event Subscriptions -->
	<EventSubscription>
        <!-- Kafka Topic Subscription Definition -->
		<SubscribeEvent subscriptionId="kafka_Subscriber"
			isEnabled="true" parallelProcessing="false">
			<SubscribeTo>test_topic_all_all</SubscribeTo>
            <!-- Container for Routing Rules (MVEL) -->
			<EventRoutingRules>
                <!-- Empty Rule, always send to this Route -->
				<EventRoutingRule>
					<InvokeCamelRoute featureGroup="testing"
						featureName="welcome" serviceType="hello-world" />
				</EventRoutingRule>
			</EventRoutingRules>
            <!-- Configure Behavior on Event Routing Failure -->
			<FailureHandlingStrategy>
				<FailureStrategyName>LeapDefaultRetryStrategy</FailureStrategyName>
				<FailureStrategyConfig>
					{
					"retryCount": "4",
					"retryInterval": "20"
					"retryIntervalMultiplier":"1",
					"maximumRetryInterval": "60",
					"timeIntervalUnit": "SECOND",
					"retryTopRecords":50
					}
				</FailureStrategyConfig>
			</FailureHandlingStrategy>
		</SubscribeEvent>
	</EventSubscription>
</EventFramework>
<?xml version="1.0" encoding="UTF-8"?>
<EventFramework
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:noNamespaceSchemaLocation="eventframework.xsd"
	xmlns:dis="http://attunedlabs.com/internalevents/Dispatcher">
    <!-- Container for all Event Subscriptions -->
	<EventSubscription>
        <!-- Kafka Topic Subscription Definition -->
		<SubscribeEvent subscriptionId="kafka_Subscriber"
			isEnabled="true" parallelProcessing="false">
			<SubscribeTo>test_topic_all_all</SubscribeTo>
            <!-- Container for Routing Rules (MVEL) -->
			<EventRoutingRules>
                <!-- Route Event to this Service based on EventParam Data -->
				<EventRoutingRule>
                    <Rule>eventParam.data.destination == 'welcome'</Rule>
					<InvokeCamelRoute featureGroup="testing"
						featureName="welcome" serviceType="hello-world" />
				</EventRoutingRule>
                <EventRoutingRule>
                    <Rule>eventParam.data.destination == 'goodbye'</Rule>
					<InvokeCamelRoute featureGroup="testing"
						featureName="welcome" serviceType="goodbye-world" />
				</EventRoutingRule>
                <EventRoutingRule>
			    <Rule>eventParam.data.printerId == "ZPL01234"</Rule>
					<HttpPostRequest featurGroup="testing" 
                            featureName="printFeature" serviceType="printlabel" 
                            hostName="localhost" port="9060">
						<header-params>
							<header-param>
								<param-name>CODE</param-name>
								<param-value>X'02'</param-value>
							</header-param>
							<header-param>
								<param-name>Zipper</param-name>
								<param-value>B'0'</param-value>
							</header-param>
						</header-params>
					</HttpPostRequest>
				</EventRoutingRule>
			</EventRoutingRules>
            <!-- Configure Behavior on Event Routing Failure -->
			<FailureHandlingStrategy>
				<FailureStrategyName>LeapDefaultRetryStrategy</FailureStrategyName>
				<FailureStrategyConfig>
					{
					"retryCount": "4",
					"retryInterval": "20"
					"retryIntervalMultiplier":"1",
					"maximumRetryInterval": "60",
					"timeIntervalUnit": "SECOND",
					"retryTopRecords":50
					}
				</FailureStrategyConfig>
			</FailureHandlingStrategy>
		</SubscribeEvent>
	</EventSubscription>
</EventFramework>
<?xml version="1.0" encoding="UTF-8"?>
<EventFramework
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:noNamespaceSchemaLocation="eventframework.xsd"
	xmlns:dis="http://attunedlabs.com/internalevents/Dispatcher">
    <!-- Container for all Event Subscriptions -->
	<EventSubscription>
        <!-- Kafka Topic Subscription Definition -->
		<SubscribeEvent subscriptionId="kafka_Subscriber"
			isEnabled="true" parallelProcessing="false">
			<SubscribeTo>test_topic_all_all</SubscribeTo>
            <!-- Container for Routing Rules (MVEL) -->
			<EventRoutingRules>
                <!-- Empty Rule, always send to this Route -->
				<EventRoutingRule>
					<pipeline>
                        <IntegrationPipeName>notifyUserEmail</IntegrationPipeName>
                    </pipeline>
				</EventRoutingRule>
			</EventRoutingRules>
            <!-- Configure Behavior on Event Routing Failure -->
			<FailureHandlingStrategy>
				<FailureStrategyName>LeapNoRetryStrategy</FailureStrategyName>
				<FailureStrategyConfig>
					{
					"MSGLoggingEnabled": "true",
                    "parallelProcessing": "true"
					}
				</FailureStrategyConfig>
			</FailureHandlingStrategy>
		</SubscribeEvent>
	</EventSubscription>
</EventFramework>

Configuration Details

In this section, we will go over all of the configuration options for various Elements contained within the parent EventSubscription element.

<SubscribeEvent>

This defines the start of a new Kafka Subscriber. The following are the possible configurations to customize the behavior of this element.

NameRequiredDefaultDescription
subscriptionIdYN/A(String) Defines this unique Subscription among the other possible definitions. Used to store the configuration in the database
isEnabledNtrue(Boolean) Determines if this subscriber should be running or not. Useful for testing or temporary subscriptions
loadBalanceNauto(String) Auto | Manual load-balancing strategy
parallelProcessingNfalse(Boolean) If true, the events will be processed async using the value provided by the globalAppDeploymentConfig parallelProcessingConsumers
<SubscribeTo>YN/A(String) The Topic name to subscribe to. For Multi-tenant Kafka Configurations, us the naming convention topicName_tenant_site
e.g. <subscribeTo>test_all_all</subscribeTo>
<EventSubscriptionCriteria>NN/A(String) MVEL evaluation to allow/omit events from entering the routing configuration
<EventRoutingRules>YN/A(Element) Container for defining how events consumed are routed. See EventRoutingRule.
<FailureHandlingStrategy>NNone(Element) Definition of retry strategy in the case routing fails.
<SubscribeEvent> configurations

<EventRoutingRule>

Specific rule that is used to guide the flow of events to various destinations. Currently, you can route events to Pipeline, Feature Services (camel route), or send the Event via HTTP.

<Rule>

(Optional) MVEL evaluation is used to determine where to send the events that match the condition.

<InvokeCamelRoute>

(Optional) Specify an existing Leap Feature Service to send the event. The event will be sent using in-memory routing via SEDA/Direct Camel components. Specify the following fields to customize behavior.

NameRequiredDefaultDescription
featureGroupYN/A(String) Feature Group of the services to be called for the event
featureNameYN/A(String) Feature Name of the service to be called for the event
serviceTypeYN/A(String) Service Name to be called. See the FeatureMetaInfo.xml for a given feature for all of these details
<InvokeCamelRoute> Options

<Pipeline>

(Optional) Specify an existing pipeline to send the event. A pipeline is referenced by its name. The framework will look up the required pipeline by name and apply it to the event.

NameRequiredDefaultDescription
<IntegrationPipeName>YN/A(String) This value must match the name attribute of an existing Pipeline Activity definition.
<Pipeline> Options

<HttpPostRequest>

(Optional) Sends the event to a specified HTTP Endpoint via HTTP [POST]. At this time, the endpoint must belong to another Leap Framework Service. In the future, we may open this up to send HTTP communication to external services. See the configuration options below.

NameRequiredDefaultDescription
featureGroupYN/A(String) Feature Group of the services to be called for the event
featureNameYN/A(String) Feature Name of the service to be called for the event
serviceTypeYN/A(String) Service Name to be called. See the FeatureMetaInfo.xml for a given feature for all of these details
hostNameYN/A(String) Base URL/IP Address for the host of the HTTP Service.
portYN/A(String) Specific port to attempt HTTP communication
<header-params>NN/A(Element) Contains one or more <header-param> definitons
<header-param>NN/A(Element) Child element of <header-params>. Allows for specifying a name-value pair to add to the HTTP request as headers
<param-name>NN/A(String) Child element of <header-param>. Specify the HTTP Header name to use
<param-value>NN/A(String) Child element of <header-param>. Specify the value associated with the corresponding <param-name>
<HttpPostRequest> options

<FailureHandlingStrategy>

(Optional) Defines the behavior of the subscription when the events encounter an error while routing. This could occur if the intended destination is busy, timed out, or is unavailable temporarily.

NameRequiredDefaultDescription
<FailureStrategyName>YN/A(String) Unique ID for the type of retry strategy to register in the DB. Predefined strategies include LeapDefaultRetryStrategy and LeapNoRetryStrategy
handlerQualifiedClassN(String) Attribute on the <FailureStrategyName> element. Specify the fully qualified name of the class being used for configuration details of the named strategy
<FailureStrategyConfig>N(String | JSON object) Allows for a set of optional retry options based on the strategy chosen.
<FailureHandlingStrategy> Options

LeapDefaultRetryStrategy

NameRequiredDefaultDescription
retryCountN(String)
-1, retry continuously
0, no retries on failure
X, retry X times
retryIntervalN(String) Time units between retry attempts
retryIntervalMultiplierN(String) Multiply by this amount between retries for non-static retry intervals
maximumRetryIntervalN(String) The maximum time interval between attempts. When using retryIntervalMultiplier it can be easy to let the retry time reach a high value
timeIntervalUnitN(String) Seconds, Minutes, or Hours
retryTopRecordsN(String) When there are multiple records in the DB waiting to retry, this will put a cap on the number attempted per time interval. Helps to improve performance when retrying with unreliable services
LeapDefaultRetryStrategy Options

LeapNoRetryStrategy

NameRequiredDefaultDescription
MSGLoggingEnabledNtrue(Boolean) When true, all events will be logged ni the SubscriberEventTracker table. When false, no logs will be stored in the table
ParallelProcessingNfalse(Boolean) Enables or disables parallel processing of events for this subscriber. Will send via SEDA or Direct based on this value
LeapNoRetryStrategy Options

Queue (RabbitMQ JMS) Consumers – coming soon

Updated on September 9, 2021

Was this article helpful?

Related Articles