Developers / Architects / Data

Streamlining Salesforce Data Migration Using Python ETL Within Google Colab

By Bassem Marji

In today’s data-driven business landscape, organizations frequently need to migrate data between Salesforce instances, whether moving from sandbox to production environments, consolidating multiple orgs, or performing data synchronization tasks.

Manual data migration processes are not only time-consuming but also prone to human error and difficult to scale. 

This article explores a practical, automated approach to Salesforce data migration using Python’s ETL (Extract, Transform, Load) capabilities within Google Colab, leveraging the simple-salesforce Python library.

The ETL process demonstrated here provides a systematic framework for extracting data from a source Salesforce instance, transforming it according to business requirements, and loading it into a destination instance with comprehensive error handling and logging capabilities.

Getting Started With Google Colab

Creating a Google Colab Account

Google Colab is a cloud-based platform for running Python code without any local setup. It provides a free Jupyter notebook environment accessible with just a Google account, making it ideal for data workflows such as ETL. To get started:

  1. Navigate to colab.research.google.com in your web browser.
  2. Sign in with your Google account credentials.
  3. If you don’t have a Google account, create one at accounts.google.com.
  4. Once signed in, you’ll have access to the Colab interface.

Creating a New Notebook

Creating a new notebook in Colab is straightforward:

  1. From the Colab homepage, click File > New notebook in Drive. This opens a blank Jupyter Notebook .ipynb file where you can add code cells, markdown for documentation, and execute Python scripts interactively.
  2. The notebook opens with a default untitled name – rename it by clicking on the title.
  3. Notebooks are automatically saved to your Google Drive in a “Colab Notebooks” folder.

You can also upload existing .ipynb files using File > Upload notebook or share it via a link for collaboration.

Key Google Colab Features for Data Migration

Google Colab provides several benefits that make it well-suited for Salesforce ETL workflows:

  • Free GPU/TPU access: While not required for Salesforce API calls, these can accelerate heavy data processing tasks.
  • Pre-installed libraries: Many popular Python packages are available out of the box, reducing setup time.
  • Google Drive integration: Enables easy storage, access, and sharing of datasets and results.
  • Real-time collaboration: Multiple users can edit and run the same notebook simultaneously.
  • Session persistence: Colab environments can maintain state for up to 12 hours of inactivity.
  • No local installation: Everything runs in the browser, making it accessible from any device.
  • Flexibility: Easily extendable.

Understanding the Simple-Salesforce Library

Library Overview

simple-salesforce is a Python library that acts as a wrapper around the Salesforce REST API, making it significantly easier to interact with Salesforce data.

Key Features and Capabilities

  • Flexible Authentication: Supports various authentication methods, including username/password with security token, OAuth2, and JWT bearer tokens, making it adaptable to different Salesforce environments.
  • Comprehensive Data Operations: Provides full support for create, read, update, and delete (CRUD) actions, along with advanced querying using SOQL (Salesforce Object Query Language) and SOSL (Salesforce Object Search Language).
  • Bulk API Integration: Enables efficient handling of large-scale data operations through Salesforce’s Bulk API, allowing the processing of thousands or even millions of records while staying within API limits.
  • Metadata Access: Enables the retrieval of Salesforce metadata, allowing developers to dynamically explore object schemas, field definitions, and organizational structure.

Diving Into the ETL Notebook Architecture

Before exploring our notebook, it is important to understand how sensitive information such as usernames, passwords, and security tokens is securely managed within the Colab environment. Google Colab provides a Secrets feature that allows users to safely store and retrieve sensitive credentials without exposing them directly in the notebook code.

Note: Authorizing an integration using a username, password, and security token is a legacy Salesforce authentication method and does not use OAuth. It is not recommended for production environments due to security and manageability limitations. However, for demonstration purposes, it can simplify the setup and make the process easier to follow.

To configure the credentials required for our ETL workflow:

  1. Open the notebook sidebar and navigate to the Secrets section.
  2. Click Add a new secret and define the following variables with their respective values and ensure Notebook access is enabled for each.
NameDescriptionNotebook Access
src_usernameUsername for the source Salesforce instance.True
src_passwordPassword for the source Salesforce instance.True
src_tokenSecurity token for the source Salesforce instance.True
dst_usernameUsername for the destination Salesforce instance.True
dst_passwordPassword for the destination Salesforce instance.True
dst_tokenSecurity token for the destination Salesforce instance.True

With credentials securely configured, let’s now dissect our notebook called “ETL Process.ipynb,” which is designed to perform ETL operations between two Salesforce orgs.

Below is a comprehensive breakdown of each section of this notebook:

  1. Install Libraries: This section installs the simple_salesforce library along with its dependencies and displays its version to ensure reproducibility across environments.
#Installing Salesforce Libraries
!pip install simple_salesforce
#Showing Libraries Versions
!pip show simple_salesforce
  1. Import Libraries: This section imports essential libraries for the notebook:
    • simple_salesforce: to connect and interact with Salesforce APIs.
    • pandas: for efficient data manipulation using DataFrames.
    • numpy: for numerical operations.
    • uuid: to generate unique identifiers, useful for external ID fields.
    • google.colab.userdata: to securely access sensitive credentials (secrets) like username, password, and security token within Google Colab.
#Importing Other Libraries
from simple_salesforce import Salesforce,SalesforceLogin,SalesforceAuthenticationFailed,SFType
import pandas as pd
import json
import numpy as np
import time as t
import uuid
from google.colab import userdata
  1. Configure Logging: This section initializes a logging framework to monitor the ETL workflow’s execution and facilitate debugging by:
    • Creating an isolated logger named ETL_Salesforce.
    • Setting the log level to INFO to capture all important messages.
    • Outputting formatted messages (with timestamps and levels) to the console.
    • Disabling propagation to prevent duplicate log entries.
# Configuring logging
import logging

# Create a dedicated logger for the ETL process
logger = logging.getLogger('ETL_Salesforce')
logger.setLevel(logging.INFO)

# Clear any existing handlers to avoid duplicates
logger.handlers = []  # Remove all handlers from this logger

# Create a new StreamHandler with the desired format
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
logger.addHandler(handler)

# Disable propagation to prevent root logger interference
logger.propagate = False
  1. Define Core Functions: This section defines the core reusable functions for the ETL process. While defined directly in the Jupyter Notebook here, in a production use case, you could alternatively implement these as your own Python library:
    • connect_to_salesforce: Establishes a secure connection to a Salesforce instance using provided credentials (username, password, security token, instance URL, and domain). It returns an authenticated Salesforce API client and logs success or failure events with appropriate error handling.
    • extract_query: Executes a SOQL query against the connected Salesforce instance, handling pagination to retrieve all records. It returns the query results as a pandas.DataFrame, excluding metadata columns, and logs query progress and execution time.
    • load_data: Performs bulk upsert operations from a DataFrame into a specified Salesforce object. It accepts parameters for the target object, upsert reference field, fields to include, and batch size. The function logs the start and end of the upsert, summarizes success and failure counts, and raises errors if encountered.
#Defining Main Functions
def connect_to_salesforce(username,password,token,url,domain):
    """
    Establish a connection to a Salesforce instance using provided credentials.


    Parameters:
    - username (str): Salesforce username for authentication.
    - password (str): Salesforce password for authentication.
    - token (str): Salesforce security token for authentication.
    - url (str): Salesforce instance URL (e.g., 'https://instance.my.salesforce.com').
    - domain (str): Salesforce domain, typically 'login' or 'test' for sandbox.


    Returns:
    - Salesforce: An authenticated Salesforce instance object for API operations.
    """


    try:
        sf = Salesforce(
                username       = username
               ,password       = password
               ,security_token = token
               ,instance_url   = url
               ,domain         = domain
               )


        logger.info(f"Connected successfully to {url}")
        return sf
    except SalesforceAuthenticationFailed as e:
        logger.error(f"Authentication failed: {e}")
        raise
    except Exception as e:
        logger.error(f"Unexpected error during connection: {e}")
        raise


def extract_query(sf_instance, soql_query):
    """
    Execute a SOQL query on a Salesforce instance and return results as a DataFrame.


    Parameters:
    - sf_instance (Salesforce): Authenticated Salesforce instance to query.
    - soql_query (str): SOQL query string to execute (e.g., 'SELECT Id, Name FROM Account').


    Returns:
    - pandas.DataFrame: DataFrame containing query results, with 'attributes' column dropped.
    """
    try:
      start_time = t.time()
      logger.info(f"Executing SOQL query: {soql_query}")
      result = sf_instance.query(soql_query)
      records = result['records']


      while not result['done']:
          logger.info(f"Fetching next batch: {result['nextRecordsUrl']}")
          result = sf_instance.query_more(result['nextRecordsUrl'], identifier_is_url=True)
          records.extend(result['records'])


      df = pd.DataFrame(records).drop('attributes', axis=1, errors='ignore')
      logger.info(f"Query complete. Rows extracted: {len(df)} . Time taken: {t.time() - start_time:.2f}s")
      return df
    except Exception as e:
      logger.error(f"Error during query execution: {e}")
      raise


def load_data(sf_instance, df, object_name, reference_field,fields, batch_size=1000):
    """
    Load (Upsert) data to Salesforce with error handling and summary.


    Parameters:
    - sf_instance: Salesforce instance.
    - df: pandas DataFrame containing the data to upsert.
    - object_name: Salesforce object name (e.g., 'Account').
    - reference_field: Field used for upsert matching (e.g., 'External_ID__c').
    - fields: List of fields to include in the upsert (optional).
    - batch_size: Number of records per batch for upsert (default: 1000).


    Returns:
    - pandas DataFrame containing upsert results.
    """


    try:
       df = df[fields]
    except KeyError as e:
       logger.error(f"Error selecting fields {fields}: {e}")
       raise


    records = df.to_dict(orient='records')
    logger.info(f"Preparing to upsert {len(records)} records to {object_name} with fields {fields}")


    try:
        start_time = t.time()
        results = getattr(sf_instance.bulk, object_name).upsert(
            records,
            external_id_field=reference_field,
            batch_size=batch_size,
            use_serial=False  # For Parallel Processing - True for Serial Processing
        )
        logger.info(f"Upsert complete. Time taken: {t.time() - start_time:.2f}s")


        # Summarize results
        success_count = sum(1 for r in results if r['success'])
        failure_count = len(results) - success_count
        logger.info(f"Success: {success_count}, Failures: {failure_count}")
        if failure_count > 0:
            logger.warning(f"Failures: {[r for r in results if not r['success']]}")


        return pd.DataFrame(results)
    except Exception as e:
        logger.error(f"Error during upsert: {e}")
        raise
  1. Establish Connections: This section establishes a connection to the source Salesforce environment using credentials securely fetched from Google Colab’s secrets storage.

Connecting to the Source Salesforce Instance:

#Connecting To The Source Salesforce Instance
SOURCE_SF_INSTANCE = {
    'username': userdata.get('src_username')
   ,'password': userdata.get('src_password')
   ,'token': userdata.get('src_token')
   ,'url': 'https://secureone--partial.sandbox.my.salesforce.com'
   ,'domain': 'test'
}
src_sf_instance = connect_to_salesforce(**SOURCE_SF_INSTANCE)

logger.info(f"Source SF Instance Session ID = {src_sf_instance.session_id}")

To maintain security, credentials are not hardcoded. Instead, they are stored in Google Colab’s Secrets vault. The application dynamically accesses these stored values (src_usernamesrc_password, and src_token) via the userdata.get('secret_name') method. The fetched credentials are passed to the connect_to_salesforce function to authenticate and create a Salesforce client instance.

Connecting to the Destination Salesforce Instance:

#Connecting To The Destination Salesforce Instance
DESTINATION_SF_INSTANCE = {
    'username': userdata.get('dst_username')
   ,'password': userdata.get('dst_password')
   ,'token': userdata.get('dst_token')
   ,'url': 'https://fullnsa-dev-ed.develop.my.salesforce.com/'
   ,'domain': 'login'
}
dst_sf_instance = connect_to_salesforce(**DESTINATION_SF_INSTANCE)

logger.info(f"Destination SF Instance Session ID = {dst_sf_instance.session_id}")
  1. Data Extraction Phase: This section handles the data extraction process by:
    • Defining the Salesforce object to query (Account) and the key reference column (Name) used for data validation and uniqueness checks.
    • Constructing a SOQL query string to retrieve the IdName, and Phone fields while limiting the results to three records to get a small sample of data for testing.
    • Calling the extract_query function with the Salesforce connection and SOQL query; This function executes the query and loads the results into a panda DataFrame (object_df).
    • Logging the count of non-null values per column to identify missing data.
    • Logging the total number of records extracted.
    • Logging the count of unique values in the reference column (Name).
    • Checking for duplicates in the reference column and logging a warning if any are found.
    • Displaying the first five rows of the extracted DataFrame to give a preview of the data structure and content.
objectName = 'Account'
referenceColumnName = 'Name'

querySOQL = """
Select Id, Name, Phone FROM Account ORDER BY Name Limit 3
"""

object_df = extract_query(src_sf_instance,querySOQL)

# Data Quality Check
logger.info(f"Value counts:\n{object_df.count()}")
logger.info(f"Total Records: {object_df.shape[0]}")
logger.info(f"Unique in {referenceColumnName}: {object_df[referenceColumnName].nunique()}")
if object_df.duplicated(subset=[referenceColumnName]).any():
   logger.warning(f"Duplicates found in reference column {referenceColumnName}")

object_df.head(5)
  1. Data Transformation Phase: This section transforms the extracted Salesforce data and prepares it for integration by:
    • Setting External_ID__c as the reference column, this is a custom Salesforce field typically used to uniquely identify and cross-reference records with external data sources or across Salesforce orgs.
    • Creating a new DataFrame df_transformed containing:
      • Name: copied from the original data.
      • Phone: copied from the original data, with missing values replaced by an empty string (”) to prevent data quality issues during loading.
      • External_ID__c: a new column populated by generating a unique UUID for each record, ensuring every row has a distinct identifier for reliable upsert and deduplication operations.
    • Logging the total number of records and confirming that all values in External_ID__c are unique, which is required for upsert/outbound integration and helps prevent data collisions.
    • Displaying the first five rows of the transformed DataFrame for review.
referenceColumnName = 'External_ID__c'

df_transformed = pd.DataFrame()
df_transformed['Name']                           = object_df['Name']
df_transformed['Phone']                          = object_df['Phone'].replace(np.nan, '')
# Generate a unique UUID for each record
df_transformed['External_ID__c']                 = [str(uuid.uuid4()) for _ in range(len(object_df))]

logger.info(f"Total Records: {df_transformed.shape[0]}")
logger.info(f"Unique in {referenceColumnName}: {df_transformed[referenceColumnName].nunique()}")
df_transformed.head(5)

  1. Data Loading Phase: This section loads the transformed data into the destination Salesforce instance. The following steps are taken:
    • Defines the list of fields [‘Name‘, ‘Phone‘, ‘External_ID__c‘] which will be included in the upsert operation. 
    • Calculates the batch size for the bulk operation as the minimum of 1000 and the total number of records in df_transformed
    • Calls the load_data function with parameters:
      • Destination Salesforce instance connection.
      • Transformed data DataFrame.
      • Target object Account.
      • Upsert reference field External_ID__c.
      • Fields to include.
      • Batch size for efficiency and API compliance.
    • The results of the load operation are collected into the df_load_results DataFrame, which typically contains success/failure info for each record processed.
    • Displays the first five rows of the load results for quick inspection and verification.
fields_to_load = ['Name', 'Phone','External_ID__c']
batch_size = min(1000, len(df_transformed))  # Dynamic batch size
df_load_results = load_data(dst_sf_instance, df_transformed, object_name = 'Account', reference_field = 'External_ID__c',fields=fields_to_load,batch_size=batch_size)
df_load_results.head(5)

Final Thoughts

The modular ETL design ensures that even complex migration scenarios can be supported without sacrificing simplicity and maintainability. When building similar ETL solutions, keep the following principles in mind:

  • Comprehensive Logging: Use structured logging to enable easier debugging, monitoring, and compliance tracking.
  • Error Handling and Recovery: Provide clear, detailed error messages to simplify troubleshooting and accelerate issue resolution.
  • Scalability Planning: Incorporate batching and parallel processing to efficiently handle large-scale data migrations.
  • Security Best Practices: Manage credentials securely and implement robust authentication methods.
  • Data Quality Assurance: Perform validation early in the process to prevent downstream failures and ensure reliable results.

The presented Colab notebook demonstrates a production-ready yet lightweight ETL solution for Salesforce migrations. 

By combining Google Colab’s collaborative environment with the simple-salesforce library, teams can securely and efficiently migrate CRM data without investing in complex infrastructure.

The Author

Bassem Marji

Bassem is a certified Salesforce Administrator, a Project Implementation Manager at BLOM Bank Lebanon, and a Technical Author at Educative. He is a big fan of the Salesforce ecosystem.

Leave a Reply