1
I am working on a tool that receives device connections, reads your messages and publishes these to a Rabbitmq queue.
For the Rabbitmq queues I used aio_pika (simplest Pika) and created 2 different functions. A Sub who keeps reading a queue, and a Post who puts messages in another queue. To run and keep both running, I used the method create_task
. This apparently works well.
My problem is when I try to add the "servers" to the loop. The amount of these can vary from environment to environment, so I have an API that returns me a list of servers that I should start. What I’m trying to do is use a loop to start these servers with the method create_server
.
My server is nothing more than an extension of asyncio. Protocol...
Below follows a simplified version of my code and the error I get when trying to run it.
# -*- coding: utf-8 -*-
import sys
import json
import asyncio
from myapp import log, amqp
from myapp.protocol import test
LOG = log.create_logging_system('myapp')
def auth():
# TODO: REQUEST!
response = json.loads(
'{"type":"auth","data":{"status":"valid","amqp":{"user":"myapp_cli","password":"password","virtual_host":"test"},"central":[{"name":"teste","port":"9090","protocol":"test"}]}}')
return response['data']['central'], response['data']['amqp']
async def start_central_ip(port, loop, name, protocol):
logger = log.create_logging_system(name)
# Lambda to use arguments
def central(n, l): return test.Central(name, logger)
server = await loop.create_server(protocol_factory=central,
host='0.0.0.0',
port=port)
await server.serve_forever()
if __name__ == '__main__':
# Auth
centrals, amqp_params = auth()
# Loop do Async
loop = asyncio.get_event_loop()
# RabbitMQ
rabbit_url = amqp.build_url(amqp_user=amqp_params['user'],
amqp_pass=amqp_params['password'],
virtual_host=amqp_params['virtual_host'])
loop.create_task(amqp.consumer(rabbit_url))
loop.create_task(amqp.publisher(rabbit_url))
for central in centrals:
protocol = central['protocol']
start_central_ip(central['port'], loop, central['name'], protocol)
loop.run_forever()
Error:
myapp.py:49: RuntimeWarning: coroutine 'start_central_ip' was never awaited
start_central_ip(central['port'], loop, central['name'], protocol)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback