Consumer Rabbitmq is not working

Asked

Viewed 21 times

0

I created a simple program of registration and orders with customers and products. My goal was to implement a CQRS application so that when adding a new order a message would be posted on Rabbit and consumed to add that request to another non-sql database that would be used to display the orders on the user screen.

The whole program is working well and the post part in the Rabbit queue is ok, but it is never consumed. The class that consumes the message today is like this:

public class AdicionarPedidoCommandHandler
    {
        private readonly RabbitMqConfiguration _configuration;
        private readonly IPedidoDocumentMapper _pedidoDocumentMapper;
        private readonly IPedidoMongoRepository _repository;
        private readonly ConnectionFactory _factory;
        public AdicionarPedidoCommandHandler(IOptions<RabbitMqConfiguration> option, IPedidoDocumentMapper pedidoDocumentMapper, IPedidoMongoRepository repository)
        {
            _configuration = option.Value;

            _factory = new ConnectionFactory
            {
                UserName = _configuration.UserName,
                Password = _configuration.Password,
                VirtualHost = _configuration.VirtualHost,
                HostName = _configuration.Host
            };

            _pedidoDocumentMapper = pedidoDocumentMapper;
            _repository = repository;
        }
        //public void ExecuteAsync()
        //{
        //    using (var bus = RabbitHutch.CreateBus("host=localhost;virtualHost=localhost"))
        //    {
        //        bus.PubSub.Subscribe<AdicionarPedidoCommand>("adicionar", msg => Handle(msg));
        //    }
        //}
        public Task ExecuteAsync()
        {
            using (var connection = _factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(
                                queue: _configuration.Queue,
                                durable: true,
                                exclusive: false,
                                autoDelete: false,
                                arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (sender, eventArgs) =>
                    {
                        var contentArray = eventArgs.Body.ToArray();
                        var contentString = Encoding.UTF8.GetString(contentArray);
                        var message = JsonConvert.DeserializeObject<AdicionarPedidoCommand>(contentString);

                        Handle(message).Wait();
                        channel.BasicAck(eventArgs.DeliveryTag, false);
                    };

                    channel.BasicConsume(_configuration.Queue, true, consumer);
                }

            }
            return Task.CompletedTask;
        }

        public async Task Handle(AdicionarPedidoCommand command)
        {
            var pedidoDocument = await _pedidoDocumentMapper.ConverterAdicionar(command);

            try
            {
                await _repository.CreateAsync(pedidoDocument);
            }
            catch
            {
            }
        }
    }

I made this program based on the basic tutorial of rabbitmq, but it seems that there is something missing. Can someone help me figure out what’s going wrong?

  • There is a lack of information for someone to be able to help you. What you mean by "it doesn’t seem to be working"?

  • The message is stuck in the queue and at no time is consumed, not even saved in the non-sql bank

1 answer

0

Man, there’s some good opportunities in these 65 lines:

  1. lack of Prefetch / Qos configuration. Or only 1 consumer will pick up all queue messages and you will have no scale.

  2. Your Add-On class is creating a connectionfactory, creating a Connection and creating a model. The life cycle is completely wrong here.

  3. You’re using an Asynchronous Handle method, and "to function," used a Wait in the call to the Handle. You should have set up in the connectionfactory, that the Connection would work with async (Dispatchconsumersasync ) and use Asynceventingbasicconsumer to work with async/await.

  4. When calling the Basicconsume, you used autoack=true and at the same time called basicAck after the Handle call.

  5. You have not dealt with exceptions in business execution (Nack/Reject)

  6. Treated no exception in deserialization (Reject)

  7. Your Handlet has a Try/catch on Handle has a catch that does nothing.

  • cola on @rabbitmqbr from Telegram, we help you there.

Browser other questions tagged

You are not signed in. Login or sign up in order to post.