[PHP/JS/Linux] MQTT probleem oplosen met Laravel of anders?

Pagina: 1
Acties:

Vraag


Acties:
  • 0 Henk 'm!

  • Harrie_
  • Registratie: Juli 2003
  • Niet online

Harrie_

⠀                  🔴 🔴 🔴 🔴 🔴

Topicstarter
Ik ben al een aantal weken bezig om een probleem opgelost te krijgen, maar ik zie inmiddels door de bomen het bos niet meer. Momenteel heb ik e.e.a. draaien in Laravel maar ik twijfel zelf of dit de beste oplossing is voor deze usecase. Ik wil daarom iedereen die misschien geen/weinig verstand van PHP of Laravel heeft maar des te meer weet over MQTT graag uitnodigen om vooral ook even door te lezen onder de lap code die ik hieronder post, want ik sta open voor andere (non-Laravel/PHP) oplossingen

Casus
Ik heb vier verschillende webapplicaties in Laravel draaien die allemaal verschillende IoT-dingen doen. De webapplicaties draaien op een shared hosting omgeving en de MQTT-broker (Mosquitto) draait op een VPS. De verschillende applicaties maken allemaal gebruik van de Laravel Job Queue en de library php-mqtt

Naarmate de applicaties beginnen te schalen (er komen steeds meer IoT-apparaten bij) merk ik steeds meer problemen. Een van de meest acute problemen waar ik nu mee zit is dat ik in een specifieke applicatie berichten lijk te missen. Het gaat hier om pak hem beet 100 apparaten die gelijktijdig meerdere berichten sturen, d.w.z. op een specifieke tijd sturen alle 100 apparaten 8 berichten naar de MQTT broker, in totaal dus 800 berichten in een tijdsbestek van minder dan een seconde. De apparaten hebben allemaal een eigen topic, dus in deze specifieke situatie volgen er 10 berichten zeer snel achter elkaar op één topic.

Zoals in de vorige alinea gesteld; ik mis berichten. Ik heb een desktop-applicatie draaien waarmee ik kan zien dat ook daadwerkelijk alle 800 berichten op de broker zijn verschenen. Ik mis ongeveer de helft in de database en daarom ben ik vanuit Laravel de afhandeling van de berichten gaan loggen, in de logging zie ik ook maar ongeveer de helft terug.

Hieronder wat stukken code die hopelijk iets meer verduidelijken wat er gebeurt:

Laravel Kernel.php
PHP:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Kernel extends ConsoleKernel {

    protected function schedule(Schedule $schedule){
        $schedule->call(function(){
            $this->checkDevicePowerReadout();
        })->everyMinute();

        $schedule->call(function(){
            $this->removeOldMqttHeartbeats();
        })->dailyAt('12:00');

        $schedule->job(new AutoSetChecker(), 'high', 'database')->everyFiveMinutes();

        $schedule->job(new CheckCommunication(), 'low', 'database')->hourly();

        $schedule->job(new MqttSubscriber(), 'high', 'sync')->everyFiveMinutes();                 
        
        $schedule->command('queue:work --stop-when-empty')->everyMinute();
    }

    protected function commands()
    {
        $this->load(__DIR__.'/Commands');

        require base_path('routes/console.php');
    }

    protected function checkDevicePowerReadout(){
        // some code
    }

    protected function removeOldMqttHeartbeats(){
        // some code
    }
}


MqttSubscriber
PHP:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class MqttSubscriber implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $response_topics;

    public function __construct(){
        $this->response_topics= Device::pluck('response_topic')->toArray();
    }

    public function handle(){
        try {                               
            $mqtt_client_id = MqttController::guidv4();                        
            
            $mqtt_client = new MqttClient(env('MQTT_HOST', ''),  env('MQTT_PORT', ''), $mqtt_client_id, 
              MqttClient::MQTT_3_1, null);
            
            $connectionSettings = (new ConnectionSettings())
            ->setUsername(env('MQTT_USERNAME', ''))
            ->setPassword(env('MQTT_PASSWORD', ''));
            
            $mqtt_client->connect($connectionSettings, true);        
            $mqtt_client->subscribe('application1/#', function (string $topic, string $message, bool $retained) {                       
                if(in_array($topic, $this->response_topics)){
                    DeviceMessageReceiver::dispatch($topic, $message)->onConnection('sync');                
                }
            }, MqttClient::QOS_AT_LEAST_ONCE);
            $mqtt_client->registerLoopEventHandler(function ($mqtt_client, float $elapsedTime) {
                if ($elapsedTime >= 5 * 60) {
                    $mqtt_client->interrupt();
                }
            });
            $mqtt_client->loop(true); $mqtt_client->disconnect();
        } catch (ContractsMqttClient $e) {        
            Log::debug('Subscribing to a topic using QoS 0 failed. An exception occurred.');
        } catch (Exception $E) {
            Log::debug('Subscribing to a topic using QoS 0 failed. An exception occurred.');
        }
    }
}


DeviceMessageReceiver
PHP:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class DeviceMessageReceiver implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $topic;
    protected $payload;

    public function __construct($topic, $payload){
        $this->topic = $topic;
        $this->payload = $payload;
    }

    public function handle(){
        Log::debug('Msg start ' . $topic);

        // Lange if-else constructie
        // Verschillende type berichten worden verschillende afgehandeld        

        Log::debug('Msg complete ' . $topic);
    }
}


En natuurlijk het aanroepen van de Queue zelf door een cronjob iedere minuut:
code:
1
ls /home/username/domains/domain.tld/public_html >> /dev/null 2>&1 && cd /home/username/domains/domain.tld/public_html && php artisan schedule:run


De grote vraag is natuurlijk; wat gaat hier dan mis en waarom mis ik een gedeelte van mijn berichten? Door aan het begin en aan het eind van de functie die e.e.a. afhandelt te loggen kan ik zien dat bij de berichten die ik mis regel 1 (logging) van de functie niet eens loopt.

Ik heb hierboven al genoemd dat er veel berichten nagenoeg tegelijkertijd op de broker komen; ergens heb ik dus het vermoeden dat er zoiets gebeurt:
• Message #1 wordt gepost op MQTT broker
• Job ziet dit bericht en DeviceMessageReceiver gaat deze afhandelen
• DeviceMessageReceiver is bezig met afhandelen
• Message #2 wordt gepost op MQTT broker
• DeviceMessageReceiver is nog bezig met message #1
• Message #3 wordt gepost op MQTT broker
• DeviceMessageReceiver is klaar met message #1 en gaat nu door met message #3
• Message #2 is niet gezien door de Job omdat message #3 'eroverheen' is gepost

Neemt niet weg dat meerdere instanties van DeviceMessageReceiver volgens mij consecutief naast elkaar zouden moeten kunnen draaien :?

En dan rijst de vraag, los ik het überhaupt wel op de juiste manier op zo? Ik heb overwogen om een daemon te maken in JS (Note-RED) en deze te draaien op de VPS naast de MQTT-broker. Ieder bericht wat door de daemon afgevangen wordt zou dan (afhankelijk van het topic) naar de vier verschillende pakketten gestuurd kunnen worden door een API-endpoint aan te roepen. Voordeel van deze methode is dan gelijk dat ik een soort 'handshake' kan uitvoeren. De API van de applicatie stuurt een success-response terug naar de daemon waardoor deze weet dat het bericht succesvol is aangekomen; is dit niet het geval zou de daemon (na een timeout?) het bericht opnieuw kunnen aanbieden. Lijkt me een goed plan in theorie maar ik zit eigenlijk niet voldoende in deze materie om hier zelf een weloverwogen besluit in te maken.

Iemand ideeën of pointers?

Hoeder van het Noord-Meierijse dialect

Beste antwoord (via Harrie_ op 22-02-2024 12:00)


  • Barryvdh
  • Registratie: Juni 2003
  • Laatst online: 00:22
Voutloos schreef op dinsdag 20 februari 2024 @ 21:55:
[...]
Alles in schedule() gebeurt in een enkel draadje. Er gaan 2 jobs naar de database queue, dat is zo weggeschreven dus niet heel spannend. Maar de rest is allemaal serieel en blocking. Als je dan langlopende zaken gaat doen, en de cron gaat inmiddels weer af, garandeer dan maar eens dat het aantal processen dat met een bepaalde stap bezig is gelijk is wat jij bedoeld had.

Dus ik zou beginnen met de 2 langslopende dingen in eigen regeltje van je cron te zetten. De queue:work kan misschien met een —max-time=55 en dan elke minuut?
(Uiteraard is supervisor beter en ook aanbevolen in de docs, maar je moet wat)

offtopic:
Btw, ook uit de docs: Enkel env() in je config files en anders config(), zodat je config:cache kan doen. Scheelt een flink stuk bij http, als http van toepassing is. ;)
Je kan ook withoutOverlapping en runInBackground toevoegen volgens mij, dan zijn het losse processen en gaan ze niet 2x lopen.

Wij gebruiken emqx als broker, daar kan je in de broker zelf instellen dat je een webhook met het bericht stuurt in plaats van met PHP luisteren. Dat kan opzich wel, maar dan zou je een langlopend proces moeten draaien, ipv een job met timeout van 5 minuten.

Alle reacties


Acties:
  • 0 Henk 'm!

  • RobIII
  • Registratie: December 2001
  • Laatst online: 22:50

RobIII

Admin Devschuur®

^ Romeinse Ⅲ ja!

Is het niet gewoon de max_inflight_messages waar je nu tegenaan loopt?
Edit: Nee, misschien niet die. Maar er staat me wel iets bij van max-messages-something. max_queued_messages maybe?

[ Voor 45% gewijzigd door RobIII op 20-02-2024 16:22 ]

There are only two hard problems in distributed systems: 2. Exactly-once delivery 1. Guaranteed order of messages 2. Exactly-once delivery.

Je eigen tweaker.me redirect

Over mij


Acties:
  • 0 Henk 'm!

  • Harrie_
  • Registratie: Juli 2003
  • Niet online

Harrie_

⠀                  🔴 🔴 🔴 🔴 🔴

Topicstarter
RobIII schreef op dinsdag 20 februari 2024 @ 16:19:
Is het niet gewoon de max_inflight_messages waar je nu tegenaan loopt?
Edit: Nee, misschien niet die. Maar er staat me wel iets bij van max-messages-something. max_queued_messages maybe?
Nee hier ligt het probleem niet, max_queued_messages heeft betrekking op het aantal QoS-1 en -2 berichten die vastgehouden worden maar alle berichten worden verzonden met QoS-0. Dat betekent natuurlijk wel dat de applicatie er als de sodemieter bij moet zijn om het bericht af te vangen want op het moment dat er een nieuw bericht komt op hetzelfde topic is de oude natuurlijk weg.

Ik ben zelf in de veronderstelling dat in mijn applicatie (door het gebruik van DeviceMessageReceiver::dispatch();) alle berichten asynchroon naar mijn handler worden gedispatcht, maarja then again ik sluit ook niet uit dat PHP gewoon te traag is om in luttele miliseconden al die berichten te lezen en door te dispatchen.

Hoeder van het Noord-Meierijse dialect


Acties:
  • 0 Henk 'm!

  • Voutloos
  • Registratie: Januari 2002
  • Niet online
De queue:work doe ik zelf nooit via de schedule(), want dan moet je gaan redeneren over meerdere workers als een worker niet binnen een minuut (eigenlijk minder ivm synchrone code erbij) gestopt is.

{signature}


Acties:
  • 0 Henk 'm!

  • Voutloos
  • Registratie: Januari 2002
  • Niet online
Als je de subscriber elke 5 minuten in sync doet, doe je elke 5e minuut de queue:work niet of veel te laat?

{signature}


Acties:
  • 0 Henk 'm!

  • Demonitzu
  • Registratie: Augustus 2012
  • Niet online

Demonitzu

Incidentele gebruiker

PHP:
1
$schedule->command('queue:work --stop-when-empty')->everyMinute();

Ik denk dat dit de boosdoener is, omdat het op deze manier niet helemaal lijkt te werken zoals je denkt dat het zou moeten werken. Beter gebruik je bv. Supervisor hiervoor.

TekkenZone - Dutch Tekken Community


Acties:
  • 0 Henk 'm!

  • Harrie_
  • Registratie: Juli 2003
  • Niet online

Harrie_

⠀                  🔴 🔴 🔴 🔴 🔴

Topicstarter
Voutloos schreef op dinsdag 20 februari 2024 @ 16:44:
De queue:work doe ik zelf nooit via de schedule(), want dan moet je gaan redeneren over meerdere workers als een worker niet binnen een minuut (eigenlijk minder ivm synchrone code erbij) gestopt is.
Voutloos schreef op dinsdag 20 februari 2024 @ 16:55:
Als je de subscriber elke 5 minuten in sync doet, doe je elke 5e minuut de queue:work niet of veel te laat?
Kun je dat eens uitleggen want dit kan ik niet helemaal volgen? Je bedoelt dat ik dit commando aanroep vanuit de cronjob? Maar wat is dan het verschil met het commando aanroepen vanuit de code?

Hoeder van het Noord-Meierijse dialect


Acties:
  • 0 Henk 'm!

  • Harrie_
  • Registratie: Juli 2003
  • Niet online

Harrie_

⠀                  🔴 🔴 🔴 🔴 🔴

Topicstarter
Demonitzu schreef op dinsdag 20 februari 2024 @ 17:42:
[...]
Beter gebruik je bv. Supervisor hiervoor.
Helaas geen optie op een shared hosting omgeving...

Hoeder van het Noord-Meierijse dialect


Acties:
  • 0 Henk 'm!

  • Voutloos
  • Registratie: Januari 2002
  • Niet online
Harrie_ schreef op dinsdag 20 februari 2024 @ 17:54:

Kun je dat eens uitleggen want dit kan ik niet helemaal volgen? Je bedoelt dat ik dit commando aanroep vanuit de cronjob? Maar wat is dan het verschil met het commando aanroepen vanuit de code?
Alles in schedule() gebeurt in een enkel draadje. Er gaan 2 jobs naar de database queue, dat is zo weggeschreven dus niet heel spannend. Maar de rest is allemaal serieel en blocking. Als je dan langlopende zaken gaat doen, en de cron gaat inmiddels weer af, garandeer dan maar eens dat het aantal processen dat met een bepaalde stap bezig is gelijk is wat jij bedoeld had.

Dus ik zou beginnen met de 2 langslopende dingen in eigen regeltje van je cron te zetten. De queue:work kan misschien met een —max-time=55 en dan elke minuut?
(Uiteraard is supervisor beter en ook aanbevolen in de docs, maar je moet wat)

offtopic:
Btw, ook uit de docs: Enkel env() in je config files en anders config(), zodat je config:cache kan doen. Scheelt een flink stuk bij http, als http van toepassing is. ;)

{signature}


Acties:
  • Beste antwoord
  • 0 Henk 'm!

  • Barryvdh
  • Registratie: Juni 2003
  • Laatst online: 00:22
Voutloos schreef op dinsdag 20 februari 2024 @ 21:55:
[...]
Alles in schedule() gebeurt in een enkel draadje. Er gaan 2 jobs naar de database queue, dat is zo weggeschreven dus niet heel spannend. Maar de rest is allemaal serieel en blocking. Als je dan langlopende zaken gaat doen, en de cron gaat inmiddels weer af, garandeer dan maar eens dat het aantal processen dat met een bepaalde stap bezig is gelijk is wat jij bedoeld had.

Dus ik zou beginnen met de 2 langslopende dingen in eigen regeltje van je cron te zetten. De queue:work kan misschien met een —max-time=55 en dan elke minuut?
(Uiteraard is supervisor beter en ook aanbevolen in de docs, maar je moet wat)

offtopic:
Btw, ook uit de docs: Enkel env() in je config files en anders config(), zodat je config:cache kan doen. Scheelt een flink stuk bij http, als http van toepassing is. ;)
Je kan ook withoutOverlapping en runInBackground toevoegen volgens mij, dan zijn het losse processen en gaan ze niet 2x lopen.

Wij gebruiken emqx als broker, daar kan je in de broker zelf instellen dat je een webhook met het bericht stuurt in plaats van met PHP luisteren. Dat kan opzich wel, maar dan zou je een langlopend proces moeten draaien, ipv een job met timeout van 5 minuten.

  • Harrie_
  • Registratie: Juli 2003
  • Niet online

Harrie_

⠀                  🔴 🔴 🔴 🔴 🔴

Topicstarter
Barryvdh schreef op woensdag 21 februari 2024 @ 12:18:
[...]
Wij gebruiken emqx als broker, daar kan je in de broker zelf instellen dat je een webhook met het bericht stuurt in plaats van met PHP luisteren. Dat kan opzich wel, maar dan zou je een langlopend proces moeten draaien, ipv een job met timeout van 5 minuten.
Zoals ik in de OP ook aangaf; ik speel met de gedachte om direct vanaf de broker berichten naar een endpoint van iedere applicatie te pushen. Ik dacht dit zelf op te moeten lossen met weer een extra applicatie naast Mosquitto, maar nu ik zie dat EMQX dit out of the box ondersteunt ga ik toch maar eens een test opzetten met deze broker. Bedankt voor deze tip als ik er niet uit kom, kom ik wel naar Rooi gereden :X

Hoeder van het Noord-Meierijse dialect


Acties:
  • +1 Henk 'm!

  • mbe81
  • Registratie: Juni 2008
  • Laatst online: 21:40
Harrie_ schreef op donderdag 22 februari 2024 @ 12:02:
[...]


Zoals ik in de OP ook aangaf; ik speel met de gedachte om direct vanaf de broker berichten naar een endpoint van iedere applicatie te pushen. Ik dacht dit zelf op te moeten lossen met weer een extra applicatie naast Mosquitto, maar nu ik zie dat EMQX dit out of the box ondersteunt ga ik toch maar eens een test opzetten met deze broker. Bedankt voor deze tip als ik er niet uit kom, kom ik wel naar Rooi gereden :X
Ter info: EMQX heeft ook sinds afgelopen week een EU-deployment van hun serverless omgeving.

Acties:
  • 0 Henk 'm!

  • Barryvdh
  • Registratie: Juni 2003
  • Laatst online: 00:22
mbe81 schreef op dinsdag 27 februari 2024 @ 17:59:
[...]


Ter info: EMQX heeft ook sinds afgelopen week een EU-deployment van hun serverless omgeving.
Ahja, wij hebben hem zelf gewoon draaien met docker script, werkt ook goed :)
Pagina: 1