Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/matrix-sdk-base/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ All notable changes to this project will be documented in this file.

### Features

- Ensure any `SaveLockedStateStore` functions which may interfere with its implementation
of `StateStore::save_changes` are synchronized using the underlying lock.
[#6547](https://github.com/matrix-org/matrix-rust-sdk/pull/6547)
- [**breaking**] Add `RoomSummary::active_service_members` field to act as a cached value that will be computed
when we sync members. Rename `Room::is_dm` to `Room::compute_is_dm` since it will now also store the computed
active service members count in the new cached field. `Room::active_service_members` is now
Expand Down
66 changes: 65 additions & 1 deletion crates/matrix-sdk-base/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,22 @@ impl<T: StateStore> SaveLockedStateStore<T> {
self.store.save_changes(changes).await.map_err(Into::into)
}
}

/// Provides a means of calling [`StateStore::remove_room`] when the caller
/// has already acquired the underlying [`Mutex`]. Returns an error if
/// the [`MutexGuard`] provided does not reference the underlying
/// [`Mutex`].
pub async fn remove_room_with_guard(
&self,
guard: &MutexGuard<'_, ()>,
room_id: &RoomId,
) -> Result<(), StoreError> {
if !std::ptr::eq(MutexGuard::mutex(guard), self.lock()) {
Err(IncorrectMutexGuardError.into())
} else {
self.store.remove_room(room_id).await.map_err(Into::into)
}
}
}

#[cfg_attr(target_family = "wasm", async_trait(?Send))]
Expand Down Expand Up @@ -1674,6 +1690,7 @@ impl<T: StateStore> StateStore for SaveLockedStateStore<T> {
}

async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
let _guard = self.lock.lock().await;
self.store.remove_room(room_id).await
}

Expand Down Expand Up @@ -2431,6 +2448,7 @@ mod tests {
use gloo_timers::future::sleep;
use matrix_sdk_common::executor::spawn;
use matrix_sdk_test::async_test;
use ruma::room_id;
use tokio::sync::Mutex;
#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
use tokio::time::sleep;
Expand All @@ -2447,7 +2465,7 @@ mod tests {
statestore_integration_tests!();

#[async_test]
async fn test_state_store_only_accepts_guard_for_underlying_mutex() {
async fn test_save_changes_only_accepts_guard_for_underlying_mutex() {
let state_store = SaveLockedStateStore::new(MemoryStore::new());
let state_changes = StateChanges::default();
state_store
Expand All @@ -2462,6 +2480,22 @@ mod tests {
.expect_err("state store does not accept guard for unknown mutex");
}

#[async_test]
async fn test_remove_room_only_accepts_guard_for_underlying_mutex() {
let state_store = SaveLockedStateStore::new(MemoryStore::new());
let room_id = room_id!("!room");
state_store
.remove_room_with_guard(&state_store.lock().lock().await, room_id)
.await
.expect("state store accepts guard for underlying mutex");

let mutex = Mutex::new(());
state_store
.remove_room_with_guard(&mutex.lock().await, room_id)
.await
.expect_err("state store does not accept guard for unknown mutex");
}

#[derive(Debug)]
struct Elapsed;

Expand Down Expand Up @@ -2510,5 +2544,35 @@ mod tests {
.expect("task saves changes");
});
}

#[async_test]
async fn test_state_store_waits_to_acquire_lock_before_removing_room() {
let state_store = SaveLockedStateStore::new(MemoryStore::new().into_state_store());

// Acquire lock and hold it for 5 seconds
let lock_task = spawn({
let state_store = state_store.clone();
async move {
let lock = state_store.lock();
let _guard = lock.lock().await;
sleep(Duration::from_secs(5)).await;
}
});

// Try to remove room from the state store while the lock is held by another
// task
let remove_task =
spawn(async move { state_store.remove_room(room_id!("!room")).await });

// Ensure that the second task does not progress until the first task has
// completed and therefore release the save lock
assert_matches!(future::select(lock_task, remove_task).await, Either::Left((_, remove_task)) => {
timeout(Duration::from_millis(100), remove_task)
.await
.expect("task completes before timeout")
.expect("task completes successfully")
.expect("task saves changes");
});
}
}
}
Loading