Artykuły

A A A
Drukuj Ekportuj do PDF
Opublikowane: 2005.12.27 21:26 | CamlanEx | Aktualizacja: 2010.01.21 2:04

Wprowadzenie do kolejek komunikatów (Message Queues) na platformie .NET 1.1.

Artykuł prezentuje zagadnie, jakim jest programowanie z wykorzystaniem kolejek komunikatów. Znajdują się w nim podstawowe informacje odnośnie tej technologii oraz przykłady w C#, przedstawiające potencjalne możliwości tej techniki.

W tym artykule chciałbym przybliżyć narzędzia programistyczne .NET 1.1 w zakresie pracy z kolejkami wiadomości. Na samym początku pojawi się trochę teorii, aby przybliżyć zasady działania tej technologii, a także usystematyzować pojęcia, jakimi będę się posługiwał w artykule. Następnie, już na konkretnych przykładach przedstawię :

 

podstawy pracy z kolejkami

transakcje

asynchroniczny odbiór wiadomości

 

 

Teoria :

 

         Wprowadzenie

 

         Komunikat definiuje się jako jednostkę danych przesyłaną w systemach rozproszonych (np. między dwoma komputerami). Same wiadomości są przesyłane do kolejek czyli kontenerów, które przechowują wiadomości podczas przesyłania. Zadaniem kolejek jest zagwarantować dostarczenie wiadomości do odbiorcy. Message Queuing to technologia firmy Microsoft, tworząca infrastrukturę przesyłania komunikatów pomiędzy aplikacjami uruchomionymi na teoretycznie dowolnym układzie komputerów obsługiwanych przez system Windows. Komputery tworzące taką sieć odgrywają różne role (routing, informacje o sieci, „bierny” odbiór), dodatkowo podczas konfigurowania administrator ustawia koszt przesłania komunikatów pomiędzy poszczególnymi komputerami lub ich grupami.

 

          Bezpieczeństwo

 

Obecnie Message Queuing korzysta z licznych cech bezpieczeństwa wbudowanych w Windows Sever 2003, między innymi są to : kontrola dostępu, uwierzytelnianie, szyfrowanie, audyt bezpieczeństwa. Kontrola dostępu jest zintegrowana z kontrolą dostępu do obiektów w usłudze katalogowej Active Directory, jest to wykonane przypisując opisy bezpieczeństwa do obiektów kolejek. Pisząc o obiektach kolejek mam na myśli nie tylko wiadomości, ale także poszczególne usługi, ustawienia itd., Opis bezpieczeństwa zawiera listę użytkowników i grup, które posiadają dostęp do danego obiektu i przydzielone im przyzwolenia.  Ponieważ w tym miejscu kolejki uzyskujemy dostęp za pomocą Active Directory model bezpieczeństwa oparty jest o protokół Kerberos V5. Uwierzytelnianie opiera się o certyfikaty. Uwierzytelnianie wiadomości odbywa się asynchronicznie bez wzajemnej komunikacji między wysyłającym i odbierającym. Szyfrowanie może się odbywać za pośrednictwem algorytmów asymetrycznych, jak i symetrycznych. Szyfrowanie służy do zabezpieczania wiadomości przesyłanych między komputerami. Audyt pozwala na rejestrowanie użytkowników usiłujących uzyskać dostęp do obiektów w kolejkach komunikatów.

 

 

         Rodzaje kolejek

 

prywatne (Private queues)

 

Kolejki prywatne nie są publikowane za pomocą Active Directory. Dostęp do nich mają tylko aplikacje znające pełną ścieżkę dostępu do takiej kolejki. Za sprawą tego, że nie korzystają z usługi katalogowej tworzy się je szybciej, nie ma opóźnień w dostępie do nich. W grupach roboczych tylko one są dostępne. Wykorzystuje się je głownie w celach testowych i podczas tworzenia aplikacji przed wdrożeniem.

 

publiczne (Public queues)

 

Kolejki publiczne są publikowane w domenach za pośrednictwem usługi katalogowej. Każdy komputer zarejestrowany w usłudze katalogowej i posiadający odpowiedni zestaw pozwoleń posiada dostęp do publicznych kolejek. Właśnie możliwość dostępu do tych kolejek za pośrednictwem usługi katalogowej sprawia, że znajdują zastosowanie w „poważniejszych” przedsięwzięciach.

 

odpowiedzi (Response queues)

 

Kolejki tego typu są tworzone przez, w celu przechowywania odpowiedzi tworzonych przez aplikację odczytującą wiadomości z kolejki. Aplikacja, która wysyła komunikaty i żąda na nie odpowiedzi, musi określić, jakie kolejki będą wykorzystane. Mechanizm kolejek komunikatów nie ma kontroli nad treścią tych wiadomości.

 

raportów (Report queues)

 

Za pomocą tej kolejki można śledzić drogę, jaką przebywa wiadomość. Na jednym komputerze może funkcjonować tylko jedna taka kolejka. Najczęściej są tworzone przez administratorów Active Directory, jednak taką możliwość posiadają także aplikacje.

 

administracyjne (Administration queues)

 

Służą do przechowania generowanych systemowo wiadomości informujących o tym, czy wiadomość dotarła (lub nie) na miejsce. Każda taka wiadomość zawiera informacje opisujące co wywołało potwierdzenie oraz to, od której wiadomości ona się wywodzi.

 

transakcyjne (Transactional queues)

 

Nikogo chyba nie zaskoczę, że ten typ kolejek przechowuje tylko i wyłącznie komunikaty przesłane w transakcji. Istotne jest to, że takie wiadomości mogą być przesłane do dowolnej (transakcyjnej) kolejki, jednak odbieranie wiadomości w transakcji, jest możliwe tylko z lokalnej kolejki.

 

Wymienione powyżej kolejki są określone jako aplikacyjne, ponieważ mogą być tworzone zarówno przez aplikacje, jak i administratora. Poniżej w paru słowach wspomnę o kolejkach systemowych. Są one oczywiście tworzone przez mechanizm Message Queuing i aplikacje nie mogą bezpośrednio umieszczać w nich wiadomości.

 

dziennika (Journal queues)

 

Te kolejki zawierają kopie wiadomości, które zostały wysłane lub odebrane z kolejki. (np. kopia wiadomości może się pojawić w dziennikach komputerów, jakie „odwiedziła” w czasie przesyłania. Można wyróżnić 2 typy rejestrowania wiadomości opierający się na źródle wiadomości i celu wiadomości. Przykładowo wysyłający wiadomość decyduje się przechować kopie tej wiadomości lokalnie (zatem rejestrowanie w dzienniku odbywa się na poziomie wiadomości). W przypadku opierania się na celu wybrać można rejestrowanie w dzienniku na poziomie kolejki (rejestrowane będą wszystkie wiadomości, jakie do niej trafią). Warto chyba wspomnieć także o tym, że Message Queuing nie zapewnia żadnych mechanizmów usuwania wiadomości z dziennika, muszą o to zadbać aplikacje.

 

łączące (Connector queues)

 

Opowiadają za przekazywanie komunikatów przesyłanych obcymi komputerami. Działają tak, jak proxy dla komputerów nie należących do danej sieci. Nie są publikowane w Active Directory, a co za tymi idzie są niewidzialne dla komputerów przesyłających komunikaty.

 

wychodzące (Outgoing queues)

 

To są lokalne kolejki służące do przechowywania wiadomości, które zostaną wysłane do zdalnych kolejek. Takie wiadomości mogą być przechowane dość długo zważywszy, że komputer może pracować przez pewien czas offline i w tym czasie wysłać komunikaty. Kiedy połączenie się pojawi, wiadomości zostaną niezwłocznie wysłane.

 

ślepe (Dead-Letter queues)

 

Do tego rodzaju kolejek trafiają wiadomości, których nie udało się dostarczyć. Wiadomości, które zostały wysłane w transakcji, trafiają do transakcyjnych ślepych kolejek, a nie-transakcyjne wiadomości, analogicznie, do nie-transakcyjnych. Miejscem przechowania jest komputer, na którym dana wiadomość wygasła lub nie można było jej przesłać dalej. Oto przykłady, kiedy występuje sytuacja, że wiadomości nie można dalej przesłać :

docelowa kolejka jest pełna, nie można jej odnaleźć lub jest nieznana wysyłającemu

wiadomość nie-transakcyjna jest wysłana do transakcyjnej kolejki (i na odwrót)

przekroczona zostanie maksymalna liczba hopów

nie powiodła się transakcja, w której wiadomości są wysyłane. Co ciekawe takie wiadomości nie trafiają do transakcyjnych ślepych kolejek.

 

Wiadomość wygasa kiedy :

 

czas życia w sieci (TTRQ, time-to-reach-queue) się skończy. Można go ustawić programowo.

czas odbioru (TTBR time-to-be-received) zostanie przekroczony. Ta właściwość jest zawsze ustawiana przez aplikację.

 

 

Na tym powoli kończy teoria, jaką chciałem zaprezentować odnośnie Message Queuing. Niektóre z rzeczy, jakie przedstawiłem, powrócą niebawem w części praktycznej. Zatem, nie przedłużając już więcej ...

 

2. Praktyka

 

         Zanim rozpoczniemy pracę z kolejkami, konieczne może się okazać zainstalowanie tej usługi w systemie. Można to zrobić np. korzystając z Panelu Sterowania-> Dodaj/Usuń programy-> Dodaj/Usuń składniki Windows. Następnie, już pracując z Visual Studio, trzeba dołączyć do projektu referencję do System.Messaging , potem już tylko dołączamy przestrzeń nazw: using System.Messaging; . Po tych czynnościach jesteśmy już gotowi do działania.

 

Przykład 1 :

 

Pierwsza aplikacja pobiera tekst wpisany przez użytkownika na ekran, a następnie go wypisuje. Jednak „trochę” ją skomplikowałem. Wykorzystuję do tego 2 wątki i 2 prywatne kolejki. Pierwszy (Główny) wątek odbiera tekst wpisany przez użytkownika, a następnie wysyła go do pierwszej kolejki, drugi odbiera wiadomość z pierwszej kolejki i przesyła go do drugiej. Główny następnie odbiera wiadomość z drugiej kolejki oraz ostatecznie wysyła ją na ekran.

 

Aby rozpocząć pracę najpierw trzeba utworzyć kolejki. Robimy to za pomocą następujących instrukcji :

 

queue1 = @".\Private$\Q1";

 

if (MessageQueue.Exists(this.queue1))

{

msq1 = new MessageQueue(this.queue1);

}

else

{

      msq1 = MessageQueue.Create(this.queue1);

}

 

 

Najpierw należy zdefiniować scieżkę dla naszej kolejki. Robimy to za pomocą  odpowiedniego stringa, jeżeli chcemy korzystać z prywatnych kolejek podajemy „przedrostek” Private$ . Kropka na początku oznacza, że wiadomość jest tworzona na lokalnej maszynie, jeśli wolelibyśmy korzystać ze zdalnej kolejki, podajemy nazwę zdalnego komputera.

 

Najpierw należy sprawdzić czy dana kolejka istnieje, jeśli tak wywołujemy konstruktor, który ją przyłącza do aplikacji. Jeśli jej nie ma, tworzymy ją za pomocą polecenia Create .

 

Kolejnym krokiem jest podanie Formatera, z którego będziemy korzystać.

 

msq1.Formatter = new BinaryMessageFormatter();

 

W moim przypadku jest binarny Formater (najszybszy) możemy jeszcze korzystać z : XmlMessageFormatter() lub ActiveXMessageFormatter(). Pierwszy pozwala na niezależne wersjonowanie serializowanych obiektów na kliencie i serwerze, drugi zapewnia kompatybilność z COM. Nie będzie chyba zaskoczeniem, że w wypadku, kiedy korzystamy z 2 kolejek Formatery powinny być takie same.

 

W moim przypadku przesyłany komunikat będzie obiektem klasy MessageClass. Aby móc z niego z korzystać, trzeba dodać atrybut serializowany, czyli :

 

[Serializable]

public class MessageClass

 

Tak przygotowani możemy już przystąpić do napisania głównej części naszej aplikacji.

 

while (true)

{

      t = rd.Next(55);

      mess = Console.ReadLine();

      if (mess == "e")

            break;

 

      mtc = new MessageClass(t,mess);

      invc++;

c.MQ1.Send(mtc,"Wsylana wiadomosc z glownego nr : " +

invc.ToString());

 

      ms = c.MQ2.Receive();

 

      if (ms.Body is MessageClass)

      {

            mtc = (MessageClass)ms.Body;

            Console.WriteLine(ms.Label);

            Console.WriteLine(mtc);

      }

}

 

W wątku głównym najpierw pobieramy tekst z konsoli, tworzymy nowy obiekt wiadomości i wysyłamy ją do kolejki nr 1, następnie odbieramy wiadomość z kolejki nr 2 i wypisujemy ją na ekran.

 

int invc = 0;

Message ms;

MessageClass mtc;

 

while (true)

{

      ms = msq1.Receive();

      if (ms.Body is MessageClass)

      {

            mtc = (MessageClass)ms.Body;

            invc++;

            msq2.Send(ms,"Wiadomosc odbita nr : " + invc.ToString());

      }

}

 

W drugim wątku już tylko odbieramy wiadomość i przesyłamy ją do drugiej kolejki. Warto w tym miejscu zwrócić uwagę, że żadna synchronizacja nie jest potrzebna, każdy wątek oczekuje, aż będzie mógł odebrać wiadomość. Można to zachowanie zmienić, podając czas, jaki wątek ma czekać na pojawienie się wiadomości :

 

ms = msq1.Receive(new TimeSpan(0,0,5));

 

Tutaj bedziemy czekać 5 minut, jeśli wiadomość nie pojawi w tym czasie, ruszymy dalej.

 

Przykład 2 :

 

Tutaj skupię się na przetwarzaniu wiadomości w transakcji. Ponownie musimy rozpocząć od utworzenia kolejki :

 

if (MessageQueue.Exists(queue_name))

      return new MessageQueue(queue_name);

else

      return MessageQueue.Create(queue_name,true);

 

W tym miejscu doszedł tylko 1 element : Create(queue_name,true). Podając true informujemy, że kolejka będzie transakcyjna.

Teraz pozostaje tylko wysłać wiadomości. Przykładowo :

 

MessageQueueTransaction transaction = new MessageQueueTransaction();

try

{    

      transaction.Begin();

 

      msq.Send("Wiadomosc nr 1","Etykieta nr 1",transaction);

      msq.Send("Wiadomosc nr 2","Etykieta nr 2",transaction);

      throw new Exception();

                       

      transaction.Commit();

}

catch

{

                       

      transaction.Abort();

}

 

W powyższej sytuacji wiadomości nie zostaną umieszczone w kolejce.

 

MessageQueueTransaction transaction = new MessageQueueTransaction();

transaction.Begin();

 

msq.Send("Wiadomosc nr 1","Etykieta nr 1",transaction);

msq.Send("Wiadomosc nr 2","Etykieta nr 2",transaction);

 

transaction.Commit();

 

Jak łatwo się domyślić, dopiero tym razem uda nam się umieścić wiadomości w kolejce.

 

Odbieranie wiadomości w transakcji przebiega analogicznie.

 

Tutaj wszystkie wiadomości pozostaną w kolejce :

 

MessageQueueTransaction transaction = new MessageQueueTransaction();

Message ms;

try

{    

      transaction.Begin();

      ms = msq.Receive(transaction);

      throw new Exception();

      ms = msq.Receive(transaction);

      transaction.Commit();

}

catch

{

      transaction.Abort();

}

 

Poniższy kod pobierze 2 wiadomości w transakcji.

 

MessageQueueTransaction transaction = new MessageQueueTransaction();

Message ms;

transaction.Begin();

ms = msq.Receive(transaction);

ms = msq.Receive(transaction);

transaction.Commit();

 

Warto wspomnieć, że o ile do transakcyjnej kolejki wstawiać wiadomości można tylko w transakcji, to już na odbiór nie są nałożone takie ograniczenia.

 

Przykład 3 :

 

W tym przykładzie przedstawię asynchroniczny odbiór wiadomości. Wykorzystam do tego 2 osobne aplikacje. Pierwsza z nich wstawi wiadomości do kolejki, a druga będzie je odbierać asynchronicznie.

 

Wstawienie wiadomości nie przynosi nic nowego:

 

msq.Formatter = new BinaryMessageFormatter();

msq.Send("Wiadomosc nr 1 z Aplikacji 3","Etykieta nr 1");

msq.Send("Wiadomosc nr 2 z Aplikacji 3","Etykieta nr 2");

msq.Send("Wiadomosc nr 3 z Aplikacji 3","Etykieta nr 3");

 

Ale podłączenie kolejki przynosi pewną nowość, mianowicie :

 

Aplikacja wstawiająca :

 

if (MessageQueue.Exists(queue_name))

      return new MessageQueue(queue_name,true);

else

      return MessageQueue.Create(queue_name);

 

Aplikacja odbierająca :

 

msq = new MessageQueue(@".\Private$\AsyncQueue",false);

msq.Formatter = new BinaryMessageFormatter();

 

Przy wywołaniu z true tylko aplikacja, która jako pierwsza uzyska dostęp do kolejki będzie mogła czytać z niej wiadomości. Jeżeli sobie tego nie życzymy możemy podać false , lecz ono jest w tym momencie nadmiarowe. Gdyby w aplikacji odbierającej w tym układzie podać true , zostanie rzucony wyjątek. To wywołanie ma znaczenie głównie w sytuacji, kiedy kolejka jest utworzona i aplikacje konkurują o dostęp do niej.

 

Aplikacja odbierająca będzie po naciśnięciu przycisku wyświetlać w etykiecie zawartość wiadomości.

 

Do odbioru tych wiadomości wykorzystamy wywołanie zwrotne. Najpierw musimy zadeklarować prostego delegata :

 

public delegate void MessageDelegate (string s);

 

Potem piszemy metodę, którą za pomocą niego wywołamy :

 

private void UpdateUI (string s)

{

      this.label1.Text = s;

}

 

Jak widać ta metoda zmienia zawartość etykiety.

 

Kolejny krok to stworzenie metody, która będzie wywołana w osobnym wątku :

 

private void ReceiveAsyncMessage(IAsyncResult result)

{

      System.Messaging.Message m = msq.EndReceive(result);

      MessageDelegate md =  new MessageDelegate(this.UpdateUI);

      this.Invoke(md,newobject [] {m.Body.ToString()});

}

 

Ponieważ jesteśmy w osobnym wątku spokojnie wywołujemy metodę msq.EndReceive(result) , która wstrzyma przebieg wątku i zakończy odbiór wiadomości. Następnie, pozostaje już tylko zmienić zawartość etykiety, musimy to zrobić w wątku interfejsu. Tworzymy obiekt wcześniej zadeklarowanego delegata i podajemy go z odpowiednim parametrem do metody Invoke, która wykona metodę wskazywaną przez delegat w wątku interfejsu.

 

Ostatecznie przygotowujemy kod wywołania przycisku :

 

private void button1_Click(object sender, System.EventArgs e)

{

      msq.BeginReceive(new TimeSpan(0,0,0,5),null,

      new AsyncCallback(this.ReceiveAsyncMessage));

}

 

Wywołujemy metodę BeginReceive . Ta metoda wróci, kiedy wiadomość będzie dostępna lub minie 5ms. Za jej pomocą rozpoczynamy wywołanie zwrotne tworząc delegata AsyncCallback, do  którego przekazujemy metodę, w której chcemy odebrać wiadomość.

 

Przykład 4 :

 

Ostatni, możliwie prosty przykład, jaki przygotowałem przedstawia ustawienie dodatkowych opcji dla wybranej wiadomości.

 

Message m = new Message("Test opcji");

m.Label = "Wiadomosc specjalna";

m.Priority = System.Messaging.MessagePriority.VeryHigh;

m.TimeToBeReceived = TimeSpan.FromHours(2);

msq.Send(m);

 

 

m = msq.Receive();

m.Formatter = new XmlMessageFormatter(newstring[] {"System.String"} );

 

Możemy ustawić wiadomości priorytet (decyduje on  między innymi o tym, czy wiadomość będzie promowana w czasie routingu , mamy do wyboru 8 poziomów), czas odbioru. Po odebraniu można ustawić odpowiedni Formater.

 

Jeżeli mamy dostęp do Active Directory, możemy dodatkowo ustawiać szyfrowanie i wybrany algorytm.

 

Kończąc chciałbym jeszcze wspomnieć, że klasa MessageQueue obok zestawu metod Receive(),BeginReceive(...) ma metody Peek(). Działają analogicznie, ale nie usuwają wiadomości z kolejki. Wszystkie przykłady, jakie zaprezentowałem były testowane na komputerze należącym do grupy roboczej, nie dysponując dostępem do Active Directory, stąd pewne ograniczenia związane z opcjami, z jakich mogłem skorzystać w programach.

W załączniku znajdują się przykłady wykorzystane w artykule.

 

3. Podsumowanie

 

         Na tym chciałbym zakończyć omawianie tego tematu. Wiem, że mając do dyspozycji tak potężne narzędzia, jak Web Service, można poddawać w wątpliwość sens korzystania z mechanizmu, jakim są kolejki komunikatów. Zastosowania warto szukać w systemach, które przez pewien czas mogą pracować offline. Można także tworzyć aplikacje, gdzie powiadamianie o zajściu pewnego zdarzenia następuje za pomocą mechanizmu Message Queuing.

 

 

Bibliografia

 

  1. MacDonald M., Microsoft .NET Distributed Applications: Integrating XML Web Services and Remonting
  2. MSDN Library ( http://msdn.microsoft.com )
  3. MessageQueuing Concepts ( http://www.microsoft.com/technet/prodtechnol/windowsserver2003/library/ServerHelp/7dbacab4-3812-44ee-ba7e-77dd8c94c6fb.mspx )

Załączniki:


Podobne artykuły

Komentarze 0 Masz uwagi do tej strony? Napisz

Dodaj komentarz

avatar

Zaloguj się lub Zarejestruj się aby wykonać tę czynność.

Autor CamlanEx
avatar
 

Załóż konto
CodeGuru to miejsce dla każdego programisty. Przez lata portal rozwijany był siłami społeczności i to właśnie społeczność programistów jest tutaj najważniejsza. CG od wielu lat gromadzi wokół siebie coraz większą grupę pasjonatów. Warto być jej częścią!

Dowiedz się więcej o CodeGuru

Geek Club - Windows Phone

 

MetroOne

Idź na górę strony