SnowflakeSearchTool

Description

The SnowflakeSearchTool is designed to connect to Snowflake data warehouses and execute SQL queries with advanced features like connection pooling, retry logic, and asynchronous execution. This tool allows CrewAI agents to interact with Snowflake databases, making it ideal for data analysis, reporting, and business intelligence tasks that require access to enterprise data stored in Snowflake.

Installation

To use this tool, you need to install the required dependencies:

uv add cryptography snowflake-connector-python snowflake-sqlalchemy

Or alternatively:

uv sync --extra snowflake

Steps to Get Started

To effectively use the SnowflakeSearchTool, follow these steps:

  1. Install Dependencies: Install the required packages using one of the commands above.
  2. Configure Snowflake Connection: Create a SnowflakeConfig object with your Snowflake credentials.
  3. Initialize the Tool: Create an instance of the tool with the necessary configuration.
  4. Execute Queries: Use the tool to run SQL queries against your Snowflake database.

Example

The following example demonstrates how to use the SnowflakeSearchTool to query data from a Snowflake database:

Code
from crewai import Agent, Task, Crew
from crewai_tools import SnowflakeSearchTool, SnowflakeConfig

# Create Snowflake configuration
config = SnowflakeConfig(
    account="your_account",
    user="your_username",
    password="your_password",
    warehouse="COMPUTE_WH",
    database="your_database",
    snowflake_schema="your_schema"
)

# Initialize the tool
snowflake_tool = SnowflakeSearchTool(config=config)

# Define an agent that uses the tool
data_analyst_agent = Agent(
    role="Data Analyst",
    goal="Analyze data from Snowflake database",
    backstory="An expert data analyst who can extract insights from enterprise data.",
    tools=[snowflake_tool],
    verbose=True,
)

# Example task to query sales data
query_task = Task(
    description="Query the sales data for the last quarter and summarize the top 5 products by revenue.",
    expected_output="A summary of the top 5 products by revenue for the last quarter.",
    agent=data_analyst_agent,
)

# Create and run the crew
crew = Crew(agents=[data_analyst_agent], 
            tasks=[query_task])
result = crew.kickoff()

You can also customize the tool with additional parameters:

Code
# Initialize the tool with custom parameters
snowflake_tool = SnowflakeSearchTool(
    config=config,
    pool_size=10,
    max_retries=5,
    retry_delay=2.0,
    enable_caching=True
)

Parameters

SnowflakeConfig Parameters

The SnowflakeConfig class accepts the following parameters:

  • account: Required. Snowflake account identifier.
  • user: Required. Snowflake username.
  • password: Optional*. Snowflake password.
  • private_key_path: Optional*. Path to private key file (alternative to password).
  • warehouse: Required. Snowflake warehouse name.
  • database: Required. Default database.
  • snowflake_schema: Required. Default schema.
  • role: Optional. Snowflake role.
  • session_parameters: Optional. Custom session parameters as a dictionary.

*Either password or private_key_path must be provided.

SnowflakeSearchTool Parameters

The SnowflakeSearchTool accepts the following parameters during initialization:

  • config: Required. A SnowflakeConfig object containing connection details.
  • pool_size: Optional. Number of connections in the pool. Default is 5.
  • max_retries: Optional. Maximum retry attempts for failed queries. Default is 3.
  • retry_delay: Optional. Delay between retries in seconds. Default is 1.0.
  • enable_caching: Optional. Whether to enable query result caching. Default is True.

Usage

When using the SnowflakeSearchTool, you need to provide the following parameters:

  • query: Required. The SQL query to execute.
  • database: Optional. Override the default database specified in the config.
  • snowflake_schema: Optional. Override the default schema specified in the config.
  • timeout: Optional. Query timeout in seconds. Default is 300.

The tool will return the query results as a list of dictionaries, where each dictionary represents a row with column names as keys.

Code
# Example of using the tool with an agent
data_analyst = Agent(
    role="Data Analyst",
    goal="Analyze sales data from Snowflake",
    backstory="An expert data analyst with experience in SQL and data visualization.",
    tools=[snowflake_tool],
    verbose=True
)

# The agent will use the tool with parameters like:
# query="SELECT product_name, SUM(revenue) as total_revenue FROM sales GROUP BY product_name ORDER BY total_revenue DESC LIMIT 5"
# timeout=600

# Create a task for the agent
analysis_task = Task(
    description="Query the sales database and identify the top 5 products by revenue for the last quarter.",
    expected_output="A detailed analysis of the top 5 products by revenue.",
    agent=data_analyst
)

# Run the task
crew = Crew(
    agents=[data_analyst], 
    tasks=[analysis_task]
)
result = crew.kickoff()

Advanced Features

Connection Pooling

The SnowflakeSearchTool implements connection pooling to improve performance by reusing database connections. You can control the pool size with the pool_size parameter.

Automatic Retries

The tool automatically retries failed queries with exponential backoff. You can configure the retry behavior with the max_retries and retry_delay parameters.

Query Result Caching

To improve performance for repeated queries, the tool can cache query results. This feature is enabled by default but can be disabled by setting enable_caching=False.

Key-Pair Authentication

In addition to password authentication, the tool supports key-pair authentication for enhanced security:

Code
config = SnowflakeConfig(
    account="your_account",
    user="your_username",
    private_key_path="/path/to/your/private/key.p8",
    warehouse="COMPUTE_WH",
    database="your_database",
    snowflake_schema="your_schema"
)

Error Handling

The SnowflakeSearchTool includes comprehensive error handling for common Snowflake issues:

  • Connection failures
  • Query timeouts
  • Authentication errors
  • Database and schema errors

When an error occurs, the tool will attempt to retry the operation (if configured) and provide detailed error information.

Conclusion

The SnowflakeSearchTool provides a powerful way to integrate Snowflake data warehouses with CrewAI agents. With features like connection pooling, automatic retries, and query caching, it enables efficient and reliable access to enterprise data. This tool is particularly useful for data analysis, reporting, and business intelligence tasks that require access to structured data stored in Snowflake.