About Me

My photo
Mumbai, Maharastra, India
He has more than 7.6 years of experience in the software development. He has spent most of the times in web/desktop application development. He has sound knowledge in various database concepts. You can reach him at viki.keshari@gmail.com https://www.linkedin.com/in/vikrammahapatra/ https://twitter.com/VikramMahapatra http://www.facebook.com/viki.keshari

Search This Blog

Sunday, November 3, 2019

Python: Multiprocessing with 4 Core CPU

We have some processing to perform on existing Dataframe, where will try to add few columns on the bases of existing columns values, this we will try to do it serially and then compare the output performance with multiprocessing

Below we have a definition in the file, which takes a dataframe as an input and add 9 further columns in it based on existing column value and some mathematical expression.

import pandas as pd
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import multiprocessing as mp
import time


def operationonDFs(dfinput):
    dfinput['z'] = dfinput.apply(lambda newCol: 2 * newCol['x'] , axis = 1)
    dfinput['a'] = dfinput.apply(lambda newCol: 3.4 * newCol['x'] , axis = 1)
    dfinput['b'] = dfinput.apply(lambda newCol: 2.1 * newCol['x'] , axis = 1)
    dfinput['c'] = dfinput.apply(lambda newCol: 5.9 * newCol['x'] , axis = 1)
    dfinput['d'] = dfinput.apply(lambda newCol: 7.3 * newCol['x'] , axis = 1)
    dfinput['d'] = dfinput.apply(lambda newCol: 3.3 * newCol['x'] , axis = 1)
    dfinput['e'] = dfinput.apply(lambda newCol: 7.1 * newCol['x'] , axis = 1)
    dfinput['f'] = dfinput.apply(lambda newCol: 4.3 * newCol['x'] , axis = 1)
    dfinput['g'] = dfinput.apply(lambda newCol: 5.3 * newCol['x'] , axis = 1)
    return dfinput

Now in second part I have created a dataframe and call the def operationDfs by passing the newly created dataframe, here if you see the we have used time package to record the total execution time to run the entire program.

if __name__ ==  '__main__':
    start_time = time.time()   

    dfinput = pd.DataFrame({'x':range(1,100000),
                       'y':range(1,100000)})

    df = operationonDFs(dfinput)
   
    print(df)
    print("--- %s seconds ---" % (time.time() - start_time))

Lets see the output

C:\Users\Atoshi\mypy>python operationonDFWOMP.py
           x      y       z         a         b         c         d         e         f         g
0          1      1       2       3.4       2.1       5.9       3.3       7.1       4.3       5.3
1          2      2       4       6.8       4.2      11.8       6.6      14.2       8.6      10.6
2          3      3       6      10.2       6.3      17.7       9.9      21.3      12.9      15.9
3          4      4       8      13.6       8.4      23.6      13.2      28.4      17.2      21.2
4          5      5      10      17.0      10.5      29.5      16.5      35.5      21.5      26.5
...      ...    ...     ...       ...       ...       ...       ...       ...       ...       ...
99994  99995  99995  199990  339983.0  209989.5  589970.5  329983.5  709964.5  429978.5  529973.5
99995  99996  99996  199992  339986.4  209991.6  589976.4  329986.8  709971.6  429982.8  529978.8
99996  99997  99997  199994  339989.8  209993.7  589982.3  329990.1  709978.7  429987.1  529984.1
99997  99998  99998  199996  339993.2  209995.8  589988.2  329993.4  709985.8  429991.4  529989.4
99998  99999  99999  199998  339996.6  209997.9  589994.1  329996.7  709992.9  429995.7  529994.7

[99999 rows x 10 columns]
--- 24.10855269432068 seconds ---
So it took 24 second to execute the program.

Lets re-write the program to use multiprocessing, first importing multiprocessing package in our program and checking the number of cpu core available in our system using cpu_count method of multiprocessing.

import pandas as pd
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import multiprocessing as mp
import time

def operationonDFs(dfinput):
    dfinput['z'] = dfinput.apply(lambda newCol: 2 * newCol['x'] , axis = 1)
    dfinput['a'] = dfinput.apply(lambda newCol: 3.4 * newCol['x'] , axis = 1)
    dfinput['b'] = dfinput.apply(lambda newCol: 2.1 * newCol['x'] , axis = 1)
    dfinput['c'] = dfinput.apply(lambda newCol: 5.9 * newCol['x'] , axis = 1)
    dfinput['d'] = dfinput.apply(lambda newCol: 7.3 * newCol['x'] , axis = 1)
    dfinput['d'] = dfinput.apply(lambda newCol: 3.3 * newCol['x'] , axis = 1)
    dfinput['e'] = dfinput.apply(lambda newCol: 7.1 * newCol['x'] , axis = 1)
    dfinput['f'] = dfinput.apply(lambda newCol: 4.3 * newCol['x'] , axis = 1)
    dfinput['g'] = dfinput.apply(lambda newCol: 5.3 * newCol['x'] , axis = 1)
    return dfinput


if __name__ ==  '__main__':
    start_time = time.time()
   
    cpu_count = mp.cpu_count()
    no_of_split = 50

    dfinput = pd.DataFrame({'x':range(1,100000),
                       'y':range(1,100000)})

    dfinput_split = np.array_split(dfinput, no_of_split)
    pool = mp.Pool(cpu_count)
    df = pd.concat(pool.map(operationonDFs, dfinput_split))
    pool.close()
    pool.join()

    print(df)
    print("--- %s seconds ---" % (time.time() - start_time))

We have used pool class, the pool distributes the tasks to the available processors using a FIFO scheduling. It works like a map reduce architecture. It maps the input to the different processors and collects the output from all the processors.
The input to the pool.map method is definition which we want to execute in parallel with the splited dataframe.
Once the execution is finished, it joins all the output to form a single set of dataframe.

Let’s see how much we save with this architecture:

 C:\Users\Atoshi\mypy>python operationonDF.py
           x      y       z         a         b         c         d         e         f         g
0          1      1       2       3.4       2.1       5.9       3.3       7.1       4.3       5.3
1          2      2       4       6.8       4.2      11.8       6.6      14.2       8.6      10.6
2          3      3       6      10.2       6.3      17.7       9.9      21.3      12.9      15.9
3          4      4       8      13.6       8.4      23.6      13.2      28.4      17.2      21.2
4          5      5      10      17.0      10.5      29.5      16.5      35.5      21.5      26.5
...      ...    ...     ...       ...       ...       ...       ...       ...       ...       ...
99994  99995  99995  199990  339983.0  209989.5  589970.5  329983.5  709964.5  429978.5  529973.5
99995  99996  99996  199992  339986.4  209991.6  589976.4  329986.8  709971.6  429982.8  529978.8
99996  99997  99997  199994  339989.8  209993.7  589982.3  329990.1  709978.7  429987.1  529984.1
99997  99998  99998  199996  339993.2  209995.8  589988.2  329993.4  709985.8  429991.4  529989.4
99998  99999  99999  199998  339996.6  209997.9  589994.1  329996.7  709992.9  429995.7  529994.7

[99999 rows x 10 columns]
--- 13.999533653259277 seconds ---

That’s really a good save compare to serial method.
Serial Method: 24 sec
With Multiprocessing: 13 sec


Data Science with…Python J
Post Reference: Vikram Aristocratic Elfin Share

No comments:

Post a Comment