Java odbieranie danych InputStream

0

Mam taki problem

Odbieram do bufora dane binarne z socketu.


InputStream in = new BufferedInputStream(conn.getInputStream());

                byte[] fileStream = new byte[1024]; //buffor 4KB

               while ((bytesRead = in.read(fileStream)) !=-1){

               System.out.println("Bytes:"+bytesRead);

               } 

Danych jest bardzo dużo "lecą bez przerwy"

Chciałbym wyodrebnić z tych danych paczki, jedyna informację o podziale tych paczek jest to ze pierwsze 4 bajty zawierają długość paczki. Powiedzmy to jest do opanowania bo można iterować poszczegolne paczki, ale w buforze nie zawsze będą pełne paczki danych, bedzie tak że na końcu bufora bedzie początek kolejnej paczki, a w nastepnym buforze koniec paczki. Czy macie jakieś sugestie. Moze sa jakies gotowe mechanizmy do obsługi tego typu danych?
Proszę o pomoc

Wojtek

0

Tu nie ma co kombinować. Najpierw musisz jakoś znaleźć początek jakiejkolwiek paczki. Zwykle są to jakieś charakterystyczne i unikalne ciągi danych, których raczej nie spotka się w paczce (zakładanie, że na początku pierwszego odczytanego bufora będzie początek paczki jest ryzykowne, ale może to być założeniem projektowym). Kiedy z któregoś odczytanego bufora namierzysz ten początek, to odczytujesz długość paczki, a następnie wczytujesz kolejne bufory i z pobranej długości odejmujesz bytesRead każdego kolejnego bufora (oczywiście w pierwszym buforze odejmujesz też ten zbędny kawałek bufora przed początkiem pierwszej namierzonej paczki.
Jeżeli kolejne odejmowanie zakończy się liczbą ujemną, to już wiesz, że w tym buforze masz koniec paczki oraz początek następnej. Na dodatek ten początek od razu namierzasz bez parsowania bufora bo będzie on w ostatnim odczytanym buforze pod indeksem o takiej wartości jak ujemna liczba w ostatnim odejmowaniu odczytanych danych do bufora.
W ten sposób namierzysz początek kolejnej paczki i cały proces się powtarza.

0

[Nie mogę zmodyfikować swojego posta powyżej.]

Ponieważ od dawna miałem zrobić sobie uniwersalną karuzelę do odczytu takich danych jak w temacie, więc po okrojeniu do powyższych potrzeb, przykładowy kod odczytujący paczki danych wyglądałby tak jak poniżej. Komentarze są tu po polsku, więc nie powinno być problemów z rozczytaniem co jest do czego. Ponieważ jest to dość uniwersalny przykład do rozbudowy, to z prostego kodu zrobił się lekko przydługawy.

[przetestowana wersja aktualna]

package com.olamagato.junk;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TransferQueue;

/**
 * Przykład odczytu prostych paczek z karuzeli.
 * @author Olamagato
 * @version 1.03
 */
public class FreeCarousel
{
    private static boolean DEBUG = true;
    /** Rozmiar nagłówka paczki. Tu składa się tylko z rozmiaru paczki */
    private static final int HEAD_SIZE = 4;
    /** rozmiar bufora nie może być mniejszy niż rozmiar nagłówka */
    private static final int BUFFER_SIZE = 512; //bufor 4KB

    /** Definiuje kryterium odszukania nagłówka paczki */
    public interface PackHeadFinder
    {
        /**
         * Zwraca index nagłówka paczki znalezionego w buforze lub
         * liczba ujemna jeżeli takiego nagłówka nie można znaleźć.
         * Bufor nextBuffer jest przeszukiwany opcjonalnie do ustalenia
         * położenia nagłówka na graniczy buforów, ale tak aby indeks
         * jego pierwszego bajtu znalazł się w zakresie
         * buffer[0..buffer.length - 1]. Argument nextBuffer nie uczestniczy
         * w rozpoznawaniu istnienia nagłówka poza tym zakresem - jeżeli
         * początek nagłówka nie uda znaleźć w zmiennej buffer, to wynikiem
         * musi być liczba ujemna bez względu na to czy udałoby się go znaleźć
         * w nextBuffer.
         * Metoda nie może w jakikolwiek sposób modyfikować danych w
         * argumentach buffer i nextBuffer.
         * @param buffer przeszukiwany bufor, całkowicie wypełniony danymi
         * @param nextBuffer następny bufor pozwalający rozpoznać nagłówek
         * podzielony między dwa bufory, całkowicie wypełniony danymi
         * @return indeks pierwszego bajtu w buforze (0..buffer.length - 1)
         * lub liczba ujemna jeżeli nagłówka w buforze nie można znaleźć
         */
        int findHeadInBuffer(final byte[] buffer, final byte[] nextBuffer);
    } //interface PackHeadFinder

    /**
     * Reprezentuje całą paczkę danych odczytaną z karuzeli.
     * Jest listą tablic bajtowych o maksymalnej wielkości BUFFER_SIZE każda.
     */
    public interface Pack extends List<byte[]>
    {
        /**
         * Rozmiar paczki w bajtach.
         * @return suma rozmiaru wszystkich fragmentów
         */
        int length();
    } //interface Pack extends List<byte[]>

    /** Obserwator dodawania nowej paczki */
    public interface PackReadListener
    {
        /**
         * Wywoływana po skompletowaniu odczytu każdej nowej paczki.
         * @param completedPack zakończona paczka danych
         */
        void packReadCompleted(final Pack completedPack);
    } //interface PackReadListener

    /**
     * Tworzy obiekt reprezentujący karuzelę danych
     * @param bigEndian
     * @param input strumień wejściowy
     * @param output kolejka wyjściowa zbierająca kolejne paczki danych
     * @param finder pozwala ustalić kryterium dla znalezienia nagłówka paczki
     */
    public FreeCarousel(final boolean bigEndian, final InputStream input,
        final TransferQueue<Pack> output, final PackHeadFinder finder)
    {
        if(input == null || finder == null)
            throw new NullPointerException("Null pointer argument.");
        if(BUFFER_SIZE < HEAD_SIZE)
            throw new IllegalStateException("BUFFER_SIZE (" + BUFFER_SIZE
                + "is lesser than pack HEAD_SIZE = " + HEAD_SIZE);
        this.bigEndian = bigEndian;
        this.input = input;
        this.output = output;
        pack = new PackHead(finder);
        listeners = Collections.synchronizedSet(
            new HashSet<PackReadListener>()); //obserwatorzy paczek
        lastPack = null;
    }

    /**
     * Dodaje nasłuchiwacza informującego o zakończeniu wczytywania paczki.
     * @param listener obiekt nasłuchujący
     * @return this dla wygodniejszego użycia
     */
    public FreeCarousel addPackReadListener(final PackReadListener listener)
    {
        listeners.add(listener);
        return this;
    }

    /**
     * Usuwa nasłuchiwacza informującego o zakończeniu wczytywania paczki.
     * @param listener obiekt nasłuchujący
     * @return this dla wygodniejszego użycia
     */
    public FreeCarousel removePackReadListener(final PackReadListener listener)
    {
        listeners.remove(listener);
        return this;
    }

    /**
     * Odczytuje kolejne paczki aż do końca strumienia.
     * Odczytane dane w postaci całych bezbłędnie odczytanych paczek
     * umieszcza w kolejce wyjściowej.
     * Pierwsza i ostatnia paczka zostaną odrzucone, jeżeli nie były
     * całe w strumieniu.
     *
     * @return this dla wygodniejszego użycia
     * @throws IOException w przypadku błędu odczytu ze strumienia
     */
    public FreeCarousel packsRead() throws IOException
    {
        if(this.buffer == null)
            this.buffer = new InputDataBuffer(input, BUFFER_SIZE);
        lastPack = new PackInternal();
        bytesRead = pack.readData(); //odczyt pierwszego bufora
        while(bytesRead >= 0 && pack.setBufferDataStart())
        {   //ustawiony początek danych w buforze
            pack.setBufferDataEnd(); //ustawienie końca danych w buforze
            if(pack.getDataStartIdx() < pack.getDataEndIdx())
            {   //są jakieś dane do dołączenia
                if(DEBUG)
                    debugCheckDataIdx();
                //dołączenie danych kolejnego bufora do paczki
                lastPack.add(Arrays.copyOfRange(buffer.get(),
                    pack.getDataStartIdx(), pack.getDataEndIdx()));
                if(pack.isTheEnd()) //zakończenie paczki, wzięcie nowej?
                    finalizeAndChangePack();
            }
            if(pack.getDataEndIdx() == bytesRead) //odczytano cały bufor
                bytesRead = pack.readData(); //reset zakresu danych
        }
        if(DEBUG)
            debugShowPacks();
        return this;
    }

    /**
     * Kończy paczkę, informuje obserwatorów o jej zakończeniu
     * oraz rozpoczyna wczytywanie następnej.
     */
    private void finalizeAndChangePack()
    {
        lastPack.finalize(pack.getPackSize());
        output.add(lastPack); //dodanie kolejnej całej paczki
        for(PackReadListener packReadListener: listeners)
            packReadListener.packReadCompleted(lastPack);
        pack.locateNextHead(); //ustala miejsce nowego nagłówka danych
        lastPack = new PackInternal(); //tworzy nową pustą paczkę danych
    }

    /**
     * Reprezentuje nagłówek paczki danych. Przeprowadza operacje na
     * danych z nagłówka (tutaj tylko size)
     */
    private class PackHead
    {
        /**
         * Ustawia warunki dla pobrania najbliższej paczki.
         * @param finder ustala kryterium dla znalezienia nagłówka paczki
         */
        private PackHead(final PackHeadFinder finder)
        {
            this.finder = finder;
            reset(dataStartIdx = dataEndIdx = -1);
        }

        /**
         * Resetuje dane dla nowej paczki danych.
         * Ustawia indeks rozmiaru paczki na podany newSizeIdx
         * @param newSizeIdx nowy indeks rozmiaru paczki w buforze lub -1
         */
        private void reset(final int newSizeIdx)
        {
            read = 0;
            size = -1;
            sizeIdx = newSizeIdx;
        }

        /**
         * Indeks początku danych w buforze.
         * @return indeks bajtu początkowego
         */
        private int getDataStartIdx() { return dataStartIdx; }

        /**
         * Indeks za końcem danych w buforze.
         * @return indeks bajtu poza buforem (exclusive)
         */
        private int getDataEndIdx() { return dataEndIdx; }

        private int getPackSize() { return size; }

        /**
         * Ustala położenie nagłówka kolejnej paczki za końcem poprzedniej.
         */
        private void locateNextHead() { reset(dataEndIdx); }

        /**
         * Ustala początek danych do odczytu w buforze.
         * Ustawia pole bufferDataStartIdx na właściwy indeks.
         * @return false jeżeli nastąpił koniec strumienia,
         * true jeżeli udało się ustalić początek danych.
         * @throws IOException błąd wczytywania danych
         */
        private boolean setBufferDataStart() throws IOException
        {
            if(size >= 0) //w trakcie wczytywania paczki
                dataStartIdx = 0; //początek danych na początku bufora
            else if(headLocated() && headLoaded()) //nowa paczka
            {
                dataStartIdx = sizeIdx; //pocz. danych za rozm. paczki
                sizeIdx = -1;
            }
            else
                return false; //nastąpił koniec strumienia
            return true;
        }

        /**
         * Ustala koniec danych do odczytu w buforze.
         * Ustawia pole bufferDataEndIdx na właściwy indeks.
         * Ustawia zmienna isTheEnd na true jeżeli wykryto koniec paczki.
         * @return true jeżeli wystąpił koniec odczytywanej paczki
         */
        private boolean setBufferDataEnd()
        {   //pozostałe w buforze dane do wczytania
            final int bufferData = bytesRead - dataStartIdx; //może być 0
            //ilość danych potrzebna do zamknięcia paczki
            //zał. read < size, zawsze dodatnia
            final int brakująceDoPaczki = size - read;
            //zawsze false jeżeli bufferData == 0
            this.isTheEnd = brakująceDoPaczki <= bufferData;
            if(isTheEnd) //czy koniec paczki w tym buforze?
            {   //doczytujemy tylko dane brakujące do paczki
                read += brakująceDoPaczki;
                //koniec danych równy z końcem paczki
                dataEndIdx = dataStartIdx + brakująceDoPaczki;
            }
            else //paczka nie skończy się w tym buforze
            {   //doczytanie do paczki całego pozostałego bufora
                read += bufferData; //może dodać 0
                //koniec danych równy z końcem bufora
                dataEndIdx = bytesRead; //dataStartIdx + bufferData;
            }
            return this.isTheEnd;
        }

        /**
         * Przeszukuje kolejne bufory w poczukiwaniu początku paczki.
         * Zmienną sizeIdx ustawia za wczytany rozmiar = początek danych.
         * @return true jeżeli indeks rozmiaru jest znaleziony,
         * false jeżeli nastąpił koniec strumienia
         * @throws IOException błąd odczytu
         */
        private boolean headLocated() throws IOException
        {
            while(sizeIdx < 0)
                if(!sizeIdxFound())
                {
                    bytesRead = buffer.read();
                    if(bytesRead == -1)
                        return false; //koniec strumienia, koniec pobierania
                }
            return true;
        }

        /**
         * Zwraca true w przypadku znalezienia początku nagłówka w buforze.
         * Ustawia zmienną sizeIdx na indeks początku nagłówka.
         * @return true jeżeli początek nagłówka jest znaleziony
         */
        private boolean sizeIdxFound()
        {   //podstawia dla findHeadInBuffer bufory dopasowane do danych
            sizeIdx = finder.findHeadInBuffer(
                getData(buffer.get(), buffer.getReadBytes()),
                getData(buffer.get(true), buffer.getReadBytes(true)));
            return sizeIdx >= 0;
        }

        /**
         * Odczyt rozmiaru paczki poprzez kolejne odczytywane bufory.
         * Ustawia pole sizeIdx na pierwszy bajt za odczytanym rozmiarem.
         * @return true jeżeli wczytano rozmiar paczki, false jeżeli
         * nastąpił koniec strumienia.
         * @throws IOException błąd odczytu
         */
        private boolean headLoaded() throws IOException
        {
            size = 0; //przygotowanie do wczytywania
            for(int sizeSh = 0; sizeSh < Integer.SIZE; sizeSh += Byte.SIZE)
                if(sizeIdx < bytesRead) //kolejny bajt do doczytania w buforze?
                    size |= buffer.get()[sizeIdx++] << sizeSh;
                else
                {   //nie pozwala pominąć bajtu rozmiaru przez anulowanie
                    sizeSh -= Byte.SIZE; //inkrementacji licznika bajtów
                    sizeIdx = 0; //pocz. nast. bufora zawiera resztę rozmiaru
                    bytesRead = buffer.read();
                    if(bytesRead == -1)
                        return false;//koniec strumienia i pobierania
                }
            if(bigEndian) //odwrócenie bajtów rozmiaru
                size = Integer.reverseBytes(size);
            if(DEBUG)
                debugPackSize();
            return true;
        }

        /**
         * Wczytuje kolejną porcję danych do bufora.
         * Resetuje położenie danych w buforze na domyślne (wszystkie dane
         * bufora należą do bieżącej paczki).
         * Resetuje położenie nagłówka jeżeli miałby się znaleźć
         * dokładnie na końcu danych poprzedniego bufora.
         * @return ilość odczytanych do bufora bajtów danych
         * @throws IOException
         */
        private int readData() throws IOException
        {
            dataStartIdx = 0; //początek nowego bufora
            if(sizeIdx == dataEndIdx && isTheEnd)
                sizeIdx = dataStartIdx; //przenosi lokację nagłówka na począt.
            //odczyt kolejnego bufora
            return dataEndIdx = buffer.read();
        }

        /**
         * Informuje o końcu paczki.
         * @return true jeżeli w aktualnym buforze jest ostatnia część paczki.
         */
        private boolean isTheEnd() { return isTheEnd; }
        private final PackHeadFinder finder;
        private int dataStartIdx; //indeks początku danych paczki w buforze
        private int dataEndIdx; //indeks za końcem danych paczki w buforze
        private int sizeIdx; //indeks bajtu rozmiaru w bieżącym buforze
        private boolean isTheEnd; //informuje czy wykryto ostatnią część w buf.
        private int size; //rozmiar bieżącej paczki
        private int read; //ilość dotychczas odczytanych danych z paczki
    } //class PackHead

    /**
     * Reprezentuje implementację całej paczki danych odczytanej z karuzeli.
     * Jest listą tablic bajtowych o maksymalnej wielkości BUFFER_SIZE każda.
     */
    private static class PackInternal extends LinkedList<byte[]>
        implements Pack
    {
        /** Tworzy nowa pustą paczkę nie zawierającą żadnego fragmentu */
        private PackInternal() { this.packSize = 0; }

        @SuppressWarnings("UseOfSystemOutOrSystemErr")
        @Override public boolean add(byte[] e)
        {
            if(DEBUG) System.out.printf("Część %03d: %03d B (%03X)%n",
                this.size(), e.length, e.length);
            return super.add(e);
        }

        /**
         * Rozmiar paczki w bajtach.
         * @return suma rozmiaru wszystkich fragmentów
         */
        @Override public int length() { return packSize; }

        /**
         * Zapisuje w obiekcie łączny rozmiar paczki.
         * @param packSize rozmiar paczki
         */
        private void finalize(final int packSize) { this.packSize = packSize; }
        private int packSize; //rozmiar w bajtach
        private static final long serialVersionUID = 197103271110071103L;
    } //class PackCls extends LinkedList<byte[]>

    /**
     * Reprezentuje wirtualny bufor danych wejściowych posługujący się dwoma
     * buforami o podanej długości, do których wczytuje dane ze strumienia.
     * Większość operacji bazuje na symulowaniu wczytywania bufora odczytanego
     * wcześniej. Dzięki temu jest możliwe udostępnienie "przyszłych danych".
     */
    private static class InputDataBuffer
    {
        /**
         * Reprezentuje podwójny bufor danych, który jest odczytywany
         * z wyprzedzeniem jednego bufora.
         * Utworzenie bufora powoduje od razu wczytanie pierwszego bufora,
         * aby każda następna operacja wczytania odczytywała wyprzedzająco
         * jeden dodatkowy bufor danych.
         * @param input strumień wejściowy
         * @param bufferSize maksymalny rozmiar buforów roboczych
         * @throws IOException błąd odczytu ze strumienia
         */
        public InputDataBuffer(final InputStream input,
            final int bufferSize) throws IOException
        {
            this.input = input;
            buffer1 = new byte[bufferSize];
            buffer2 = new byte[bufferSize];
            internalRead();
        }

        /**
         * Zwraca zawsze najstarszy wypełniony bufor.
         * @return najstarszy bufor.
         */
        public final byte[] get() { return get(false); }

        /**
         * Zwraca bufor, który jest odczytywany później lub wcześniej zależnie
         * od prawdziwości parametru lastBuffer.
         * @param lastBuffer najnowszy wczytany bufor (wyprzedzający)
         * @return wybrany bufor
         */
        public final byte[] get(final boolean lastBuffer)
        {
            return lastBuffer == firstBuffer ? buffer1 : buffer2;
        }

        /**
         * Zwraca zawsze rozmiar najstarszego odczytanego bufora.
         * @return liczba bajtów bufora.
         */
        public final int getReadBytes() { return getReadBytes(false); }

        /**
         * Zwraca wielkość bufora odczytywanego później lub wcześniej zależnie
         * od prawdziwości parametru lastBuffer.
         * @param lastBuffer najnowszy wczytany bufor (wyprzedzający)
         * @return liczba bajtów wybranego bufora
         */
        public final int getReadBytes(final boolean lastBuffer)
        {
            return lastBuffer == firstBuffer ? bytesRead1 : bytesRead2;
        }

        /**
         * Odczytuje dane strumienia do bufora bieżącego i zwraca ilość
         * danych wcześniej odczytanego bufora zapasowego.
         * @return liczba bajtów najstarszego wypełnionego bufora
         * @throws IOException blablabla
         */
        public final int read() throws IOException { return internalRead(); }

        /**
         * Odczytuje dane strumienia do bufora bieżącego i zwraca ilość
         * danych wcześniej odczytanego bufora zapasowego.
         * Wersja dla inicjacji danych konstruktora.
         * @return liczba bajtów najstarszego wypełnionego bufora
         * @throws IOException blablabla
         */
        private int internalRead() throws IOException
        {   //wypełnia kolejny, inny bufor niż poprzednio wypełniony
            //DEBUG. Odczytywanie losowych ilości danych
            final int debugRozmBufora = DEBUG ?
                los.nextInt(BUFFER_SIZE) : BUFFER_SIZE;
            final int bytesRead = input.read(nextBuffer(), 0, debugRozmBufora);
            //ilość odczytanych bajtów przypisuje odczytanemu buforowi
            if(firstBuffer)
                bytesRead1 = bytesRead;
            else
                bytesRead2 = bytesRead;
            return getReadBytes(); //rozmiar wcześniejszego odczytu
        }

        /**
         * Zmienia bufor podstawowy na zapasowy i odwrotnie oraz
         * zwraca ten bufor jako rezultat.
         * Efektem ubocznym jest ustawienie zmiennej firstBuffer
         * zgodnie z właśnie zwróconym buforem.
         * @return bufor zgodny z ustawioną wartością firstBuffer
         */
        private byte[] nextBuffer()
        {   //uwaga, przypisanie w warunku
            return (firstBuffer = !firstBuffer) ? buffer1 : buffer2;
        }
        private final InputStream input; //strumień danych wejściowych
        private final byte[] buffer1, buffer2; //dwa bufory danych
        //zmienne stanu
        private boolean firstBuffer; //ostatnio wypełniony bufor to buffer1
        private int bytesRead1, bytesRead2; //wielkości odczytanych danych
    } //static class InputDataBuffer

    /**
     * Zwraca tablicę będącą argumentem jeżeli ilość podanych danych
     * jest nie mniejsza od jej rzeczywistej długości, albo lub krótszą
     * kopię tej tablicy jeżeli podana ilość danych jest mniejsza niż
     * jej długość.
     * Procedura unika kopiowania danych w typowej sytuacji gdy bufor
     * jest całkowicie wypełniony danymi.
     * @param dataBuffer bufor z danymi bajtowymi
     * @param dataLength rzeczywista ilość danych
     * @return
     */
    private static byte[] getData(final byte[] dataBuffer,
        final int dataLength)
    {
        return dataLength >= dataBuffer.length ?
            dataBuffer : Arrays.copyOf(dataBuffer, dataLength);
    }

    //debug
    @SuppressWarnings("UseOfSystemOutOrSystemErr")
    private void debugCheckDataIdx()
    {
        final int start = pack.getDataStartIdx();
        final int end = pack.getDataEndIdx();
        if(start <= end && start >= 0 && start <= bytesRead
            && end >= 0 && end <= bytesRead)
            return;
        else
            throw new IllegalStateException(String.format(
                "%nbytesRead=%d (%x)%n"
                + "size=%d (%x)%n"
                + "sizeIdx=%d (%x)%n"
                + "start=%d (%x)%n"
                + "end=%d (%x)",
                bytesRead, bytesRead,
                pack.size, pack.size,
                pack.sizeIdx, pack.sizeIdx,
                start, start,
                end, end));
    }

    @SuppressWarnings("UseOfSystemOutOrSystemErr")
    private void debugShowPacks()
    {
        System.out.printf("Wczytano %d paczek.%n", output.size());
        if(pack.getPackSize() >= 0)
            System.out.println(
                "Projektowany Rozmiar ostatniej, niedokończonej paczki = "
                + pack.getPackSize());
    }

    @SuppressWarnings("UseOfSystemOutOrSystemErr")
    private void debugPackSize()
    {
        //gdy rozmiar paczki w strumieniu obejmuje rozmiar nagłówka paczki
        if(pack.size < 0)
            throw new IllegalStateException("Bad input data: "
                + "Negative pack size = " + pack.size
                + String.format(
                "%nbytesRead=%d (%x)%n"
                + "size=%d (%x)%n"
                + "sizeIdx=%d (%x)%n"
                + "start=%d (%x)%n"
                + "end=%d (%x)",
                bytesRead, bytesRead,
                pack.size, pack.size,
                pack.sizeIdx, pack.sizeIdx,
                pack.dataStartIdx, pack.dataStartIdx,
                pack.dataEndIdx, pack.dataEndIdx));
        System.out.printf("Paczka %03d: %d B (%03X)%n",
            output.size(), pack.size, pack.size);
    }

    private static final Random los = new Random();
    //debug end

    /** Sposób odczytu liczb wielobajtowych w strumieniu. */
    private final boolean bigEndian; //kodowanie liczb
    private final InputStream input; //strumień wejściowy
    private final TransferQueue<Pack> output; //interfejs wyjściowy
    private final PackHead pack; //mechanizm obsługujący paczki
    private final Set<PackReadListener> listeners; //obserwatorzy paczek
    //zmienne stanu
    private PackInternal lastPack; //bieżąca zbierana paczka danych
    private InputDataBuffer buffer; //bufor danych wejściowych
    private int bytesRead; //ilość pobranych danych w buforze
} //class FreeCarousel
0

Witam

Dzieki, wielkie

Zrobilem to tak mniej wiecej jak pisałeś wczesniej,

            InputStream in = new BufferedInputStream(new FileInputStream(f));

            int bytesRead = 0;
            int znak = 0;
            int countReads = 0;
            int reszta = 0;

            int globalLen = 0;

            byte[] fst = new byte[1024];
            byte[] fst2 = new byte[1024];

            byte[] tempBuffor = new byte[1024];

            List<byte[]> byteList = new ArrayList<byte[]>();

                while ((bytesRead = in.read(fst2)) != -1) {

                    globalLen = globalLen + bytesRead;

                    System.out.println("Global"+globalLen);

                    fst = fst2;

                    countReads++;

                    //System.out.println("ID read: "+countReads+" BytesRead: " + bytesRead);

                    int startByte = 0;
                    int stopByte = 0;
                    int offset = 4;
                    int len = 0;
                    int lenCount = 0;
                    int count = 0;

                    if (znak > 1) {

                        //System.out.print("temp:"+tempBuffor.length+" ");
                        System.out.println(Arrays.toString(tempBuffor));
                        fst = concat(tempBuffor, fst2);

                        //FileOutputStream fi = new FileOutputStream(new File("e:/xdr/Pack_"+System.currentTimeMillis()+"_"+countReads+".log"));
                        //fi.write(fst);

                    }
                    System.out.println("ID read: "+countReads+" BytesUpdated: " + fst.length);

                   while (fst.length >= lenCount ) {

                        count++;

                        startByte = stopByte;

                        len = ((fst[startByte] & 0xff) << 24) + ((fst[startByte + 1] & 0xff) << 16) + ((fst[startByte + 2] & 0xff) << 8) + (fst[startByte + 3] & 0xff);

                        lenCount = lenCount + len;

                        reszta = fst.length - lenCount;

                        if (reszta < len | reszta < 4) {

                             System.out.print("ID: " + count + " Len: " + len + " LenCount: " + lenCount + " Reszta: "+reszta+" StartByte: " + startByte + " StopByte: " +fst.length);
                            System.out.println();
                             tempBuffor = Arrays.copyOfRange(fst, startByte, fst.length);
                            znak = fst.length - startByte;

                            System.out.print(count);

                        } else {

                            stopByte = len + stopByte + offset;

                             System.out.print("ID: " + count + " Len: " + len + " LenCount: " + lenCount + " Reszta: "+reszta+" StartByte: " + startByte + " StopByte: " + stopByte);
                            //System.out.print(Arrays.toString(Arrays.copyOf(Arrays.copyOfRange(fst, startByte, stopByte), 5)));
                            System.out.println();

                            byteList.add(Arrays.copyOfRange(fst, startByte, stopByte));

                            znak = 0;
                        }

                    }

                }

Tak odczytuję długość paczki, stream zawsze zaczyna sie od poczatku.

len = ((fst[startByte] & 0xff) << 24) + ((fst[startByte + 1] & 0xff) << 16) + ((fst[startByte + 2] & 0xff) << 8) + (fst[startByte + 3] & 0xff);

Poczatek streamu wyglada tak

[0, 0, 3, 60, -126, 73, 0, 32.....]

Jezeli robię rozpaczkowywanie, z pliku ze zdumpowanego streamu z socketa jest ok, a jak podepne to pod stream, lubi sie wywalić.

Mozesz podać przykład dla klasy FreeCarousel, nie bardzo sie moge połapać jak jej uzyc?

0

Przede wszystkim to mi wygląda na błąd: "reszta < len | reszta < 4". Tu powinno być "||" zamiast operatora bitowego. Jednak być może nie wpływa to na wynik.
Nie wydaje mi się z kodu, żebyś uwzględniał iż rozmiar paczki może wyjść na granicy bufora, więc za jednym zamachem całej długości nie wczytasz, albo dostaniesz wyjątek wyjścia poza zakres tablicy.
Co do sposobu wykorzystania kodu wrzuconego przeze mnie, to szkielet przykładu:

        //...
        //Trzeba ustawić w klasie FreeCarousel.BIG_ENDIAN = true
        //najlepiej przez zrobienie tego parametrem konstruktora
        //użycie
        InputStream in = new BufferedInputStream(new FileInputStream(args[1]));
        TransferQueue<FreeCarousel.Pack> out = new LinkedTransferQueue<>();
        FreeCarousel paczki = new FreeCarousel(in, out,
            new FreeCarousel.PackHeadFinder()
            {
                @Override public int findHeadInBuffer(byte[] buffer,
                    byte[] nextBuffer) { return 0; } //zawsze początek bufora
            });
        //opcjonalnie
        paczki.addPackReadListener(new FreeCarousel.PackReadListener()
        {
            @Override public void packReadCompleted(Pack completedPack)
            {   //Przykład. Notyfikacja o odczytanej paczce powinna być w
                //osobnym wątku a nie tutaj.
                System.out.println("Odczytano paczkę o długości "
                    + completedPack.length());
                //...
            }
        });
        //odczytywanie paczek, synchroniczne
        paczki.packsRead(); //czyta aż nie skończy się strumień
        //wszystkie paczki odczytane
        //...
        //robisz coś z odczytanymi paczkami w kolejce out tutaj, albo
        //zanim zakończy się packsRead() w osobnym wątku.
        //w tym drugim wypadku można skorzystać z właściwości kolejki do
        //sprawdzania kolejnych paczek lub czekania na nie, ewentualnie
        //skorzystać z dorzuconego opcjonalnie listenera
0

Dostaje wyjatek, BIG_EDIAN = true;


 File f = new File("E:/xdr/1317798719031.log");
        //ByteBuffer buff = ByteBuffer.allocate(1140960);

        // Thread t = new BytesBuff(buff);
        //t.start();

        try {

            InputStream in = new BufferedInputStream(new FileInputStream(f));

                //Trzeba ustawić w klasie FreeCarousel.BIG_ENDIAN = true
                //najlepiej przez zrobienie tego parametrem konstruktora
                //użycie

                TransferQueue<FreeCarousel.Pack> out = new LinkedTransferQueue<>();

                FreeCarousel paczki = new FreeCarousel(in, out,
                        new FreeCarousel.PackHeadFinder()
                        {
                                @Override public int findHeadInBuffer(byte[] buffer,
                                        byte[] nextBuffer) { return 0; } //zawsze początek bufora
                        });
                //odczytywanie paczek, synchroniczne

                 paczki.packsRead();
...

Exception in thread "Thread-0" java.lang.ArrayIndexOutOfBoundsException
        at java.lang.System.arraycopy(Native Method)
        at java.util.Arrays.copyOfRange(Arrays.java:2551)
        at javaapplication3.FreeCarousel.packsRead(FreeCarousel.java:136)
        at javaapplication3.Main.run(Main.java:78)
0

Wyslalem Ci teraz wiadomosc z haslem

0

W trzecim poście od góry powyżej wrzuciłem poprawiony i znacznie bardziej przejrzysty kod karuzeli wczytującej takie proste paczki jakie potrzebujesz. Dołożyłem na koniec kilka metod do debugowania, ale można je szybko wyciąć lub zgasić zmienną DEBUG.
Jedyne co się zmieni w wywołaniu konstruktora, to dodatkowy pierwszy parametr dla big endian:

FreeCarousel paczki = new FreeCarousel(true, in, out,
    new FreeCarousel.PackHeadFinder()
    {
        @Override public int findHeadInBuffer(byte[] buffer,
            byte[] nextBuffer) { return 0; } //zawsze początek bufora
    });

Wczytywane są wszystkie 634 paczki, a ponieważ ich rozmiar rzadko kiedy przekracza pół kilo, więc takiej wielkości bufor jest wystarczający. Możesz oczywiście ustawić sobie bufor dowolnej wielkości - każdy zadziała poprawnie. Byle był większy od 4 bajtów. Nie jest wykluczone, że zadziałałby też bufor 1-2 bajtowy po zdjęciu blokady minimum rozmiaru nagłówka (tu 4 bajty). Z takim szalonym rozmiarem ten kod też powinien sobie poradzić.

Przekaż pozdrowienia dla pracodawcy. ;)

0

Działa !!!! Super dzieki wielkie, pracodawca pozdrowiony ;)

1 użytkowników online, w tym zalogowanych: 0, gości: 1, botów: 0