Połączenie do bazy danych w wątkach async

0

Dzień dobry,
jako początkujący adept w rustcie, szukam pomocy, może ktoś wie, jak wybrnąć z tego scenariusza. Cel tej aplikacji jest wstępnie prosty, robimy nasłuch na notify z bazy postgresowej i jak tylko zejdzie powiadomienie, to na koniec chciałbym zrobić jakiś update/insert na bazie. Na razie, bazując na ściągawce,naklepałem tylko tyle:

#[tokio::main]
async fn main() {
    let (client, mut connection) = tokio_postgres::connect("host=localhost user=postgres dbname=postgres password=password",NoTls).await.unwrap();

    let (tx, rx) = futures_channel::mpsc::unbounded();
    let stream =
        stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!("{}", e));
    let connection = stream.forward(tx).map(|r| r.unwrap());
    tokio::spawn(connection);

    client
        .batch_execute(
            "LISTEN test_notifications;"
        )
        .await
        .unwrap();

    let notifications = rx
        .filter_map(|m| {

            match m {
            AsyncMessage::Notification(n) =>{
                println!("Match: {:?}",n);
                // tutaj próbuje wcisnąć cały tzw. "proces biznesowy" i wszystko jest ok, dopóki nie muszę tych wyników zapisać na bazie danych
                futures_util::future::ready(Some(n))
            },
            _ => futures_util::future::ready(None),
        }})
        .collect::<Vec<_>>()
        .await;
}

Z samym crudem nie mam problemów, żeby sobie dłubać, ale niestety odbijam się od takiej rzeczy, że jak już wpadłem w ten "async", to nie mogę z niego wyjść, próba przerobienia tego filter_mapa na asynchroniczny kończyła się tym, że pierwszy sygnał był odebrany i w zasadzie dalej już się nic nie działo, bo pewnie coś przeoczyłem, co jest istotne.
Także za wszelkie code review i sugestie, podpowiedzi z góry dziękuje.

2

(...) filter_mapa na asynchroniczny kończyła się tym, że pierwszy sygnał był odebrany i w zasadzie dalej już się nic nie działo

A co innego ma się dziać? Musisz to w pętli wykonywać. W tym momencie twój kod "async" działa tak, że przychodzi powiadomienie, konsumuje to, zwraca wynik i kod idzie dalej. Musisz zrobić np. loop w osobnym procesie i niech sobie chodzi i nasłuchuje.

Edit.
W sumie i tak collect jest blokującą operacja i dopóki rx się nie zamknie, to będzie czekać i nie zwróci nic do notifications

0

A co innego ma się dziać?

No myślałem, że mogę np. zrobić nowe połączenie wewnątrz tego match i tam po prostu domknąć formalnie te proces, ale to też jakoś nie zdaje egzaminu, choć prędzej, źle to piszę.

To ewentualnie jakbyś mi podpowiedział, jak pozbyć się collect i utrzymać to w pętli przy życiu, bo jak go tylko wywalę, to wykonuje się raz wszystko i potem już rx zwraca jako rozłączony.

0

Nie znam się więc się zapytam: a są inne wątki niż async (w Ruście czy nieruście)?

0

No możesz zrobić tak wprost, że w kółko kopać LISTEN notify_name, ale nie wiem, czy to jest w porządku na dłuższą metę.
Z tego co zauważyłem, przynajmniej w tym tokio-postgres, to jak chcesz użyć bez async, to w sumie pod spodem i tak jest zrobiony asynchronicznie tylko jakoś po swojemu poblokowali to do jednego wątku, ale zastrzegam, że tutaj mogę być chaotyczny w tej teorii.

https://docs.rs/postgres/latest/postgres/index.html#implementation

2

@BartoSAS Najlepiej "ukradnij" kod tutaj https://docs.rs/sqlx-postgres/0.7.4/src/sqlx_postgres/listener.rs.html#25.
PgListener to wszystko czego potrzebujesz IMO

Edit.
Robisz sobie connect_with, potem wołasz listen z parametrem test_notifications i w osobnym wątku w pętli coś takiego listener.try_recv().await

0

To sql-x jest wygodniejszy niż tokio? Bo google nic innego nie podpowiadało pod hasłem "postgres notify rust" 😕

1

Tokio to runtime. Mozesz użyć kodu który już napisałeś. Chodzi mi ogólnie o koncept, jaki jest w przykładzie użyty.

1 użytkowników online, w tym zalogowanych: 0, gości: 1