Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 162 additions & 25 deletions crates/forge_repo/src/database/pool.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#![allow(dead_code)]
use std::path::PathBuf;
use std::sync::Mutex;
use std::time::Duration;

use anyhow::Result;
use backon::{BlockingRetryable, ExponentialBuilder};
use diesel::prelude::*;
use diesel::r2d2::{ConnectionManager, CustomizeConnection, Pool, PooledConnection};
use diesel::sqlite::SqliteConnection;
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
Expand All @@ -29,8 +29,8 @@
pub fn new(database_path: PathBuf) -> Self {
Self {
max_size: 5,
min_idle: Some(1),
connection_timeout: Duration::from_secs(5),
min_idle: None,
connection_timeout: Duration::from_secs(15),
idle_timeout: Some(Duration::from_secs(600)), // 10 minutes
max_retries: 5,
database_path,
Expand All @@ -39,8 +39,8 @@
}

pub struct DatabasePool {
pool: DbPool,
max_retries: usize,
pool: Mutex<DbPool>,
config: PoolConfig,
}

impl DatabasePool {
Expand All @@ -53,6 +53,7 @@
let pool = Pool::builder()
.max_size(1) // Single connection for in-memory testing
.connection_timeout(Duration::from_secs(30))
.connection_customizer(Box::new(SqliteCustomizer))
.build(manager)
.map_err(|e| anyhow::anyhow!("Failed to create in-memory connection pool: {e}"))?;

Expand All @@ -65,19 +66,70 @@
.run_pending_migrations(MIGRATIONS)
.map_err(|e| anyhow::anyhow!("Failed to run database migrations: {e}"))?;

Ok(Self { pool, max_retries: 5 })
let config = PoolConfig::new(PathBuf::from(":memory:"));
Ok(Self { pool: Mutex::new(pool), config })
}

/// Gets a connection from the pool with retry and self-healing.
///
/// If all retries fail, the pool is recreated from scratch as a last resort
/// before attempting one final connection checkout.
pub fn get_connection(&self) -> Result<PooledSqliteConnection> {
Self::retry_with_backoff(
self.max_retries,
let max_retries = self.config.max_retries;
let pool = self.pool.lock().expect("DatabasePool mutex poisoned");

let result = Self::retry_with_backoff(
max_retries,
"Failed to get connection from pool, retrying",
|| {
self.pool
.get()
pool.get()
.map_err(|e| anyhow::anyhow!("Failed to get connection from pool: {e}"))
},
)
);

match result {
Ok(conn) => Ok(conn),
Err(original_error) => {
warn!(
error = %original_error,
"All retries exhausted, attempting pool recreation as last resort"
);
drop(pool);
self.recreate_pool()?;
let pool = self.pool.lock().expect("DatabasePool mutex poisoned");
pool.get().map_err(|e| {
anyhow::anyhow!("Failed to get connection after pool recreation: {e}")
})
}
}
}

/// Recreates the connection pool from scratch using the stored
/// configuration.
///
/// This is used as a last-resort recovery mechanism when all retry attempts
/// have failed, typically due to stale or corrupted connections after long
/// idle periods.
fn recreate_pool(&self) -> Result<()> {
debug!(
database_path = %self.config.database_path.display(),
"Recreating database pool from scratch"
);

let new_database_pool = Self::retry_with_backoff(
self.config.max_retries,
"Failed to recreate database pool, retrying",
|| Self::build_pool(&self.config),
)?;

let new_pool = new_database_pool
.pool
.into_inner()
.expect("DatabasePool mutex should not be poisoned during recreation");

let mut guard = self.pool.lock().expect("DatabasePool mutex poisoned");
*guard = new_pool;
Ok(())
}

/// Retries a blocking database pool operation with exponential backoff.
Expand Down Expand Up @@ -111,19 +163,14 @@

impl CustomizeConnection<SqliteConnection, diesel::r2d2::Error> for SqliteCustomizer {
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
diesel::sql_query("PRAGMA busy_timeout = 30000;")
.execute(conn)
.map_err(diesel::r2d2::Error::QueryError)?;
diesel::sql_query("PRAGMA journal_mode = WAL;")
.execute(conn)
.map_err(diesel::r2d2::Error::QueryError)?;
diesel::sql_query("PRAGMA synchronous = NORMAL;")
.execute(conn)
.map_err(diesel::r2d2::Error::QueryError)?;
diesel::sql_query("PRAGMA wal_autocheckpoint = 1000;")
.execute(conn)
.map_err(diesel::r2d2::Error::QueryError)?;
Ok(())
use diesel::connection::SimpleConnection;
conn.batch_execute(
"PRAGMA busy_timeout = 30000;
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA wal_autocheckpoint = 1000;",
)
.map_err(diesel::r2d2::Error::QueryError)
}
}

Expand Down Expand Up @@ -184,6 +231,96 @@
})?;

debug!(database_path = %config.database_path.display(), "created connection pool");
Ok(Self { pool, max_retries: config.max_retries })
Ok(Self { pool: Mutex::new(pool), config: config.clone() })
}
}

#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;

use super::*;

fn pool_with_short_idle_timeout() -> anyhow::Result<DatabasePool> {
let dir = tempfile::tempdir()?;
let db_path = dir.path().join("test.sqlite");
let config = PoolConfig {
max_size: 5,
min_idle: None,
connection_timeout: Duration::from_secs(15),
idle_timeout: Some(Duration::from_millis(100)),
max_retries: 3,
database_path: db_path,
};
DatabasePool::try_from(config)
}

#[test]
fn test_idle_eviction_recovery() -> anyhow::Result<()> {
let pool = pool_with_short_idle_timeout()?;

// Get a connection to warm up the pool
{
let mut conn = pool.get_connection()?;
diesel::sql_query("SELECT 1").execute(&mut *conn)?;

Check failure on line 265 in crates/forge_repo/src/database/pool.rs

View workflow job for this annotation

GitHub Actions / Lint Fix

no method named `execute` found for struct `diesel::query_builder::SqlQuery<Inner>` in the current scope

Check failure on line 265 in crates/forge_repo/src/database/pool.rs

View workflow job for this annotation

GitHub Actions / Build and Test

no method named `execute` found for struct `SqlQuery<Inner>` in the current scope
}

// Wait for idle timeout to evict connections
std::thread::sleep(Duration::from_millis(300));

// After eviction, get_connection should still succeed by creating a fresh
// connection
let mut conn = pool.get_connection()?;
let actual = diesel::sql_query("SELECT 1 AS result")
.execute(&mut *conn)

Check failure on line 275 in crates/forge_repo/src/database/pool.rs

View workflow job for this annotation

GitHub Actions / Lint Fix

no method named `execute` found for struct `diesel::query_builder::SqlQuery<Inner>` in the current scope

Check failure on line 275 in crates/forge_repo/src/database/pool.rs

View workflow job for this annotation

GitHub Actions / Build and Test

no method named `execute` found for struct `SqlQuery<Inner>` in the current scope
.unwrap();
let expected = 1;
assert_eq!(actual, expected);
Ok(())
}

#[test]
fn test_pool_config_defaults() {
let config = PoolConfig::new(PathBuf::from("/tmp/test.sqlite"));

assert_eq!(config.max_size, 5);
assert_eq!(config.min_idle, None);
assert_eq!(config.connection_timeout, Duration::from_secs(15));
assert_eq!(config.idle_timeout, Some(Duration::from_secs(600)));
assert_eq!(config.max_retries, 5);
}

#[test]
fn test_pool_recreation_after_simulated_failure() -> anyhow::Result<()> {
let dir = tempfile::tempdir()?;
let db_path = dir.path().join("test_recreate.sqlite");
let config = PoolConfig {
max_size: 2,
min_idle: None,
connection_timeout: Duration::from_secs(15),
idle_timeout: Some(Duration::from_millis(100)),
max_retries: 3,
database_path: db_path.clone(),
};
let pool = DatabasePool::try_from(config)?;

// Use the pool normally
{
let mut conn = pool.get_connection()?;
diesel::sql_query("SELECT 1").execute(&mut *conn)?;

Check failure on line 310 in crates/forge_repo/src/database/pool.rs

View workflow job for this annotation

GitHub Actions / Lint Fix

no method named `execute` found for struct `diesel::query_builder::SqlQuery<Inner>` in the current scope

Check failure on line 310 in crates/forge_repo/src/database/pool.rs

View workflow job for this annotation

GitHub Actions / Build and Test

no method named `execute` found for struct `SqlQuery<Inner>` in the current scope
}

// Wait for idle eviction
std::thread::sleep(Duration::from_millis(300));

// Recreate the pool manually to verify it works
pool.recreate_pool()?;

// Verify the recreated pool works by running a query
let mut conn = pool.get_connection()?;
let result: Result<i32, _> =
diesel::select(diesel::dsl::sql::<diesel::sql_types::Integer>("1")).first(&mut *conn);

Check failure on line 322 in crates/forge_repo/src/database/pool.rs

View workflow job for this annotation

GitHub Actions / Lint Fix

no method named `first` found for struct `SelectStatement<From, Select, Distinct, ..., ..., ..., ..., ..., ...>` in the current scope

Check failure on line 322 in crates/forge_repo/src/database/pool.rs

View workflow job for this annotation

GitHub Actions / Build and Test

no method named `first` found for struct `SelectStatement<From, Select, Distinct, ..., ..., ..., ..., ..., ...>` in the current scope
assert!(result.is_ok(), "Pool should be usable after recreation");
Ok(())
}
}
Loading