python - Tracking progress of joblib.Parallel execution -


Is there an easy way to track overall progress of execution?

Running execution is made up of thousands of jobs, which I want to track and record in a database However, to do this, whenever parallel ends a task, I need to execute a callback , By reporting how much remaining jobs left

I have completed a similar task with Python's Stadtib Multiprocessing. By launching a thread that records the number of pending jobs in the pool's job list.

Looking at the code, parallel inherited pool, so I thought I could pull the same trick, but I do not think use this list, and I do not understand that it The internal situation is "read" in any other way.

The document you link to indicates that the parallel one The alternative progress meter is. This code is implemented using the callback keyword code multiprocessing.Pool.apply_async :

  # This is a dispatch function of self._lock Inside .acquire () job = self._pool.apply_async (secure function), args, quarges, callback = callback (self-eniicitated, self) self._jobsappend (job) self.n_dispatched + = 1  Callback class (object): "" "Callback is used by parallel: it is used for progress reporting To add data, and to process data, "def" __init __ (self, index, parallel): self.parallel = parallel self.index = index def __call __ (self, outside): self.parallel.print_progress ( Self.index) if self.parallel._original_iterable: self.parallel Disp print_progress (self, index): elapsed_time = time.time ()  

and here print_progress :

  - self._start_time # This is an approximation code. Only the 'verbose' time to print messages # The challenge is that if we do not know the line length of ourselves then _origosity_filter: If _verbosity_filter (index, self verbos): return self_print ('% 3i jobs gone: passed:% s', (index + 1, short_format_time), elapsed_time)) Other: # we queue_length = self .n_dispatched Removed Ends # We always display the first loop, if not index == 0: # Display the number of remaining objects after sending the message, the cursor is 0 cursor (= queue_length - index + 1 - self._pre_dispatch_amount) frequency = (queue_length // self.verbose) + 1 is_last_item = (Index + 1 = queue_length) if (is_last_item or cursor% frequency): return balance_time = (elapsed_time / (index + 1) * (self.n_dispatched - index - 1.)) self._print ('out of% 3i Happened% 3i | past:% s remaining:% s', (index + 1, queue_long, short-format_t IME), short_format_time (remaining_time),))  

By the way they implement it, it is strange, to be honest - it seems that the task will always be completed in this sequence that they started again index variable which is < Code> print_progress is actually started, at that time only self.n_dispatched variable. So the first work started will always be completed with a index 0, even if the third job is completed first. It also means that they do not actually track the number of jobs complete so there is no frequency variable to monitor for you.

I think your best is to make the best callback class, and the monkey patch parallel:

  from math import from sqrt, import from job leeb import Determines parallel, delayed class callback (object): complete = default detection (int) def __init __ (self, index, parallel): self.index = index self.parallel = parallel def __call __ (self, index): callback .completed [self.parallel] + = 1 print ("done with {}}" format (callback collapsed [self equivalent Parallel (n_jobs = 2) (range (sqrt)). If it is not possible, (I ** 2) for category (10)))  

Output:

 done with  done with 2 done with 2 3 in 4 with 4 done with 5 done with 7 done with 8 did 9 9 with 9 [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]  

In this way, whenever any work is completed, your callback is called.


Comments

Popular posts from this blog

java - org.apache.http.ProtocolException: Target host is not specified -

java - Gradle dependencies: compile project by relative path -

ruby on rails - Object doesn't support #inspect when used with .include -