bpo-17560: Too small type for struct.pack/unpack in mutliprocessing.Connection by ahcub · Pull Request #10305 · python/cpython
Hi @yangyxt, I would love to help, but would probably need to see the code to advice anything meaningful.
Thanks. Glad to paste code here. This is how I implement mp.pool:
pool = mp.Pool(processes=(int(estimate_cpus()))) logger.warn("\n\nThe number of usable CPUs is: " + str(int(estimate_cpus()))) # Read the large bam using self-defined iterator based on pysam, yielding chunk dfs iterator = read_bam_qname_groups(bam, chunksize=chunksize, headers=headers, sep='\t') output = map(lambda chunk: pool.apply_async(func_return_unit_df, args=(chunk, *func_args)), iterator) results = map(lambda r: r.get(), output)
Notice read_bam_qname_groups() is a self-defined generator to read big tables in chunks, yielding 5000 lines at each time.
And func_return_unit_df is a placeholder for functions used to process that 5000 line chunk pandas dataframe. Since I need to implement *args here so I have to use built-in map() and lambda and pool.apply_async() instead of just using pool.map_async().
This is how I process after getting the "results" iterator.
chunk_df = next(results) logger.info("From {}, the returned result df's shape is ".format(func_return_unit_df.__name__) + str(chunk_df.shape) + "\n") chunk_df.to_csv(output_path, sep='\t', index=False, header=False, mode='a', encoding='utf-8')
Therefore, the workflow should be reading a big file into chunks, pass each chunk to func_return_unit_df, then output the processed result into an output file in appending mode. Doing this will prevent the script eating too much memory.
Furthermore, I used a logging command in self-defined "read_bam_qname_groups":
logger.info("We check the next the row and its qname is different from returned table's last row, the yielding table shape is: " + str(chunk_df.shape) + str(chunk_df.head())) yield chunk_df
Before yielding chunk_df, I output a logging info with a timestamp.
Then I put a logging command at the begining of func_return_unit_df.
def fetch_multi_with_boolarray(chunk, uqname): logger.info("The input chunk shape is: " + str(chunk.shape))
Once the function start executing, the logging info is output with a timestamp.
Here is the key part, I test the code in a table with 10 million rows
and I noticed that the interval between two timestamps is near 1 min. And I test the code in a table with 100 thousand rows and the interval between two timestamps is around 20 ms.
I don't know why this happens. This is really important to me. Pls let me know your opinions. Thanks!