Ik ben al een aantal weken bezig om een probleem opgelost te krijgen, maar ik zie inmiddels door de bomen het bos niet meer. Momenteel heb ik e.e.a. draaien in Laravel maar ik twijfel zelf of dit de beste oplossing is voor deze usecase. Ik wil daarom iedereen die misschien geen/weinig verstand van PHP of Laravel heeft maar des te meer weet over MQTT graag uitnodigen om vooral ook even door te lezen onder de lap code die ik hieronder post, want ik sta open voor andere (non-Laravel/PHP) oplossingen
Casus
Ik heb vier verschillende webapplicaties in Laravel draaien die allemaal verschillende IoT-dingen doen. De webapplicaties draaien op een shared hosting omgeving en de MQTT-broker (Mosquitto) draait op een VPS. De verschillende applicaties maken allemaal gebruik van de Laravel Job Queue en de library php-mqtt
Naarmate de applicaties beginnen te schalen (er komen steeds meer IoT-apparaten bij) merk ik steeds meer problemen. Een van de meest acute problemen waar ik nu mee zit is dat ik in een specifieke applicatie berichten lijk te missen. Het gaat hier om pak hem beet 100 apparaten die gelijktijdig meerdere berichten sturen, d.w.z. op een specifieke tijd sturen alle 100 apparaten 8 berichten naar de MQTT broker, in totaal dus 800 berichten in een tijdsbestek van minder dan een seconde. De apparaten hebben allemaal een eigen topic, dus in deze specifieke situatie volgen er 10 berichten zeer snel achter elkaar op één topic.
Zoals in de vorige alinea gesteld; ik mis berichten. Ik heb een desktop-applicatie draaien waarmee ik kan zien dat ook daadwerkelijk alle 800 berichten op de broker zijn verschenen. Ik mis ongeveer de helft in de database en daarom ben ik vanuit Laravel de afhandeling van de berichten gaan loggen, in de logging zie ik ook maar ongeveer de helft terug.
Hieronder wat stukken code die hopelijk iets meer verduidelijken wat er gebeurt:
Laravel Kernel.php
MqttSubscriber
DeviceMessageReceiver
En natuurlijk het aanroepen van de Queue zelf door een cronjob iedere minuut:
De grote vraag is natuurlijk; wat gaat hier dan mis en waarom mis ik een gedeelte van mijn berichten? Door aan het begin en aan het eind van de functie die e.e.a. afhandelt te loggen kan ik zien dat bij de berichten die ik mis regel 1 (logging) van de functie niet eens loopt.
Ik heb hierboven al genoemd dat er veel berichten nagenoeg tegelijkertijd op de broker komen; ergens heb ik dus het vermoeden dat er zoiets gebeurt:
• Message #1 wordt gepost op MQTT broker
• Job ziet dit bericht en DeviceMessageReceiver gaat deze afhandelen
• DeviceMessageReceiver is bezig met afhandelen
• Message #2 wordt gepost op MQTT broker
• DeviceMessageReceiver is nog bezig met message #1
• Message #3 wordt gepost op MQTT broker
• DeviceMessageReceiver is klaar met message #1 en gaat nu door met message #3
• Message #2 is niet gezien door de Job omdat message #3 'eroverheen' is gepost
Neemt niet weg dat meerdere instanties van DeviceMessageReceiver volgens mij consecutief naast elkaar zouden moeten kunnen draaien
En dan rijst de vraag, los ik het überhaupt wel op de juiste manier op zo? Ik heb overwogen om een daemon te maken in JS (Note-RED) en deze te draaien op de VPS naast de MQTT-broker. Ieder bericht wat door de daemon afgevangen wordt zou dan (afhankelijk van het topic) naar de vier verschillende pakketten gestuurd kunnen worden door een API-endpoint aan te roepen. Voordeel van deze methode is dan gelijk dat ik een soort 'handshake' kan uitvoeren. De API van de applicatie stuurt een success-response terug naar de daemon waardoor deze weet dat het bericht succesvol is aangekomen; is dit niet het geval zou de daemon (na een timeout?) het bericht opnieuw kunnen aanbieden. Lijkt me een goed plan in theorie maar ik zit eigenlijk niet voldoende in deze materie om hier zelf een weloverwogen besluit in te maken.
Iemand ideeën of pointers?
Casus
Ik heb vier verschillende webapplicaties in Laravel draaien die allemaal verschillende IoT-dingen doen. De webapplicaties draaien op een shared hosting omgeving en de MQTT-broker (Mosquitto) draait op een VPS. De verschillende applicaties maken allemaal gebruik van de Laravel Job Queue en de library php-mqtt
Naarmate de applicaties beginnen te schalen (er komen steeds meer IoT-apparaten bij) merk ik steeds meer problemen. Een van de meest acute problemen waar ik nu mee zit is dat ik in een specifieke applicatie berichten lijk te missen. Het gaat hier om pak hem beet 100 apparaten die gelijktijdig meerdere berichten sturen, d.w.z. op een specifieke tijd sturen alle 100 apparaten 8 berichten naar de MQTT broker, in totaal dus 800 berichten in een tijdsbestek van minder dan een seconde. De apparaten hebben allemaal een eigen topic, dus in deze specifieke situatie volgen er 10 berichten zeer snel achter elkaar op één topic.
Zoals in de vorige alinea gesteld; ik mis berichten. Ik heb een desktop-applicatie draaien waarmee ik kan zien dat ook daadwerkelijk alle 800 berichten op de broker zijn verschenen. Ik mis ongeveer de helft in de database en daarom ben ik vanuit Laravel de afhandeling van de berichten gaan loggen, in de logging zie ik ook maar ongeveer de helft terug.
Hieronder wat stukken code die hopelijk iets meer verduidelijken wat er gebeurt:
Laravel Kernel.php
PHP:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| class Kernel extends ConsoleKernel { protected function schedule(Schedule $schedule){ $schedule->call(function(){ $this->checkDevicePowerReadout(); })->everyMinute(); $schedule->call(function(){ $this->removeOldMqttHeartbeats(); })->dailyAt('12:00'); $schedule->job(new AutoSetChecker(), 'high', 'database')->everyFiveMinutes(); $schedule->job(new CheckCommunication(), 'low', 'database')->hourly(); $schedule->job(new MqttSubscriber(), 'high', 'sync')->everyFiveMinutes(); $schedule->command('queue:work --stop-when-empty')->everyMinute(); } protected function commands() { $this->load(__DIR__.'/Commands'); require base_path('routes/console.php'); } protected function checkDevicePowerReadout(){ // some code } protected function removeOldMqttHeartbeats(){ // some code } } |
MqttSubscriber
PHP:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| class MqttSubscriber implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $response_topics; public function __construct(){ $this->response_topics= Device::pluck('response_topic')->toArray(); } public function handle(){ try { $mqtt_client_id = MqttController::guidv4(); $mqtt_client = new MqttClient(env('MQTT_HOST', ''), env('MQTT_PORT', ''), $mqtt_client_id, MqttClient::MQTT_3_1, null); $connectionSettings = (new ConnectionSettings()) ->setUsername(env('MQTT_USERNAME', '')) ->setPassword(env('MQTT_PASSWORD', '')); $mqtt_client->connect($connectionSettings, true); $mqtt_client->subscribe('application1/#', function (string $topic, string $message, bool $retained) { if(in_array($topic, $this->response_topics)){ DeviceMessageReceiver::dispatch($topic, $message)->onConnection('sync'); } }, MqttClient::QOS_AT_LEAST_ONCE); $mqtt_client->registerLoopEventHandler(function ($mqtt_client, float $elapsedTime) { if ($elapsedTime >= 5 * 60) { $mqtt_client->interrupt(); } }); $mqtt_client->loop(true); $mqtt_client->disconnect(); } catch (ContractsMqttClient $e) { Log::debug('Subscribing to a topic using QoS 0 failed. An exception occurred.'); } catch (Exception $E) { Log::debug('Subscribing to a topic using QoS 0 failed. An exception occurred.'); } } } |
DeviceMessageReceiver
PHP:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| class DeviceMessageReceiver implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $topic; protected $payload; public function __construct($topic, $payload){ $this->topic = $topic; $this->payload = $payload; } public function handle(){ Log::debug('Msg start ' . $topic); // Lange if-else constructie // Verschillende type berichten worden verschillende afgehandeld Log::debug('Msg complete ' . $topic); } } |
En natuurlijk het aanroepen van de Queue zelf door een cronjob iedere minuut:
code:
1
| ls /home/username/domains/domain.tld/public_html >> /dev/null 2>&1 && cd /home/username/domains/domain.tld/public_html && php artisan schedule:run |
De grote vraag is natuurlijk; wat gaat hier dan mis en waarom mis ik een gedeelte van mijn berichten? Door aan het begin en aan het eind van de functie die e.e.a. afhandelt te loggen kan ik zien dat bij de berichten die ik mis regel 1 (logging) van de functie niet eens loopt.
Ik heb hierboven al genoemd dat er veel berichten nagenoeg tegelijkertijd op de broker komen; ergens heb ik dus het vermoeden dat er zoiets gebeurt:
• Message #1 wordt gepost op MQTT broker
• Job ziet dit bericht en DeviceMessageReceiver gaat deze afhandelen
• DeviceMessageReceiver is bezig met afhandelen
• Message #2 wordt gepost op MQTT broker
• DeviceMessageReceiver is nog bezig met message #1
• Message #3 wordt gepost op MQTT broker
• DeviceMessageReceiver is klaar met message #1 en gaat nu door met message #3
• Message #2 is niet gezien door de Job omdat message #3 'eroverheen' is gepost
Neemt niet weg dat meerdere instanties van DeviceMessageReceiver volgens mij consecutief naast elkaar zouden moeten kunnen draaien
En dan rijst de vraag, los ik het überhaupt wel op de juiste manier op zo? Ik heb overwogen om een daemon te maken in JS (Note-RED) en deze te draaien op de VPS naast de MQTT-broker. Ieder bericht wat door de daemon afgevangen wordt zou dan (afhankelijk van het topic) naar de vier verschillende pakketten gestuurd kunnen worden door een API-endpoint aan te roepen. Voordeel van deze methode is dan gelijk dat ik een soort 'handshake' kan uitvoeren. De API van de applicatie stuurt een success-response terug naar de daemon waardoor deze weet dat het bericht succesvol is aangekomen; is dit niet het geval zou de daemon (na een timeout?) het bericht opnieuw kunnen aanbieden. Lijkt me een goed plan in theorie maar ik zit eigenlijk niet voldoende in deze materie om hier zelf een weloverwogen besluit in te maken.
Iemand ideeën of pointers?
Hoeder van het Noord-Meierijse dialect