====================================================================== LiveView Peer-Based Recovery Sample for MQTT Message Bus ====================================================================== Overview: This sample demonstrates the LiveView delayed peer-based recovery capability when using the MQTT message bus. The project is setup to support elastic scaling and uses peer recovery to keep table contents eventually consistent. Message buses such as Kafka support replaying older messages based on an offset, or sequence number. LiveView recovery with such buses is fairly simple and does not need the delayed recovery capability. See the Kafka recovery sample for more information. Buses such as MQTT, JMS, etc. even when configured with persistence, do not support playing older messages that were not previously received and unacknowledged. For buses of this kind, LiveView must use delayed peer-based recovery as demonstrated in this sample. To avoid missing data from MQTT while recovery is in progress, the publisher must first subscribe to the MQTT topic. Only after the subscription is successful will the publisher start recovering data from peer nodes, and storing incoming MQTT data, if any, while peer recovery is ongoing. During startup, a LiveView node looks for peer nodes in the same cluster and starts the delayed recovery process if there is at least one peer node available. If no peer exists in the cluster, the node starts with the current real time data from MQTT. If a peer exists, the starting node will recover (copy) all the data from the peer table. After peer recovery is complete, retained data from the eFTL subscription is sent to the LiveView table and publishing transitions to the real-time data coming from the bus. There is a complication with buses like MQTT that don't provide globally unique identifiers for messages. LiveView recovery relies on globally unique message ids provided by the message bus. Buses such as Kafka, Pulsar, FTL, etc. provide these ids in some form. When a globally unique identifier is not provided by the message bus, LiveView servers in a cluster work in concert to maintain such an identifier. This maintenance is said to be 'perfect' if recovery can be completed such that the table data in all nodes are eventually identical. Idempotent LiveView tables can always enjoy perfect recovery. Depending on a number of factors discuss below, Non-idempotent tables may or may not be able to have perfect recovery. Non-idempotent LiveView Table Recovery: During peer recovery it is possible that some small number of messages may both be recovered from the peer and delivered by the MQTT subscription. If the LiveView table is idempotent, this does not matter; the MQTT message is harmlessly republished to the table. However, if the LiveView table is non-idempotent, republishing the same message may cause issues. Examples of non-idempotent tables are ones that have autoincremented primary keys, or a generated timestamp field. The number of messages in question is a function of how fast the publish rate is, and how long peer recovery takes. For non-idempotent tables where the primary key fields are all present in the data, and rows are known to never be updated, "perfect" recovery is done by dropping messages from the retained message bus data that have already been recovered from the peer. For non-idempotent tables where the primary key fields are not all present in the data, or row updates are allowed, you must choose a strategy that minimizes the impact of an "imperfect" recovery for the individual use case. While there are a number of special cases described below that can avoid recovery defects, ultimately you must choose whether you want the publisher to either republish a row or drop a, possible duplicate, message. Note that this sample has a non-idempotent LiveView table called WellData where all primary key fields are in the data, and rows are never updated. Thus, this sample demonstrates perfect recovery by default. Configuration Options: This sample contains options to guide you through the recovery process, so that you can make recovery as accurate as possible - or at least minimize recovery defects according to your application design goals. To see the options, click the Parameters tab of MQTTPublisher.sbapp, which is located in src/main/eventflow under the com.tibco.ldm.sample.lv_sample_mqtt_delayed_recovery package. The options and guidance are described below. If primary key fields are available, set the parameter, PRIMARY_KEY_AVAILABLE to true (the default). If the data stream contains primary key fields: 1. If rows are ever updated: a) If peer recovery never delivered a row with a primary key that was delivered by the message bus. The message bus row should be published and recovery has no defects. b) If both the peer recovery and the message bus delivered the same primary key row, you can specify whether recovery should update the LiveView table with the last message bus row -- or alternatively drop such rows. You can set this using the PUBLISH_IF_DUP parameter. Setting the parameter to true means all incoming rows are published. If the row is published this may be a duplicate, and any non-idempotent characteristics of the data may be adversely affected. With PUBLISH_IF_DUP set to false, updates to existing rows may be lost. 2. If rows are never updated: Any duplicate message bus rows are dropped and recovery contains no defects. If there are no field(s) in the data stream that can be used as a primary key: 1. Specify whether rows that arrive on the message bus during the recovery process should be dropped or published using the PUBLISH_DATA_ARRIVE_ON_RECOVERY parameter. True means data arriving during recovery process will be published. If rows are published, then duplicate rows are possible. Otherwise, the rows may be lost during recovery. Sample Requirements: To run this sample, an available MQTT broker service is required. The sample uses a public test server made available by mosquitto.org: tcp://test.mosquitto.org:1883 Running the Sample: You can run the sample either from Studio, or as deployed applications. There are deploy projects for each of the client publisher, Service Layer (SL), and Data Layer (DL). There are some subtleties with running the DLs, especially from Studio. The data layer is configured to do peer recovery and by default if a node is the very first data layer in the cluster, it does not look for a peer but rather transitions immediately to real time data from the bus. If a data layer is not the first in the cluster, it must find a peer in the cluster. By default if it does not find such a peer within a timeout period, the start up fails. You can change this behavior by setting: liveview.recovery.ignore.eden=true DLs that do not find a peer will then complete startup and transition to real time data. A subtlety is that Studio uses a special deployment technique which reuses an installed node. An issue with this is the first time you start a DL, it knows it is the first DL in the cluster and does not require a peer. However, the next time you start the DL the nodes memory has retained the information that there has been a DL present. This time, the starting DL will wait for a peer, which may not currently exist and ultimately fail to start. To avoid this situation before running a single DL from Studio, you should check the Clusters tab and if there is a node running DL project, stop it and remove it. A new node will automatically be created by Studio when you launch the project. Running the Sample from Studio: The first step is to publish messages to MQTT broker: In the mqtt_message_publisher project, right-click on MQTTClientPub.sbapp and run it as an EventFlow Fragment. Only one publisher instance is needed. You can then right click on the SL project and Run As -> LiveView Fragment The SL should start on port 10080 - you can connect to that with a browser. There will be no WellData table available, because there isn't a DL running yet. You can then right click on the DL project and Run As -> LiveView Fragment You should expect to see a WellData table, and it should have data being published to it. You can start another DL by again right clicking on the DL project and Run As -> LiveView Fragment This second DL will subscribe to the MQTT topic, recover data from already running DL, and drop any rows that were delivered twice. You now have a cluster with a Highly available WellData table. Sample Script Details: The script lv-test-consistency.sh compares data between two running DL servers. The two DLs should have exactly the same data. The script lv-test-no-duplicates.sh checks whether there is any row that is published more than once on a recovering server. The WellData table contains a field called updateCount that increments by 1 every time the row is published. The script validates that no row has an updateCount greater than 1. Note that the updateCount field rule behavior means the WellData table is non-idempotent. Doing a failover: On the SL browser go to the Data tab. You will see the most recently published rows in the WellData table. Scroll down and you will see the LVSessionsQueries table. In LVSessionsQueries look at the RemoteServer column to discover which DL is currently servicing the WellData queries for you browser. Go back to the Studio and stop the DL that is currently servicing the WellData queries. The query will be automatically failed over to the other DL. You will see this in the LVSessionsQueries RemoteServer column. The browser query will continue to see data flowing uninterrupted. Now start another DL by right clicking on the DL project and Run As -> LiveView Fragment. You should not use the Clusters tab to stop any nodes, because there currently is a peer to recover from. If you go to the System Info tab in the SL browser, you will see the newly started DL join the cluster and than advertise participation in the WellDataGroup. You can run the test scripts again to validate the tables have exactly the same data with no duplicate publishes. Running the servers as installed applications: There are deploy projects included in the sample to make it easier to create deployable applications. For details on building, installing and starting applications, see the StreamBase documentation.