StreamBase Engine Configuration

Overview

This article provides a reference for writing a StreamBase engine configuration file where the HOCON type is com.tibco.ep.streambase.configuration.sbengine.

The root objects in this configuration type are:

StreamBaseEngine

This root object contains data such as custom plug-in function definitions and module definitions. When the StreamBase engine starts, information in this configuration is combined with information from all the separate configuration types to create a consistent configuration for its StreamBase server instance to use.

EventFlowDeployment

This root object can contain modules, container connections, and operator parameters that can be set independently of the rest of a StreamBase engine's configuration.

Required Header Lines

Each configuration file must contain the following header lines, typically found at the beginning of each file:

name

Specifies an arbitrary, case-sensitive string to name this configuration, which must be unique among other files with the same type, if any. Configuration files can refer to each other by this name. Select a name that reminds you of this configuration's type and purpose. For example:

name = "mysbengine"
version

Specifies an arbitrary version number that you can use to keep track of file versions for this configuration type in your development project. The maintenance of version numbers is under user control; StreamBase does not compare versions when loading configuration files during the fragment launch process. The version number is a string value, and can contain any combination of characters and numbers. For example:

version = "1.0.0"
type

This essential setting specifies the unique HOCON configuration type described on this page.

type = "com.tibco.ep.streambase.configuration.sbengine"

The three header lines taken together constitute a unique signature for each HOCON file in a project's configurations folder. Each project's configurations folder can contain only one file with the same signature.

The top-level configuration object defines the configuration envelope the same way for all HOCON file types.

configuration

On a line below the header lines, enter the word configuration followed by an open brace. The configuration object is a sibling of the name, version, and type identifiers, and serves to define the configuration envelope around this type's objects as described on this page. The file must end with the matching close brace.

configuration = {
...
...
}

StreamBaseEngine Root Object

The following shows the configuration's HOCON properties, usage, and syntax examples, where applicable. StreamBaseEngine is used to configure the behavior of an individual engine within a StreamBase Runtime node, almost always at design time (for example, by an application developer).

StreamBaseEngine

This object describes a top-level StreamBase engine configuration, which includes Java engine configuration as well.

associatedWithEngines

If you want to restrict this object to be associated with specific engines, do so here. Each entry can be a specific engine name or a regular expression that can applies to more than one engine. This array is optional and has no default value. If not present, the configuration is associated with all engines.

For example:

associatedWithEngines = [ "javaengine", "otherengine[0-9]" ]
externalClassPath

The StreamBase engine configuration inherits from the base Runtime Java engine. The following array illustrates how you can configure the Java part of the StreamBase engine.

For example:

externalClassPath = [
  "/absolute/path/d.jar"
  "/absolute/path/g.jar"
  "/absolute/path/root" (class file hierarchy)
]
jvm

Used to specify properties to pass to the Java virtual machine (JVM) that runs StreamBase Server.

minimumDispatchThreads

Minimum number of dispatch threads. This property is optional and its default value is 10.

For example:

minimumDispatchThreads = 10
maximumDispatchThreads

Long. Specifies the maximum number of dispatch threads. This property is optional and its default value is 2000. For example:

maximumDispatchThreads = 2000
shutdownTimerSeconds

Long. Specifies the maximum number of seconds to wait for a JVM to shut down before aborting it. This property is optional and its default value is 60. For example:

shutdownTimerSeconds = 60
timerParallelism

Long. Specifies the number of timers that can be executing concurrently. This property is optional and its default value is 1. For example:

timerParallelism = 1
timerResolutionMilliseconds

Long. Specifies the maximum timer resolution. This is the interval at which timers are examined. Higher resolution timers have more impact on system performance. This property is optional and its default value is 1000. For example:

timerResolutionMilliseconds = 1000
schedulerPolicy

The scheduling policy for the JVM process. The valid values for the policy are SCHED_FIFO, SCHED_RR, and SCHED_OTHER. This property is optional. For example:

schedulerPolicy = "SCHED_FIFO"
schedulerPriority

Long. Specifies the scheduling priority for the JVM process. The valid range for priority depends on the policy; for Linux the valid values for SCHED_FIFO and SCHED_RR are 1 to 99. This property is optional and its default value is operating system-specific. For example:

schedulerPriority = 1
systemProperties

A list of Java system properties that will be set in each engine instance on this operating system type. These values can be overridden and extended by individual application fragments. This object is optional and has no default value.

System properties are represented as a HOCON object, where each property name in the object is a system property name and each property value is a system property value. HOCON objects are delimited by braces.

For example:

systemProperties = { "name1" = "value1" "name.2" = "value2" }

For the second example above, note the dot, which is a path separator and therefore must be inside the quoted string.

jvmArgs

A array of JVM arguments used in each engine instance on this operating system type. These values can be overridden by individual application fragments. This array is optional and has no default value.

For example:

jvmArgs = [
  "-Xmx3g"
  "-Xms512m"
  "-XX:+UseG1GC"
  "-XX:MaxGCPauseMillis=500"
  "-XX:ConcGCThreads=1"
  ]
streamBase

StreamBase EventFlow-specific configuration. The EventFlow-specific information is contained in its own sub-configuration so it does not collide with any LiveView information (where a LiveView engine configuration extends the EventFlow one).

dataAreaPath

String. A path to an engine-specific data directory, which StreamBase operators, adapters, and other services use to store data. The path can be absolute or relative; relative paths are relative to the value of the Status.ENGINE_DATA_AREA system property.

This path is created if it does not exist. If the path cannot be created or accessed, configuration audit will fail. The data area can be changed at runtime by replacing a configuration with a new path. This location is never removed even if the data area is changed by replacing configuration, or the configuration is deactivated and removed.

This property is optional and has a default value of engine-data-area.

For example:

dataAreaPath = "engine-data-area"
operatorParameters

String. An association of global parameters to use in expressions in any module or container run by this StreamBase engine. This object is optional and has no default value. Operator parameters are legacy global variables, superseded by module parameters.

An operator parameter is specified by a name and a value. The name can be a simple name such as ParamOne. This will:

1. Expose a parameter available to modules using ${param} syntax. (This syntax also resolves to module parameters defined directly in modules, when present.)

2. Set the given value as the parameter value for any Java operator or adapter with a parameter of the given name.

The name can also be a dotted path name to set a particular adapter or operator's parameter, such as ContainerName.OperatorName.ParamOne. If the operator is in a submodule you must include the submodule in the dotted pathname, such as ContainerName.SubModuleName.OperatorName.ParamOne. When a path is used the parameter value will only be set for that named operator. All other parameters with the same name on other operators remain unmodified. See String Values in Parameters in the Authoring Guide for the recommended policy on quoting string parameter values.

This optional property is encryptable as part of the node-level secrecy system described in Encrypting Sensitive Configuration Data.

For example:

MyName = "MyValue"
"MyName.with.dots.quoted" = "#!v0hZ40tTIC1PKW8TrYdqyW81KLo2kH
   JG/GaYZ/K+1Z/UrhrlG1nfMFfuCJY+3Us4Vvt6jyAwiRcLk6aZN3vUXg=="
forceHighResolutionTimer

Bool. Set to true to force high resolution timers, at the expense of performance. This property is optional and its default value is false.

For example:

forceHighResolutionTimer = true
ignoreUnboundCaptures

Bool. StreamBase's default behavior is to fail typechecking when an unbound capture is detected. An unbound capture occurs when an input stream of a module is implemented with a capture field and that input stream is not connected to an input port of the calling Module Reference in the parent application. This situation results in an error because there is no schema to examine or fields to capture for the unbound capture instance. When you are certain your unbound captures are benign, you can optionally suppress the resulting typecheck errors. To do this, set this key to true. This property is optional and its default value is false.

schemaMaxSize

Long. The maximum size of a stream's schema; that is, the total number of bytes of all properties you expect in the tuple. While there is no theoretical upper limit, in practice there is a maximum based on the available memory on the machines that will host the processing of your application. Be cautious about creating very large schemas, because applications that move smaller amounts of data perform much better than applications that move tuples containing unnecessary fields. This property is optional and its default value is 1 megabyte.

For example:

schemaMaxSize = 1M
nowImplementation

What implementation should be used for calls to now(). Must be either system or thread.

system uses a call to Java's System.currentTimeMillis(). thread uses a background thread that checks the time approximately every millisecond. This option results in decreased accuracy, but may be more efficient than system if you call now() more frequently than 1000 times per second. This property is optional and its default value is system.

For example:

nowImplementation = "thread"
operatorStateChangeTimeoutMilliseconds

Int. Each Java Operator changes state along with the engine process as a whole. The engine waits for each Operator to change state before it completes its state change. The value of this property is the amount of time the engine will wait before timing out the Operator. If an operator is timed out on a state change, the engine shuts the operator down and proceeds with the state change. This property is optional and its default value is 10000.

For example:

operatorStateChangeTimeoutMilliseconds = 10002
engineMonitor

Configures the engine's monitoring subsystem. This property is optional.

enabled

Bool. This is required in order to use the engine monitor. This property is optional and if it is not present, defaults are set according to the defaults of the contained types. Default is true.

For example:

enabled = false
statsFrequencyMilliseconds

Int. Configures how frequently monitoring stats are produced This property is optional and its default value is 1000.

For example:

statsFrequencyMilliseconds = 2000
timeService

Configures settings related to the Time Service. This property is optional and has defaults as specified below.

type

Time service type, one of CONTROLLABLE or WALLCLOCK. WALLCLOCK uses the system clock internally. CONTROLLABLE can be initialized with a target time as specified below. This property is optional and its default is WALLCLOCK.

For example:

type = "WALLCLOCK"
targetTime

Configures settings related to the Time Service. Initial time of the time service in milliseconds, if type above is CONTROLLABLE. If above is set to WALLCLOCK, the value is ignored. This property is optional and its default value is zero, which means initialize from the system clock.

For example:

targetTime = 2018-06-06 12:12:24.123+0000
pluginFunctions

Configure the pluginFunctions object for any of the following reasons:

  • Registering the type signatures of custom Java functions used in the server, which are contained in JAR files.

  • Assigning shorter aliases to Java functions so they can be called in the same way as built-in functions.

This object is optional and has no default value.

java

Configuration of Java plugin-based functions, indexed by function name. This object is optional and has no default value.

For example, the following registers a custom simple function named func1 that takes a string and int argument and returns a string:

func1 = {
  type = "simple"
  argumentTypes = {
  argument = { type = "string" }
  argument = { type = "int" }
  }
  returnType = { type = "string" }
}

Adding an alias attribute tells the EventFlow engine to allow you to use a short function name for a given function. For example, this would allow you to call the same function in expressions in the form f(myString, myInt):

func1 = {
  type = "simple"
  alias = "f"
  argumentTypes = {
  argument = { type = "string" }
  argument = { type = "int" }
  }
  returnType = { type = "string" }
}

You can also specify the autoArguments key to be true and leave out the arguments and return keys to ask the EventFlow engine to auto-detect them. For example:

log10 = {
  className = "java.lang.Math"
  alias = "log_base_ten"
  autoArguments = true
}
type

One of simple or aggregate. This property is optional and its default value is simple.

For example:

type = "simple"
alias

String. Function alias, allowing the function to be called by the alias as well as its name. This property is required.

For example:

alias = "analias"
className

String. The fully qualified class name where the named public static function is found. This property is required.

For example:

className = "com.tibco.ep.a.b.c"
isVarargs

For Java functions only, set this to true if the last argument is variable length and you want to declare the individual argument elements. In this case, do not set autoArguments = true. When you use autoArguments = true, the isVarargs attribute is ignored.

For example:

isVarargs = false
autoArguments

Bool. For Java functions only, set this property to true to have the StreamBase engine auto-detect the arguments and return type of the current function. This property is optional and its default value is false.

For example:

autoArguments = false
argumentTypes

Function arguments. This property is optional and has no default value.

type

Argument type, must be one of the valid StreamBase data types. This property is required.

For example:

type = "tuple"
schema

Defines the schema for a value when the type is tuple. This property is required when the type tuple and not allowed otherwise.

name

The name of this complex type. This property is required.

For example:

name = "s1"
fields

Schema fields. At least one field definition is required.

f1

Schema field definition example name.

type

Argument type, must be one of the valid StreamBase data types. This property is required.

For example:

type = "string"
schema

Defines the schema for a value when type="tuple". If present, at least one field definition is required.

For example:

schema = {
name = "s2"
fields = {
f2 = {
type = "string"}
}
elementType

Defines the data type for the elements of a list when an argument type is list. This object is required when the type is list and not allowed otherwise.

returnType

Function return type. This property is optional and has no default value.

type

Argument type, must be one of the valid StreamBase data types. This property is required.

For example:

type = "int"
errorHandler

Default handling for runtime errors is to discard the error-producing event and continue with processing.

evalError

This exception is caused by a variety of reasons, including errors that occur when evaluating expressions or when flushing disk-based query table logs, and incorrect timestamp formats. Continuing after such errors is not recommended.

For example:

evalError = "shutdown"
evalPluginError

Java plug-ins can get eval-plugin-error exceptions for a variety of reasons, including TupleExceptions, errors, and when setting parameters. Continuing after such errors is not recommended.

For example:

evalPluginError = "ignore"
orderingError

The Gather operator throws this error when an order by field value is null or the current value is less then a previous value.

For example:

orderingError = "continue"
constantsOverwrite

String. Constant values defined in a module may be overwritten with a value at engine startup time using this object. The object's values are constants and are substituted at engine startup time.

This optional property is encryptable as part of the node-level secrecy system described in Encrypting Sensitive Configuration Data.

For example:

constantsOverwrite = {
  constant1 = "value1"
  constant2 = "value2"
}
artifactWaitTimeoutMilliseconds

If an operator in an EventFlow application specifies that an artifact must exist before the operator is started, then this property configures how long to wait for the artifact from a TIBCO Artifact Management Server (AMS), before aborting engine startup. This property is optional and its default value is 20000.

For example:

artifactWaitTimeoutMilliseconds = 20000

EventFlowDeployment Root Object

The following shows the configuration's HOCON properties, usage, and syntax examples, where applicable.

EventFlowDeployment

Use the EventFlowDeployment root object to control the binding of EventFlows to containers at deploy time. It is only meaningful in cases where there are multiple containers in a single engine, each with their own top-level “main” EventFlow.

associatedWithEngines

If you want to restrict this object to be associated with specific engines, do so here. Each entry can be a specific engine name or a regular expression that can applies to more than one engine. This array is optional and has no default value. If not present, the configuration uses default engine association based on its location in a fragment or application archive, or in a node deploy configuration.

For example:

associatedWithEngines = [ "javaengine", "otherengine[0-9]" ]
modules

Optional array of modules to load into named containers. If no modules are specified below, the server creates a single container named default with no module in it.

trace

Configures runtime tracing for a StreamBase module. This is an example of data that could also be specified at deploy time via the override engine configuration.

matchOperatorAndStreamRegexp

String. A regular expression pattern that limits tracing to the operators and streams whose name matches the pattern. This array is optional and has no default value.

For example:

matchOperatorAndStreamRegexp = "stream[0-9]"
fileBase

String. Absolute base file name for trace files. This optional property has no default value. If absent, trace output is written to the console.

For example:

fileBase = "/absolute/path/trace-"
overwrite

Bool. If true, all trace files are overwritten on engine restart. If false, trace files are appended to on engine restart. This property is optional and its default value is false.

For example:

overwrite = true
compress

Bool. If true, the trace system compresses its files using gzip. If false, files remain uncompressed. This property is optional and its default value is false.

For example:

compress = true
buffered

Bool. If true, the trace system buffers trace output before writing to a file. If false, the output trace file is flushed on every trace line. This property is optional and its default value is true.

For example:

buffered = false
moduleParameters

Specifies the value of one or more parameters to be passed to the top-level module in the current container. This optional property is encryptable as part of the node-level secrecy system described in Encrypting Sensitive Configuration Data.

For example:

moduleParameters = {
  "param1.with.dots.quoted" = "value1"
  param2 = "value2"
}
extensionPoints

An associative array of extension point target IDs and their contents. This property is optional and has no default value.

extensions

The ID of this Extension Point, which must match the ID specified in the Properties view for an Extension Point operator in an module that will use this deployment file.

For example:

foo = {
moduleName

The file system name of a module that implements this Extension Point's interface. The module must be in the fragment archive's module folder. Specify an EventFlow (.sbapp) file. This property is required.

For example:

moduleName = "module1"
moduleParameters

Specifies any module parameters for this module in this Extension Point. This optional property is encryptable as part of the node-level secrecy system described in Encrypting Sensitive Configuration Data.

For example:

moduleParameters = {
  param11 = "value11"
  param12 = "value12"
}
moduleName

The name of an EventFlow or precompiled module file, which must exist in the fragment archive's module folder. This property is required.

For example:

moduleName = "mymodule.sbapp"
containerName

String. The name you assign to the container to hold the specified module. Container names must follow the StreamBase Identifier Naming Rules: names must begin with an alpha character or underscore, and can contain only alpha characters, numbers, and underscores. Names cannot contain hyphens, colons, or other non-alphabetic characters. For running or debugging in StreamBase Studio, the top-level module must be in a container named default. This property is required.

Note

If you use this property to define a new container, TIBCO recommends specifying a container name other than default. This is because Studio always runs EventFlow fragments in a container with that name, and any separate container named default conflicts and causes a runtime error.

For example:

containerName = "mycontainer"
dequeue

One of {ENABLED | DISABLED | DROP_TUPLES} to specify the startup state of dequeuing for this container. If you specify DISABLED, then dequeue attempts are actively refused and exceptions are thrown. If you specify DROP_TUPLES, then tuples are silently dropped. This property is optional and its default value is ENABLED.

For example:

dequeue = DISABLED
enqueue

One of {ENABLED | DISABLED | DROP_TUPLES} to specify the startup state of enqueuing for this container. If you specify DISABLED, then enqueue attempts are actively refused and exceptions are thrown. If you specify DROP_TUPLES, then tuples are silently dropped. This property is optional and its default value is ENABLED.

For example:

enqueue = DROP_TUPLES
suspend

Set this value to true to specify that the module in this container should start up suspended. Set it to false to have the module start running when the container is added. This property is optional and its default value is false.

For example:

suspend = true
containerConnections

A list of container connections used between modules in this engine. This array is optional and has no default value. A container connection describes a means for connecting streams in different application modules running in different containers.

source

An address describing the entity in another named container (usually an output stream) that will send the data to the destination container. This property is required.

For example:

source = "zkMaintainModel.ModelUpdates"
destination

An address describing the entity (usually an input stream) that will consume the output of the connected source container. This property is required.

For example:

destination = "default.ProcessTransactionAndScore.
  ProcessTransaction.EvaluateModel.default.Control"
filter

A predicate expression filter to limit the tuples that cross to the destination container. The expression to enter must resolve to true or false, and should match against one or more properties in the schema of the connection's stream.

This property is optional and has no default value.

For example:

filter = "this is a filter"
synchronicity

By default, connections between containers are set up to run asynchronously, with a queue to manage the flow of tuples through the connection. StreamBase also supports synchronous container connections, which are low latency, direct connections between containers, and do not have queue management. This property is optional and its default value is ASYNCHRONOUS.

For example:

synchronicity = "ASYNCHRONOUS"

Full StreamBaseEngine Example

The following is an example of the c.t.ep.streambase.c.sbengine configuration type with StreamBaseEngine root object. This shows an example of every element in use at once, which is very unlikely in a real-world project.

name = "myengine"
version = "1.0.0"
type = "com.tibco.ep.streambase.configuration.sbengine"
configuration = {
  StreamBaseEngine = {
    associatedWithEngines = [ "javaengine", "otherengine[0-9]" ]
    externalClassPath = [
      "/absolute/path/d.jar"
      "/absolute/path/g.jar"
      "/absolute/path/root" 
    ]
    jvm = {
      minimumDispatchThreads = 10
    }
    systemProperties = {
      prop1 = "val1"
      prop2 = "val2"
      "prop.3" = "val3" 
    }
    jvmArgs = [
      "-Xmx3g"
      "-Xms512m"
      "-XX:+UseG1GC"
      "-XX:MaxGCPauseMillis=500"
      "-XX:ConcGCThreads=1"
      "-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"
    ]
    streamBase = {
      dataAreaPath = "my-data"
      operatorParameters = {
        MyName = "MyValue"
        "MyName.with.dots.quoted" = "#!v0hZ40tTIC1PKW8TrYdqyW81KLo2kHJG
        /GaYZ/K+1Z/UrhrlG1nfMFfuCJY+3Us4Vvt6jyAwiRcLk6aZN3vUXg=="
      }
      forceHighResolutionTimer = true
      ignoreUnboundCaptures = true
      schemaMaxSize = 1M
      nowImplementation = "thread"
      operatorStateChangeTimeoutMilliseconds = 10002
      engineMonitor = {
        enabled = false
        statsFrequencyMilliseconds = 2000
      }
      timeService = {
        type = "CONTROLLABLE"
        targetTime = "2016-10-01 12:12:24.123+0000"
      }
      pluginFunctions = {
        java = {
          func1 = {
            type = "simple"
            alias = "whatever"
            className = "com.tibco.ep.a.b.c"
            isVarargs = false
            autoArguments = false
            argumentTypes = [
              {
                type = "tuple"
                schema = {
                  name = "s1"
                  fields = {
                    f1 = {
                      type = "tuple"                   
                      schema = {
                        name = "s2"
                        fields = {
                          f2 = {
                            type = "string"
                          }
                        }
                      }
                    }
                  }
                }
              }
              {
                type = "list"
                elementType = {
                   type = "string"
                }
              }
            ]
            returnType = {
              type = "int"
            }
          }
        }
      }
      errorHandler = {
        evalError = "shutdown"
        evalPluginError = "ignore"
        orderingError = "continue"
      }
      constantsOverwrite = {
        constant1 = "value1"
        constant2 = "value2"
      }
      artifactWaitTimeoutMilliseconds = 20000
    }
  }
}

Full EventFlowDeployment Example

The following is an example of the c.t.ep.streambase.c.sbengine configuration type with EventFlowDeployment root object. This shows an example of every element in use at once, which is very unlikely in a real-world project.

name = "mydeployment"
version = "1.0.0"
type = "com.tibco.ep.streambase.configuration.sbengine"
configuration = {
  EventFlowDeployment = {
    associatedWithEngines = [ "sbengine1", "otherengine[0-9]" ]
    modules = [
      {
        trace = [
          {
            matchOperatorAndStreamRegexp = "stream[0-9]"
            fileBase = "/absolute/path/trace-"
            overwrite = true
            compress = true
            buffered = false
          }
          {
            matchOperatorAndStreamRegexp = "anotherstream.+"
            fileBase = "/absolute/path/trace-another-"
            overwrite = false
            compress = true
            buffered = false
          }
        ]
        moduleParameters = {
          "param1.with.dots.quoted" = "value1"
          param2 = "value2"
        }
        extensionPoints = {
          foo = {
            extensions = {
              ext1 = {
                moduleName = "com.example.module1"
                moduleParameters = {
                  param11 = "value11"
                  param12 = "value12"
                }
              }
              ext2 = {
                moduleName = "com.example.module2"
                moduleParameters = {
                  param21 = "value21"
                  param22 = "value22"
                }
              }
            }
          }
          "bar" = {
            extensions = {
              ext3 = {
                moduleName = "com.example.module3"
                moduleParameters = {
                  param31 = "value31"
                  param32 = "value32"
                }
              }
            }
          }
        }
        moduleName = "com.example.deptname.thismodule"
        containerName = "mycontainer"
        dequeue = DISABLED
        enqueue = DROP_TUPLES
        suspend = true
      }
    ]
    containerConnections = [
      {
        source = "default.modulename.outputstreamname"
        destination = "othercontainer.inputstreamname"
        filter = "where 'tradeID % 2 == 0'"
        synchronicity = "ASYNCHRONOUS" 
      }
      {
        source = "default.outputstreamname2"
        destination = "othercontainer2.modulename2.inputstreamname2"
      }
    ]
  }
}