When working with large datasets in Python, memory errors can be a common issue, especially when fetching data from external sources like Salesforce. This is exactly what happened to me when I attempted to pull a large set of Lead data from Salesforce, process it, and save it to a CSV file. The error I encountered was a Python memory error, which occurred because I was trying to load all the data into memory at once.
I’ll walk you through the issue, the solution, and how to fix the memory error by splitting the CSV into smaller files, processing data in chunks, and using Salesforce’s pagination feature.
The Problem: Memory Error
The error I was facing (TypeError: 'iterator' is an invalid keyword argument for open()
) was happening because I used the open()
function with unsupported parameters like iterator
and chunksize
. These parameters are useful in some cases, especially with libraries like pandas
, but not with basic file operations in Python.
In addition to this, the root of the problem was that I was trying to fetch all the data from Salesforce at once using sf.query_all(sosl)
. When the dataset is large (like 1000+ records), this results in a significant memory load, which can easily cause a memory error.
Understanding the Original Code
Before we fix the problem, let’s first look at the original code and break it down:
from simple_salesforce import Salesforce
from datetime import datetime
import csv
import os
import json
import account
SALESFORCE_USERNAME = '123'
PASSWORD = '123'
SECURITY_TOKEN = '123'
def main():
# Authentication settings
sf = Salesforce(username=SALESFORCE_USERNAME,
password=PASSWORD,
security_token=SECURITY_TOKEN)
# Lead Column setting to be acquired
columns = [
"CreatedDate"
]
sosl = 'SELECT {0[0]} FROM Lead'.format(columns)
# Data acquisition with SOSL
data = sf.query_all(sosl)
# Delete CSV file if it exists
output_csv = 'output.csv'
if os.path.exists(output_csv):
os.remove(output_csv)
# Write to CSV file
for k, v in data.items():
if type(v) is list:
with open(output_csv, 'w', newline="",iterator=True,chunksize=1000) as f:
writer = csv.DictWriter(f, fieldnames=columns)
writer.writeheader()
for d in v:
data = json.loads(json.dumps(d))
del data['attributes']
writer.writerow(data)
if __name__ == '__main__':
main()
Key Issues in the Original Code
- Memory Overload: The use of
sf.query_all()
pulls the entire dataset into memory at once, which can lead to a memory overload when dealing with large datasets. - Unsupported Parameters for
open()
: The attempt to useiterator=True
andchunksize=1000
with theopen()
function results in aTypeError
. These are not valid parameters for theopen()
function in Python.
Process Data in Chunks and Split the CSV Files
The key to fixing the memory error and ensuring that the CSV is saved in smaller, more manageable chunks is to implement query pagination and process the data in smaller batches.
Here’s how we can approach this:
- Use Pagination: Instead of loading all the records at once, we will use Salesforce’s pagination feature by calling the
query()
method. This method retrieves a portion of the data and provides anextRecordsUrl
to fetch the next batch of records. - Write in Chunks: Instead of writing all the data to a single CSV file, we will split the data into multiple CSV files. After every 1000 rows, a new CSV file will be created.
Here’s the updated and improved code that implements this solution:
Fixed and Improved Code
from simple_salesforce import Salesforce
import csv
import os
import json
SALESFORCE_USERNAME = '123'
PASSWORD = '123'
SECURITY_TOKEN = '123'
def main():
# Authentication settings
sf = Salesforce(username=SALESFORCE_USERNAME,
password=PASSWORD,
security_token=SECURITY_TOKEN)
# Lead Column setting to be acquired
columns = [
"CreatedDate"
]
# Create a query string to select the desired columns
sosl = 'SELECT {0[0]} FROM Lead'.format(columns)
# Initial query
data = sf.query(sosl)
# Counter for CSV file naming
file_counter = 1
rows_written = 0
# Delete any existing CSV files with the same name
output_csv = f'output{file_counter}.csv'
if os.path.exists(output_csv):
os.remove(output_csv)
# Open the CSV file to write data
with open(output_csv, 'w', newline="") as f:
writer = csv.DictWriter(f, fieldnames=columns)
writer.writeheader()
while data:
# Write current chunk of data to the CSV
for record in data['records']:
# Remove unnecessary attributes
record_data = json.loads(json.dumps(record))
del record_data['attributes']
writer.writerow(record_data)
rows_written += 1
# Check if we have written 1000 rows, if so, start a new CSV file
if rows_written >= 1000:
file_counter += 1
output_csv = f'output{file_counter}.csv'
if os.path.exists(output_csv):
os.remove(output_csv)
# Create a new CSV file
with open(output_csv, 'w', newline="") as new_f:
new_writer = csv.DictWriter(new_f, fieldnames=columns)
new_writer.writeheader()
# Reset rows_written count
rows_written = 0
# Check if there's more data to fetch
if 'nextRecordsUrl' in data:
data = sf.query_more(data['nextRecordsUrl'], True)
else:
break
if __name__ == '__main__':
main()
Key Changes and Explanations
- Query Pagination:
The code usessf.query()
instead ofsf.query_all()
. This method retrieves a smaller portion of the data at a time and provides anextRecordsUrl
to get the next set of records. Thewhile
loop continues to fetch more data using this URL until all records are processed. - Chunked File Writing:
Every time 1000 rows are written, the program creates a new CSV file (output1.csv
,output2.csv
, etc.). This approach prevents the file from becoming too large and helps manage memory more efficiently. - Memory Efficiency:
By processing records in chunks, the program doesn’t load the entire dataset into memory at once, reducing the risk of a memory error. - File Cleanup:
Before writing new data to a CSV file, the code checks if the file already exists and deletes it to avoid overwriting existing files.
Conclusion
Dealing with large datasets in Python can be tricky, especially when you’re fetching data from external sources like Salesforce. In this blog, we identified the root cause of the Python memory error and proposed a solution that involves query pagination and chunked file writing. By using Salesforce’s pagination and processing data in manageable chunks, we can now split large datasets into smaller CSV files, reducing memory usage and ensuring the program runs smoothly.