Kotlin library that simplifies creating and managing MQTT connections, built on top of Eclipse Paho
This document explains how Pulse works and how you can integrate it into any Android app.
In your Application class, initialize the kit using:
val pulseMqttKit = PulseMqttKit()
override fun onCreate() {
super.onCreate()
pulseMqttKit.initialize(context, MyMqttBridgeImpl())
}
class MyMqttBridgeImpl : PulseMqttKitBridge {
override fun getLogger() = AndroidLogger()
override fun getAppGson() = Gson()
override fun getCustomCoroutineScope() = MainScope()
override fun getHealthMonitoringConfig() = HealthMonitoringConfig(
monitoringFreqSeconds = 30,
type = HealthMonitoringType.WORK_MANAGER
)
override fun getNetworkConfig() = NetworkMonitoringConfig(enabled = true)
}
Where MyMqttBridgeImpl is your implementation of the required bridge
interface.
This allows you to provide app-specific implementations such as logging,
event tracking, or custom behavior.
Pulse works on a command submission model.
For every MQTT action, you submit a command to the kit:
- Connect:
ConnectCommand(connectionOptions, commandTimeout, retryPolicy) - Publish:
PublishCommand(topic, payload, qos) - Subscribe:
SubscribeCommand(topics, qos) - Unsubscribe:
UnsubscribeCommand(topics) - Disconnect:
DisconnectCommand()
Submit a command using:
pulseMqttKit.submitCommand(command)Each command supports a list of dependencies (type
List<MqttCommand>).
This is useful for scenarios where you need to ensure certain commands
succeed before executing the main one.
For example, when submitting a SubscribeCommand, you may want to
ensure you are connected first:
val connectCommand = ConnectCommand(connectionOptions, retryPolicy = RetryPolicy.Exponential())
val subscribeCommand = SubscribeCommand(
topics = listOf("my/topic"),
qos = QOSLevel.QOS_1,
dependencies = listOf(connectCommand)
)
MqttCommand.submitCommand(subscribeCommand)In this case, Pulse will first execute the ConnectCommand. Only if
it returns a success result will it proceed to execute the
SubscribeCommand.
For any command, you can specify a retry policy to control retries:
- Sequential: Fixed retry intervals.
- Exponential: Exponential backoff.
- Jitter: Adds random jitter to avoid thundering herd.
- None: No retry.
You can also exclude a set of MQTT exception codes for which retries should be skipped.
Example: In a ConnectCommand, you may want to skip retries if the
exception is NOT_AUTHORIZED or CONNECT_ALREADY_IN_PROGRESS:
val retryPolicy = RetryPolicy.Exponential(
excludedErrorCodes = setOf(
MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED,
MqttExceptionCode.CONNECT_ALREADY_IN_PROGRESS
)
)In this example, retries will be executed for all other failures except the excluded ones.
Inside ConnectionOptions, there is an optional parameter
autoSubscriptionConfig.
val options = ConnectionOptions(
serverUri = "tcp://broker.hivemq.com:1883",
clientId = "my-client",
autoSubscriptionConfig = AutoSubscriptionConfig(
enabled = true,
subscriptionStore = hashMapOf(
"driver/notification"
to TopicTypeConfig(
messageType = PushNotificationResponse::class.java,
qosLevel = QOSLevel.QOS_2
),
"driver/alerts" to TopicTypeConfig(
messageType = GlobalAlertMessage::class.java,
qosLevel = QOSLevel.QOS_1
)
)
)
)• Key: Topic string • Value: TopicTypeConfig (contains messageType + qosLevel)
If enabled = true, Pulse will automatically re-subscribe to the
topics listed in subscriptionStore after a reconnection.
You do not need to manually submit a SubscribeCommand in that
case.
You can register listeners to receive updates about command execution:
pulseMqttKit.addListener(myListener)Each command result will contain:
- Number of Attempts -- how many times this command was retried before success/failure.
- Total Execution Time -- time taken to complete this command.
Remember to unregister listeners when not needed:
pulseMqttKit.removeListener(myListener)Pulse also supports automatic health monitoring and reconnection.
Use HealthMonitoringConfig to configure monitoring:
val config = HealthMonitoringConfig(
monitoringFreqSeconds = 30,
type = HealthMonitoringType.WORK_MANAGER,
healthCheck = { /* your custom health check logic */ }
)
pulseMqttKit.startHealthMonitoring()Currently supported monitoring types:
- AlarmManager -- uses system alarms to trigger health checks.
- WorkManager -- uses Jetpack WorkManager for background checks.
This ensures that if connection is lost, Pulse will automatically attempt to reconnect.
If you need to manually shut down the connection and clear all listeners, call:
pulseMqttKit.shutDown()This will internally disconnect and unsubscribe from all
topics.
You can still manually submit a DisconnectCommand or ConnectCommand
afterwards if you need explicit control.
- Initialize PulseMqttKit in
Application.onCreate()or in Activity depending upon your use-case with yourbridgeImpl. - Submit Commands for all MQTT actions (connect, publish, subscribe, etc.).
- Configure Command Dependencies if one command should only run after another succeeds.
- Use Retry Policies to control connection behavior (with optional exception exclusions).
- Optionally configure AutoSubscriptionConfig for automatic topic re-subscription.
- Register Listeners to observe command results and update UI or logs.
- Enable Health Monitoring for automatic reconnection.
- Call
pulseMqttKit.shutDown()to disconnect and clear listeners when cleaning up resources.
By following these steps, you can reliably manage MQTT connections and operations with minimal boilerplate.
class MyApp : Application() {
val pulseMqttKit = PulseMqttKit()
override fun onCreate() {
super.onCreate()
pulseMqttKit.initialize(context, MyMqttBridgeImpl())
val connectCommand = ConnectCommand(
connectionOptions = myOptions.copy(
autoSubscriptionConfig = AutoSubscriptionConfig(
enabled = true,
subscriptionStore = SubscriptionStoreData(
topics = mutableListOf("topic/1"),
qos = QOSLevel.QOS_1
)
)
),
commandTimeout = 5000L,
retryPolicy = RetryPolicy.Exponential(
excludedErrorCodes = setOf(MqttExceptionCode.REASON_CODE_NOT_AUTHORIZED)
)
)
pulseMqttKit.submitCommand(connectCommand)
}
}