Working with querysets and Celery

Asked

Viewed 230 times

1

Following what I had asked here, I keep trying to run my code asynchronously. I’m trying to use Celery to perform this procedure, but I’m having some problems.

I had the following:

def get(self, request):

    queryset = Model.objects.raw() # Faço um MATCH com parametros digitados pelo usuario e valores do banco.

        for res in queryset:
            processamento_dos_dados

    return resultado_do_processamento

What I did was this:

With Celery, in my tasks.py put my function:

@shared_task
def teste(queryset, variavel1, lista1, variavel2, lista2):
    for res in queryset:
        processamento_de_dados # Aqui são alguns procedimentos de comparação necessarios para minha aplicação

    return lista2

In my views.py:

from .tasks import teste
class CLASSE(APIView):

    def get(self, request):
        # Processamentos para gerar variavel1, lista1, variavel2
        queryset = Model.objects.raw() # Faço um MATCH com parametros digitados pelo usuario e valores do banco.

        lista2 = []
        lista2 = teste.delay(queryset, variavel1, lista1, variavel2, lista2)

This is where the problems begin. If I try to pass the way it is there teste.delay(variaveis), returns an error saying that queryset is not a serializable JSON object. If I remove . delay, it performs synchronously, as if not using Celery.

Researching I found this possible solution:

test = serializers.serializer('json', queryset)
lista2 = teste.delay(test, variavel1, lista1, variavel2, lista2)

However, the serialization time for JSON is out of the question (the code is stuck there a long time, I could not in any of my tests make it pass the serialization).

I have also tried to exchange . delay() for .apply_async(), however I do not know how to pass all the necessary variables within the args=.

Another solution I found was this one, but again, I do not know how to pass all the parameters necessary for the execution of the function.

As indicated in my first question, I tried to use generators to optimize my code, but I didn’t feel any difference in the execution. I followed this here, doing as follows:

queryset = Model.objects.raw() #MATCH de valores
for res in queryset.iterator():
  • Does this error only happen when you add parameters? I recently tested background worker and gave the same error when using http requests. I’ll share my implementation with Celery, use parameters with . delay().

  • So, I tested only with parameters, because I couldn’t find a way to use my function without them... I even managed to use . apply_async(), but I get the same error as Queryset. Apparently there is no way to pass queryset as parameter to Celery

  • You do the queries in the task, that should be in the task, it will take time. Do not pass queryset.

  • Meanwhile I found a post with your difficulty, and with my suggestion and with another, take a look. https://stackoverflow.com/questions/34765276/celery-raise-error-while-passing-my-queryset-obj-as-parameter

2 answers

2


You really can’t pass queryset to Celery, you need to pass a serializable object, and you’re probably using json as a serialization method, so you have to pass something that’s compatible with json.

In your case, if your query isn’t too expensive, I suggest you pass a list of integers:

from .tasks import teste

class CLASSE(APIView):
    def get(self, request):
        queryset = Model.objects.raw()
        ids = list(queryset.values_list('id', flat=True))
        teste.delay(queryset)

Note that the delay method does not return anything you can use, it is asynchronous, so its Lista2 would not bring anything useful.

Your task would look something like this:

@shared_task
def teste(ids, variavel1, lista1, variavel2, lista2):
    queryset = Model.objects.filter(pk__in=ids)
    for res in queryset:
        processamento_de_dados

There is also no point in returning anything, because it is asynchronous. Suppose your Workers are overloaded and Celery takes 1 minute to execute the task, what would be the sense in waiting 1 minute to see the result, you are using Celery precisely to avoid this ;)

If your query is expensive and you need to make it asynchronous as well, check how it is composed.

For example, if your query searches users who have logged in an arbitrary number of hours, set in the Settings or in some dynamic setting, to send a newsletter, you could pass the configuration value and leave to run the query in Celery, understand?

  • Hello @Michel Sabchuk, ok, I’m doing basically what you explained, but, I’m having trouble with the return. How do I receive the return? in case, my Lista2?

  • 1

    It does not, at least not synchronously. Remember: Celery operates asynchronously! Let’s assume a practical example: whether your task is to send a newsletter to a set of emails. I would make a data model to store the sending instances, list the desired users in this model, perform the task, and if I needed to see the result, refer to the instance I previously created, which could be updated with a new status or new information when the task is finished.

  • So I guess Celery’s not gonna solve my problem, so... The return of my data processing is a %value, in this case, a list of values, pq type, my algorithm calculates a value for each tuple returned from the database, and I go saving these values in a list, sort it and get the first value (the smallest result). I wanted to transform it asynchronously, to try to make it faster, but I see that I’m not on the right track.

  • 1

    Probably not. If I understand correctly, you have a view that processes the data and returns it in the answer. This is precisely synchronous ;) What you can study is to make your view trigger the task and at the same time mark that the task has been started. In the answer, instead of showing the result, you would show some information warning that you are being processed. When you have finished the task, if you reloaded the view, you would have the data updated and could render correctly. On the client, it could recharge periodically, or load via ajax, or even use sockets...

  • That’s right. Basically I call a view, this view searches data in the database, processes it and returns a result. I need to speed up this processing, so I thought I’d try to parallelize the execution of FOR (pq I search the data in the database through a queryset and process that queryset within a FOR, as ta there in the question), so that it ran more than one loop at a time. But apparently it’s not possible. Thank you!

1

I share my example, with Celery and Rabbitmq, and parameters when invoking tasks:

py serializers.

class SensorsSerializer(serializers.HyperlinkedModelSerializer):

    class Meta:
        model = Sensor
        fields = '__all__'
        extra_kwargs = {'raw_data': {'required': False}}

    def create(self, validated_data):
        """Custom create method to allow creating"""
        try:
            user = None
            request = self.context.get("request")
            if request and hasattr(request, "user"):
                user = request.user.id

            data = self.initial_data          
            save_sensors_rawdata.delay(data, user) # delay com parametros
        except (ValueError, TypeError) as error:
            Error.objects.create(user_id=user,raw_data_original=data, error_details="SensorsSerializer/create: "+str(error))
        finally:
            return Sensor(**validated_data)

tasks py.

@task
def save_sensors_rawdata(raw_data, user_id):
    """Save sensors Raw data"""
    try:
        Sensor.objects.create(user_id=user_id,
            raw_data=raw_data)
    except (ValueError, TypeError) as error:
        Error.objects.create(user_id=user_id,raw_data_original=raw_data, 
            error_details="SensorsSerializer/Task/save_sensors_rawdata: "+str(error))
    finally:
        return raw_data

Requirements.txt

celery==4.2.0
django-celery-beat==1.3.0

NOTE: In this M2m approach, I have sensors that perform a POST with data, create a task and respond immediately with Status 200 and the object received, since in this case I do not make changes to the response. This project had no problem being like this. That is, after receiving the data, create the task this is sent to Rabbitmq, as soon as a worker is available, picks up the task and then in this calculation various metrics, update some tables and Insert others. I understand that there are services where this approach is not possible or would not work, but in this case it works very well.

Browser other questions tagged

You are not signed in. Login or sign up in order to post.