Java odbieranie danych InputStream

0

Mam taki problem

Odbieram do bufora dane binarne z socketu.


InputStream in = new BufferedInputStream(conn.getInputStream());

                byte[] fileStream = new byte[1024]; //buffor 4KB


               while ((bytesRead = in.read(fileStream)) !=-1){

               System.out.println("Bytes:"+bytesRead);

               } 

Danych jest bardzo dużo "lecą bez przerwy"

Chciałbym wyodrebnić z tych danych paczki, jedyna informację o podziale tych paczek jest to ze pierwsze 4 bajty zawierają długość paczki. Powiedzmy to jest do opanowania bo można iterować poszczegolne paczki, ale w buforze nie zawsze będą pełne paczki danych, bedzie tak że na końcu bufora bedzie początek kolejnej paczki, a w nastepnym buforze koniec paczki. Czy macie jakieś sugestie. Moze sa jakies gotowe mechanizmy do obsługi tego typu danych?
Proszę o pomoc

Wojtek

0

Tu nie ma co kombinować. Najpierw musisz jakoś znaleźć początek jakiejkolwiek paczki. Zwykle są to jakieś charakterystyczne i unikalne ciągi danych, których raczej nie spotka się w paczce (zakładanie, że na początku pierwszego odczytanego bufora będzie początek paczki jest ryzykowne, ale może to być założeniem projektowym). Kiedy z któregoś odczytanego bufora namierzysz ten początek, to odczytujesz długość paczki, a następnie wczytujesz kolejne bufory i z pobranej długości odejmujesz bytesRead każdego kolejnego bufora (oczywiście w pierwszym buforze odejmujesz też ten zbędny kawałek bufora przed początkiem pierwszej namierzonej paczki.
Jeżeli kolejne odejmowanie zakończy się liczbą ujemną, to już wiesz, że w tym buforze masz koniec paczki oraz początek następnej. Na dodatek ten początek od razu namierzasz bez parsowania bufora bo będzie on w ostatnim odczytanym buforze pod indeksem o takiej wartości jak ujemna liczba w ostatnim odejmowaniu odczytanych danych do bufora.
W ten sposób namierzysz początek kolejnej paczki i cały proces się powtarza.

0

[Nie mogę zmodyfikować swojego posta powyżej.]

Ponieważ od dawna miałem zrobić sobie uniwersalną karuzelę do odczytu takich danych jak w temacie, więc po okrojeniu do powyższych potrzeb, przykładowy kod odczytujący paczki danych wyglądałby tak jak poniżej. Komentarze są tu po polsku, więc nie powinno być problemów z rozczytaniem co jest do czego. Ponieważ jest to dość uniwersalny przykład do rozbudowy, to z prostego kodu zrobił się lekko przydługawy.

[przetestowana wersja aktualna]

package com.olamagato.junk;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TransferQueue;

/**
 * Przykład odczytu prostych paczek z karuzeli.
 * @author Olamagato
 * @version 1.03
 */
public class FreeCarousel
{
	private static boolean DEBUG = true;
	/** Rozmiar nagłówka paczki. Tu składa się tylko z rozmiaru paczki */
	private static final int HEAD_SIZE = 4;
	/** rozmiar bufora nie może być mniejszy niż rozmiar nagłówka */
	private static final int BUFFER_SIZE = 512; //bufor 4KB

	/** Definiuje kryterium odszukania nagłówka paczki */
	public interface PackHeadFinder
	{
		/**
		 * Zwraca index nagłówka paczki znalezionego w buforze lub
		 * liczba ujemna jeżeli takiego nagłówka nie można znaleźć.
		 * Bufor nextBuffer jest przeszukiwany opcjonalnie do ustalenia
		 * położenia nagłówka na graniczy buforów, ale tak aby indeks
		 * jego pierwszego bajtu znalazł się w zakresie
		 * buffer[0..buffer.length - 1]. Argument nextBuffer nie uczestniczy
		 * w rozpoznawaniu istnienia nagłówka poza tym zakresem - jeżeli
		 * początek nagłówka nie uda znaleźć w zmiennej buffer, to wynikiem
		 * musi być liczba ujemna bez względu na to czy udałoby się go znaleźć
		 * w nextBuffer.
		 * Metoda nie może w jakikolwiek sposób modyfikować danych w
		 * argumentach buffer i nextBuffer.
		 * @param buffer przeszukiwany bufor, całkowicie wypełniony danymi
		 * @param nextBuffer następny bufor pozwalający rozpoznać nagłówek
		 * podzielony między dwa bufory, całkowicie wypełniony danymi
		 * @return indeks pierwszego bajtu w buforze (0..buffer.length - 1)
		 * lub liczba ujemna jeżeli nagłówka w buforze nie można znaleźć
		 */
		int findHeadInBuffer(final byte[] buffer, final byte[] nextBuffer);
	} //interface PackHeadFinder

	/**
	 * Reprezentuje całą paczkę danych odczytaną z karuzeli.
	 * Jest listą tablic bajtowych o maksymalnej wielkości BUFFER_SIZE każda.
	 */
	public interface Pack extends List<byte[]>
	{
		/**
		 * Rozmiar paczki w bajtach.
		 * @return suma rozmiaru wszystkich fragmentów
		 */
		int length();
	} //interface Pack extends List<byte[]>

	/** Obserwator dodawania nowej paczki */
	public interface PackReadListener
	{
		/**
		 * Wywoływana po skompletowaniu odczytu każdej nowej paczki.
		 * @param completedPack zakończona paczka danych
		 */
		void packReadCompleted(final Pack completedPack);
	} //interface PackReadListener

	/**
	 * Tworzy obiekt reprezentujący karuzelę danych
	 * @param bigEndian
	 * @param input strumień wejściowy
	 * @param output kolejka wyjściowa zbierająca kolejne paczki danych
	 * @param finder pozwala ustalić kryterium dla znalezienia nagłówka paczki
	 */
	public FreeCarousel(final boolean bigEndian, final InputStream input,
		final TransferQueue<Pack> output, final PackHeadFinder finder)
	{
		if(input == null || finder == null)
			throw new NullPointerException("Null pointer argument.");
		if(BUFFER_SIZE < HEAD_SIZE)
			throw new IllegalStateException("BUFFER_SIZE (" + BUFFER_SIZE
				+ "is lesser than pack HEAD_SIZE = " + HEAD_SIZE);
		this.bigEndian = bigEndian;
		this.input = input;
		this.output = output;
		pack = new PackHead(finder);
		listeners = Collections.synchronizedSet(
			new HashSet<PackReadListener>()); //obserwatorzy paczek
		lastPack = null;
	}

	/**
	 * Dodaje nasłuchiwacza informującego o zakończeniu wczytywania paczki.
	 * @param listener obiekt nasłuchujący
	 * @return this dla wygodniejszego użycia
	 */
	public FreeCarousel addPackReadListener(final PackReadListener listener)
	{
		listeners.add(listener);
		return this;
	}

	/**
	 * Usuwa nasłuchiwacza informującego o zakończeniu wczytywania paczki.
	 * @param listener obiekt nasłuchujący
	 * @return this dla wygodniejszego użycia
	 */
	public FreeCarousel removePackReadListener(final PackReadListener listener)
	{
		listeners.remove(listener);
		return this;
	}

	/**
	 * Odczytuje kolejne paczki aż do końca strumienia.
	 * Odczytane dane w postaci całych bezbłędnie odczytanych paczek
	 * umieszcza w kolejce wyjściowej.
	 * Pierwsza i ostatnia paczka zostaną odrzucone, jeżeli nie były
	 * całe w strumieniu.
	 *
	 * @return this dla wygodniejszego użycia
	 * @throws IOException w przypadku błędu odczytu ze strumienia
	 */
	public FreeCarousel packsRead() throws IOException
	{
		if(this.buffer == null)
			this.buffer = new InputDataBuffer(input, BUFFER_SIZE);
		lastPack = new PackInternal();
		bytesRead = pack.readData(); //odczyt pierwszego bufora
		while(bytesRead >= 0 && pack.setBufferDataStart())
		{	//ustawiony początek danych w buforze
			pack.setBufferDataEnd(); //ustawienie końca danych w buforze
			if(pack.getDataStartIdx() < pack.getDataEndIdx())
			{	//są jakieś dane do dołączenia
				if(DEBUG)
					debugCheckDataIdx();
				//dołączenie danych kolejnego bufora do paczki
				lastPack.add(Arrays.copyOfRange(buffer.get(),
					pack.getDataStartIdx(), pack.getDataEndIdx()));
				if(pack.isTheEnd()) //zakończenie paczki, wzięcie nowej?
					finalizeAndChangePack();
			}
			if(pack.getDataEndIdx() == bytesRead) //odczytano cały bufor
				bytesRead = pack.readData(); //reset zakresu danych
		}
		if(DEBUG)
			debugShowPacks();
		return this;
	}

	/**
	 * Kończy paczkę, informuje obserwatorów o jej zakończeniu
	 * oraz rozpoczyna wczytywanie następnej.
	 */
	private void finalizeAndChangePack()
	{
		lastPack.finalize(pack.getPackSize());
		output.add(lastPack); //dodanie kolejnej całej paczki
		for(PackReadListener packReadListener: listeners)
			packReadListener.packReadCompleted(lastPack);
		pack.locateNextHead(); //ustala miejsce nowego nagłówka danych
		lastPack = new PackInternal(); //tworzy nową pustą paczkę danych
	}

	/**
	 * Reprezentuje nagłówek paczki danych. Przeprowadza operacje na
	 * danych z nagłówka (tutaj tylko size)
	 */
	private class PackHead
	{
		/**
		 * Ustawia warunki dla pobrania najbliższej paczki.
		 * @param finder ustala kryterium dla znalezienia nagłówka paczki
		 */
		private PackHead(final PackHeadFinder finder)
		{
			this.finder = finder;
			reset(dataStartIdx = dataEndIdx = -1);
		}

		/**
		 * Resetuje dane dla nowej paczki danych.
		 * Ustawia indeks rozmiaru paczki na podany newSizeIdx
		 * @param newSizeIdx nowy indeks rozmiaru paczki w buforze lub -1
		 */
		private void reset(final int newSizeIdx)
		{
			read = 0;
			size = -1;
			sizeIdx = newSizeIdx;
		}

		/**
		 * Indeks początku danych w buforze.
		 * @return indeks bajtu początkowego
		 */
		private int getDataStartIdx() { return dataStartIdx; }

		/**
		 * Indeks za końcem danych w buforze.
		 * @return indeks bajtu poza buforem (exclusive)
		 */
		private int getDataEndIdx() { return dataEndIdx; }

		private int getPackSize() { return size; }

		/**
		 * Ustala położenie nagłówka kolejnej paczki za końcem poprzedniej.
		 */
		private void locateNextHead() { reset(dataEndIdx); }

		/**
		 * Ustala początek danych do odczytu w buforze.
		 * Ustawia pole bufferDataStartIdx na właściwy indeks.
		 * @return false jeżeli nastąpił koniec strumienia,
		 * true jeżeli udało się ustalić początek danych.
		 * @throws IOException błąd wczytywania danych
		 */
		private boolean setBufferDataStart() throws IOException
		{
			if(size >= 0) //w trakcie wczytywania paczki
				dataStartIdx = 0; //początek danych na początku bufora
			else if(headLocated() && headLoaded()) //nowa paczka
			{
				dataStartIdx = sizeIdx; //pocz. danych za rozm. paczki
				sizeIdx = -1;
			}
			else
				return false; //nastąpił koniec strumienia
			return true;
		}

		/**
		 * Ustala koniec danych do odczytu w buforze.
		 * Ustawia pole bufferDataEndIdx na właściwy indeks.
		 * Ustawia zmienna isTheEnd na true jeżeli wykryto koniec paczki.
		 * @return true jeżeli wystąpił koniec odczytywanej paczki
		 */
		private boolean setBufferDataEnd()
		{	//pozostałe w buforze dane do wczytania
			final int bufferData = bytesRead - dataStartIdx; //może być 0
			//ilość danych potrzebna do zamknięcia paczki
			//zał. read < size, zawsze dodatnia
			final int brakująceDoPaczki = size - read;
			//zawsze false jeżeli bufferData == 0
			this.isTheEnd = brakująceDoPaczki <= bufferData;
			if(isTheEnd) //czy koniec paczki w tym buforze?
			{	//doczytujemy tylko dane brakujące do paczki
				read += brakująceDoPaczki;
				//koniec danych równy z końcem paczki
				dataEndIdx = dataStartIdx + brakująceDoPaczki;
			}
			else //paczka nie skończy się w tym buforze
			{	//doczytanie do paczki całego pozostałego bufora
				read += bufferData; //może dodać 0
				//koniec danych równy z końcem bufora
				dataEndIdx = bytesRead; //dataStartIdx + bufferData;
			}
			return this.isTheEnd;
		}

		/**
		 * Przeszukuje kolejne bufory w poczukiwaniu początku paczki.
		 * Zmienną sizeIdx ustawia za wczytany rozmiar = początek danych.
		 * @return true jeżeli indeks rozmiaru jest znaleziony,
		 * false jeżeli nastąpił koniec strumienia
		 * @throws IOException błąd odczytu
		 */
		private boolean headLocated() throws IOException
		{
			while(sizeIdx < 0)
				if(!sizeIdxFound())
				{
					bytesRead = buffer.read();
					if(bytesRead == -1)
						return false; //koniec strumienia, koniec pobierania
				}
			return true;
		}

		/**
		 * Zwraca true w przypadku znalezienia początku nagłówka w buforze.
		 * Ustawia zmienną sizeIdx na indeks początku nagłówka.
		 * @return true jeżeli początek nagłówka jest znaleziony
		 */
		private boolean sizeIdxFound()
		{	//podstawia dla findHeadInBuffer bufory dopasowane do danych
			sizeIdx = finder.findHeadInBuffer(
				getData(buffer.get(), buffer.getReadBytes()),
				getData(buffer.get(true), buffer.getReadBytes(true)));
			return sizeIdx >= 0;
		}

		/**
		 * Odczyt rozmiaru paczki poprzez kolejne odczytywane bufory.
		 * Ustawia pole sizeIdx na pierwszy bajt za odczytanym rozmiarem.
		 * @return true jeżeli wczytano rozmiar paczki, false jeżeli
		 * nastąpił koniec strumienia.
		 * @throws IOException błąd odczytu
		 */
		private boolean headLoaded() throws IOException
		{
			size = 0; //przygotowanie do wczytywania
			for(int sizeSh = 0; sizeSh < Integer.SIZE; sizeSh += Byte.SIZE)
				if(sizeIdx < bytesRead) //kolejny bajt do doczytania w buforze?
					size |= buffer.get()[sizeIdx++] << sizeSh;
				else
				{	//nie pozwala pominąć bajtu rozmiaru przez anulowanie
					sizeSh -= Byte.SIZE; //inkrementacji licznika bajtów
					sizeIdx = 0; //pocz. nast. bufora zawiera resztę rozmiaru
					bytesRead = buffer.read();
					if(bytesRead == -1)
						return false;//koniec strumienia i pobierania
				}
			if(bigEndian) //odwrócenie bajtów rozmiaru
				size = Integer.reverseBytes(size);
			if(DEBUG)
				debugPackSize();
			return true;
		}

		/**
		 * Wczytuje kolejną porcję danych do bufora.
		 * Resetuje położenie danych w buforze na domyślne (wszystkie dane
		 * bufora należą do bieżącej paczki).
		 * Resetuje położenie nagłówka jeżeli miałby się znaleźć
		 * dokładnie na końcu danych poprzedniego bufora.
		 * @return ilość odczytanych do bufora bajtów danych
		 * @throws IOException
		 */
		private int readData() throws IOException
		{
			dataStartIdx = 0; //początek nowego bufora
			if(sizeIdx == dataEndIdx && isTheEnd)
				sizeIdx = dataStartIdx; //przenosi lokację nagłówka na począt.
			//odczyt kolejnego bufora
			return dataEndIdx = buffer.read();
		}

		/**
		 * Informuje o końcu paczki.
		 * @return true jeżeli w aktualnym buforze jest ostatnia część paczki.
		 */
		private boolean isTheEnd() { return isTheEnd; }
		private final PackHeadFinder finder;
		private int dataStartIdx; //indeks początku danych paczki w buforze
		private int dataEndIdx; //indeks za końcem danych paczki w buforze
		private int sizeIdx; //indeks bajtu rozmiaru w bieżącym buforze
		private boolean isTheEnd; //informuje czy wykryto ostatnią część w buf.
		private int size; //rozmiar bieżącej paczki
		private int read; //ilość dotychczas odczytanych danych z paczki
	} //class PackHead

	/**
	 * Reprezentuje implementację całej paczki danych odczytanej z karuzeli.
	 * Jest listą tablic bajtowych o maksymalnej wielkości BUFFER_SIZE każda.
	 */
	private static class PackInternal extends LinkedList<byte[]>
		implements Pack
	{
		/** Tworzy nowa pustą paczkę nie zawierającą żadnego fragmentu */
		private PackInternal() { this.packSize = 0; }

		@SuppressWarnings("UseOfSystemOutOrSystemErr")
		@Override public boolean add(byte[] e)
		{
			if(DEBUG) System.out.printf("Część %03d: %03d B (%03X)%n",
				this.size(), e.length, e.length);
			return super.add(e);
		}

		/**
		 * Rozmiar paczki w bajtach.
		 * @return suma rozmiaru wszystkich fragmentów
		 */
		@Override public int length() { return packSize; }

		/**
		 * Zapisuje w obiekcie łączny rozmiar paczki.
		 * @param packSize rozmiar paczki
		 */
		private void finalize(final int packSize) { this.packSize = packSize; }
		private int packSize; //rozmiar w bajtach
		private static final long serialVersionUID = 197103271110071103L;
	} //class PackCls extends LinkedList<byte[]>

	/**
	 * Reprezentuje wirtualny bufor danych wejściowych posługujący się dwoma
	 * buforami o podanej długości, do których wczytuje dane ze strumienia.
	 * Większość operacji bazuje na symulowaniu wczytywania bufora odczytanego
	 * wcześniej. Dzięki temu jest możliwe udostępnienie "przyszłych danych".
	 */
	private static class InputDataBuffer
	{
		/**
		 * Reprezentuje podwójny bufor danych, który jest odczytywany
		 * z wyprzedzeniem jednego bufora.
		 * Utworzenie bufora powoduje od razu wczytanie pierwszego bufora,
		 * aby każda następna operacja wczytania odczytywała wyprzedzająco
		 * jeden dodatkowy bufor danych.
		 * @param input strumień wejściowy
		 * @param bufferSize maksymalny rozmiar buforów roboczych
		 * @throws IOException błąd odczytu ze strumienia
		 */
		public InputDataBuffer(final InputStream input,
			final int bufferSize) throws IOException
		{
			this.input = input;
			buffer1 = new byte[bufferSize];
			buffer2 = new byte[bufferSize];
			internalRead();
		}

		/**
		 * Zwraca zawsze najstarszy wypełniony bufor.
		 * @return najstarszy bufor.
		 */
		public final byte[] get() { return get(false); }

		/**
		 * Zwraca bufor, który jest odczytywany później lub wcześniej zależnie
		 * od prawdziwości parametru lastBuffer.
		 * @param lastBuffer najnowszy wczytany bufor (wyprzedzający)
		 * @return wybrany bufor
		 */
		public final byte[] get(final boolean lastBuffer)
		{
			return lastBuffer == firstBuffer ? buffer1 : buffer2;
		}

		/**
		 * Zwraca zawsze rozmiar najstarszego odczytanego bufora.
		 * @return liczba bajtów bufora.
		 */
		public final int getReadBytes() { return getReadBytes(false); }

		/**
		 * Zwraca wielkość bufora odczytywanego później lub wcześniej zależnie
		 * od prawdziwości parametru lastBuffer.
		 * @param lastBuffer najnowszy wczytany bufor (wyprzedzający)
		 * @return liczba bajtów wybranego bufora
		 */
		public final int getReadBytes(final boolean lastBuffer)
		{
			return lastBuffer == firstBuffer ? bytesRead1 : bytesRead2;
		}

		/**
		 * Odczytuje dane strumienia do bufora bieżącego i zwraca ilość
		 * danych wcześniej odczytanego bufora zapasowego.
		 * @return liczba bajtów najstarszego wypełnionego bufora
		 * @throws IOException blablabla
		 */
		public final int read() throws IOException { return internalRead(); }

		/**
		 * Odczytuje dane strumienia do bufora bieżącego i zwraca ilość
		 * danych wcześniej odczytanego bufora zapasowego.
		 * Wersja dla inicjacji danych konstruktora.
		 * @return liczba bajtów najstarszego wypełnionego bufora
		 * @throws IOException blablabla
		 */
		private int internalRead() throws IOException
		{	//wypełnia kolejny, inny bufor niż poprzednio wypełniony
			//DEBUG. Odczytywanie losowych ilości danych
			final int debugRozmBufora = DEBUG ?
				los.nextInt(BUFFER_SIZE) : BUFFER_SIZE;
			final int bytesRead = input.read(nextBuffer(), 0, debugRozmBufora);
			//ilość odczytanych bajtów przypisuje odczytanemu buforowi
			if(firstBuffer)
				bytesRead1 = bytesRead;
			else
				bytesRead2 = bytesRead;
			return getReadBytes(); //rozmiar wcześniejszego odczytu
		}

		/**
		 * Zmienia bufor podstawowy na zapasowy i odwrotnie oraz
		 * zwraca ten bufor jako rezultat.
		 * Efektem ubocznym jest ustawienie zmiennej firstBuffer
		 * zgodnie z właśnie zwróconym buforem.
		 * @return bufor zgodny z ustawioną wartością firstBuffer
		 */
		private byte[] nextBuffer()
		{	//uwaga, przypisanie w warunku
			return (firstBuffer = !firstBuffer) ? buffer1 : buffer2;
		}
		private final InputStream input; //strumień danych wejściowych
		private final byte[] buffer1, buffer2; //dwa bufory danych
		//zmienne stanu
		private boolean firstBuffer; //ostatnio wypełniony bufor to buffer1
		private int bytesRead1, bytesRead2; //wielkości odczytanych danych
	} //static class InputDataBuffer

	/**
	 * Zwraca tablicę będącą argumentem jeżeli ilość podanych danych
	 * jest nie mniejsza od jej rzeczywistej długości, albo lub krótszą
	 * kopię tej tablicy jeżeli podana ilość danych jest mniejsza niż
	 * jej długość.
	 * Procedura unika kopiowania danych w typowej sytuacji gdy bufor
	 * jest całkowicie wypełniony danymi.
	 * @param dataBuffer bufor z danymi bajtowymi
	 * @param dataLength rzeczywista ilość danych
	 * @return
	 */
	private static byte[] getData(final byte[] dataBuffer,
		final int dataLength)
	{
		return dataLength >= dataBuffer.length ?
			dataBuffer : Arrays.copyOf(dataBuffer, dataLength);
	}

	//debug
	@SuppressWarnings("UseOfSystemOutOrSystemErr")
	private void debugCheckDataIdx()
	{
		final int start = pack.getDataStartIdx();
		final int end = pack.getDataEndIdx();
		if(start <= end && start >= 0 && start <= bytesRead
			&& end >= 0 && end <= bytesRead)
			return;
		else
			throw new IllegalStateException(String.format(
				"%nbytesRead=%d (%x)%n"
				+ "size=%d (%x)%n"
				+ "sizeIdx=%d (%x)%n"
				+ "start=%d (%x)%n"
				+ "end=%d (%x)",
				bytesRead, bytesRead,
				pack.size, pack.size,
				pack.sizeIdx, pack.sizeIdx,
				start, start,
				end, end));
	}

	@SuppressWarnings("UseOfSystemOutOrSystemErr")
	private void debugShowPacks()
	{
		System.out.printf("Wczytano %d paczek.%n", output.size());
		if(pack.getPackSize() >= 0)
			System.out.println(
				"Projektowany Rozmiar ostatniej, niedokończonej paczki = "
				+ pack.getPackSize());
	}

	@SuppressWarnings("UseOfSystemOutOrSystemErr")
	private void debugPackSize()
	{
		//gdy rozmiar paczki w strumieniu obejmuje rozmiar nagłówka paczki
		if(pack.size < 0)
			throw new IllegalStateException("Bad input data: "
				+ "Negative pack size = " + pack.size
				+ String.format(
				"%nbytesRead=%d (%x)%n"
				+ "size=%d (%x)%n"
				+ "sizeIdx=%d (%x)%n"
				+ "start=%d (%x)%n"
				+ "end=%d (%x)",
				bytesRead, bytesRead,
				pack.size, pack.size,
				pack.sizeIdx, pack.sizeIdx,
				pack.dataStartIdx, pack.dataStartIdx,
				pack.dataEndIdx, pack.dataEndIdx));
		System.out.printf("Paczka %03d: %d B (%03X)%n",
			output.size(), pack.size, pack.size);
	}

	private static final Random los = new Random();
	//debug end

	/** Sposób odczytu liczb wielobajtowych w strumieniu. */
	private final boolean bigEndian; //kodowanie liczb
	private final InputStream input; //strumień wejściowy
	private final TransferQueue<Pack> output; //interfejs wyjściowy
	private final PackHead pack; //mechanizm obsługujący paczki
	private final Set<PackReadListener> listeners; //obserwatorzy paczek
	//zmienne stanu
	private PackInternal lastPack; //bieżąca zbierana paczka danych
	private InputDataBuffer buffer; //bufor danych wejściowych
	private int bytesRead; //ilość pobranych danych w buforze
} //class FreeCarousel
0

Witam

Dzieki, wielkie

Zrobilem to tak mniej wiecej jak pisałeś wczesniej,

            InputStream in = new BufferedInputStream(new FileInputStream(f));

            int bytesRead = 0;
            int znak = 0;
            int countReads = 0;
            int reszta = 0;

            int globalLen = 0;

            byte[] fst = new byte[1024];
            byte[] fst2 = new byte[1024];
            
            byte[] tempBuffor = new byte[1024];

            List<byte[]> byteList = new ArrayList<byte[]>();

                while ((bytesRead = in.read(fst2)) != -1) {

                    globalLen = globalLen + bytesRead;
                    
                    System.out.println("Global"+globalLen);
                    
                    fst = fst2;

                    countReads++;

                    //System.out.println("ID read: "+countReads+" BytesRead: " + bytesRead);

                    int startByte = 0;
                    int stopByte = 0;
                    int offset = 4;
                    int len = 0;
                    int lenCount = 0;
                    int count = 0;

                    if (znak > 1) {

                        //System.out.print("temp:"+tempBuffor.length+" ");
                        System.out.println(Arrays.toString(tempBuffor));
                        fst = concat(tempBuffor, fst2);
                        
                        //FileOutputStream fi = new FileOutputStream(new File("e:/xdr/Pack_"+System.currentTimeMillis()+"_"+countReads+".log"));
                        //fi.write(fst);


                    }
                    System.out.println("ID read: "+countReads+" BytesUpdated: " + fst.length);

                   while (fst.length >= lenCount ) {

                        count++;

                        startByte = stopByte;

                        len = ((fst[startByte] & 0xff) << 24) + ((fst[startByte + 1] & 0xff) << 16) + ((fst[startByte + 2] & 0xff) << 8) + (fst[startByte + 3] & 0xff);

                        lenCount = lenCount + len;

                        reszta = fst.length - lenCount;

                        if (reszta < len | reszta < 4) {

                            
                             System.out.print("ID: " + count + " Len: " + len + " LenCount: " + lenCount + " Reszta: "+reszta+" StartByte: " + startByte + " StopByte: " +fst.length);
                            System.out.println();
                             tempBuffor = Arrays.copyOfRange(fst, startByte, fst.length);
                            znak = fst.length - startByte;

                            System.out.print(count);

                        } else {

                            stopByte = len + stopByte + offset;

                             System.out.print("ID: " + count + " Len: " + len + " LenCount: " + lenCount + " Reszta: "+reszta+" StartByte: " + startByte + " StopByte: " + stopByte);
                            //System.out.print(Arrays.toString(Arrays.copyOf(Arrays.copyOfRange(fst, startByte, stopByte), 5)));
                            System.out.println();

                            byteList.add(Arrays.copyOfRange(fst, startByte, stopByte));

                            znak = 0;
                        }

                    }

                }

Tak odczytuję długość paczki, stream zawsze zaczyna sie od poczatku.

len = ((fst[startByte] & 0xff) << 24) + ((fst[startByte + 1] & 0xff) << 16) + ((fst[startByte + 2] & 0xff) << 8) + (fst[startByte + 3] & 0xff);

Poczatek streamu wyglada tak

[0, 0, 3, 60, -126, 73, 0, 32.....]

Jezeli robię rozpaczkowywanie, z pliku ze zdumpowanego streamu z socketa jest ok, a jak podepne to pod stream, lubi sie wywalić.

Mozesz podać przykład dla klasy FreeCarousel, nie bardzo sie moge połapać jak jej uzyc?

0

Przede wszystkim to mi wygląda na błąd: "reszta < len | reszta < 4". Tu powinno być "||" zamiast operatora bitowego. Jednak być może nie wpływa to na wynik.
Nie wydaje mi się z kodu, żebyś uwzględniał iż rozmiar paczki może wyjść na granicy bufora, więc za jednym zamachem całej długości nie wczytasz, albo dostaniesz wyjątek wyjścia poza zakres tablicy.
Co do sposobu wykorzystania kodu wrzuconego przeze mnie, to szkielet przykładu:

		//...
		//Trzeba ustawić w klasie FreeCarousel.BIG_ENDIAN = true
		//najlepiej przez zrobienie tego parametrem konstruktora
		//użycie
		InputStream in = new BufferedInputStream(new FileInputStream(args[1]));
		TransferQueue<FreeCarousel.Pack> out = new LinkedTransferQueue<>();
		FreeCarousel paczki = new FreeCarousel(in, out,
			new FreeCarousel.PackHeadFinder()
			{
				@Override public int findHeadInBuffer(byte[] buffer,
					byte[] nextBuffer) { return 0; } //zawsze początek bufora
			});
		//opcjonalnie
		paczki.addPackReadListener(new FreeCarousel.PackReadListener()
		{
			@Override public void packReadCompleted(Pack completedPack)
			{	//Przykład. Notyfikacja o odczytanej paczce powinna być w
				//osobnym wątku a nie tutaj.
				System.out.println("Odczytano paczkę o długości "
					+ completedPack.length());
				//...
			}
		});
		//odczytywanie paczek, synchroniczne
		paczki.packsRead(); //czyta aż nie skończy się strumień
		//wszystkie paczki odczytane
		//...
		//robisz coś z odczytanymi paczkami w kolejce out tutaj, albo
		//zanim zakończy się packsRead() w osobnym wątku.
		//w tym drugim wypadku można skorzystać z właściwości kolejki do
		//sprawdzania kolejnych paczek lub czekania na nie, ewentualnie
		//skorzystać z dorzuconego opcjonalnie listenera
0

Dostaje wyjatek, BIG_EDIAN = true;


 File f = new File("E:/xdr/1317798719031.log");
        //ByteBuffer buff = ByteBuffer.allocate(1140960);

        // Thread t = new BytesBuff(buff);
        //t.start();

        try {

            InputStream in = new BufferedInputStream(new FileInputStream(f));

                //Trzeba ustawić w klasie FreeCarousel.BIG_ENDIAN = true
                //najlepiej przez zrobienie tego parametrem konstruktora
                //użycie



                TransferQueue<FreeCarousel.Pack> out = new LinkedTransferQueue<>();

                FreeCarousel paczki = new FreeCarousel(in, out,
                        new FreeCarousel.PackHeadFinder()
                        {
                                @Override public int findHeadInBuffer(byte[] buffer,
                                        byte[] nextBuffer) { return 0; } //zawsze początek bufora
                        });
                //odczytywanie paczek, synchroniczne

                 paczki.packsRead();
...

Exception in thread "Thread-0" java.lang.ArrayIndexOutOfBoundsException
        at java.lang.System.arraycopy(Native Method)
        at java.util.Arrays.copyOfRange(Arrays.java:2551)
        at javaapplication3.FreeCarousel.packsRead(FreeCarousel.java:136)
        at javaapplication3.Main.run(Main.java:78)

 
0

Wyslalem Ci teraz wiadomosc z haslem

0

W trzecim poście od góry powyżej wrzuciłem poprawiony i znacznie bardziej przejrzysty kod karuzeli wczytującej takie proste paczki jakie potrzebujesz. Dołożyłem na koniec kilka metod do debugowania, ale można je szybko wyciąć lub zgasić zmienną DEBUG.
Jedyne co się zmieni w wywołaniu konstruktora, to dodatkowy pierwszy parametr dla big endian:

FreeCarousel paczki = new FreeCarousel(true, in, out,
	new FreeCarousel.PackHeadFinder()
	{
		@Override public int findHeadInBuffer(byte[] buffer,
			byte[] nextBuffer) { return 0; } //zawsze początek bufora
	});

Wczytywane są wszystkie 634 paczki, a ponieważ ich rozmiar rzadko kiedy przekracza pół kilo, więc takiej wielkości bufor jest wystarczający. Możesz oczywiście ustawić sobie bufor dowolnej wielkości - każdy zadziała poprawnie. Byle był większy od 4 bajtów. Nie jest wykluczone, że zadziałałby też bufor 1-2 bajtowy po zdjęciu blokady minimum rozmiaru nagłówka (tu 4 bajty). Z takim szalonym rozmiarem ten kod też powinien sobie poradzić.

Przekaż pozdrowienia dla pracodawcy. ;)

0

Działa !!!! Super dzieki wielkie, pracodawca pozdrowiony ;)

0

Hej

Wracam do tematu, wszytko pięknie działa, ale jak dostaje 15000 paczek na minute to po kilku godzinach brakuje javie pamieci. Trzeba wtedy restartować, czy nie powinno być coś czyszcone w karuzeli.

Pozdrawiam

PS dlatego nie zamykalem tematu

0

Po pierwsze czy wyłączyłeś tryb debug? Powoduje on przydzielanie losowych wielkości buforów, co na wydajność i na śmieciarza raczej dobrze nie wpływa.

Jeżeli masz zablokowaną pamięć, to znaczy, że coś na wyjściu nie jest czyszczone i zostają referencje do obiektów. Karuzela ma wszystkie dane tymczasowe czyszczone, a jedyne agregaty danych, to zbiór listenerów, którym zarządza Twój kod oraz kolejka transferowa, z której danymi zarządza również Twój kod. Z danych wejściowych dostarczasz obiekt implementujący PackHeadFinder - dopóki istnieje obiekt karuzeli to on też nie zostanie zniszczony (dla każdej z nich). Żadne inne pola nie są kolekcjami i w żadnym z nich nie pozostaje referencja do poprzednich danych (no chyba, że znajdziesz :) Tak więc musisz poszukać co się dzieje z tymi danymi.
Proponowałbym zmniejszyć do minimum opcję Xmx (w JVM) tak aby zrównać ją z najmniejszą Xms (eksperymentalnie, dzieląc za każdym ostatnią wartość przez 2) przy jakiej choćby kilka pierwszych paczek może zostać odebrane. Wtedy odpal sobie profilera i sprawdź które dane nie schodzą do śmieciarza, tylko zaczynają się kolekcjonować. Karuzela działa tak, że wszelkie tymczasowe bufory potrzebne dzo ściągania danych alokowane są raz i używane powtórnie, o ile jest to możliwe, albo przypisuje nowe dane w miejsce starych referencji (stare więc idą na śmietnik).
Ale odbierane dane, których na godzinę jest 900 000 paczek oznaczają, że jeżeli średnio paczka zajmuje 4 KB (a pewnie więcej), to tych danych będzie ok. 3,4 GB/godzinę. Oznacza to, że jeżeli reszta kodu nie przetwarza i nie niszczy tych danych odpowiednio szybko, to masz problem bo dla Javy takie obciążenie pamięci stanie się brzegowe. Raczej więc obstawiałbym, że odbierane paczki są gdzieś niepotrzebnie kolekcjonowane lub nie dość szybko niszczone.

ps. Znalazłem u siebie nowszą wersję tej klasy, w której poprawiłem kiedyś jakieś drobiazgi. Jeżeli chcesz, to możesz ją sobie porównać jakimś diffem i ewentualnie użyć:
(niestety jakaś durnota na tym forum znowu dostawia kolejne LF do kodu - sprawdzałem z LF i CR/LF, musisz więc sobie sam usunąć nadmiarowe)

package com.olamagato.streams;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.TransferQueue;

/**
 * Przykład odczytu prostych paczek z karuzeli.
 * @author Olamagato
 * @version 1.04
 */
public class FreeCarousel
{
	private static boolean DEBUG = true;
	/** Rozmiar nagłówka paczki. Tu składa się tylko z rozmiaru paczki */
	private static final int HEAD_SIZE = 4;
	/** rozmiar bufora nie może być mniejszy niż rozmiar nagłówka */
	private static final int BUFFER_SIZE = 1024; //bufor 1KB

	/** Definiuje kryterium odszukania i użycia nagłówka paczki */
	public interface PackHead
	{
		/**
		 * Zwraca index nagłówka paczki znalezionego w buforze lub
		 * liczba ujemna jeżeli takiego nagłówka nie można znaleźć.
		 * Bufor nextBuffer jest przeszukiwany opcjonalnie do ustalenia
		 * położenia nagłówka na granicy buforów, ale tak aby indeks
		 * jego pierwszego bajtu znalazł się w zakresie
		 * buffer[0..buffer.length - 1]. Argument nextBuffer nie uczestniczy
		 * w rozpoznawaniu istnienia nagłówka poza tym zakresem - jeżeli
		 * początek nagłówka nie uda znaleźć w zmiennej buffer, to wynikiem
		 * musi być liczba ujemna bez względu na to czy udałoby się go znaleźć
		 * w nextBuffer.
		 * Metoda nie może w jakikolwiek sposób modyfikować danych w
		 * argumentach buffer i nextBuffer.
		 * @param buffer przeszukiwany bufor, całkowicie wypełniony danymi
		 * @param nextBuffer następny bufor pozwalający rozpoznać nagłówek
		 * podzielony między dwa bufory, całkowicie wypełniony danymi
		 * @return indeks pierwszego bajtu w buforze (0..buffer.length - 1)
		 * lub liczba ujemna jeżeli nagłówka w buforze nie można znaleźć
		 */
		int findHeadInBuffer(final byte[] buffer, final byte[] nextBuffer);

		/**
		 * Zwraca liczbę prostych pól nagłówka możliwych do odczytania.
		 * @return to co widać
		 */
		int fieldCount();

		/**
		 * Zwraca numer pola rozmiaru paczki w nagłówku [0..fieldCount()-1].
		 * Pole rozmiaru paczki nie może być ujemne i może mieć
		 * od 8 do 32-bitów (1 do 4 bajtów).
		 * @return nr bajtu od którego zaczyna się w nagłówku pole rozmiaru.
		 */
		int getNumberSizeField();

		/**
		 * Zwraca rozmiar wybranego pola nagłówka w bitach. Rozmiar
		 * pola powinien być wielokrotnością 8 bitów i nie może być
		 * większy niż 64 bity. Odpowiada to wielkości dowolnej
		 * prostej zmiennej.
		 * @param nrPola numer pola od 0 do fieldCount()-1
		 * @return rozmiar pola w bitach
		 */
		int getFieldSize(final int nrPola);

		/**
		 * Ustawia pole o podanym indeksie przez wartość headField z
		 * uwzględnieniem kolejności ułożenia bajtów.
		 * @param nrPola numer pola od 0 do fieldCount()-1
		 * @param headField opakowana wartość long pola typu prostego
		 */
		void setField(final int nrPola, long headField);

		/**
		 * Zwraca reprezentację binarną wybranego pola.
		 * @param nrPola numer pola do odczytania
		 * @return wartość long pola typu prostego
		 */
		long getField(final int nrPola);
	} //interface PackHead

	/**
	 * Reprezentuje całą paczkę danych odczytaną z karuzeli.
	 * Jest listą tablic bajtowych o maksymalnej wielkości BUFFER_SIZE każda.
	 */
	public interface Pack extends List<byte[]>
	{
		/**
		 * Zwraca nagłówek paczki danych.
		 * @return nagłówek paczki
		 */
		PackHead getHeadData();
	} //interface Pack extends List<byte[]>

	/** Obserwator dodawania nowej paczki */
	public interface PackReadListener
	{
		/**
		 * Wywoływana po skompletowaniu odczytu każdej nowej paczki.
		 * @param completedPack zakończona paczka danych
		 */
		void packReadCompleted(final Pack completedPack);
	} //interface PackReadListener

	/**
	 * Tworzy obiekt reprezentujący karuzelę danych
	 * @param bigEndian
	 * @param input strumień wejściowy
	 * @param output kolejka wyjściowa zbierająca kolejne paczki danych
	 * @param packHead pozwala ustalić kryterium dla znalezienia nagłówka
	 * paczki
	 */
	public FreeCarousel(final boolean bigEndian, final InputStream input,
		final TransferQueue<Pack> output, final PackHead packHead)
	{
		if(input == null || packHead == null)
			throw new NullPointerException("Null pointer argument.");
		if(BUFFER_SIZE < HEAD_SIZE)
			throw new IllegalStateException("BUFFER_SIZE (" + BUFFER_SIZE
				+ "is lesser than pack HEAD_SIZE = " + HEAD_SIZE);
		this.bigEndian = bigEndian;
		this.input = input;
		this.output = output;
		pack = new PackReader(packHead);
		listeners = Collections.synchronizedSet(
			new HashSet<PackReadListener>()); //obserwatorzy paczek
		lastPack = null;
	}

	/**
	 * Dodaje nasłuchiwacza informującego o zakończeniu wczytywania paczki.
	 * @param listener obiekt nasłuchujący
	 * @return this dla wygodniejszego użycia
	 */
	public FreeCarousel addPackReadListener(final PackReadListener listener)
	{
		listeners.add(listener);
		return this;
	}

	/**
	 * Usuwa nasłuchiwacza informującego o zakończeniu wczytywania paczki.
	 * @param listener obiekt nasłuchujący
	 * @return this dla wygodniejszego użycia
	 */
	public FreeCarousel removePackReadListener(final PackReadListener listener)
	{
		listeners.remove(listener);
		return this;
	}

	/**
	 * Odczytuje kolejne paczki aż do końca strumienia.
	 * Odczytane dane w postaci całych bezbłędnie odczytanych paczek
	 * umieszcza w kolejce wyjściowej.
	 * Pierwsza i ostatnia paczka zostaną odrzucone, jeżeli nie były
	 * całe w strumieniu.
	 *
	 * @return this dla wygodniejszego użycia
	 * @throws IOException w przypadku błędu odczytu ze strumienia
	 */
	public FreeCarousel packsRead() throws IOException
	{
		if(this.buffer == null)
			this.buffer = new InputDataBuffer(input, BUFFER_SIZE);
		lastPack = new PackInternal();
		bytesRead = pack.readData(); //odczyt pierwszego bufora
		while(bytesRead >= 0 && pack.setBufferDataStart())
		{	//ustawiony początek danych w buforze
			pack.setBufferDataEnd(); //ustawienie końca danych w buforze
			if(pack.getDataStartIdx() < pack.getDataEndIdx())
			{	//są jakieś dane do dołączenia
				if(DEBUG)
					debugCheckDataIdx();
				//dołączenie danych kolejnego bufora do paczki
				lastPack.add(Arrays.copyOfRange(buffer.get(),
					pack.getDataStartIdx(), pack.getDataEndIdx()));
				if(pack.isTheEnd()) //zakończenie paczki, wzięcie nowej?
					finalizeAndChangePack();
			}
			if(pack.getDataEndIdx() == bytesRead) //odczytano cały bufor
				bytesRead = pack.readData(); //reset zakresu danych
		}
		if(DEBUG)
			debugShowPacks();
		return this;
	}

	/**
	 * Kończy paczkę, informuje obserwatorów o jej zakończeniu
	 * oraz rozpoczyna wczytywanie następnej.
	 */
	private void finalizeAndChangePack()
	{
		lastPack.finalize(pack.head);
		output.add(lastPack); //dodanie kolejnej całej paczki
		for(PackReadListener packReadListener: listeners)
			packReadListener.packReadCompleted(lastPack);
		pack.locateNextHead(); //ustala miejsce nowego nagłówka danych
		lastPack = new PackInternal(); //tworzy nową pustą paczkę danych
	}

	/**
	 * Reprezentuje nagłówek paczki danych. Przeprowadza operacje na
	 * danych z nagłówka (tutaj tylko size)
	 */
	private class PackReader
	{
		/**
		 * Ustawia warunki dla pobrania najbliższej paczki.
		 * @param packHead ustala kryterium dla znalezienia nagłówka paczki
		 */
		private PackReader(final PackHead packHead)
		{
			this.head = packHead;
			reset(dataStartIdx = dataEndIdx = -1);
		}

		/**
		 * Resetuje dane dla nowej paczki danych.
		 * Ustawia indeks rozmiaru paczki na podany newSizeIdx
		 * @param newSizeIdx nowy indeks rozmiaru paczki w buforze lub -1
		 */
		private void reset(final int newSizeIdx)
		{
			read = 0;
			size = -1;
			headFieldIdx = newSizeIdx;
		}

		/**
		 * Indeks początku danych w buforze.
		 * @return indeks bajtu początkowego
		 */
		private int getDataStartIdx() { return dataStartIdx; }

		/**
		 * Indeks za końcem danych w buforze.
		 * @return indeks bajtu poza buforem (exclusive)
		 */
		private int getDataEndIdx() { return dataEndIdx; }

		/**
		 * Zwraca odczytany rozmiar paczki.
		 * @return rozmiar bieżącej paczki
		 */
		private int getPackSize() { return size; }

		/**
		 * Ustala położenie nagłówka kolejnej paczki za końcem poprzedniej.
		 */
		private void locateNextHead() { reset(dataEndIdx); }

		/**
		 * Ustala początek danych do odczytu w buforze.
		 * Ustawia pole bufferDataStartIdx na właściwy indeks.
		 * @return false jeżeli nastąpił koniec strumienia,
		 * true jeżeli udało się ustalić początek danych.
		 * @throws IOException błąd wczytywania danych
		 */
		private boolean setBufferDataStart() throws IOException
		{
			if(size >= 0) //w trakcie wczytywania paczki
				dataStartIdx = 0; //początek danych na początku bufora
			else if(headLocated() && headLoaded()) //nowa paczka
			{
				dataStartIdx = headFieldIdx; //pocz. danych za rozm. paczki
				headFieldIdx = -1;
			}
			else
				return false; //nastąpił koniec strumienia
			return true;
		}

		/**
		 * Ustala koniec danych do odczytu w buforze.
		 * Ustawia pole bufferDataEndIdx na właściwy indeks.
		 * Ustawia zmienna isTheEnd na true jeżeli wykryto koniec paczki.
		 * @return true jeżeli wystąpił koniec odczytywanej paczki
		 */
		private boolean setBufferDataEnd()
		{	//pozostałe w buforze dane do wczytania
			final int bufferData = bytesRead - dataStartIdx; //może być 0
			//ilość danych potrzebna do zamknięcia paczki
			//zał. read < size, zawsze dodatnia
			final int brakująceDoPaczki = size - read;
			//zawsze false jeżeli bufferData == 0
			this.isTheEnd = brakująceDoPaczki <= bufferData;
			if(isTheEnd) //czy koniec paczki w tym buforze?
			{	//doczytujemy tylko dane brakujące do paczki
				read += brakująceDoPaczki;
				//koniec danych równy z końcem paczki
				dataEndIdx = dataStartIdx + brakująceDoPaczki;
			}
			else //paczka nie skończy się w tym buforze
			{	//doczytanie do paczki całego pozostałego bufora
				read += bufferData; //może dodać 0
				//koniec danych równy z końcem bufora
				dataEndIdx = bytesRead; //dataStartIdx + bufferData;
			}
			return this.isTheEnd;
		}

		/**
		 * Przeszukuje kolejne bufory w poczukiwaniu początku paczki.
		 * Zmienną sizeIdx ustawia za wczytany rozmiar = początek danych.
		 * @return true jeżeli indeks rozmiaru jest znaleziony,
		 * false jeżeli nastąpił koniec strumienia
		 * @throws IOException błąd odczytu
		 */
		private boolean headLocated() throws IOException
		{
			while(headFieldIdx < 0)
				if(!headIdxFound())
				{
					bytesRead = buffer.read();
					if(bytesRead == -1)
						return false; //koniec strumienia, koniec pobierania
				}
			return true;
		}

		/**
		 * Zwraca true w przypadku znalezienia początku nagłówka w buforze.
		 * Ustawia zmienną sizeIdx na indeks początku nagłówka.
		 * @return true jeżeli początek nagłówka jest znaleziony
		 */
		private boolean headIdxFound()
		{	//podstawia dla findHeadInBuffer bufory dopasowane do danych
			headFieldIdx = head.findHeadInBuffer(
				getData(buffer.get(), buffer.getReadBytes()),
				getData(buffer.get(true), buffer.getReadBytes(true)));
			return headFieldIdx >= 0;
		}

		/**
		 * Odczyt kolejnych pól paczki poprzez kolejne odczytywane bufory.
		 * Ustawia pole sizeIdx na pierwszy bajt za odczytanym rozmiarem.
		 * @return true jeżeli wczytano rozmiar paczki, false jeżeli
		 * nastąpił koniec strumienia.
		 * @throws IOException błąd odczytu
		 */
		private boolean headLoaded() throws IOException
		{
			for(int nrPola = 0; nrPola < head.fieldCount(); ++nrPola)
				if(!isFieldLoaded(nrPola))
					return false;
			return true;
		}

		/**
		 * Informuje czy udało się ustawić pole o podanym numerze.
		 * Jeżeli jest to pole rozmiaru, to odczytuje go do 32-bitowej
		 * zmiennej size bez względu na to jaki jest faktyczny rozmiar
		 * tego pola.
		 * @param nrPola nr pola do załadowania ze strumienia
		 * @return true jeżeli pole zostało wypełnione, false = koniec danych
		 * @throws IOException błąd odczytu
		 */
		private boolean isFieldLoaded(final int nrPola)
			throws IOException
		{
			Long field = readField(head.getFieldSize(nrPola));
			if(field == null)
				return false;
			if(nrPola == head.getNumberSizeField())
			{
				size = field.intValue() & Integer.MAX_VALUE;
				if(DEBUG) debugPackSize();
			}
			head.setField(nrPola, field);
			return true;
		}

		/**
		 * Ładuje ze strumienia pole o podanym rozmiarze i zwraca go
		 * jako obiektową wartość Long. Bajty są odwracane jeżeli wartości
		 * są intepretowane jako big endian (to samo dotyczy pól innych
		 * niż numeryczne).
		 * @param fieldSize rozmiar pola w bitach, nie większy niż Long.SIZE
		 * @return odczytana wartość lub null po natrafieniu na koniec danych
		 * @throws IOException błąd odczytu
		 */
		private Long readField(final int fieldSize) throws IOException
		{
			long headField = 0; //przygotowanie do wczytywania
			for(int sizeSh = 0; sizeSh < fieldSize; sizeSh += Byte.SIZE)
				//kolejny bajt do doczytania w buforze?
				if(headFieldIdx < bytesRead)
					headField |= buffer.get()[headFieldIdx++] << sizeSh;
				else
				{	//nie pozwala pominąć bajtu rozmiaru przez anulowanie
					sizeSh -= Byte.SIZE; //inkrementacji licznika bajtów
					//początek następnego bufora zawiera resztę rozmiaru
					headFieldIdx = 0;
					bytesRead = buffer.read();
					if(bytesRead == -1)
						return null;//koniec strumienia i pobierania
				}
			if(bigEndian) headField = Long.reverseBytes(headField)
				>>> (Long.SIZE - fieldSize);
			return headField;
		}

		/**
		 * Wczytuje kolejną porcję danych do bufora.
		 * Resetuje położenie danych w buforze na domyślne (wszystkie dane
		 * bufora należą do bieżącej paczki).
		 * Resetuje położenie nagłówka jeżeli miałby się znaleźć
		 * dokładnie na końcu danych poprzedniego bufora.
		 * @return ilość odczytanych do bufora bajtów danych
		 * @throws IOException
		 */
		private int readData() throws IOException
		{
			dataStartIdx = 0; //początek nowego bufora
			if(headFieldIdx == dataEndIdx && isTheEnd)
				//przenosi lokację nagłówka na początek kolejnej paczki
				headFieldIdx = dataStartIdx;
			//odczyt kolejnego bufora
			return dataEndIdx = buffer.read();
		}

		/**
		 * Informuje o końcu paczki.
		 * @return true jeżeli w aktualnym buforze jest ostatnia część paczki.
		 */
		private boolean isTheEnd() { return isTheEnd; }
		private final PackHead head;
		private int dataStartIdx; //indeks początku danych paczki w buforze
		private int dataEndIdx; //indeks za końcem danych paczki w buforze
		private int headFieldIdx; //indeks bajtu rozmiaru w bieżącym buforze
		private boolean isTheEnd; //informuje czy wykryto ostatnią część w buf.
		private int size; //rozmiar bieżącej paczki
		private int read; //ilość dotychczas odczytanych danych z paczki
	} //class PackReader

	/**
	 * Reprezentuje implementację całej paczki danych odczytanej z karuzeli.
	 * Jest listą tablic bajtowych o maksymalnej wielkości BUFFER_SIZE każda.
	 */
	private static class PackInternal extends LinkedList<byte[]>
		implements Pack
	{
		/** Tworzy nowa pustą paczkę nie zawierającą żadnego fragmentu */
		private PackInternal() {}

		@SuppressWarnings("UseOfSystemOutOrSystemErr")
		@Override public boolean add(byte[] e)
		{
			if(DEBUG) System.out.printf("Część %03d: %03d B (%03X)%n",
				this.size(), e.length, e.length);
			return super.add(e);
		}

		/**
		 * Zwraca odczytany nagłówek paczki danych.
		 * @return nagłówek paczki
		 */
		@Override public PackHead getHeadData() { return packHead; }

		/**
		 * Zapisuje w obiekcie łączny rozmiar paczki.
		 * @param packHead
		 */
		private void finalize(final PackHead packHead)
			{ this.packHead = packHead; }

		private PackHead packHead;
		private static final long serialVersionUID = 197103271110071103L;
	} //class PackInternal extends LinkedList<byte[]>

	/**
	 * Reprezentuje wirtualny bufor danych wejściowych posługujący się dwoma
	 * buforami o podanej długości, do których wczytuje dane ze strumienia.
	 * Większość operacji bazuje na symulowaniu wczytywania bufora odczytanego
	 * wcześniej. Dzięki temu jest możliwe udostępnienie "przyszłych danych".
	 */
	private static class InputDataBuffer
	{
		/**
		 * Reprezentuje podwójny bufor danych, który jest odczytywany
		 * z wyprzedzeniem jednego bufora.
		 * Utworzenie bufora powoduje od razu wczytanie pierwszego bufora,
		 * aby każda następna operacja wczytania odczytywała wyprzedzająco
		 * jeden dodatkowy bufor danych.
		 * @param input strumień wejściowy
		 * @param bufferSize maksymalny rozmiar buforów roboczych
		 * @throws IOException błąd odczytu ze strumienia
		 */
		public InputDataBuffer(final InputStream input,
			final int bufferSize) throws IOException
		{
			this.input = input;
			buffer1 = new byte[bufferSize];
			buffer2 = new byte[bufferSize];
			internalRead();
		}

		/**
		 * Zwraca zawsze najstarszy wypełniony bufor.
		 * @return najstarszy bufor.
		 */
		public final byte[] get() { return get(false); }

		/**
		 * Zwraca bufor, który jest odczytywany później lub wcześniej zależnie
		 * od prawdziwości parametru lastBuffer.
		 * @param lastBuffer najnowszy wczytany bufor (wyprzedzający)
		 * @return wybrany bufor
		 */
		public final byte[] get(final boolean lastBuffer)
		{
			return lastBuffer == firstBuffer ? buffer1 : buffer2;
		}

		/**
		 * Zwraca zawsze rozmiar najstarszego odczytanego bufora.
		 * @return liczba bajtów bufora.
		 */
		public final int getReadBytes() { return getReadBytes(false); }

		/**
		 * Zwraca wielkość bufora odczytywanego później lub wcześniej zależnie
		 * od prawdziwości parametru lastBuffer.
		 * @param lastBuffer najnowszy wczytany bufor (wyprzedzający)
		 * @return liczba bajtów wybranego bufora
		 */
		public final int getReadBytes(final boolean lastBuffer)
		{
			return lastBuffer == firstBuffer ? bytesRead1 : bytesRead2;
		}

		/**
		 * Odczytuje dane strumienia do bufora bieżącego i zwraca ilość
		 * danych wcześniej odczytanego bufora zapasowego.
		 * @return liczba bajtów najstarszego wypełnionego bufora
		 * @throws IOException blablabla
		 */
		public final int read() throws IOException { return internalRead(); }

		/**
		 * Odczytuje dane strumienia do bufora bieżącego i zwraca ilość
		 * danych wcześniej odczytanego bufora zapasowego.
		 * Wersja dla inicjacji danych konstruktora.
		 * @return liczba bajtów najstarszego wypełnionego bufora
		 * @throws IOException blablabla
		 */
		private int internalRead() throws IOException
		{	//wypełnia kolejny, inny bufor niż poprzednio wypełniony
			//DEBUG. Odczytywanie losowych ilości danych
			final int debugRozmBufora = DEBUG ?
				los.nextInt(BUFFER_SIZE) : BUFFER_SIZE;
			final int bytesRead = input.read(nextBuffer(), 0, debugRozmBufora);
			//ilość odczytanych bajtów przypisuje odczytanemu buforowi
			if(firstBuffer)
				bytesRead1 = bytesRead;
			else
				bytesRead2 = bytesRead;
			return getReadBytes(); //rozmiar wcześniejszego odczytu
		}

		/**
		 * Zmienia bufor podstawowy na zapasowy i odwrotnie oraz
		 * zwraca ten bufor jako rezultat.
		 * Efektem ubocznym jest ustawienie zmiennej firstBuffer
		 * zgodnie z właśnie zwróconym buforem.
		 * @return bufor zgodny z ustawioną wartością firstBuffer
		 */
		private byte[] nextBuffer()
		{	//uwaga, przypisanie w warunku
			return (firstBuffer = !firstBuffer) ? buffer1 : buffer2;
		}
		private final InputStream input; //strumień danych wejściowych
		private final byte[] buffer1, buffer2; //dwa bufory danych
		private static final Random los = new Random(); //debug
		//zmienne stanu
		private boolean firstBuffer; //ostatnio wypełniony bufor to buffer1
		private int bytesRead1, bytesRead2; //wielkości odczytanych danych

	} //static class InputDataBuffer

	/**
	 * Zwraca tablicę będącą argumentem jeżeli ilość podanych danych
	 * jest nie mniejsza od jej rzeczywistej długości, albo lub krótszą
	 * kopię tej tablicy jeżeli podana ilość danych jest mniejsza niż
	 * jej długość.
	 * Procedura unika kopiowania danych w typowej sytuacji gdy bufor
	 * jest całkowicie wypełniony danymi.
	 * @param dataBuffer bufor z danymi bajtowymi
	 * @param dataLength rzeczywista ilość danych
	 * @return to co widać
	 */
	private static byte[] getData(final byte[] dataBuffer,
		final int dataLength)
	{
		return dataLength >= dataBuffer.length ?
			dataBuffer : Arrays.copyOf(dataBuffer, dataLength);
	}

	//debug
	@SuppressWarnings("UseOfSystemOutOrSystemErr")
	private void debugCheckDataIdx()
	{
		final int start = pack.getDataStartIdx();
		final int end = pack.getDataEndIdx();
		if(start <= end && start >= 0 && start <= bytesRead
			&& end >= 0 && end <= bytesRead)
			return;
		else
			throw new IllegalStateException(String.format(
				"%nbytesRead=%d (%x)%n"
				+ "size=%d (%x)%n"
				+ "sizeIdx=%d (%x)%n"
				+ "start=%d (%x)%n"
				+ "end=%d (%x)",
				bytesRead, bytesRead,
				pack.size, pack.size,
				pack.headFieldIdx, pack.headFieldIdx,
				start, start,
				end, end));
	}

	@SuppressWarnings("UseOfSystemOutOrSystemErr")
	private void debugShowPacks()
	{
		System.out.printf("Wczytano %d paczek.%n", output.size());
		if(pack.getPackSize() >= 0)
			System.out.println(
				"Projektowany Rozmiar ostatniej, niedokończonej paczki = "
				+ pack.getPackSize());
	}

	@SuppressWarnings("UseOfSystemOutOrSystemErr")
	private void debugPackSize()
	{
		//gdy rozmiar paczki w strumieniu obejmuje rozmiar nagłówka paczki
		if(pack.size < 0)
			throw new IllegalStateException("Bad input data: "
				+ "Negative pack size = " + pack.size
				+ String.format(
				"%nbytesRead=%d (%x)%n"
				+ "size=%d (%x)%n"
				+ "sizeIdx=%d (%x)%n"
				+ "start=%d (%x)%n"
				+ "end=%d (%x)",
				bytesRead, bytesRead,
				pack.size, pack.size,
				pack.headFieldIdx, pack.headFieldIdx,
				pack.dataStartIdx, pack.dataStartIdx,
				pack.dataEndIdx, pack.dataEndIdx));
		System.out.printf("Paczka %03d: %d B (%03X)%n",
			output.size(), pack.size, pack.size);
	}
	//debug end

	/** Sposób odczytu liczb wielobajtowych w strumieniu. */
	private final boolean bigEndian; //kodowanie liczb
	private final InputStream input; //strumień wejściowy
	private final TransferQueue<Pack> output; //interfejs wyjściowy
	private final PackReader pack; //mechanizm obsługujący paczki
	private final Set<PackReadListener> listeners; //obserwatorzy paczek
	//zmienne stanu
	private PackInternal lastPack; //bieżąca zbierana paczka danych
	private InputDataBuffer buffer; //bufor danych wejściowych
	private int bytesRead; //ilość pobranych danych w buforze
} //class FreeCarousel
0

Witam

Sorry ze tak wracam do wątku, ale temat powrócił jak bumerang i chciałbym zdiagnozować gdzie paczki się kumulują. Tak DEBUG został wyłączony, profilerem namierzyłam ze tworzą się instancje w klasach** FreeCarousel$PackInternal** oraz LinkedTransferQueue$Node.

Pozostałe obiekty są czyszczone.

Moja implementacja wygląda


InputStream in = new BufferedInputStream(conn.getInputStream());

TransferQueue<FreeCarousel.Pack> out = new LinkedTransferQueue<>();

                FreeCarousel paczki = new FreeCarousel(true, in, out,
                        new FreeCarousel.PackHeadFinder() {

                            @Override
                            public int findHeadInBuffer(byte[] buffer,
                                    byte[] nextBuffer) {
                                return 0;
                            } //zawsze początek bufora
                        });



                Thread streamDecode = new StreamSave(paczki);
                streamDecode.start();

                paczki.packsRead();

 

i same odbieranie w wątku

public class StreamSave extends Thread {

    public FreeCarousel paczki;

    public StreamSave(FreeCarousel paczki) {
        this.paczki = paczki;

    }

    public FreeCarousel getPaczki() {
        return paczki;
    }

    @Override
    public void run() {
        super.run();

            final StreamSaveThread save = new StreamSaveThread();
            save.start();

        try {




            paczki.addPackReadListener(new FreeCarousel.PackReadListener() {

                @Override
                public void packReadCompleted(Pack completedPack) {
                    //osobnym wątku a nie tutaj.

                    Pack p = completedPack;

                    //ByteBuffer bytBuff = ByteBuffer.allocate(p.length());
                    
                    Iterator<byte[]> ps = p.iterator();

                    byte[] paczka = ps.next();
                    ps.remove();
                    p.clear();


                    save.savePackage(paczka);
                    paczka = null;
                    // rozpoznawanie paczek


                }
            });

        } catch (java.lang.OutOfMemoryError outm) {

            System.exit(0);

        } finally {

        }

    }
}

 

Nie wiem jak przeprowadzić czyszczenie tych instancji obiektu.

0

Twój problem to PackListener. Nie służy on w żadnym przypadku do odczytywania paczek. Otrzymanie wywołania informuje tylko o tym, że pojawił się nowy element do odczytania np. za pomocą TransferQueue<E>.transfer(E e) (lub niepewnego tryTransfer). No i w żadnym wypadku nie należy referencji z listenera (do paczki) przekazywać do jakiegokolwiek innego wątku ponieważ ta sama referencja wędruje do kolejki transferowej (out). Referencja ta służy tylko do łatwiejszej identyfikacji właśnie dołożonej paczki w przypadku gdyby kolejka transferowa nie była jeszcze pusta (a powinna jeżeli dane będą ściągane na bieżąco)
Obiekt może być zniszczony przez śmieciarkę tylko wtedy kiedy zniknie ostatnia twarda referencja do niego. Dlatego jeżeli referencje do paczek znajdą się gdzieś poza kolejką transferową to paczki te nigdy nie zostaną zniszczone lub stanie się to wtedy kiedy znikną kopie tych referencji (lub zostaną zamazane czymś innym).
Z tego co widzę, to w listenerze, który z niewiadomych powodów znajduje się w osobnym wątku (a więc chyba jakiejś n-tej kopii), robisz coś jeszcze gorszego bo kasujesz zawartość paczki, która poszła do kolejki transferowej. Stąd kolejka ta będzie zawierać puste paczki i właściwie nie ma prawa normalnie działać. W poście z 2011-10-11 22:59 pokazałem przykład jak należy użyć odczytu paczek. Tyle, że zamiast System.out.println należałoby przekazać sygnał o nowej paczce do osobnego wątku, w którym będziesz odczytywał kolejkę transferową (out) - (coś jak invokeLater w Swingu, czy publish w SwingWorkerze) i co najważniejsze NATYCHMIAST kończyć packReadCompleted.

W większości wypadków parametr completedPack w metodzie packReadCompleted powinien być zasadniczo nieużywany bo z założenia po każdym takim odczycie kolejka out powinna być opróżniana z jedynego elementu jaki w niej powinien zostać właśnie umieszczony. Istnieje on tylko dla rzadkiego przypadku kiedy będziesz miał kilka różnych odczytów paczek jednocześnie (czyli wielu producentów) z kilku karuzeli i wszystkie będą pchane do tej samej kolejki transferowej.

Jeżeli nie wykorzystujesz kolejki out do ściągania z niej elementów to:

  1. Kolejka ta cały czas pęcznieje, a kolejne jej węzły przechowują twarde referencje do wszystkich już odczytanych paczek (za to pustych bo skasowanych przez Ciebie w listenerze, więc problem wychodzi po milionach odczytów).
  2. Śmieciarka, ani nie kasuje węzłów kolejki, ani nie likwiduje obiektów typu Pack (czyli ich implementacji PackInternal) ponieważ twarde referencje do nich przechowują węzły kolejki, których nigdy nie usuwasz. Dodatkowo powoduje to ogromną fragmentację pamięci dzięki czemu samo odczytywanie paczek będzie szło coraz wolniej. I nie da się tego wychwycić profilerem bo będzie to na niedostępnym poziomie zarządcy pamięci.
    Masz więc dokładnie to czego należałoby się spodziewać, czyli niby wyciek pamięci. Piszę "niby" ponieważ to żaden wyciek skoro de facto zapychasz kolekcję elementami, których nigdy nie usuwasz.

Użyj więc kolejki out, a do odbierania z niej paczek zastosuj take lub poll. Parametr completedPack w listenerze zostaw nieużywany. Ewentualnie dla testu możesz sobie sprawdzać czy referencja uzyskana z take/poll jest tożsamościowo tą samą co completedPack (ale to raczej bezużyteczne).

Pamiętaj, że metoda obsługi zdarzenia packReadCompleted jest wywoływana z wątku karuzeli, więc przekazywanie paczek (savePackage) będzie spowalniało (zatrzymywało na chwilę) równomierne odczytywanie paczek przez karuzelę. Tak jak każda metoda zdarzeniowa powinna ona się kończyć tak szybko jak to możliwe przekazując tylko sam sygnał do innego wątku o nadejściu nowej paczki do out (zamiast jej odczytywania). Taki sygnał powinien działać podobnie jak w Swingu invokeAndLater lub w egzekutorach metoda cancel.
Najlepiej może w ogóle zrezygnować z wywołania addPackReadListener, a do odczytu kolejki out użyć jej metod (kolejki transferowej) czyli take/poll/isEmpty albo transfer/tryTransfer (oczywiście po zarejestrowaniu odbiorcy jako konsumenta).

0

Hej

Zrobiłem odczyt z kolejki out, wygląda ok, śmieciarz niszczy obiekty. Zrobiłem to w ten sposób:

public class StreamSaveOut extends Thread {

    public TransferQueue<FreeCarousel.Pack> out;

    public StreamSaveOut(TransferQueue<FreeCarousel.Pack> out) {
        this.out = out;

    }

    public TransferQueue<FreeCarousel.Pack> getOut() {
        return out;
    }

    @Override
    public void run() {
        super.run();

        final StreamSaveThread save = new StreamSaveThread();
        save.start();

        try {


            while (true) {


                if (!out.isEmpty()) {
                    
                    Iterator<byte[]> ps = out.take().iterator();
                    byte[] paczka = ps.next();
                    ps.remove();
                    save.savePackage(paczka);
                    paczka = null;
                }

            }


        } catch (InterruptedException ex) {
            System.out.println(ex.toString());
        } catch (java.lang.OutOfMemoryError outm) {

            System.exit(0);

        } finally {
        }

    }
}

 

Dzieki.

0

OK, tylko łapanie OutOfMemoryError samo w sobie jest nie w porządku (błędów się z definicji nie łapie), a "eliminowanie" go za pomocą System.exit to już grube nieporozumienie. To tak jakbyś do jazdy samochodem zakładał na oczy czarną opaskę i liczył, że szczęśliwie dojedziesz do celu. :) Zaprogramuj porządnie soft, to nigdy nie będziesz musiał go restartować w razie wysypki (bo ta nigdy nie nastąpi).

Poza tym ja bym to zrobił po prostu zwykłą pętlą for-each bo samo take eliminuje element z kolejki więc wszystko co jest w nim zawarte mogłoby być usunięte hurtem, a potem hurtem odmiecone. Rozdrabnianie usuwania powoduje tylko, że gc ma więcej do roboty (a nie masz nad nim kontroli) i na dodatek trudniej mu zarządzać zwracaną pamięcią (fragmentacja RAMu).

0

Hej

Rzeczywiście

 OutOfMemoryError

to poroniony pomysł, wyjątek usunołem z klasy, sprobowałem z for-ach w ten sposób:

 
for (Pack pack : out) {

                if (!out.isEmpty()) {

                    Iterator<byte[]> ps = out.take().iterator();
                    byte[] paczka = ps.next();
                    ps.remove();
                    save.savePackage(paczka);
                    paczka = null;
                }

 }

Ale po jakimś czasie odbieranie się zatrzymuje. Może masz propozycje jak to zrobić.

Zresztą z while (true){} zauważyłem ze procesor jest mocno obciążany 30-40%.

Pozdrawiam

1 użytkowników online, w tym zalogowanych: 0, gości: 1