/*
The MIT License (MIT)
Copyright (c) 2018 Giovanni Paolo Vigano'
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
using System;
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
///
/// Adaptation for Unity of the M2MQTT library (https://github.com/eclipse/paho.mqtt.m2mqtt),
/// modified to run on UWP (also tested on Microsoft HoloLens).
///
namespace M2MqttUnity
{
///
/// Generic MonoBehavior wrapping a MQTT client, using a double buffer to postpone message processing in the main thread.
///
public class M2MqttUnityClient : MonoBehaviour
{
[Header("MQTT broker configuration")]
[Tooltip("IP address or URL of the host running the broker")]
public string brokerAddress = "localhost";
[Tooltip("Port where the broker accepts connections")]
public int brokerPort = 1883;
[Tooltip("Use encrypted connection")]
public bool isEncrypted = false;
[Header("Connection parameters")]
[Tooltip("Connection to the broker is delayed by the the given milliseconds")]
public int connectionDelay = 500;
[Tooltip("Connection timeout in milliseconds")]
public int timeoutOnConnection = MqttSettings.MQTT_CONNECT_TIMEOUT;
[Tooltip("Connect on startup")]
public bool autoConnect = false;
[Tooltip("UserName for the MQTT broker. Keep blank if no user name is required.")]
public string mqttUserName = null;
[Tooltip("Password for the MQTT broker. Keep blank if no password is required.")]
public string mqttPassword = null;
///
/// Wrapped MQTT client
///
protected MqttClient client;
private List messageQueue1 = new List();
private List messageQueue2 = new List();
private List frontMessageQueue = null;
private List backMessageQueue = null;
private bool mqttClientConnectionClosed = false;
private bool mqttClientConnected = false;
///
/// Event fired when a connection is successfully established
///
public event Action ConnectionSucceeded;
///
/// Event fired when failing to connect
///
public event Action ConnectionFailed;
///
/// Connect to the broker using current settings.
///
public virtual void Connect()
{
if (client == null || !client.IsConnected)
{
StartCoroutine(DoConnect());
}
}
///
/// Disconnect from the broker, if connected.
///
public virtual void Disconnect()
{
if (client != null)
{
StartCoroutine(DoDisconnect());
}
}
///
/// Override this method to take some actions before connection (e.g. display a message)
///
protected virtual void OnConnecting()
{
Debug.LogFormat("Connecting to broker on {0}:{1}...\n", brokerAddress, brokerPort.ToString());
}
///
/// Override this method to take some actions if the connection succeeded.
///
protected virtual void OnConnected()
{
Debug.LogFormat("Connected to {0}:{1}...\n", brokerAddress, brokerPort.ToString());
SubscribeTopics();
if (ConnectionSucceeded != null)
{
ConnectionSucceeded();
}
}
///
/// Override this method to take some actions if the connection failed.
///
protected virtual void OnConnectionFailed(string errorMessage)
{
Debug.LogWarning("Connection failed.");
if (ConnectionFailed != null)
{
ConnectionFailed();
}
}
///
/// Override this method to subscribe to MQTT topics.
///
protected virtual void SubscribeTopics()
{
}
///
/// Override this method to unsubscribe to MQTT topics (they should be the same you subscribed to with SubscribeTopics() ).
///
protected virtual void UnsubscribeTopics()
{
}
///
/// Disconnect before the application quits.
///
protected virtual void OnApplicationQuit()
{
CloseConnection();
}
///
/// Initialize MQTT message queue
/// Remember to call base.Awake() if you override this method.
///
protected virtual void Awake()
{
frontMessageQueue = messageQueue1;
backMessageQueue = messageQueue2;
}
///
/// Connect on startup if autoConnect is set to true.
///
protected virtual void Start()
{
if (autoConnect)
{
Connect();
}
}
///
/// Override this method for each received message you need to process.
///
protected virtual void DecodeMessage(string topic, byte[] message)
{
Debug.LogFormat("Message received on topic: {0}", topic);
}
///
/// Override this method to take some actions when disconnected.
///
protected virtual void OnDisconnected()
{
Debug.Log("Disconnected.");
}
///
/// Override this method to take some actions when the connection is closed.
///
protected virtual void OnConnectionLost()
{
Debug.LogWarning("CONNECTION LOST!");
}
///
/// Processing of income messages and events is postponed here in the main thread.
/// Remember to call ProcessMqttEvents() in Update() method if you override it.
///
protected virtual void Update()
{
ProcessMqttEvents();
}
protected virtual void ProcessMqttEvents()
{
// process messages in the main queue
SwapMqttMessageQueues();
ProcessMqttMessageBackgroundQueue();
// process messages income in the meanwhile
SwapMqttMessageQueues();
ProcessMqttMessageBackgroundQueue();
if (mqttClientConnectionClosed)
{
mqttClientConnectionClosed = false;
OnConnectionLost();
}
}
private void ProcessMqttMessageBackgroundQueue()
{
foreach (MqttMsgPublishEventArgs msg in backMessageQueue)
{
DecodeMessage(msg.Topic, msg.Message);
}
backMessageQueue.Clear();
}
///
/// Swap the message queues to continue receiving message when processing a queue.
///
private void SwapMqttMessageQueues()
{
frontMessageQueue = frontMessageQueue == messageQueue1 ? messageQueue2 : messageQueue1;
backMessageQueue = backMessageQueue == messageQueue1 ? messageQueue2 : messageQueue1;
}
private void OnMqttMessageReceived(object sender, MqttMsgPublishEventArgs msg)
{
frontMessageQueue.Add(msg);
}
private void OnMqttConnectionClosed(object sender, EventArgs e)
{
// Set unexpected connection closed only if connected (avoid event handling in case of controlled disconnection)
mqttClientConnectionClosed = mqttClientConnected;
mqttClientConnected = false;
}
///
/// Connects to the broker using the current settings.
///
/// The execution is done in a coroutine.
private IEnumerator DoConnect()
{
// wait for the given delay
yield return new WaitForSecondsRealtime(connectionDelay / 1000f);
// leave some time to Unity to refresh the UI
yield return new WaitForEndOfFrame();
// create client instance
if (client == null)
{
try
{
#if (!UNITY_EDITOR && UNITY_WSA_10_0 && !ENABLE_IL2CPP)
client = new MqttClient(brokerAddress,brokerPort,isEncrypted, isEncrypted ? MqttSslProtocols.SSLv3 : MqttSslProtocols.None);
#else
client = new MqttClient(brokerAddress, brokerPort, isEncrypted, null, null, isEncrypted ? MqttSslProtocols.SSLv3 : MqttSslProtocols.None);
//System.Security.Cryptography.X509Certificates.X509Certificate cert = new System.Security.Cryptography.X509Certificates.X509Certificate();
//client = new MqttClient(brokerAddress, brokerPort, isEncrypted, cert, null, MqttSslProtocols.TLSv1_0, MyRemoteCertificateValidationCallback);
#endif
}
catch (Exception e)
{
client = null;
Debug.LogErrorFormat("CONNECTION FAILED! {0}", e.ToString());
OnConnectionFailed(e.Message);
yield break;
}
}
else if (client.IsConnected)
{
yield break;
}
OnConnecting();
// leave some time to Unity to refresh the UI
yield return new WaitForEndOfFrame();
yield return new WaitForEndOfFrame();
client.Settings.TimeoutOnConnection = timeoutOnConnection;
string clientId = Guid.NewGuid().ToString();
try
{
client.Connect(clientId, mqttUserName, mqttPassword);
}
catch (Exception e)
{
client = null;
Debug.LogErrorFormat("Failed to connect to {0}:{1}\n (check client parameters: encryption, address/port, username/password):\n{2}", brokerAddress, brokerPort, e.ToString());
OnConnectionFailed(e.Message);
yield break;
}
if (client.IsConnected)
{
client.ConnectionClosed += OnMqttConnectionClosed;
// register to message received
client.MqttMsgPublishReceived += OnMqttMessageReceived;
mqttClientConnected = true;
OnConnected();
}
else
{
OnConnectionFailed("CONNECTION FAILED!");
}
}
private IEnumerator DoDisconnect()
{
yield return new WaitForEndOfFrame();
CloseConnection();
OnDisconnected();
}
private void CloseConnection()
{
mqttClientConnected = false;
if (client != null)
{
if (client.IsConnected)
{
UnsubscribeTopics();
client.Disconnect();
}
client.MqttMsgPublishReceived -= OnMqttMessageReceived;
client.ConnectionClosed -= OnMqttConnectionClosed;
client = null;
}
}
#if ((!UNITY_EDITOR && UNITY_WSA_10_0))
private void OnApplicationFocus(bool focus)
{
// On UWP 10 (HoloLens) we cannot tell whether the application actually got closed or just minimized.
// (https://forum.unity.com/threads/onapplicationquit-and-ondestroy-are-not-called-on-uwp-10.462597/)
if (focus)
{
Connect();
}
else
{
CloseConnection();
}
}
#endif
}
}