Krzysztof Olszewski

Krzysztof Olszewski

Dyrektor Technologii i Architektury Oprogramowania

Krzysztof Olszewski

Krzysztof Olszewski

Dyrektor Technologii i Architektury Oprogramowania

Prawie każdy szanujący się programista, otrzymując zadanie rozwiązania problemu przyjmowania i rozgłaszania asynchronicznych zdarzeń, pomyśli o wzorcu kolejki jako najprostszym i najoczywistszym rozwiązaniu. Zanim przejdziemy do dalszych rozważań w tym temacie, słowem wstępu, zastanówmy się co to są zdarzenia i co to jest asynchroniczność.

Zdarzenie to fakt który wystąpił i może być dla kogoś interesujący

To oczywiście bardzo ogólna definicja. Z informatycznego punktu widzenia, zdarzenie to pewien zbiór danych, które świadczą o tym, że coś się stało. Taki zbiór informacji można/trzeba przekazać dalej, dalej to znaczy, że ktoś może/musi zostać o tym poinformowany i może/musi na to jakoś zareagować. Oczywiście zbiór taki zazwyczaj będzie zawierał informację o czasie i miejscu wystąpienia zdarzenia (kiedy i gdzie), jego rodzaju (co), oraz całą resztę mniej lub bardziej szczegółowych danych, zależnych od specyfiki zdarzenia. W praktyce programisty, zdarzeniem może być to, że ktoś kliknął w link na stronie w przeglądarce, lub że ktoś zatwierdził zamówienie w sklepie internetowym.

Pojęcie zdarzenia jest łatwe do zrozumienia i zgodne z intuicją, natomiast z asynchronicznością jest trochę trudniej. Na początku wydaje się nam, że ją rozumiemy, ale zagłębiając się w szczegóły, pojawia się coraz więcej dziwnych i nieoczywistych konsekwencji jej używania. Asynchroniczność w zakresie zdarzeń oznacza, że fakt powstania zdarzenia i fakt dowiedzenia się o nim, są co prawda ze sobą powiązane z zachowaniem zasady przyczynowo skutkowej (najpierw zdarzenie a potem reakcja, nigdy odwrotnie) ale

Nadawca zdarzenia nie czeka aż Odbiorca je odbierze

Ta prosta zasada ma ogromne konsekwencje:

  1. Nadawca nie wie kiedy i czy zdarzenie dotarło do Odbiorcy.
  2. Nadawca nie ma szansy dostać w odpowiedzi żadnych informacji zwrotnych od Odbiorcy, wie tylko jakie informacje wysłał.
  3. Wysłane zdarzenia od chwili wysłania do chwili dostarczenia „żyją” gdzieś pomiędzy Nadawcą i Odbiorcą, to miejsce (bufor) musi być dostępne, inaczej nie da się zrealizować asynchroniczności.
  4. Przy większej produktywności (ilości tworzonych zdarzeń w czasie) Nadawcy niż konsumpcji (ilości przyjmowanych zdarzeń w czasie) Odbiorcy, bufor będzie rósł i ze 100% pewnością, w czasie mniejszym od nieskończonego się przepełni.
  5. Przy większej konsumpcji Odbiorcy niż produktywności Nadawcy, bufor przez znaczną ilość czasu będzie pusty, a Odbiorca będzie bezczynny.

 

Jak widać z powyższych punktów, asynchroniczność, poza innymi konsekwencjami, powoduje konieczność istnienia medium, które zapewni bufor na zdarzenia/komunikaty. To tyle słowem wstępu. Idąc dalej zauważmy, że wspomniana na początku kolejka może pełnić rolę właśnie takiego bufora.

Dlaczego kolejka? Kolejka posiada interfejs wejściowy pozwalający dodawać do niej elementy, posiada także bufor wewnętrzny do przechowywania elementów do czasu ich pobrania, oraz interfejs wyjściowy od odbierania elementów z kolejki. Idealna implementacja kolejki powinna posiadać cztery cechy:

  • Nieskończoną wewnętrzną pojemność
  • Nieskończenie wydajne wejście
  • Nieskończenie wydajne wyjście
  • Zerowy koszt pracy w środowisku wielowątkowym

Oczywiście taki ideał nie istnieje. Skoro nie mamy ideału to przyjrzyjmy się co realnego mają do dyspozycji programiści Javy w zakresie kolejek gotowych do pracy w środowisku wielu wątków.

Podstawową implementacją kolejki jest ArrayBlockingQueue. Niestety nie spełnia ona ani jednego z powyższych wymagań. Ma stałą, niezmienną pojemność bufora. Operacja dodania elementu do kolejki jest podobnie jak operacja pobrania, synchronizowana na wspólnej blokadzie, jest to sposób który przyjęto aby zapewnić bezpieczeństwo pracy z wieloma wątkami. Takie rozwiązanie działa poprawnie, jednak implikuje znaczące ograniczenia w przepustowości.

Kolejną implementacją kolejki jest LinkedBlockingQueue. Oferuje ona opcję „nielimitowanej” pojemności, jednak jest wewnętrznie ograniczona do maksymalnie Integer.MAX_VALUE elementów. Operacje dodania i pobrania posiadają swoje własne blokady, co wciąż nie jest zbyt dobrą wiadomością (istnienie blokad) choć w stosunku do poprzedniej implementacji (gdzie blokada jest współdzielona) uzyskujemy poprawę przepustowości. Jest jeszcze jeden problem z tą implementacją. Skoro jest ona taka „lepsza” to dlaczego nie używać jej zawsze, pomijając stosowanie ArrayBlockingQueue. Zachowanie się aplikacji używającej ArrayBlockingQueue jest bardziej przewidywalne niż LinkedBlockingQueue. Ta druga aby dodawać kolejne elementy musi je tworzyć a więc przydziela dynamicznie pamięć (new), którą po zwolnieniu (po pobraniu elementu z kolejki) oczyścić musi proces GC (Garbage Colector ), a to przy dużym obciążeniu generuje duży ruch na GC i może znacząco (negatywnie) wpłynąć na wydajność całej aplikacji. W przeciwieństwie do tego w ArrayBlockingQueue bufor na elementy alokowany jest raz na początku i się nigdy nie zmienia. Wybór pomiędzy tymi implementacjami nie jest więc taki oczywisty i zależy od wymagań.

Obie opisywane implementacje są blokujące, to znaczy, że w przypadku pustej kolejki konsument jest blokowany do chwili pojawienia się nowych elementów a dla przepełnionej producent czeka aż pojawi się miejsce na nowy element.

Kolejna implementacja ConcurrentLinkedQueuenie posiada tego zachowania (nie jest blokująca), jednak posiada inne bardzo ciekawe cechy. Jej pojemność limituje tylko ilość dostępnej pamięci. Ma nieblokowane wejście i wyjście oparte o „sprytny” algorytm CAS (Compare-And-Swap). Oczywiście jest w pełni bezpieczna w pracy wielowątkowej. W testach przepustowości często wypada lepiej od poprzedniczek. Czyżby ideał? Niekoniecznie. W tej implementacji wciąż wewnętrznie przydzielana jest dynamicznie pamięć na nowe elementy, co skutkuje opisanymi wcześniej przyLinkedBlockingQueue ograniczeniami.

Te trzy przykłady nie wyczerpują dosyć bogatego wachlarza dostępnych w standardowej dystrybucji JDK implementacji. Zestaw ten możemy powiększyć korzystając z innych popularnych źródeł, np. z biblioteki Google Guava. Nie można nie ulec wrażeniu, że jest tego naprawdę dużo i wydaje się, że wśród takiej różnorodności bez problemu znajdziemy dobre rozwiązanie dla każdego zestawu wymagań. A jednak inżynierowie z firmy LMAX Group uznali, że żadna z dostępnych implementacji nie spełnia ich wymagań i podjęli decyzję o stworzeniu czegoś nowego. Ok roku 2010 pojawiła się pierwsza wersja LMAX Disruptor. Jest to, jak piszą autorzy

High Performance Inter-Thread Messaging Library

Nie jest to tylko kolejna implementacja kolejki, ale rozbudowana biblioteka zawierająca zestaw komponentów pozwalający na wydajne przesyłanie komunikatów pomiędzy wątkami. Czym może się pochwalić rozwiązanie LMAX? Jest tego całkiem sporo:

  • wysoka przepustowość – w teście przesyłania wiadomości z jednego wątku do drugiego, LMAX osiągnął wynik 24 miliony komunikatów na sekundę, w porównaniu do 4 milionów uzyskanych przez ArrayBlockingQueue

  • niski czas opóźnienia – w podobnym teście uzyskano średni czas opóźnienia dla LMAX równy 52 ns, oraz 32’757 ns dla ArrayBlockingQueue,

  • przewidywalny czas opóźnienia – 99% komunikatów przesyłanych przez LMAX zmieściło się w maksymalnym czasie 128 ns, ten sam wynik dla ArrayBlockingQueue to
    ok 2 miliony ns (uff!)

Tak doskonałe wyniki, dające nie jak zazwyczaj 10% czy może 30 % oszczędności, alepoprawę wręcz o kilka rzędów, nie pozwalają nie przyjrzeć się jak to zostało zrobione. LMAX Disruptor opiera się o kilka założeń. Sercem Disruptor’a jest stałych rozmiarów „bufor pierścieniowy”, brzmi to groźnie a na schematach prezentowanych w dokumentacji posiada zastanawiający kolisty kształt. Zaglądając jednak w kod, widzimy że jest to

private final Object[] entries;

Jest to zwykła tablica, a jej „kolistość” uzyskuje się przesuwając kursor po jej zawartości wracając do początku gdy osiąga się koniec. Warto zaznaczyć, że już na starcie zapełniamy cały bufor fabrykując instancje kontenerów na dane które będą potem do niego trafiały. Kontenery te są instancjonowane tylko raz z założeniem, że są klasami zmiennymi (mutable), gotowymi na przyjmowanie „w kółko” danych coraz to nowych zdarzeń. Disruptor na prośbę producenta udostępnia wolny „slot” do wstawienia nowego komunikatu. Oczywistym jest pytanie, a co gdy bufor jest zapełniony? Skoro bufor ma stały rozmiar, i jak wiadomo konsumenci mogą działać wolniej niż producenci, to do takiej sytuacji dojdzie bardzo szybko. Producent albo wtedy czeka do chwili aż taki slot będzie dostępny, albo jest informowany, że takiego slotu nie ma. Ciekawym fragmentem kodu Disruptor’a jest kod odpowiedzialny za oczekiwanie producenta na wolny slot

while (wrapPoint > (minSequence =

Util.getMinimumSequence(this.gatingSequences, nextValue))) {

LockSupport.parkNanos(1L);

}

no cóż … czekamy w pętli. Czy można by się spodziewać czegoś innego? Ja przyznam, że oczekiwałem i jestem odrobinę zawiedziony :). Nie bez znaczenia jest także implementacja metody LockSupport.parkNanos(long nanos), która odwołuje się do metody

public native void park(boolean isAbsolute, long time);

z klasy

jdk.internal.misc.Unsafe

Podążając za dokumentacją klasy Unsafe dowiadujemy się, że używanie jej metod nie jest bezpieczne (zgodnie z nazwą), a efekty ich działania mogą być nieprzewidywalne. Daje to pewien pogląd na niskopoziomowy sposób implementacji całego rozwiązania, nastawiony na wyciśnięcie maksymalnej wydajności, przy zgodzie na odrobinę ryzyka. Warto dodać, że podobne wycieczki w delikatnie mówiąc „niezalecane” czy „niebezpieczne” obszary Java’y są dosyć częste w kodzie Disruptor’a. Patrząc jednak na wysoki poziom inżynierski całości, jestem pewien, że Twórcy w/w robią wszystko z pełną świadomością i my jako użytkownicy nie mamy powodu do obaw. Od strony konsumentów domyślnym sposobem jest „zarejestrowanie się” jako odbiorca komunikatów, pozostawiając szczegóły kiedy i jak dostarczyć komunikat samemu buforowi. Cała praca związana z obsługą wielu wątków na wejściu, na wyjściu, ustalanie kto kiedy co może wykonać jest delegowana do Disruptor’a i nie musimy się tym zajmować, poza w miarę prostymi działaniami konfiguracyjnymi na samym starcie. Kolejną wartą zauważenia cechą implementacji Disruptor’a jest dbałość o bycie w zgodzie z wewnętrznymi mechanizmami nowoczesnych procesorów. Praca wielu rdzeni i wielu wątków, przesyłanie danych pomiędzy pamięcią operacyjną a kolejnymi poziomami cache (L…) procesora, ma niebanalny wpływ na wydajność czy przepustowość algorytmów. Architekci biblioteki od samego początku uwzględniali te fakty.

Kto używa Disruptor’a? Oprócz oczywistego użycia w grupie firm LMAX a w szczególności LMAX Exchange, jednym z oficjalnych zastosowań jest obsługa asynchronicznego logowania operacji w popularnej bibliotece Log4J. Disruptor jest udostępniany na licencji Apache-2.0, więc można założyć, że ma duże szanse na stanie się znaną i popularną biblioteką (może już jest), choć trzeba mieć świadomość ograniczonego zakresu zastosowań, tam gdzie domyślne i prostsze struktury nie są wystarczające.

Czy to koniec historii? Zdecydowanie nie. Zastanówmy się co ta super maszyna zrobi jak wyłączymy jej prąd, albo choćby zatrzymamy JVM? Co z komunikatami w buforze cyklicznym? Niestety, nie będzie po nich śladu. Jak widać może pojawić się nam kolejne wymaganie, persystencja komunikatów, tak aby przy „awarii” nie było ryzyka utraty choćby jednego z nich. To jedno, wydawało by się małe wymaganie powoduje, że na pewno nie jest to koniec historii, a jej kontynuacja wymagała od inżynierów stworzenia takich bytów jak JMS, ActiveMQ, RabbitMQ, ZeroMQ, Artemis czy Kawka. Ale to już całkiem inna historia.