πŸ‘ˆ Back

Using Google BigQuery with Python: A Full Guide

A Google BigQuery Tutorial

By Keith Thomson β€’ 18 min read β€’ gcp

A Practical Guide

Google BigQuery is a fully-managed, serverless data warehouse that enables scalable analysis over petabytes of data. When combined with Python 🐍, it becomes a powerful tool for data engineers, analysts, and scientists.

This guide provides real-world code examples and best practices for integrating BigQuery with Python on Google Cloud Platform (GCP).


Table of Contents

  1. Prerequisites
  2. Setting Up Authentication
  3. Connecting to BigQuery
  4. Querying Data from BigQuery
  5. Loading Data into BigQuery
  6. Writing Data to BigQuery
  7. Scheduled Queries with Python
  8. Optimizing Query Performance
  9. Exporting Data from BigQuery
  10. Error Handling and Logging
  11. Cost Management
  12. Advanced Use Cases
  13. Integrating with Other GCP Services
  14. Security Best Practices
  15. Conclusion

Prerequisites

Before you begin, ensure you have the following:

  • A Google Cloud Platform (GCP) account with billing enabled.
  • A GCP project with the BigQuery API enabled.
  • Python 3.7+ installed on your local machine or cloud environment.
  • The Google Cloud SDK installed and authenticated:
    gcloud auth application-default login
    
    

The google-cloud-bigquery and pandas libraries installed: pip install google-cloud-bigquery pandas

Setting Up Authentication To interact with BigQuery from Python, you need to authenticate using a service account:

Create a Service Account in GCP:

Create a new service account and assign it the BigQuery Admin role. Navigate to IAM & Admin > Service Accounts. Generate a JSON key file and download it.

Set the Environment Variable: import os os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "path/to/your/service-account-key.json"

Connecting to BigQuery Use the google-cloud-bigquery library to establish a connection: from google.cloud import bigquery

Initialize a BigQuery client

client = bigquery.Client()

Querying Data from BigQuery

Example: Analyzing E-Commerce Sales Data Suppose you have a dataset containing e-commerce transactions. You want to analyze daily sales trends:

def query_daily_sales():
    query = """
        SELECT
            DATE(transaction_time) AS transaction_date,
            SUM(amount) AS total_sales,
            COUNT(DISTINCT user_id) AS unique_customers
        FROM
            `your_project.your_dataset.ecommerce_transactions`
        GROUP BY
            transaction_date
        ORDER BY
            transaction_date
    """
    query_job = client.query(query)  # Run the query
    results = query_job.result()  # Wait for the query to complete

    for row in results:
        print(f"Date: {row.transaction_date}, Sales: \${row.total_sales}, Customers: {row.unique_customers}")

query_daily_sales()

Key Points:

Use parameterized queries to avoid SQL injection. For large datasets, use query_job.to_dataframe() to convert results to a Pandas DataFrame for further analysis.

Example: Parameterized Queries

def query_sales_by_date(start_date, end_date):
    query = """
        SELECT
            DATE(transaction_time) AS transaction_date,
            SUM(amount) AS total_sales
        FROM
            `your_project.your_dataset.ecommerce_transactions`
        WHERE
            DATE(transaction_time) BETWEEN @start_date AND @end_date
        GROUP BY
            transaction_date
        ORDER BY
            transaction_date
    """
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter("start_date", "DATE", start_date),
            bigquery.ScalarQueryParameter("end_date", "DATE", end_date),
        ]
    )
    query_job = client.query(query, job_config=job_config)
    results = query_job.result().to_dataframe()
    return results

Usage

sales_data = query_sales_by_date("2025-01-01", "2025-01-31")
print(sales_data.head())

Loading Data into BigQuery

Example: Uploading a CSV File

If you have a local CSV file (e.g., new_transactions.csv), you can load it into BigQuery:

def load_csv_to_bigquery():
    table_id = "your_project.your_dataset.new_transactions"

    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        autodetect=True,
        write_disposition="WRITE_TRUNCATE"
    )

    with open("new_transactions.csv", "rb") as source_file:
        job = client.load_table_from_file(
            source_file, table_id, job_config=job_config
        )

    job.result()  # Wait for the job to complete
    print(f"Loaded {job.output_rows} rows into {table_id}")

load_csv_to_bigquery()

Best Practices:

Use WRITE_TRUNCATE to replace the table or WRITE_APPEND to add data. For large files, consider using Cloud Storage as an intermediate step.

Example: Loading from Pandas DataFrame

import pandas as pd

def load_dataframe_to_bigquery():
    data = {
        "transaction_id": ["1001", "1002", "1003"],
        "user_id": ["user1", "user2", "user3"],
        "amount": [99.99, 149.99, 199.99]
    }
    df = pd.DataFrame(data)
    table_id = "your_project.your_dataset.new_transactions_df"

    job = client.load_table_from_dataframe(
        df, table_id, job_config=bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")
    )
    job.result()
    print(f"Loaded {job.output_rows} rows into {table_id}")

load_dataframe_to_bigquery()

✍ Writing Data to BigQuery

Example: Streaming Real-Time Data If you have real-time data (e.g., from an API), you can stream it into BigQuery:

def stream_real_time_data(rows_to_insert):
    table_id = "your_project.your_dataset.real_time_transactions"
    table = client.get_table(table_id)

    errors = client.insert_rows(table, rows_to_insert)
    if errors:
        print(f"Encountered errors: {errors}")
    else:
        print("Data streamed successfully.")

# Example data
rows_to_insert = [
    {"transaction_id": "1001", "user_id": "user1", "amount": 99.99},
    {"transaction_id": "1002", "user_id": "user2", "amount": 149.99}
]

stream_real_time_data(rows_to_insert)

Note:

Streaming is ideal for low-latency use cases but incurs higher costs. For batch processing, use load_table_from_dataframe or load_table_from_file.

Scheduled Queries with Python

Example: Automating Daily Reports Use Cloud Scheduler and Cloud Functions to run queries on a schedule. Here’s a Python function for a Cloud Function:

def generate_daily_report(request):
    client = bigquery.Client()
    query = """
        SELECT
            DATE(transaction_time) AS transaction_date,
            SUM(amount) AS total_sales
        FROM
            `your_project.your_dataset.ecommerce_transactions`
        WHERE
            DATE(transaction_time) = CURRENT_DATE()
        GROUP BY
            transaction_date
    """
    query_job = client.query(query)
    results = query_job.result().to_dataframe()

    # Send results via email or save to Cloud Storage
    print(results)
    return "Report generated successfully."

Deployment:

Deploy this function to Cloud Functions and trigger it daily using Cloud Scheduler.

Optimizing Query Performance & Best Practices

Partition your tables by date or integer ranges to reduce query costs. Use clustering for frequently filtered columns. Avoid SELECT *β€”only query the columns you need. Leverage materialized views for repetitive queries.

Example: Creating a Partitioned Table

def create_partitioned_table():
    table_id = "your_project.your_dataset.partitioned_transactions"

    schema = [
        bigquery.SchemaField("transaction_id", "STRING"),
        bigquery.SchemaField("transaction_time", "TIMESTAMP"),
        bigquery.SchemaField("amount", "FLOAT64")
    ]

    table = bigquery.Table(table_id, schema=schema)
    table.time_partitioning = bigquery.TimePartitioning(
        type_=bigquery.TimePartitioningType.DAY,
        field="transaction_time"
    )

    table = client.create_table(table)
    print(f"Created partitioned table {table.table_id}")

create_partitioned_table()

Example: Creating a Clustered Table

def create_clustered_table():
    table_id = "your_project.your_dataset.clustered_transactions"

    schema = [
        bigquery.SchemaField("transaction_id", "STRING"),
        bigquery.SchemaField("user_id", "STRING"),
        bigquery.SchemaField("amount", "FLOAT64")
    ]

    table = bigquery.Table(table_id, schema=schema)
    table.clustering_fields = ["user_id"]

    table = client.create_table(table)
    print(f"Created clustered table {table.table_id}")

create_clustered_table()

Exporting Data from BigQuery

Example: Exporting Query Results to CSV

def export_to_csv():
    query = """
        SELECT * FROM `your_project.your_dataset.ecommerce_transactions`
        WHERE transaction_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 DAY)
    """
    query_job = client.query(query)
    results = query_job.result().to_dataframe()

    results.to_csv("recent_transactions.csv", index=False)
    print("Data exported to recent_transactions.csv")

export_to_csv()
Example: Exporting to Cloud Storage
def export_to_cloud_storage():
    destination_uri = "gs://your-bucket/recent_transactions.avro"
    dataset_ref = client.dataset("your_dataset", project="your_project")
    table_ref = dataset_ref.table("ecommerce_transactions")

    extract_job = client.extract_table(
        table_ref,
        destination_uri,
        location="US"
    )
    extract_job.result()
    print(f"Exported data to {destination_uri}")

export_to_cloud_storage()

⛑️ Error Handling and Logging

Always include error handling to manage API limits, network issues, and invalid queries:

from google.api_core.exceptions import GoogleAPICallError, RetryError

def safe_query(query):
    try:
        query_job = client.query(query)
        return query_job.result()
    except GoogleAPICallError as e:
        print(f"API Error: {e}")
    except RetryError as e:
        print(f"Retry Error: {e}")
    except Exception as e:
        print(f"Unexpected Error: {e}")

Cost Management

Monitor usage in the BigQuery UI under Query History. Set up alerts for unusual spending in Cloud Billing. Use flat-rate pricing for predictable workloads. Optimize queries to reduce data scanned.

Advanced Use Cases

Example: Using BigQuery ML

def create_ml_model():
    query = """
        CREATE OR REPLACE MODEL `your_project.your_dataset.sales_forecast_model`
        OPTIONS(
            model_type=ARIMA
            time_series_timestamp_col=transaction_date
            time_series_data_col=total_sales
        ) AS
        SELECT
            DATE(transaction_time) AS transaction_date,
            SUM(amount) AS total_sales
        FROM
            `your_project.your_dataset.ecommerce_transactions`
        GROUP BY
            transaction_date
    """
    client.query(query).result()
    print("ML model created successfully.")

create_ml_model()

Example: Integrating with Dataflow

Example Apache Beam pipeline for Dataflow

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

def run_dataflow_pipeline():
    options = PipelineOptions(
        project="your_project",
        runner="DataflowRunner",
        region="us-central1"
    )

    with beam.Pipeline(options=options) as p:
        (p
         | "Read from BigQuery" >> beam.io.ReadFromBigQuery(
             query="SELECT * FROM `your_project.your_dataset.ecommerce_transactions`",
             use_standard_sql=True
         )
         | "Write to BigQuery" >> beam.io.WriteToBigQuery(
             table="your_project.your_dataset.processed_transactions",
             schema="transaction_id\:STRING, user_id\:STRING, amount\:FLOAT64",
             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
             write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
         )
        )

run_dataflow_pipeline()

Integrating with Other GCP Services

Example:

Triggering BigQuery from Cloud Storage

from google.cloud import storage

def trigger_bigquery_on_new_file(bucket_name, file_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)

    if blob.exists():
        load_csv_to_bigquery(f"gs://{bucket_name}/{file_name}")
    else:
        print(f"File {file_name} not found in bucket {bucket_name}")

trigger_bigquery_on_new_file("your-bucket", "new_transactions.csv")

πŸ” Security Best Practices

  • Use IAM roles to grant least privilege access.
  • Encrypt sensitive data using Cloud KMS.
  • Audit logs to monitor access and changes.

Conclusion

Google BigQuery and Python are a powerful combination for data analysis, ETL, and real-time processing. By following the examples and best practices above, you can start building scalable, efficient data pipelines on GCP.