Po pierwsze czy wyłączyłeś tryb debug? Powoduje on przydzielanie losowych wielkości buforów, co na wydajność i na śmieciarza raczej dobrze nie wpływa.
Jeżeli masz zablokowaną pamięć, to znaczy, że coś na wyjściu nie jest czyszczone i zostają referencje do obiektów. Karuzela ma wszystkie dane tymczasowe czyszczone, a jedyne agregaty danych, to zbiór listenerów, którym zarządza Twój kod oraz kolejka transferowa, z której danymi zarządza również Twój kod. Z danych wejściowych dostarczasz obiekt implementujący PackHeadFinder - dopóki istnieje obiekt karuzeli to on też nie zostanie zniszczony (dla każdej z nich). Żadne inne pola nie są kolekcjami i w żadnym z nich nie pozostaje referencja do poprzednich danych (no chyba, że znajdziesz :) Tak więc musisz poszukać co się dzieje z tymi danymi.
Proponowałbym zmniejszyć do minimum opcję Xmx (w JVM) tak aby zrównać ją z najmniejszą Xms (eksperymentalnie, dzieląc za każdym ostatnią wartość przez 2) przy jakiej choćby kilka pierwszych paczek może zostać odebrane. Wtedy odpal sobie profilera i sprawdź które dane nie schodzą do śmieciarza, tylko zaczynają się kolekcjonować. Karuzela działa tak, że wszelkie tymczasowe bufory potrzebne dzo ściągania danych alokowane są raz i używane powtórnie, o ile jest to możliwe, albo przypisuje nowe dane w miejsce starych referencji (stare więc idą na śmietnik).
Ale odbierane dane, których na godzinę jest 900 000 paczek oznaczają, że jeżeli średnio paczka zajmuje 4 KB (a pewnie więcej), to tych danych będzie ok. 3,4 GB/godzinę. Oznacza to, że jeżeli reszta kodu nie przetwarza i nie niszczy tych danych odpowiednio szybko, to masz problem bo dla Javy takie obciążenie pamięci stanie się brzegowe. Raczej więc obstawiałbym, że odbierane paczki są gdzieś niepotrzebnie kolekcjonowane lub nie dość szybko niszczone.
ps. Znalazłem u siebie nowszą wersję tej klasy, w której poprawiłem kiedyś jakieś drobiazgi. Jeżeli chcesz, to możesz ją sobie porównać jakimś diffem i ewentualnie użyć:
(niestety jakaś durnota na tym forum znowu dostawia kolejne LF do kodu - sprawdzałem z LF i CR/LF, musisz więc sobie sam usunąć nadmiarowe)
package com.olamagato.streams;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.TransferQueue;
/**
* Przykład odczytu prostych paczek z karuzeli.
* @author Olamagato
* @version 1.04
*/
public class FreeCarousel
{
private static boolean DEBUG = true;
/** Rozmiar nagłówka paczki. Tu składa się tylko z rozmiaru paczki */
private static final int HEAD_SIZE = 4;
/** rozmiar bufora nie może być mniejszy niż rozmiar nagłówka */
private static final int BUFFER_SIZE = 1024; //bufor 1KB
/** Definiuje kryterium odszukania i użycia nagłówka paczki */
public interface PackHead
{
/**
* Zwraca index nagłówka paczki znalezionego w buforze lub
* liczba ujemna jeżeli takiego nagłówka nie można znaleźć.
* Bufor nextBuffer jest przeszukiwany opcjonalnie do ustalenia
* położenia nagłówka na granicy buforów, ale tak aby indeks
* jego pierwszego bajtu znalazł się w zakresie
* buffer[0..buffer.length - 1]. Argument nextBuffer nie uczestniczy
* w rozpoznawaniu istnienia nagłówka poza tym zakresem - jeżeli
* początek nagłówka nie uda znaleźć w zmiennej buffer, to wynikiem
* musi być liczba ujemna bez względu na to czy udałoby się go znaleźć
* w nextBuffer.
* Metoda nie może w jakikolwiek sposób modyfikować danych w
* argumentach buffer i nextBuffer.
* @param buffer przeszukiwany bufor, całkowicie wypełniony danymi
* @param nextBuffer następny bufor pozwalający rozpoznać nagłówek
* podzielony między dwa bufory, całkowicie wypełniony danymi
* @return indeks pierwszego bajtu w buforze (0..buffer.length - 1)
* lub liczba ujemna jeżeli nagłówka w buforze nie można znaleźć
*/
int findHeadInBuffer(final byte[] buffer, final byte[] nextBuffer);
/**
* Zwraca liczbę prostych pól nagłówka możliwych do odczytania.
* @return to co widać
*/
int fieldCount();
/**
* Zwraca numer pola rozmiaru paczki w nagłówku [0..fieldCount()-1].
* Pole rozmiaru paczki nie może być ujemne i może mieć
* od 8 do 32-bitów (1 do 4 bajtów).
* @return nr bajtu od którego zaczyna się w nagłówku pole rozmiaru.
*/
int getNumberSizeField();
/**
* Zwraca rozmiar wybranego pola nagłówka w bitach. Rozmiar
* pola powinien być wielokrotnością 8 bitów i nie może być
* większy niż 64 bity. Odpowiada to wielkości dowolnej
* prostej zmiennej.
* @param nrPola numer pola od 0 do fieldCount()-1
* @return rozmiar pola w bitach
*/
int getFieldSize(final int nrPola);
/**
* Ustawia pole o podanym indeksie przez wartość headField z
* uwzględnieniem kolejności ułożenia bajtów.
* @param nrPola numer pola od 0 do fieldCount()-1
* @param headField opakowana wartość long pola typu prostego
*/
void setField(final int nrPola, long headField);
/**
* Zwraca reprezentację binarną wybranego pola.
* @param nrPola numer pola do odczytania
* @return wartość long pola typu prostego
*/
long getField(final int nrPola);
} //interface PackHead
/**
* Reprezentuje całą paczkę danych odczytaną z karuzeli.
* Jest listą tablic bajtowych o maksymalnej wielkości BUFFER_SIZE każda.
*/
public interface Pack extends List<byte[]>
{
/**
* Zwraca nagłówek paczki danych.
* @return nagłówek paczki
*/
PackHead getHeadData();
} //interface Pack extends List<byte[]>
/** Obserwator dodawania nowej paczki */
public interface PackReadListener
{
/**
* Wywoływana po skompletowaniu odczytu każdej nowej paczki.
* @param completedPack zakończona paczka danych
*/
void packReadCompleted(final Pack completedPack);
} //interface PackReadListener
/**
* Tworzy obiekt reprezentujący karuzelę danych
* @param bigEndian
* @param input strumień wejściowy
* @param output kolejka wyjściowa zbierająca kolejne paczki danych
* @param packHead pozwala ustalić kryterium dla znalezienia nagłówka
* paczki
*/
public FreeCarousel(final boolean bigEndian, final InputStream input,
final TransferQueue<Pack> output, final PackHead packHead)
{
if(input == null || packHead == null)
throw new NullPointerException("Null pointer argument.");
if(BUFFER_SIZE < HEAD_SIZE)
throw new IllegalStateException("BUFFER_SIZE (" + BUFFER_SIZE
+ "is lesser than pack HEAD_SIZE = " + HEAD_SIZE);
this.bigEndian = bigEndian;
this.input = input;
this.output = output;
pack = new PackReader(packHead);
listeners = Collections.synchronizedSet(
new HashSet<PackReadListener>()); //obserwatorzy paczek
lastPack = null;
}
/**
* Dodaje nasłuchiwacza informującego o zakończeniu wczytywania paczki.
* @param listener obiekt nasłuchujący
* @return this dla wygodniejszego użycia
*/
public FreeCarousel addPackReadListener(final PackReadListener listener)
{
listeners.add(listener);
return this;
}
/**
* Usuwa nasłuchiwacza informującego o zakończeniu wczytywania paczki.
* @param listener obiekt nasłuchujący
* @return this dla wygodniejszego użycia
*/
public FreeCarousel removePackReadListener(final PackReadListener listener)
{
listeners.remove(listener);
return this;
}
/**
* Odczytuje kolejne paczki aż do końca strumienia.
* Odczytane dane w postaci całych bezbłędnie odczytanych paczek
* umieszcza w kolejce wyjściowej.
* Pierwsza i ostatnia paczka zostaną odrzucone, jeżeli nie były
* całe w strumieniu.
*
* @return this dla wygodniejszego użycia
* @throws IOException w przypadku błędu odczytu ze strumienia
*/
public FreeCarousel packsRead() throws IOException
{
if(this.buffer == null)
this.buffer = new InputDataBuffer(input, BUFFER_SIZE);
lastPack = new PackInternal();
bytesRead = pack.readData(); //odczyt pierwszego bufora
while(bytesRead >= 0 && pack.setBufferDataStart())
{ //ustawiony początek danych w buforze
pack.setBufferDataEnd(); //ustawienie końca danych w buforze
if(pack.getDataStartIdx() < pack.getDataEndIdx())
{ //są jakieś dane do dołączenia
if(DEBUG)
debugCheckDataIdx();
//dołączenie danych kolejnego bufora do paczki
lastPack.add(Arrays.copyOfRange(buffer.get(),
pack.getDataStartIdx(), pack.getDataEndIdx()));
if(pack.isTheEnd()) //zakończenie paczki, wzięcie nowej?
finalizeAndChangePack();
}
if(pack.getDataEndIdx() == bytesRead) //odczytano cały bufor
bytesRead = pack.readData(); //reset zakresu danych
}
if(DEBUG)
debugShowPacks();
return this;
}
/**
* Kończy paczkę, informuje obserwatorów o jej zakończeniu
* oraz rozpoczyna wczytywanie następnej.
*/
private void finalizeAndChangePack()
{
lastPack.finalize(pack.head);
output.add(lastPack); //dodanie kolejnej całej paczki
for(PackReadListener packReadListener: listeners)
packReadListener.packReadCompleted(lastPack);
pack.locateNextHead(); //ustala miejsce nowego nagłówka danych
lastPack = new PackInternal(); //tworzy nową pustą paczkę danych
}
/**
* Reprezentuje nagłówek paczki danych. Przeprowadza operacje na
* danych z nagłówka (tutaj tylko size)
*/
private class PackReader
{
/**
* Ustawia warunki dla pobrania najbliższej paczki.
* @param packHead ustala kryterium dla znalezienia nagłówka paczki
*/
private PackReader(final PackHead packHead)
{
this.head = packHead;
reset(dataStartIdx = dataEndIdx = -1);
}
/**
* Resetuje dane dla nowej paczki danych.
* Ustawia indeks rozmiaru paczki na podany newSizeIdx
* @param newSizeIdx nowy indeks rozmiaru paczki w buforze lub -1
*/
private void reset(final int newSizeIdx)
{
read = 0;
size = -1;
headFieldIdx = newSizeIdx;
}
/**
* Indeks początku danych w buforze.
* @return indeks bajtu początkowego
*/
private int getDataStartIdx() { return dataStartIdx; }
/**
* Indeks za końcem danych w buforze.
* @return indeks bajtu poza buforem (exclusive)
*/
private int getDataEndIdx() { return dataEndIdx; }
/**
* Zwraca odczytany rozmiar paczki.
* @return rozmiar bieżącej paczki
*/
private int getPackSize() { return size; }
/**
* Ustala położenie nagłówka kolejnej paczki za końcem poprzedniej.
*/
private void locateNextHead() { reset(dataEndIdx); }
/**
* Ustala początek danych do odczytu w buforze.
* Ustawia pole bufferDataStartIdx na właściwy indeks.
* @return false jeżeli nastąpił koniec strumienia,
* true jeżeli udało się ustalić początek danych.
* @throws IOException błąd wczytywania danych
*/
private boolean setBufferDataStart() throws IOException
{
if(size >= 0) //w trakcie wczytywania paczki
dataStartIdx = 0; //początek danych na początku bufora
else if(headLocated() && headLoaded()) //nowa paczka
{
dataStartIdx = headFieldIdx; //pocz. danych za rozm. paczki
headFieldIdx = -1;
}
else
return false; //nastąpił koniec strumienia
return true;
}
/**
* Ustala koniec danych do odczytu w buforze.
* Ustawia pole bufferDataEndIdx na właściwy indeks.
* Ustawia zmienna isTheEnd na true jeżeli wykryto koniec paczki.
* @return true jeżeli wystąpił koniec odczytywanej paczki
*/
private boolean setBufferDataEnd()
{ //pozostałe w buforze dane do wczytania
final int bufferData = bytesRead - dataStartIdx; //może być 0
//ilość danych potrzebna do zamknięcia paczki
//zał. read < size, zawsze dodatnia
final int brakująceDoPaczki = size - read;
//zawsze false jeżeli bufferData == 0
this.isTheEnd = brakująceDoPaczki <= bufferData;
if(isTheEnd) //czy koniec paczki w tym buforze?
{ //doczytujemy tylko dane brakujące do paczki
read += brakująceDoPaczki;
//koniec danych równy z końcem paczki
dataEndIdx = dataStartIdx + brakująceDoPaczki;
}
else //paczka nie skończy się w tym buforze
{ //doczytanie do paczki całego pozostałego bufora
read += bufferData; //może dodać 0
//koniec danych równy z końcem bufora
dataEndIdx = bytesRead; //dataStartIdx + bufferData;
}
return this.isTheEnd;
}
/**
* Przeszukuje kolejne bufory w poczukiwaniu początku paczki.
* Zmienną sizeIdx ustawia za wczytany rozmiar = początek danych.
* @return true jeżeli indeks rozmiaru jest znaleziony,
* false jeżeli nastąpił koniec strumienia
* @throws IOException błąd odczytu
*/
private boolean headLocated() throws IOException
{
while(headFieldIdx < 0)
if(!headIdxFound())
{
bytesRead = buffer.read();
if(bytesRead == -1)
return false; //koniec strumienia, koniec pobierania
}
return true;
}
/**
* Zwraca true w przypadku znalezienia początku nagłówka w buforze.
* Ustawia zmienną sizeIdx na indeks początku nagłówka.
* @return true jeżeli początek nagłówka jest znaleziony
*/
private boolean headIdxFound()
{ //podstawia dla findHeadInBuffer bufory dopasowane do danych
headFieldIdx = head.findHeadInBuffer(
getData(buffer.get(), buffer.getReadBytes()),
getData(buffer.get(true), buffer.getReadBytes(true)));
return headFieldIdx >= 0;
}
/**
* Odczyt kolejnych pól paczki poprzez kolejne odczytywane bufory.
* Ustawia pole sizeIdx na pierwszy bajt za odczytanym rozmiarem.
* @return true jeżeli wczytano rozmiar paczki, false jeżeli
* nastąpił koniec strumienia.
* @throws IOException błąd odczytu
*/
private boolean headLoaded() throws IOException
{
for(int nrPola = 0; nrPola < head.fieldCount(); ++nrPola)
if(!isFieldLoaded(nrPola))
return false;
return true;
}
/**
* Informuje czy udało się ustawić pole o podanym numerze.
* Jeżeli jest to pole rozmiaru, to odczytuje go do 32-bitowej
* zmiennej size bez względu na to jaki jest faktyczny rozmiar
* tego pola.
* @param nrPola nr pola do załadowania ze strumienia
* @return true jeżeli pole zostało wypełnione, false = koniec danych
* @throws IOException błąd odczytu
*/
private boolean isFieldLoaded(final int nrPola)
throws IOException
{
Long field = readField(head.getFieldSize(nrPola));
if(field == null)
return false;
if(nrPola == head.getNumberSizeField())
{
size = field.intValue() & Integer.MAX_VALUE;
if(DEBUG) debugPackSize();
}
head.setField(nrPola, field);
return true;
}
/**
* Ładuje ze strumienia pole o podanym rozmiarze i zwraca go
* jako obiektową wartość Long. Bajty są odwracane jeżeli wartości
* są intepretowane jako big endian (to samo dotyczy pól innych
* niż numeryczne).
* @param fieldSize rozmiar pola w bitach, nie większy niż Long.SIZE
* @return odczytana wartość lub null po natrafieniu na koniec danych
* @throws IOException błąd odczytu
*/
private Long readField(final int fieldSize) throws IOException
{
long headField = 0; //przygotowanie do wczytywania
for(int sizeSh = 0; sizeSh < fieldSize; sizeSh += Byte.SIZE)
//kolejny bajt do doczytania w buforze?
if(headFieldIdx < bytesRead)
headField |= buffer.get()[headFieldIdx++] << sizeSh;
else
{ //nie pozwala pominąć bajtu rozmiaru przez anulowanie
sizeSh -= Byte.SIZE; //inkrementacji licznika bajtów
//początek następnego bufora zawiera resztę rozmiaru
headFieldIdx = 0;
bytesRead = buffer.read();
if(bytesRead == -1)
return null;//koniec strumienia i pobierania
}
if(bigEndian) headField = Long.reverseBytes(headField)
>>> (Long.SIZE - fieldSize);
return headField;
}
/**
* Wczytuje kolejną porcję danych do bufora.
* Resetuje położenie danych w buforze na domyślne (wszystkie dane
* bufora należą do bieżącej paczki).
* Resetuje położenie nagłówka jeżeli miałby się znaleźć
* dokładnie na końcu danych poprzedniego bufora.
* @return ilość odczytanych do bufora bajtów danych
* @throws IOException
*/
private int readData() throws IOException
{
dataStartIdx = 0; //początek nowego bufora
if(headFieldIdx == dataEndIdx && isTheEnd)
//przenosi lokację nagłówka na początek kolejnej paczki
headFieldIdx = dataStartIdx;
//odczyt kolejnego bufora
return dataEndIdx = buffer.read();
}
/**
* Informuje o końcu paczki.
* @return true jeżeli w aktualnym buforze jest ostatnia część paczki.
*/
private boolean isTheEnd() { return isTheEnd; }
private final PackHead head;
private int dataStartIdx; //indeks początku danych paczki w buforze
private int dataEndIdx; //indeks za końcem danych paczki w buforze
private int headFieldIdx; //indeks bajtu rozmiaru w bieżącym buforze
private boolean isTheEnd; //informuje czy wykryto ostatnią część w buf.
private int size; //rozmiar bieżącej paczki
private int read; //ilość dotychczas odczytanych danych z paczki
} //class PackReader
/**
* Reprezentuje implementację całej paczki danych odczytanej z karuzeli.
* Jest listą tablic bajtowych o maksymalnej wielkości BUFFER_SIZE każda.
*/
private static class PackInternal extends LinkedList<byte[]>
implements Pack
{
/** Tworzy nowa pustą paczkę nie zawierającą żadnego fragmentu */
private PackInternal() {}
@SuppressWarnings("UseOfSystemOutOrSystemErr")
@Override public boolean add(byte[] e)
{
if(DEBUG) System.out.printf("Część %03d: %03d B (%03X)%n",
this.size(), e.length, e.length);
return super.add(e);
}
/**
* Zwraca odczytany nagłówek paczki danych.
* @return nagłówek paczki
*/
@Override public PackHead getHeadData() { return packHead; }
/**
* Zapisuje w obiekcie łączny rozmiar paczki.
* @param packHead
*/
private void finalize(final PackHead packHead)
{ this.packHead = packHead; }
private PackHead packHead;
private static final long serialVersionUID = 197103271110071103L;
} //class PackInternal extends LinkedList<byte[]>
/**
* Reprezentuje wirtualny bufor danych wejściowych posługujący się dwoma
* buforami o podanej długości, do których wczytuje dane ze strumienia.
* Większość operacji bazuje na symulowaniu wczytywania bufora odczytanego
* wcześniej. Dzięki temu jest możliwe udostępnienie "przyszłych danych".
*/
private static class InputDataBuffer
{
/**
* Reprezentuje podwójny bufor danych, który jest odczytywany
* z wyprzedzeniem jednego bufora.
* Utworzenie bufora powoduje od razu wczytanie pierwszego bufora,
* aby każda następna operacja wczytania odczytywała wyprzedzająco
* jeden dodatkowy bufor danych.
* @param input strumień wejściowy
* @param bufferSize maksymalny rozmiar buforów roboczych
* @throws IOException błąd odczytu ze strumienia
*/
public InputDataBuffer(final InputStream input,
final int bufferSize) throws IOException
{
this.input = input;
buffer1 = new byte[bufferSize];
buffer2 = new byte[bufferSize];
internalRead();
}
/**
* Zwraca zawsze najstarszy wypełniony bufor.
* @return najstarszy bufor.
*/
public final byte[] get() { return get(false); }
/**
* Zwraca bufor, który jest odczytywany później lub wcześniej zależnie
* od prawdziwości parametru lastBuffer.
* @param lastBuffer najnowszy wczytany bufor (wyprzedzający)
* @return wybrany bufor
*/
public final byte[] get(final boolean lastBuffer)
{
return lastBuffer == firstBuffer ? buffer1 : buffer2;
}
/**
* Zwraca zawsze rozmiar najstarszego odczytanego bufora.
* @return liczba bajtów bufora.
*/
public final int getReadBytes() { return getReadBytes(false); }
/**
* Zwraca wielkość bufora odczytywanego później lub wcześniej zależnie
* od prawdziwości parametru lastBuffer.
* @param lastBuffer najnowszy wczytany bufor (wyprzedzający)
* @return liczba bajtów wybranego bufora
*/
public final int getReadBytes(final boolean lastBuffer)
{
return lastBuffer == firstBuffer ? bytesRead1 : bytesRead2;
}
/**
* Odczytuje dane strumienia do bufora bieżącego i zwraca ilość
* danych wcześniej odczytanego bufora zapasowego.
* @return liczba bajtów najstarszego wypełnionego bufora
* @throws IOException blablabla
*/
public final int read() throws IOException { return internalRead(); }
/**
* Odczytuje dane strumienia do bufora bieżącego i zwraca ilość
* danych wcześniej odczytanego bufora zapasowego.
* Wersja dla inicjacji danych konstruktora.
* @return liczba bajtów najstarszego wypełnionego bufora
* @throws IOException blablabla
*/
private int internalRead() throws IOException
{ //wypełnia kolejny, inny bufor niż poprzednio wypełniony
//DEBUG. Odczytywanie losowych ilości danych
final int debugRozmBufora = DEBUG ?
los.nextInt(BUFFER_SIZE) : BUFFER_SIZE;
final int bytesRead = input.read(nextBuffer(), 0, debugRozmBufora);
//ilość odczytanych bajtów przypisuje odczytanemu buforowi
if(firstBuffer)
bytesRead1 = bytesRead;
else
bytesRead2 = bytesRead;
return getReadBytes(); //rozmiar wcześniejszego odczytu
}
/**
* Zmienia bufor podstawowy na zapasowy i odwrotnie oraz
* zwraca ten bufor jako rezultat.
* Efektem ubocznym jest ustawienie zmiennej firstBuffer
* zgodnie z właśnie zwróconym buforem.
* @return bufor zgodny z ustawioną wartością firstBuffer
*/
private byte[] nextBuffer()
{ //uwaga, przypisanie w warunku
return (firstBuffer = !firstBuffer) ? buffer1 : buffer2;
}
private final InputStream input; //strumień danych wejściowych
private final byte[] buffer1, buffer2; //dwa bufory danych
private static final Random los = new Random(); //debug
//zmienne stanu
private boolean firstBuffer; //ostatnio wypełniony bufor to buffer1
private int bytesRead1, bytesRead2; //wielkości odczytanych danych
} //static class InputDataBuffer
/**
* Zwraca tablicę będącą argumentem jeżeli ilość podanych danych
* jest nie mniejsza od jej rzeczywistej długości, albo lub krótszą
* kopię tej tablicy jeżeli podana ilość danych jest mniejsza niż
* jej długość.
* Procedura unika kopiowania danych w typowej sytuacji gdy bufor
* jest całkowicie wypełniony danymi.
* @param dataBuffer bufor z danymi bajtowymi
* @param dataLength rzeczywista ilość danych
* @return to co widać
*/
private static byte[] getData(final byte[] dataBuffer,
final int dataLength)
{
return dataLength >= dataBuffer.length ?
dataBuffer : Arrays.copyOf(dataBuffer, dataLength);
}
//debug
@SuppressWarnings("UseOfSystemOutOrSystemErr")
private void debugCheckDataIdx()
{
final int start = pack.getDataStartIdx();
final int end = pack.getDataEndIdx();
if(start <= end && start >= 0 && start <= bytesRead
&& end >= 0 && end <= bytesRead)
return;
else
throw new IllegalStateException(String.format(
"%nbytesRead=%d (%x)%n"
+ "size=%d (%x)%n"
+ "sizeIdx=%d (%x)%n"
+ "start=%d (%x)%n"
+ "end=%d (%x)",
bytesRead, bytesRead,
pack.size, pack.size,
pack.headFieldIdx, pack.headFieldIdx,
start, start,
end, end));
}
@SuppressWarnings("UseOfSystemOutOrSystemErr")
private void debugShowPacks()
{
System.out.printf("Wczytano %d paczek.%n", output.size());
if(pack.getPackSize() >= 0)
System.out.println(
"Projektowany Rozmiar ostatniej, niedokończonej paczki = "
+ pack.getPackSize());
}
@SuppressWarnings("UseOfSystemOutOrSystemErr")
private void debugPackSize()
{
//gdy rozmiar paczki w strumieniu obejmuje rozmiar nagłówka paczki
if(pack.size < 0)
throw new IllegalStateException("Bad input data: "
+ "Negative pack size = " + pack.size
+ String.format(
"%nbytesRead=%d (%x)%n"
+ "size=%d (%x)%n"
+ "sizeIdx=%d (%x)%n"
+ "start=%d (%x)%n"
+ "end=%d (%x)",
bytesRead, bytesRead,
pack.size, pack.size,
pack.headFieldIdx, pack.headFieldIdx,
pack.dataStartIdx, pack.dataStartIdx,
pack.dataEndIdx, pack.dataEndIdx));
System.out.printf("Paczka %03d: %d B (%03X)%n",
output.size(), pack.size, pack.size);
}
//debug end
/** Sposób odczytu liczb wielobajtowych w strumieniu. */
private final boolean bigEndian; //kodowanie liczb
private final InputStream input; //strumień wejściowy
private final TransferQueue<Pack> output; //interfejs wyjściowy
private final PackReader pack; //mechanizm obsługujący paczki
private final Set<PackReadListener> listeners; //obserwatorzy paczek
//zmienne stanu
private PackInternal lastPack; //bieżąca zbierana paczka danych
private InputDataBuffer buffer; //bufor danych wejściowych
private int bytesRead; //ilość pobranych danych w buforze
} //class FreeCarousel