Java jest językiem obiektowym, a każdy wątek też jest obiektem. Metody nie są samodzielnym bytem, za to są nimi zadania typu Runnable, Callable<V>, ForkJoinTask<V>, ponieważ po utworzeniu są obiektami. Żeby wrzucić wykonanie każdej metody w osobnym wątku tworzysz tyle pod-zadań1 jednego obiektu ile potrzebujesz i w nich umieszczasz kod wykonawczy takich metod. Twoje metody redukują się wtedy do wykorzystania jednego z executorów do wykonania tych zadań.
Jeżeli chcesz:
- wymusić jednowątkowość (nawet na wielordzeniowym CPU), to używasz
- Executors.newSingleThreadExecutor(),
- wymusić konkretną liczbę wątków (bez względu na typ maszyny, w tym jeden wątek), to używasz
- Executors.newFixedThreadPool(),
- wymusić nieograniczoną wielowątkowość (nawet na jednordzeniowym CPU), to używasz
- Executors.newCachedThreadPool(),
- żeby te same zadania były wykonywane wielokrotnie przez te same wątki, to używasz
- Executors.newScheduledThreadPool()2,
- użyć całej mocy maszyny jedno/wieloprocesorowej do jak najszybszego wykonania zadań, to używasz
- new ForkJoinThreadPool()3.
Dzisiaj odchodzi się od bezpośredniego sterowania przez aplikację wątkami bo jest to nieoptymalne i wrażliwe na błędy. Te pule wątków są zupełnie wystarczające, żeby objąć wszystko co do tej pory we współbieżności wymyślono.
Co do Twojej potrzeby, to zauważ, że wielowątkowe wykonanie połączenia z bazą oraz wysyłanie do niej danych może spowodować, że wysyłka może nastąpić w czasie przeprowadzania połączenia. Jednak możliwość wysyłania pojawi się dopiero wtedy kiedy zadanie połączenia zostanie zakończone i to z sukcesem.
Krótko mówiąc możliwość rozpoczęcia tych zadań zależy od udanego zakończenia innych zadań. Nie wiem więc czy przemyślałeś to co chcesz osiągnąć.
Nie musisz też używać nieczytelnych anonimowych klas wewnętrznych wewnątrz metod (tak jak to pokazałeś z wątkami w pseudokodzie). Wszystkie zadania mogą być jawnymi klasami wewnętrznymi ze swoimi referencjami w postaci pól w obiekcie.
W metodach klasy można je wtedy przejrzyście i elegancko używać.
1 - Runnable, Callable<V> lub ForkJoinTask<V> (RecursiveAction/RecursiveTask<V>)
2 - w przerwach między wykonaniami zadania każdy wątek będzie w stanie idle/halt/lock.
3 - dostępna tylko w Javie 1.7; całkowicie zrywa powiązanie zadania z wątkiem/procesorem.
Szkielet przykładu użycia puli wątków:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class MySqlConnect
{
public interface SendProgress
{
int getProgress();
}
public MySqlConnect()
{
//...
executor = Executors.newFixedThreadPool(3);
connected = tryConnect();
}
private Future<Boolean> tryConnect()
{
return executor.submit(mysql_test_connect);
}
public void dispose() { executor.shutdown(); }
//...
public Future<Boolean> send(final byte[] data) //przykład danych
{
final SendTask task = new SendTask(data);
sending.add(task);
return executor.submit(task);
}
public List<SendProgress> getTaskProgress()
{
List<SendProgress> progress = new ArrayList<>(sending.size());
for(SendTask sendTask: sending)
progress.add(sendTask);
return progress;
}
/**
* Get result from asynchronic task.
* Waiting for the task end.
* @param <T> result type
* @param future asynchronic result
* @return synchronic result
*/
public <T> T getResultFromFuture(final Future<T> future)
{
T result = null;
try { result = future.get(); } //possible to wait for task end
catch(InterruptedException | ExecutionException
| CancellationException ignore)
{ result = null; }
return result;
}
//jawna
private class SendTask implements Callable<Boolean>, SendProgress
{
private SendTask(final byte[] data) { this.data = data; }
@Override public Boolean call() throws Exception
{
boolean result = false;
if(isConnected())
{
System.out.println("[SendTask] Bytes to send: " + data.length);
//polecenia...
++progress;
//...
}
return Boolean.valueOf(result);
}
@Override public int getProgress() { return progress; }
private int progress = 0;
private final byte[] data;
}
//anonimowa z jawną referencją
private Callable<Boolean> mysql_test_connect = new Callable<Boolean>()
{
@Override public Boolean call() throws Exception
{
boolean result = false;
//polecenia...
return Boolean.valueOf(result);
}
};
//...
private boolean isConnected()
{
final Boolean result = getResultFromFuture(connected);
return result != null && result.booleanValue();
}
//lista do uzyskiwania informacji o postępie
private final List<SendTask> sending = new ArrayList<>();
private Future<Boolean> connected;
private ExecutorService executor;
}