Rabbitmq pentru începători, ajaxblog

RabbitMQ server de coadă

Uneori, în aplicații web devine necesară pentru a efectua sarcini complexe CPU-intensive, care nu pot fi umeschayas într-un interval de timp scurt a cererii HTTP. În acest caz, cozile vin la salvare. Ideea principală a cozilor este de a evita executarea sarcinilor intensive de resurse imediat după trimiterea cererii. În schimb, sarcina este în așteptare pentru executarea ulterioară în modul asincron. Ie la primirea unei cereri de la un client, am încapsulează sarcina ca mesaj si trimite-l la coada, iar coada este deja handler primește mesaje în ordine și procesele lor în mod corespunzător. Privind în perspectivă, voi spune că poate exista un mod de coadă, atunci când, în prezența mai multor copii ale handler, următoarea sarcină va fi de a acționa pe procesor liber. Astfel, se realizează paralelizarea sarcinilor.







Această secțiune discută lucrul cu cozi care utilizează serverul de mesaje RabbitMQ. Serverul RabbitMQ este în esență un manager de coadă, care are următoarele avantaje:

În tutoriale, se vor da exemple pentru toate opțiunile de mai sus. Bazat pe tutorialele site-ului oficial, completate și implementate în PHP pentru RabbitMQ.

RabbitMQ utilizează protocolul AMQP. Pentru a utiliza RabbitMQ, trebuie să puneți părțile clientului și serverului.

Instalarea serverului

Pentru a instala extensia AMQP pentru PHP, trebuie mai întâi să instalați serverul RabbitMQ

Adăugați următoarea linie în fișierul /etc/apt/sources.list

Instalarea clientului

Concepte de bază

Următoarea notație este folosită în RabbitMQ. Producătorul este un program care trimite mesaje. O vom marca prin

Brokerul (coada) este de fapt doar un tampon în memorie, fără restricții asupra numărului de mesaje stocate. Mai mulți producători pot trimite mesaje la aceeași coadă de așteptare, iar mai mulți observatori pot încerca să primească mesaje din aceeași coadă. Coada de așteptare va fi indicată după cum urmează (numele coadajului este afișat în partea de sus)

Consumatorul (destinatar) este un program care primește mesaje din coadă. O vom marca prin

Aici este important să rețineți că producătorul, consumatorul și brokerul pot fi localizate pe diferite mașini, în plus, în cele mai multe cazuri, acesta este cazul.

Primul script de lucru cu coada de așteptare, un fel de "Hello world", va trimite un mesaj text de la client, îl va primi pe server și îl va afișa pe ecran.

Ie Schema de lucru este după cum urmează: Primul lucru pe care trebuie să-l faceți este să vă conectați la serverul RabbitMQ. Conexiunea este stabilită prin comenzi

Utilizând conexiunea, puteți obține un obiect pentru canal

Pe baza canalului recepționat vom crea un schimbător

și, de fapt, coada de așteptare

Când schimbătorul și coada sunt pregătite, ele pot fi legate de cheie

După expedierea mesajului, conexiunea poate fi întreruptă.

De asemenea, destinatarul trebuie să urmeze aceeași succesiune - să se conecteze la serverul de mesaje; - a crea un canal; - declararea schimbătorului; - declararea coadajului; - conectați coada la schimbătorul de chei Ultimele două acțiuni, așa cum au fost menționate mai sus, nu sunt necesare. Acum puteți începe să ascultați coada

Aici, metoda get primește constanta ARMQ_AUTOACK în valoarea parametrului, care notifică serverului de mesaje că mesajul a fost primit. Acesta este cel mai simplu mod de a șterge un mesaj din coadă. Cu toate acestea, în acest caz, dacă mesajul este procesat fără succes, este imposibil să îl returnați din nou în coadă.

Astfel, primim două scripturi

Cozi de distribuție

Rabbitmq pentru începători, ajaxblog

Notificare (confirmare)

Unele sarcini pot fi efectuate mult timp. Și cine știe ce s-ar putea întâmpla cu serverul de la acest punct: serverul poate reporni, sau problema se poate bloca sau a termina o greșeală fatală. În prima notificare tutorial a fost deconectat de parametrul de transmisie AMQP_AUTOACK get) metoda (. În acest caz, mesajele sunt șterse din memorie imediat după metoda get, în cazul unei erori apărute în timpul procesării, nu va reveni la coada. Pentru a evita acest lucru, nu vom trece constanta AMQP_AUTOACK la metoda get. In schimb, la finalizarea procesării metodei de apel ACK (), care notifică brokerul că mesajul a fost procesat cu succes, și o puteți șterge din memorie. În caz contrar, RabbitMQ înțelege că mesajul nu este procesat și îl redirecționează către un alt consumator gratuit. Cu toate acestea, aici merită remarcat un punct important. Mesajele Transmis nu sunt procesate până când până când nu otkonnektitsya și consumatorilor Again prikonnektitsya la broker. Dacă doriți să proceseze mesajul în același mesaj al unei conexiuni la server, este necesar pentru a apela un NACK metoda (), cu pavilion AMQP_REQUEUE care au pus tratate fără succes problema înapoi în coada de așteptare și notifică brokerul că această sarcină ar trebui să fie re-prelucrate.







Eroare distribuită - când notificarea este activată, nu confirmați sarcini (mesaje) prelucrate corect. În acest caz, pentru fiecare conexiune nouă, toate sarcinile deja procesate vor fi reprocesate. Procesul va arăta ca o repetare neplăcută a mesajelor, ceea ce va duce în cele din urmă la depășirea memoriei. Urmăriți această situație utilizând instrumentul rabbitmqctl pentru serverul de mesaje native

Viabilitatea mesajelor (durabilitate)

În paragraful anterior, am examinat cum să nu pierdem un mesaj în coadă, trimițându-l înapoi la coadă. Cu toate acestea, mesajul poate fi pierdut dacă serverul de mesaje a fost brusc oprit. Pentru a evita acest lucru, coada trebuie să fie creată cu pavilionul AMQP_DURABLE.

În cazul în care locul „bună“ a fost deja declarat, atunci acest cod va eșua, deoarece nu se poate declara încă o dată declarată toți ceilalți parametri. Din această situație are două opțiuni, fie pentru a reseta toate cozile de cum se spune aici, sau pentru a crea o nouă coadă la numele neutilizat. Vedeți lista de cozi, conform modului menționat în paragraful anterior. Instalarea de pavilion AMQP_DURABLE nu garantează sute de mesaje de siguranță la sută din coadă. În ciuda faptului că o astfel de spoposbom vom specifica RabbitMQ salvați mesajele de pe disc, există o zonă moartă după ce a primit sooscheniya când este deja în memorie, dar nu au fost încă salvate pe disc. În acest moment, în cazul unei situații neprevăzute, aceasta poate fi pierdută din memorie. De exemplul nostru simplu de astfel de garanții este suficient, dar, în cazul în care garanția de mare de a primi mesaje trebuie să ajungă, este necesar să se utilizeze tranzacții.

Toți împreună

De exemplu, distribuirea mesajelor între cozi, avem nevoie de o funcție care să simuleze volumul de lucru al sistemului. Pentru aceasta folosim un cronometru normal

Pentru a efectua schimbător shalbonu de comunicare trebuie să aibă un tip de subiect, care este definit constant AMQP_EX_TYPE_TOPIC. Cheile routingKey formate din cuvinte, ca urmare, prin punctul, de exemplu, „logs.devices.kernel.notice“, „logs.devices.cron“. Lungimea maximă a unei astfel de taste poate fi de 255 de caractere. Logica de livrare a mesajelor-cheie similare cu logica pentru schimbătoarele cu tipul de directe - mesaj cu o cheie specifică va fi livrată în conformitate cu cheia corespunzătoare. Dar există o mare diferență. Tastele utilizate pentru comunicarea șablonului pot conține două caractere speciale:

  • *. corespunde strict unui singur cuvânt;
  • #. corespunde oricărui număr de cuvinte, inclusiv absența cuvintelor;

De exemplu, avem următoarele relații
* .orange. *
*. * iepure
leneș #

Rabbitmq pentru începători, ajaxblog

Primul cuvânt descrie viteza, a doua - culoarea și a treia - tipul de animal, adică [viteză] [culoare] [specie]. Am creat trei linkuri: coada Q1 este legată de cheia ".orange. "Și coada Q2 - cu taste". .rabit "și" leneș. # ". Astfel, putem spune că coada Q1 ia în considerare toate animalele portocalii și transformă Q2 - toți iepurii și toate animalele lente.

Să luăm în considerare câteva exemple:

  • "Quick.orange.rabbit" - în ambele cozi
  • "Lazy.orange.elephant" - în ambele cozi
  • "Quick.orange.fox" - numai în prima
  • "Lazy.brown.fox" - numai în cel de-al doilea
  • "Quick.brown.fox" - va fi aruncat
  • "Quick.orange.male.fox" - vor fi aruncate
  • "Lazy.orange.male.fox" - numai în cel de-al doilea

Un schimbător cu tipul de subiect poate repeta comportamentul schimbătorului cu tipul fanout, dacă asociați coada cu tasta "#" cu ea. Dacă cheia nu utilizează caractere speciale, atunci un astfel de schimbător se va potrivi schimbătorului cu tipul direct.

Trimiterea mesajelor

Pentru a trimite mesaje prin șablon, schimbătorul trebuie să fie creat cu un tip de subiect care corespunde cu constantul AMQP_EX_TYPE_TOPIC.

Implementarea șablonului RPC

În a doua lecție, a fost implementată o coadă care distribuia încărcătura între toate produsele de consum disponibile. Dar, dacă avem nevoie pentru a obține rezultatul de la operatorul de coadă de așteptare. Această abordare este cunoscută sub numele de procedura de apel la distanță (RPC) sau procedura de apel la distanță (RPC). În această lecție, veți implementa modelul RPC utilizând coada de mesaje RabbitMQ. Desigur, această abordare presupune că procesarea nu ar trebui să dureze mult timp. Pentru a implementa exemplul, managerul nostru de funcții va modifica mesajul "mesaj înainte de" la "mesaj după".

În general, implementarea RPC prin intermediul programului RabbitMQ este destul de simplă. Clientul trimite un mesaj, iar serverul răspunde. Pentru a procesa răspunsul la server, trebuie să creați o coadă de apel invers. Pentru a afla ce coadă de așteptare așteaptă un răspuns, trebuie să trimitem numele acesteia în cerere. Pentru a face acest lucru, este creată o coadă anonimă pentru producător și numele său este adăugat la parametrii de interogare

Rețineți că coada de apel inversă este creată cu pavilionul AMQP_EXCLUSIVE, ceea ce înseamnă că numai un singur consumator poate asculta această coadă.

ID-ul de identificare

În metoda prezentată mai sus, intenționăm să creăm o coadă de apel invers pentru fiecare cerere RPC. Deoarece nu puteți identifica în mod unic interogarea prin care aparține răspunsul, se adaugă și un parametru correlationId la interogare, care are o valoare unică pentru fiecare solicitare. Mai târziu, când obținem răspunsul, putem compara corelația cu valoarea transmisă împreună cu interogarea. Și dacă nu coincid, aruncați răspunsul.

Planul final de acțiune

  • clientul creează o coadă de apel invers anonimă exclusivă
  • clientul trimite cererea cu doi parametri: replyTo - numele de apel invers al corralation-ului de coadă1d - o valoare unică pentru fiecare solicitare
  • cererea este trimisă la o coadă denumită, de exemplu, cu numele rpc_queue
  • Un RPC muncitor (server RPC) așteaptă o solicitare din coadă și atunci când apare solicitarea, procesează și trimite înapoi un răspuns la client, folosind numele cozii de apel invers ca o cheie router
  • clientul ascultă coada de așteptare și când apare mesajul, verificați corelațiaId. Dacă valoarea acestei proprietăți din mesajul primit corespunde valorii generate anterior, răspunsul este procesat de către aplicație.

Toți împreună

Funcția de procesare a mesajelor de la server este după cum urmează







Articole similare

Trimiteți-le prietenilor: