Any StreamBase Managed Object can be a distributed object. A distributed object transparently provides remote method invocation and access to object fields across nodes. The full transactional guarantees StreamBase makes for non-distributed objects are also true for distributed objects.
The same instance of an object cannot exist on multiple nodes. Copies of an object's state may be located on multiple nodes to improve performance or robustness, but the master copy is located on a single node — by default the node where the object was created.
All object methods transparently execute on the master node for an object. Any methods invoked on an object reference are sent to the master node and executed there.
Objects of the same type can be created on multiple nodes. This is done by installing the application class files, or implementation, on multiple nodes. This is a common application architecture to support object partitioning and caching or service availability mechanisms.
Figure 1, “Distributed method execution” shows an
Order class that has its implementation installed on two nodes —
Node One and
Node Two. Two instances of the
Order class are created, one on
Node One and one on
Node Two. When the
Order.cancel() method executes on
Node One, using the
order(Node Two) instance, the method executes on
Node Two. The opposite is true for the
order(Node One) instance.
The distribution protocol uses either TCP/IP, SSL, or InfiniBand connectivity between nodes with a platform independent encoding. The platform-independent encoding allows heterogeneous hardware platforms to communicate with each in a distributed transactional system. The optional automatic node discovery protocol uses UDP.
Location transparency is accomplished through the use of distributed references. All Managed Objects created in StreamBase have a distributed reference that contains the master node for the object. An object's identity, as defined by its distributed reference, does not change through-out the lifetime of the object.
Methods invoked on an object are always executed on the master node for an object.
Object field data is transparently read from and written to the master node when fields are accessed on a local node.
Read operations are dispatched to the master node to read field data depending on whether the local node has the data cached locally or not. If the field data is not available on the local node, a distributed read is done when a field is accessed. The read completes before the get of the field returns to the caller. All reads are done on the master node in the same transaction in which the field access occurs.
When a field associated with a remote object is modified on a local node, by default, the update is deferred until the local transaction enters the prepare state. This is called deferred writes. See Deferred Write Protocol for details.
When an extent is accessed using a local query, only object references on the local node are returned — no read is dispatched to any remote nodes. References are in a local extent either because the object was created on the local node, it was returned in a method call, or it was pushed to the local node as part of object replication. Distributed queries can be used to access the global extent of all objects.
Cluster-unique location code
Cluster-unique shared memory timestamp
The default node name is set to the local host name. The default node name can be changed during node installation. This allows multiple StreamBase nodes to run on the same machine.
The location code is automatically derived from the node name using a hashing algorithm.
The location code is a numeric identifier that determines the actual network location of the master node for an object. The location code is stored with each Managed Object. The initial value of the location code for an object is the location code of the node on which the object was created.
Highly available objects can migrate to other nodes as part of failover, or to support load balancing. When object migration occurs, the location code associated with all of the migrated objects is updated to use the location code of the node to which they were migrated. This update occurs on all nodes on which the objects exist. After the completion of an object migration, the new master node for the object is the new node, which may be different than the node on which the object was created.
The shared memory timestamp is assigned when the shared memory is first created for a node. This occurs the first time a node is started following an installation. The shared memory timestamp is a component of the opaque distributed reference. It ensures that the distributed reference is globally unique.
Location discovery in done two ways:
Static discovery using configuration information.
Dynamic discovery using service discovery.
Configuration can be used to define the mapping between a node name and a network address. Configuring this mapping is allowed at any time, but it is only required if service discovery cannot be used for location discovery. An example of when this would be necessary is if a remote node is across a wide area network where service discovery is not allowed. This is called static discovery.
If configuration information is not provided for a location name, service discovery performs the location discovery. This has the advantage that no configuration for remote nodes is required on the local node — it is all discovered at runtime. This is called dynamic discovery.
When a network address is discovered with both static and dynamic discovery, the configured static discovery information is used.
StreamBase performs location discovery in the following cases:
A create of an object in a partition with a remote active node.
A method or field is set on a remote object.
When an operation is dispatched on a remote object, a location discovery request is done by location code to locate the network information associated with a location code.
Location code information is cached on the local node once it the node is discovered.
Initialization and termination of the distribution services are tied to activation and deactivation of distribution configuration data. A node without active distribution configuration cannot provide distributed services to a cluster. When distribution configuration is activated, StreamBase takes the following steps to initialize distribution:
Mark the local node state as starting.
Start dynamic discovery service if enabled.
Start network listeners.
Start keep-alive server.
Mark the local node state as active.
After initialization completes, the node automatically becomes part of the cluster. The node can now provide access to distributed objects or provide high-availability services to other nodes in the cluster.
Remote nodes can have one of the states in Remote node states.
Remote node states
Application installed node state change notifiers are called when a remote node transitions from active to unavailable and from unavailable to active. The
In Up Notifier and
In Down Notifier states defined in Remote node states are seen when a node notifier is called.
When a node state change notifier installs, it is guaranteed to be called for all active remote nodes already discovered by the local node. Node notifier execution is serialized for a specific remote node. A call to a notifier must complete before another notifier is called. For example, if a remote node becomes unavailable while an active notifier is being executed, the unavailable notifier is not called until the active notifier completes.
Node state change notifiers are called in a transaction.
By default, all distributed object updates use a deferred write protocol. The deferred write protocol defers all network I/O until the commit phase of a transaction. This allows the batching of all of the object updates, and the prepare request, into a single network I/O for each node, improving network performance. The size of the network buffer used for the network I/O is controlled in the distribution configuration. See the StreamBase Administration Guide for details on distribution configuration.
The deferred write protocol is shown in Figure 2, “Deferred write protocol” for two nodes.
Notice that no transaction locks are taken on node
B as distributed objects are modified on node
A until the prepare step.
Distributed object creates and deletes perform network I/O immediately; they are not deferred until commit time. There is no prepare phase enabled for these transactions. See Figure 1, “Distributed transaction”.
The deferred write protocol is disabled if a method call is done on a distributed object. Any modifications to the distributed object on the local node are flushed to the remote node before the method executes on the remote node. This ensures that any updates made on the local node are available on the remote node when the method executes.
After the method executes on the remote node, any modifications on the remote node are copied back to the initiating node. This ensures that the data is again consistent on the local node on which the method was originally executed.
You can disable the deferred write protocol in the high availability configuration, though TIBCO recommends enabled it. However, if an application only accesses object fields using accessors, instead of directly accessing fields, it is be more performant to disable the deferred write protocol since no modifications are ever done on the local node. See the StreamBase Administration Guide for details on high availability configuration
StreamBase supports keep-alive messages between all nodes in a cluster. Keep-alive requests actively determine whether a remote
node is still reachable. Keep alive messages are sent to remote nodes using the configurable
keepAliveSendIntervalSeconds time interval.
Figure 3, “Keep-alive protocol” shows how a node is detected as being down. Every time a keep-alive request is sent to a remote node, a timer starts with
a duration of
nonResponseTimeoutSeconds. This timer is reset when a keep-alive response is received from the remote node. If a keep-alive response is not received
nonResponseTimeoutSeconds interval, a keep-alive request is sent on the next network interface configured for the node (if any). If there are no other
network interfaces configured for the node, or the
nonResponseTimeoutSeconds has expired on all configured interfaces, all connections to the remote node are dropped, and the remote node is marked
Connection failures to remote nodes are also detected by the keep-alive protocol. When a connection failure is detected, as opposed to a keep-alive response not being received, the connection is reattempted to the remote node before trying the next configured network interface for the remote node (if any). This connection reattempt is done to transparently handle transient network connectivity failures without reporting a false node down event.
It is important to understand that the total time before a remote node is marked
Down is the number of configured interfaces multiplied by the
nonResponseTimeoutSeconds configuration value in the case of keep-alive responses not being received. In the case of connection failures, the total
time could be twice the
nonResponseTimeoutSeconds times the number of configured interfaces, if both connection attempts to the remote node (the initial one and the retry)
hang attempting to connect with the remote node.
For example, in the case of keep-live responses not being received: if there are two network interfaces configured, and the
nonResponseTimeoutSeconds value is four seconds, it will be eight seconds before the node is marked
Down. In the case of connection establishment failures, where each connection attempt hangs, the total time would be sixteen seconds
before the node is marked
Distribution uses TCP as the underlying network protocol. In general, TCP provides reliable connectivity between machines on a network. However, it is possible that network errors can occur that cause a TCP connection to drop. When a TCP connection drops, requests and responses between nodes participating in a distributed transaction are not received. Network errors are detected by the keep-alive protocol described in Detecting Failed Nodes and handled by the distributed transaction protocol.
The following can cause network connectivity failures:
A non-response keep alive timeout occurring.
TCP retry timers expiring.
Lost routes to remote machines.
These errors are usually caused by network cables being disconnected, router crashes, or machine interfaces being disabled.
As discussed in Local and Distributed Transactions, all distributed transactions have a transaction initiator that acts as the transaction coordinator. The transaction initiator can detect network failures when sending a request, or reading a response from a remote node. When the transaction initiator detects a network failure, the transaction is rolled back. Other nodes in a distributed transaction can also detect network failures. When this happens, rollback is returned to the transaction initiator, and again the transaction initiator rolls back the transaction. This is shown in Figure 4, “Connection failure handling”.
When the transaction initiator performs a rollback because of a connection failure — either detected by the initiator or another node in the distributed transaction — the rollback is sent to all known nodes. Known nodes are those that were located using location discovery (see Location Discovery). This must be done because the initiator does not know which nodes are participating in the distributed transaction. Notice that a rollback is sent to all known nodes in Figure 4, “Connection failure handling”. The rollback is retried until network connectivity is restored to all nodes.
Transaction rollback is synchronized to ensure that the transaction is safely aborted on all participating nodes, no matter the current node state.
Any communication failures to remote nodes detected during a global transaction before a commit sequence is started cause an exception that an application can handle (see the StreamBase Java Developer's Guide). This allows the application to explicitly decide whether to commit or rollback the current transaction. If the exception is not caught, the transaction is automatically rolled back.
Undetected communication failures to remote nodes do not affect the commit of the transaction. This failure scenario is shown
in Figure 5, “Undetected communication failure”. In this case,
Node 2 failed and restarted after all locks were taken on
Node 2, but before the commit sequence was started by the transaction initiator —
Node 1. Once the commit sequence starts it continues to completion. The request to commit is ignored on
Node 2 because the transaction state was lost when
Node 2 restarted.
Transaction initiator fails before commit sequence starts.
Transaction initiator fails during the commit sequence.
When a node participating in a distributed transaction detects the failure of a transaction initiator, it queries all other nodes for the outcome of the transaction. If the transaction was committed on any other participating nodes, the transaction is committed on the node that detected the node failure. If the transaction aborted on any other participating nodes, the transaction aborts on the node that detected the failure. If the transaction is still in progress on the other participating nodes, the transaction aborts on the node that detected the failure.
Transaction outcome voting before the commit sequence is shown in Figure 6, “Transaction initiator fails prior to initiating commit sequence”. In Figure 6, “Transaction initiator fails prior to initiating commit sequence” the initiating node,
Node 1, fails before initiating the commit sequence. When
Node 2 detects the failure, it performs the transaction outcome voting algorithm by querying other nodes in the cluster to see if
they are participating in this transaction. Since there are no other nodes in this cluster, the Transaction Status request is a no-op and the transaction is immediately aborted on
Node 2, releasing all locks held by the distributed transaction.
Transaction outcome voting during a commit sequence is shown in Figure 7, “Transaction initiator fails during commit sequence”. In Figure 7, “Transaction initiator fails during commit sequence” the initiating node,
Node 1, fails during the commit sequence after committing the transaction on
, but before it is committed on
Node 3 detects the failure, it performs the transaction outcome voting algorithm by querying
Node 2 for the resolution of the global transaction. Since the transaction was committed on
Node 2 it is committed on
To support transaction outcome voting, each node maintains a history of all committed and aborted transactions for each remote node participating in a global transaction. The number of historical transactions to maintain is configurable and recommended to be based on the time for the longest running distributed transaction. For example, if 1000 transactions per second are being processed from a remote node, and the longest transaction on average is ten times longer than the mean, configure the transaction history buffer for 10,000 transactions.
For each transaction from each remote node, the following is captured:
Global transaction identifier
Node login time-stamp