Concurrent.Utures and Class Object

Asked

Viewed 81 times

1

I am trying to create a class (Processor) that runs a series of functions to get results from a State-to-State API. (STATE)

1- READ data from a table.
2-SEND request for the API to process.
3- WAIT Note to be complete.
4- DOWNLOAD results when complete.

I would like to implement concurrence so that it can run to several states at the same time. I was tempted something similar to what Luciano Ramalho implements with the lib Concurrent.Future in Fluent Python .( Chapter 17. Concurrency with Futures) but I can’t compete

ERROR:
    res = executor.map(run_single(), sorted(states))
    TypeError: run_single() missing 1 required positional argument: 'state'

But when I include state in single run only runs sequentially state by state.

I’m using python 3.5.~ Part of my code below - Grateful for any and all guidance.

        import datetime
        import os
        import time
        import xml.etree.ElementTree as ET
        import psycopg2
        import psycopg2.extras
        import requests
        from concurrent import futures


        class Processor(object):    
            database = 'db'
            user = 'user'
            password = 'password'

            def __init__(self, state):
                self.base_url = 'api.com'
                self.state = state
                self.status = None
                self.fetch_size = 1000000
                self.job_id = ''

                self.send_requests(self.state)

            def send_requests(self, state):
                payload = dict(params)

                # connection to postgres db table , fetch data.
                conn = psycopg2.connect(
                    "dbname='%s' user='%s' host='host' password='%s'" % (database, user, password))
                cursor = conn.cursor('%s' % state, cursor_factory=psycopg2.extras.DictCursor)
                sql = ("select * from table where state='%s' limit 1" % state)
                cursor.execute(sql)

                try:
                    # function to build/send requests fetching data by chunks of fetch_size limited.
                    while True:
                        fetchs = cursor.fetchmany(self.fetch_size)
                        if len(fetchs) != 0:
                            chunk = ''
                            for fetch in fetchs:
                                try:
                                    row = fetch[0] + '|' + fetch[1] + '|' + fetch[2] + '\n'
                                    chunk += row
                                except:
                                    print('>ERROR ->', fetch[0])
                                    pass
                            header = 'header\n'
                            row = requests.post(self.base_url, params=payload, data=header + chunk)
                            response = row.text
                            print('-> %s: response job_xml: %s' % (state, response))
                            root = ET.fromstring(response)
                            self.job_id = root.find('Response/MetaInfo/RequestId').text
                            print('-> %s: response job_id: %s' % (state, self.job_id))
                            self.check_jobs(state)
                        else:
                            break
                except Exception as e:
                    print(e)
                    pass

                # Function checking the status of the job_id if completed download() the results if not wait and retry.
            def check_jobs(self, state):
                print('->>> %s: Checking job %s  <<<-' % (state, self.job_id))
                status = self.get_status(self.job_id)
                if status == 'completed':
                    print('-> %s: status: %s, job_id: %s  ' % (state, status, self.job_id))
                    self.download_results(self.job_id)
                else:
                    time.sleep(4)  # 480 large million requests
                    self.check_jobs(state)

                # Function to return status of job_id
            def get_status(self, job_id):
                url_status = 'url that get status of job_id'
                req_status = requests.get(url_status)
                root = ET.fromstring(req_status.text)
                status = root.find('Response/Status').text
                return status

                # Function download the results
            def download_results(self, job_id):
                url_download = 'url to download job_id'
                print('-> %s: downloading jod_id: %s @ URL [%s]' % (self.state, job_id, url_download))
                r = requests.get(url_download, stream=True)

                # create folder for state if not exists
                download = os.path.join(self.responses_folder, self.state)
                if not os.path.exists(download):
                    os.makedirs(download)

                # Save result to folder
                save_as = os.path.join(download, str(job_id + '.zip'))
                with open(save_as, 'wb') as f:
                    for chunk in r.iter_content(chunk_size=1024):
                        if chunk:
                            f.write(chunk)

                print('-> %s: downloaded job_id: %s @ folder [ %s ] ' % (self.state, job_id, save_as))
                self.delete_results(job_id)


if __name__ == "__main__":
    states = ['AK', 'AL', 'AR']
    workers = 20

    def run_single(state):
        Processor(state)

    for state in states:
        with futures.ThreadPoolExecutor(workers) as executor:
            res = executor.map(run_single(), sorted(states))

1 answer

1


Your problem is that when creating a task in your Executor you are calling the function, when you use the expression run_single(). In this case the question is not very well posed, or that has fragments of your code that do not allow to see the problem itself - are part, the program would call the function and the return value of it would be passed to the call to the Executor.map - and hence its error: the function requires a positional parameter (state), which is not passed.

What you should do is pass the function itself as a parameter to the Executor - which then takes care of calling it within a separate thread. For this, you should not put parentheses after the function name (which in Python causes it to be treated like any other object and simply passed as parameter).

In short, just rewrite your executor call line as:

res = executor.map(run_single, states)

Notice that I removed the sorted also - the executor cannot guarantee the order in which each of the tasks will be processed, then try to order the Entry list can only give you the false impression that the tasks would be performed in some special order. It is better to make clear that this order is arbitrary.

(Class name is also wrong inside run_single - must be Processor, nay Geocode)

And finally, Voce is creating a new connection to the bank for each task that is running - in general this is not a good practice - creating a connection to the bank is a relatively costly task - it is best to use some kind of connection pool, or use only a global connection and create a cursor from it within its class.

  • thanks for all the tips - I’ll give a look at coding and try to implement your suggestion , thankful tb by the tip of the pools Connection I will read on the subject . I’ll be right back.

  • jsbueno - worked however expected to see several python.exe run at the same time, I checked only 1. And in the end the runtime was no big surprise - it was equal to or less than sequential - some hint of what might be happening ?

  • You’re using Threadpooleecutor - you’re going to have a single process. To run in multiple processes, use the Processpoolexecutor (also in Concurrent.Futures). Of course I can’t just look at your code and identify the bottlenecks - but apparently it’s code with abstaining IO that should have benefited from running on parallel threads - except for the use of the Elementtree library for XML processing: it is in pure Python, and its execution ends up not being parallelizable in threads. If that’s the problem, using Processpoolexecutor might be enough.

Browser other questions tagged

You are not signed in. Login or sign up in order to post.