Difficulty using tasks and create_server together (asyncio)

Asked

Viewed 17 times

1

I am working on a tool that receives device connections, reads your messages and publishes these to a Rabbitmq queue.

For the Rabbitmq queues I used aio_pika (simplest Pika) and created 2 different functions. A Sub who keeps reading a queue, and a Post who puts messages in another queue. To run and keep both running, I used the method create_task. This apparently works well.

My problem is when I try to add the "servers" to the loop. The amount of these can vary from environment to environment, so I have an API that returns me a list of servers that I should start. What I’m trying to do is use a loop to start these servers with the method create_server.

My server is nothing more than an extension of asyncio. Protocol...

Below follows a simplified version of my code and the error I get when trying to run it.

# -*- coding: utf-8 -*-

import sys
import json
import asyncio
from myapp import log, amqp
from myapp.protocol import test

LOG = log.create_logging_system('myapp')


def auth():
    # TODO: REQUEST!
    response = json.loads(
        '{"type":"auth","data":{"status":"valid","amqp":{"user":"myapp_cli","password":"password","virtual_host":"test"},"central":[{"name":"teste","port":"9090","protocol":"test"}]}}')
    
    return response['data']['central'], response['data']['amqp']


async def start_central_ip(port, loop, name, protocol):
    logger = log.create_logging_system(name)
    # Lambda to use arguments
    def central(n, l): return test.Central(name, logger)
    server = await loop.create_server(protocol_factory=central,
                                      host='0.0.0.0',
                                      port=port)
    await server.serve_forever()


if __name__ == '__main__':
    #   Auth
    centrals, amqp_params = auth()
    #   Loop do Async
    loop = asyncio.get_event_loop()
    #   RabbitMQ
    rabbit_url = amqp.build_url(amqp_user=amqp_params['user'],
                                amqp_pass=amqp_params['password'],
                                virtual_host=amqp_params['virtual_host'])
    loop.create_task(amqp.consumer(rabbit_url))
    loop.create_task(amqp.publisher(rabbit_url))
    for central in centrals:
        protocol = central['protocol']
        start_central_ip(central['port'], loop, central['name'], protocol)
    loop.run_forever()

Error:

myapp.py:49: RuntimeWarning: coroutine 'start_central_ip' was never awaited
  start_central_ip(central['port'], loop, central['name'], protocol)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
No answers

Browser other questions tagged

You are not signed in. Login or sign up in order to post.