How to Fix a Python Memory Error When Saving and Splitting CSV Files

When working with large datasets in Python, memory errors can be a common issue, especially when fetching data from external sources like Salesforce. This is exactly what happened to me when I attempted to pull a large set of Lead data from Salesforce, process it, and save it to a CSV file. The error I encountered was a Python memory error, which occurred because I was trying to load all the data into memory at once.

I’ll walk you through the issue, the solution, and how to fix the memory error by splitting the CSV into smaller files, processing data in chunks, and using Salesforce’s pagination feature.

The Problem: Memory Error

The error I was facing (TypeError: 'iterator' is an invalid keyword argument for open()) was happening because I used the open() function with unsupported parameters like iterator and chunksize. These parameters are useful in some cases, especially with libraries like pandas, but not with basic file operations in Python.

In addition to this, the root of the problem was that I was trying to fetch all the data from Salesforce at once using sf.query_all(sosl). When the dataset is large (like 1000+ records), this results in a significant memory load, which can easily cause a memory error.

Understanding the Original Code

Before we fix the problem, let’s first look at the original code and break it down:

from simple_salesforce import Salesforce
from datetime import datetime
import csv
import os
import json
import account

SALESFORCE_USERNAME = '123'
PASSWORD = '123'
SECURITY_TOKEN = '123'

def main():
    # Authentication settings
    sf = Salesforce(username=SALESFORCE_USERNAME,
                    password=PASSWORD,
                    security_token=SECURITY_TOKEN)

    # Lead Column setting to be acquired
    columns = [
        "CreatedDate"
    ]
    sosl = 'SELECT {0[0]} FROM Lead'.format(columns)

    # Data acquisition with SOSL
    data = sf.query_all(sosl)

    # Delete CSV file if it exists
    output_csv = 'output.csv'
    if os.path.exists(output_csv):
        os.remove(output_csv)

    # Write to CSV file
    for k, v in data.items():
        if type(v) is list:
            with open(output_csv, 'w', newline="",iterator=True,chunksize=1000) as f:
                writer = csv.DictWriter(f, fieldnames=columns)
                writer.writeheader()
                for d in v:
                    data = json.loads(json.dumps(d))
                    del data['attributes']
                    writer.writerow(data)
if __name__ == '__main__':
    main()

Key Issues in the Original Code

  1. Memory Overload: The use of sf.query_all() pulls the entire dataset into memory at once, which can lead to a memory overload when dealing with large datasets.
  2. Unsupported Parameters for open(): The attempt to use iterator=True and chunksize=1000 with the open() function results in a TypeError. These are not valid parameters for the open() function in Python.

Process Data in Chunks and Split the CSV Files

The key to fixing the memory error and ensuring that the CSV is saved in smaller, more manageable chunks is to implement query pagination and process the data in smaller batches.

Here’s how we can approach this:

  1. Use Pagination: Instead of loading all the records at once, we will use Salesforce’s pagination feature by calling the query() method. This method retrieves a portion of the data and provides a nextRecordsUrl to fetch the next batch of records.
  2. Write in Chunks: Instead of writing all the data to a single CSV file, we will split the data into multiple CSV files. After every 1000 rows, a new CSV file will be created.

Here’s the updated and improved code that implements this solution:

Fixed and Improved Code

from simple_salesforce import Salesforce
import csv
import os
import json

SALESFORCE_USERNAME = '123'
PASSWORD = '123'
SECURITY_TOKEN = '123'

def main():
    # Authentication settings
    sf = Salesforce(username=SALESFORCE_USERNAME,
                    password=PASSWORD,
                    security_token=SECURITY_TOKEN)

    # Lead Column setting to be acquired
    columns = [
        "CreatedDate"
    ]
    
    # Create a query string to select the desired columns
    sosl = 'SELECT {0[0]} FROM Lead'.format(columns)

    # Initial query
    data = sf.query(sosl)

    # Counter for CSV file naming
    file_counter = 1
    rows_written = 0

    # Delete any existing CSV files with the same name
    output_csv = f'output{file_counter}.csv'
    if os.path.exists(output_csv):
        os.remove(output_csv)

    # Open the CSV file to write data
    with open(output_csv, 'w', newline="") as f:
        writer = csv.DictWriter(f, fieldnames=columns)
        writer.writeheader()

        while data:
            # Write current chunk of data to the CSV
            for record in data['records']:
                # Remove unnecessary attributes
                record_data = json.loads(json.dumps(record))
                del record_data['attributes']
                writer.writerow(record_data)
                rows_written += 1

                # Check if we have written 1000 rows, if so, start a new CSV file
                if rows_written >= 1000:
                    file_counter += 1
                    output_csv = f'output{file_counter}.csv'
                    if os.path.exists(output_csv):
                        os.remove(output_csv)

                    # Create a new CSV file
                    with open(output_csv, 'w', newline="") as new_f:
                        new_writer = csv.DictWriter(new_f, fieldnames=columns)
                        new_writer.writeheader()

                    # Reset rows_written count
                    rows_written = 0

            # Check if there's more data to fetch
            if 'nextRecordsUrl' in data:
                data = sf.query_more(data['nextRecordsUrl'], True)
            else:
                break

if __name__ == '__main__':
    main()

Key Changes and Explanations

  1. Query Pagination:
    The code uses sf.query() instead of sf.query_all(). This method retrieves a smaller portion of the data at a time and provides a nextRecordsUrl to get the next set of records. The while loop continues to fetch more data using this URL until all records are processed.
  2. Chunked File Writing:
    Every time 1000 rows are written, the program creates a new CSV file (output1.csv, output2.csv, etc.). This approach prevents the file from becoming too large and helps manage memory more efficiently.
  3. Memory Efficiency:
    By processing records in chunks, the program doesn’t load the entire dataset into memory at once, reducing the risk of a memory error.
  4. File Cleanup:
    Before writing new data to a CSV file, the code checks if the file already exists and deletes it to avoid overwriting existing files.

Conclusion

Dealing with large datasets in Python can be tricky, especially when you’re fetching data from external sources like Salesforce. In this blog, we identified the root cause of the Python memory error and proposed a solution that involves query pagination and chunked file writing. By using Salesforce’s pagination and processing data in manageable chunks, we can now split large datasets into smaller CSV files, reducing memory usage and ensuring the program runs smoothly.

Related blog posts