Przekazanie async coroutine z innego wątku

0

Hej
Przygotowuję serwis, który ma nasłuchiwać na eventy z Kafki. Dla każdego z Kafka topików uruchamiam listenera w osobnym wątku. W momencie otrzymania eventu, Kafka listener ma wywołać REST. Ponieważ REST może chwilę trwać, nie chcę blokować listenera tylko kontynuować obsługę kolejnych eventów (odpowiedź na RESTa również nie bardzo mnie interesuje - co by ją tylko wyprintować). Wydaje się to znakomitym usecasem dla Asyncio. Mój kod wygląda następująco:

W 'main' pobieram referencje do event_loop i przekazuję ją do każdego z Kafka listenerów

def run_consumer(topic, event_mapping, loop, kafka_config):
   threading.Thread(target=MyKafkaConsumer(
       topic=topic,
       event_mapping=event_mapping,
       loop=loop,
       **kafka_config
   ).consume).start()

if __name__ == '__main__':
   config = Config()
   loop = asyncio.get_event_loop()
   [run_consumer(topic, event_mapping, loop, config.get_kafka()) for topic, event_mapping in config.get('mappings').items()]
   loop.run_forever()

Następnie, w kodzie listenera czekam na eventy z Kafki, i w momencie otrzymania eventu, wrzucam go na event_loop (fragmenty kodu, żeby przedstawić ideę):

async def asynchronous_fetch(url):
    http_client = AsyncHTTPClient()
    response = await http_client.fetch(url)
    return response.body

class MyKafkaConsumer:
    def __init__(self, bootstrap_servers, schema_registry_url, group_id, topic, event_mapping, loop, **kwargs):
        self.loop = loop

    def consume(self):
            while(True):
                try:
                    message = self.consumer.poll()
                    asyncio.run_coroutine_threadsafe(asynchronous_fetch(message.url), self.loop)
                except Exception as e:
                    logging.exception(e)
                finally:
                    self.consumer.commit()

Niestety, REST nigdy nie jest wywołany. W powyższym przypadku próbuję to zrobić za pomocą tornado.httpclient, ale próbowałem również z httpx i aiohttp. Spodziewałem się, że przekazanie asynchronous_fetch poprzez asyncio.run_coroutine_threadsafe spowoduje wykonanie asynchronous_fetch. We wszystkich przypadkach moja coroutine nie jest wykonywana. Co robię źle?

0

Coroutina wywalała się na jakimś trywialnym błędzie przy kodowaniu nagłówków do stringa, ale w żaden sposób o tym nie informowała. Trzeba było ustawić debug=True na event_loop i debugować funkcje asynchroniczną, żeby pojawił się print z errorem.

1 użytkowników online, w tym zalogowanych: 0, gości: 1