We have some processing to perform on existing Dataframe, where will try to
add few columns on the bases of existing columns values, this we will try to do
it serially and then compare the output performance with multiprocessing
Below we have a definition in the file, which takes a dataframe as an
input and add 9 further columns in it based on existing column value and some
mathematical expression.
import pandas as pd
import
matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import multiprocessing
as mp
import time
def
operationonDFs(dfinput):
dfinput['z'] = dfinput.apply(lambda newCol:
2 * newCol['x'] , axis = 1)
dfinput['a'] = dfinput.apply(lambda newCol:
3.4 * newCol['x'] , axis = 1)
dfinput['b'] = dfinput.apply(lambda newCol:
2.1 * newCol['x'] , axis = 1)
dfinput['c'] = dfinput.apply(lambda newCol:
5.9 * newCol['x'] , axis = 1)
dfinput['d'] = dfinput.apply(lambda newCol:
7.3 * newCol['x'] , axis = 1)
dfinput['d'] = dfinput.apply(lambda newCol:
3.3 * newCol['x'] , axis = 1)
dfinput['e'] = dfinput.apply(lambda newCol:
7.1 * newCol['x'] , axis = 1)
dfinput['f'] = dfinput.apply(lambda newCol:
4.3 * newCol['x'] , axis = 1)
dfinput['g'] = dfinput.apply(lambda newCol:
5.3 * newCol['x'] , axis = 1)
return dfinput
Now in second part I have created a dataframe and call the def operationDfs
by passing the newly created dataframe, here if you see the we have used time package
to record the total execution time to run the entire program.
if __name__ == '__main__':
start_time = time.time()
dfinput =
pd.DataFrame({'x':range(1,100000),
'y':range(1,100000)})
df = operationonDFs(dfinput)
print(df)
print("--- %s seconds ---" %
(time.time() - start_time))
Lets see the output
C:\Users\Atoshi\mypy>python operationonDFWOMP.py
x y
z a b c d e f g
0 1 1 2
3.4 2.1 5.9
3.3 7.1 4.3
5.3
1 2 2
4 6.8 4.2
11.8 6.6 14.2
8.6 10.6
2 3 3
6 10.2 6.3
17.7 9.9 21.3
12.9 15.9
3 4 4
8 13.6 8.4
23.6 13.2 28.4
17.2 21.2
4 5 5
10 17.0 10.5
29.5 16.5 35.5
21.5 26.5
... ... ...
... ... ...
... ... ...
... ...
99994 99995 99995
199990 339983.0 209989.5
589970.5 329983.5 709964.5
429978.5 529973.5
99995 99996 99996
199992 339986.4 209991.6
589976.4 329986.8 709971.6
429982.8 529978.8
99996 99997 99997
199994 339989.8 209993.7
589982.3 329990.1 709978.7
429987.1 529984.1
99997 99998 99998
199996 339993.2 209995.8
589988.2 329993.4 709985.8
429991.4 529989.4
99998 99999 99999
199998 339996.6 209997.9
589994.1 329996.7 709992.9
429995.7 529994.7
[99999 rows x 10 columns]
--- 24.10855269432068 seconds ---
So it took 24 second to execute the program.
Lets re-write the program to use multiprocessing, first importing multiprocessing
package in our program and checking the number of cpu core available in our
system using cpu_count method of multiprocessing.
import pandas as pd
import
matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import multiprocessing
as mp
import time
def
operationonDFs(dfinput):
dfinput['z'] = dfinput.apply(lambda newCol:
2 * newCol['x'] , axis = 1)
dfinput['a'] = dfinput.apply(lambda newCol:
3.4 * newCol['x'] , axis = 1)
dfinput['b'] = dfinput.apply(lambda newCol:
2.1 * newCol['x'] , axis = 1)
dfinput['c'] = dfinput.apply(lambda newCol:
5.9 * newCol['x'] , axis = 1)
dfinput['d'] = dfinput.apply(lambda newCol:
7.3 * newCol['x'] , axis = 1)
dfinput['d'] = dfinput.apply(lambda newCol:
3.3 * newCol['x'] , axis = 1)
dfinput['e'] = dfinput.apply(lambda newCol:
7.1 * newCol['x'] , axis = 1)
dfinput['f'] = dfinput.apply(lambda newCol:
4.3 * newCol['x'] , axis = 1)
dfinput['g'] = dfinput.apply(lambda newCol:
5.3 * newCol['x'] , axis = 1)
return dfinput
if __name__ == '__main__':
start_time = time.time()
cpu_count = mp.cpu_count()
no_of_split = 50
dfinput =
pd.DataFrame({'x':range(1,100000),
'y':range(1,100000)})
dfinput_split = np.array_split(dfinput, no_of_split)
pool = mp.Pool(cpu_count)
df = pd.concat(pool.map(operationonDFs, dfinput_split))
pool.close()
pool.join()
print(df)
print("--- %s seconds ---" %
(time.time() - start_time))
We have used pool class, the pool distributes the tasks to the available
processors using a FIFO scheduling. It works like a map reduce architecture. It
maps the input to the different processors and collects the output from all the
processors.
The input to the pool.map method is definition which we want to execute
in parallel with the splited dataframe.
Once the execution is finished, it joins all the output to form a single
set of dataframe.
Let’s see how much we save with this architecture:
C:\Users\Atoshi\mypy>python
operationonDF.py
x y
z a b c d e f g
0 1 1
2 3.4 2.1
5.9 3.3 7.1
4.3 5.3
1 2 2
4 6.8 4.2
11.8 6.6 14.2
8.6 10.6
2 3 3
6 10.2 6.3
17.7 9.9 21.3
12.9 15.9
3 4 4
8 13.6 8.4
23.6 13.2 28.4
17.2 21.2
4 5 5
10 17.0 10.5
29.5 16.5 35.5
21.5 26.5
... ... ...
... ... ...
... ... ...
... ...
99994 99995 99995
199990 339983.0 209989.5
589970.5 329983.5 709964.5
429978.5 529973.5
99995 99996 99996
199992 339986.4 209991.6
589976.4 329986.8 709971.6
429982.8 529978.8
99996 99997 99997
199994 339989.8 209993.7
589982.3 329990.1 709978.7
429987.1 529984.1
99997 99998 99998
199996 339993.2 209995.8
589988.2 329993.4 709985.8
429991.4 529989.4
99998 99999 99999
199998 339996.6 209997.9
589994.1 329996.7 709992.9
429995.7 529994.7
[99999 rows x 10 columns]
--- 13.999533653259277 seconds
---
That’s really a good save compare to serial method.
Serial Method: 24 sec
With Multiprocessing: 13 sec
Data Science with…Python J
Post Reference: Vikram Aristocratic Elfin Share