Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/matrix-sdk-base/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ All notable changes to this project will be documented in this file.

### Features

- [**breaking**] Enforce atomic and synchronized updates to `RoomInfo`. Requires
`StateStore::save_changes` to acquire state store lock and replaces `Room::set_room_info`
with an atomic version, `Room::update_room_info`, which is also synchronized by
the state store lock.
([#6478](https://github.com/matrix-org/matrix-rust-sdk/pull/6478))
- Add `RoomMember::is_service_member` that automatically checks the room info and retrieves this info.
([#6536](https://github.com/matrix-org/matrix-rust-sdk/pull/6536))
- [**breaking**] Add `DmRoomDefinition` enum, allowing clients to specify what a DM
Expand Down
96 changes: 43 additions & 53 deletions crates/matrix-sdk-base/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use tracing::{Level, debug, enabled, info, instrument, warn};
#[cfg(feature = "e2e-encryption")]
use crate::RoomMemberships;
use crate::{
RoomStateFilter, SessionMeta,
RoomStateFilter, SessionMeta, StateStore,
deserialized_responses::DisplayName,
error::{Error, Result},
event_cache::store::{EventCacheStoreLock, EventCacheStoreLockState},
Expand Down Expand Up @@ -405,24 +405,22 @@ impl BaseClient {
let room = self.state_store.get_or_create_room(room_id, RoomState::Knocked);

if room.state() != RoomState::Knocked {
let _state_store_lock = self.state_store_lock().lock().await;

let mut room_info = room.clone_info();
room_info.mark_as_knocked();
room_info.mark_state_partially_synced();
room_info.mark_members_missing(); // the own member event changed
let store_guard = self.state_store.lock().lock().await;

// We are no longer joined to the room, so the invite acceptance details are no
// longer relevant.
#[cfg(feature = "e2e-encryption")]
if let Some(olm_machine) = self.olm_machine().await.as_ref() {
olm_machine.store().clear_room_pending_key_bundle(room_info.room_id()).await?
olm_machine.store().clear_room_pending_key_bundle(room_id).await?
}

let mut changes = StateChanges::default();
changes.add_room(room_info.clone());
self.state_store.save_changes(&changes).await?; // Update the store
room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
room.update_and_save_room_info_with_store_guard(&store_guard, |mut info| {
info.mark_as_knocked();
info.mark_state_partially_synced();
info.mark_members_missing(); // the own member event changed
(info, RoomInfoNotableUpdateReasons::MEMBERSHIP)
})
.await?;
}

Ok(room)
Expand Down Expand Up @@ -481,13 +479,7 @@ impl BaseClient {
// If the state isn't `RoomState::Joined` then this means that we knew about
// this room before. Let's modify the existing state now.
if room.state() != RoomState::Joined {
let _state_store_lock = self.state_store_lock().lock().await;

let mut room_info = room.clone_info();

room_info.mark_as_joined();
room_info.mark_state_partially_synced();
room_info.mark_members_missing(); // the own member event changed
let store_guard = self.state_store_lock().lock().await;

#[cfg(feature = "e2e-encryption")]
{
Expand Down Expand Up @@ -515,12 +507,13 @@ impl BaseClient {
let _ = inviter;
}

let mut changes = StateChanges::default();
changes.add_room(room_info.clone());

self.state_store.save_changes(&changes).await?; // Update the store

room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
room.update_and_save_room_info_with_store_guard(&store_guard, |mut info| {
info.mark_as_joined();
info.mark_state_partially_synced();
info.mark_members_missing(); // the own member event changed
(info, RoomInfoNotableUpdateReasons::MEMBERSHIP)
})
.await?;
}

Ok(room)
Expand All @@ -533,24 +526,22 @@ impl BaseClient {
let room = self.state_store.get_or_create_room(room_id, RoomState::Left);

if room.state() != RoomState::Left {
let _state_store_lock = self.state_store_lock().lock().await;

let mut room_info = room.clone_info();
room_info.mark_as_left();
room_info.mark_state_partially_synced();
room_info.mark_members_missing(); // the own member event changed
let store_guard = self.state_store.lock().lock().await;

// We are no longer joined to the room, so the invite acceptance details are no
// longer relevant.
#[cfg(feature = "e2e-encryption")]
if let Some(olm_machine) = self.olm_machine().await.as_ref() {
olm_machine.store().clear_room_pending_key_bundle(room_info.room_id()).await?
olm_machine.store().clear_room_pending_key_bundle(room_id).await?
}

let mut changes = StateChanges::default();
changes.add_room(room_info.clone());
self.state_store.save_changes(&changes).await?; // Update the store
room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
room.update_and_save_room_info_with_store_guard(&store_guard, |mut info| {
info.mark_as_left();
info.mark_state_partially_synced();
info.mark_members_missing(); // the own member event changed
(info, RoomInfoNotableUpdateReasons::MEMBERSHIP)
})
.await?;
}

Ok(())
Expand Down Expand Up @@ -769,17 +760,14 @@ impl BaseClient {

context.state_changes.ambiguity_maps = ambiguity_cache.cache;

{
let _state_store_lock = self.state_store_lock().lock().await;

processors::changes::save_and_apply(
context,
&self.state_store,
&self.ignore_user_list_changes,
Some(response.next_batch.clone()),
)
.await?;
}
processors::changes::save_and_apply(
context,
&self.state_store,
&self.state_store_lock().lock().await,
&self.ignore_user_list_changes,
Some(response.next_batch.clone()),
)
.await?;

let mut context = Context::default();

Expand All @@ -793,11 +781,12 @@ impl BaseClient {
.await;

// Save the new display name updates if any.
{
let _state_store_lock = self.state_store_lock().lock().await;

processors::changes::save_only(context, &self.state_store).await?;
}
processors::changes::save_only(
context,
&self.state_store,
&self.state_store_lock().lock().await,
)
.await?;

for (room_id, member_ids) in updated_members_in_room {
if let Some(room) = self.get_room(&room_id) {
Expand Down Expand Up @@ -919,7 +908,7 @@ impl BaseClient {
context.state_changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);

{
let _state_store_lock = self.state_store_lock().lock().await;
let state_store_guard = self.state_store_lock().lock().await;

let mut room_info = room.clone_info();
room_info.mark_members_synced();
Expand All @@ -928,6 +917,7 @@ impl BaseClient {
processors::changes::save_and_apply(
context,
&self.state_store,
&state_store_guard,
&self.ignore_user_list_changes,
None,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use ruma::{
use tracing::{debug, instrument, trace, warn};

use super::super::Context;
use crate::{RoomInfo, StateChanges, store::BaseStateStore};
use crate::{RoomInfo, StateChanges, StateStore, store::BaseStateStore};

/// Create the [`Global`] account data processor.
pub fn global(events: &[Raw<AnyGlobalAccountDataEvent>]) -> Global {
Expand Down
34 changes: 24 additions & 10 deletions crates/matrix-sdk-base/src/response_processors/changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,27 @@ use ruma::{
events::{GlobalAccountDataEventType, ignored_user_list::IgnoredUserListEvent},
serde::Raw,
};
use tokio::sync::MutexGuard;
use tracing::{error, instrument, trace};

use super::Context;
use crate::{
Result,
Result, StoreError,
store::{BaseStateStore, StateStoreExt as _},
};

/// Save the [`StateChanges`] from the [`Context`] inside the [`BaseStateStore`]
/// only! The changes aren't applied on the in-memory rooms.
#[instrument(skip_all)]
pub async fn save_only(context: Context, state_store: &BaseStateStore) -> Result<()> {
pub async fn save_only(
context: Context,
state_store: &BaseStateStore,
state_store_guard: &MutexGuard<'_, ()>,
) -> Result<()> {
let _timer = timer!(tracing::Level::TRACE, "_method");

save_changes(&context, state_store, None).await?;
broadcast_room_info_notable_updates(&context, state_store);
save_changes(&context, state_store, state_store_guard, None).await?;
broadcast_room_info_notable_updates(&context, state_store, state_store_guard)?;

Ok(())
}
Expand All @@ -44,6 +49,7 @@ pub async fn save_only(context: Context, state_store: &BaseStateStore) -> Result
pub async fn save_and_apply(
context: Context,
state_store: &BaseStateStore,
state_store_guard: &MutexGuard<'_, ()>,
ignore_user_list_changes: &SharedObservable<Vec<String>>,
sync_token: Option<String>,
) -> Result<()> {
Expand All @@ -54,9 +60,9 @@ pub async fn save_and_apply(
let previous_ignored_user_list =
state_store.get_account_data_event_static().await.ok().flatten();

save_changes(&context, state_store, sync_token).await?;
save_changes(&context, state_store, state_store_guard, sync_token).await?;
apply_changes(&context, ignore_user_list_changes, previous_ignored_user_list);
broadcast_room_info_notable_updates(&context, state_store);
broadcast_room_info_notable_updates(&context, state_store, state_store_guard)?;

trace!("applied changes");

Expand All @@ -66,9 +72,10 @@ pub async fn save_and_apply(
async fn save_changes(
context: &Context,
state_store: &BaseStateStore,
state_store_guard: &MutexGuard<'_, ()>,
sync_token: Option<String>,
) -> Result<()> {
state_store.save_changes(&context.state_changes).await?;
state_store.save_changes_with_guard(state_store_guard, &context.state_changes).await?;

if let Some(sync_token) = sync_token {
*state_store.sync_token.write().await = Some(sync_token);
Expand Down Expand Up @@ -118,13 +125,20 @@ fn apply_changes(
}
}

fn broadcast_room_info_notable_updates(context: &Context, state_store: &BaseStateStore) {
fn broadcast_room_info_notable_updates(
context: &Context,
state_store: &BaseStateStore,
state_store_guard: &MutexGuard<'_, ()>,
) -> Result<()> {
for (room_id, room_info) in &context.state_changes.room_infos {
if let Some(room) = state_store.room(room_id) {
let room_info_notable_update_reasons =
context.room_info_notable_updates.get(room_id).copied().unwrap_or_default();

room.set_room_info(room_info.clone(), room_info_notable_update_reasons)
room.update_room_info_with_store_guard(state_store_guard, |_| {
(room_info.clone(), room_info_notable_update_reasons)
})
.map_err(StoreError::from)?;
}
}
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use matrix_sdk_common::timer;
use matrix_sdk_crypto::OlmMachine;
use ruma::{OwnedUserId, RoomId};

use crate::{EncryptionState, Result, RoomMemberships, store::BaseStateStore};
use crate::{EncryptionState, Result, RoomMemberships, StateStore, store::BaseStateStore};

/// Update tracked users, if the room is encrypted.
pub async fn update(
Expand Down
10 changes: 8 additions & 2 deletions crates/matrix-sdk-base/src/room/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,21 @@ mod tests {
super::{Room, RoomState},
CallIntentConsensus,
};
use crate::{store::MemoryStore, utils::RawStateEventWithKeys};
use crate::{
store::{MemoryStore, SaveLockedStateStore},
utils::RawStateEventWithKeys,
};

fn make_room_test_helper(room_type: RoomState) -> (Arc<MemoryStore>, Room) {
let store = Arc::new(MemoryStore::new());
let user_id = user_id!("@me:example.org");
let room_id = room_id!("!test:localhost");
let (sender, _receiver) = tokio::sync::broadcast::channel(1);

(store.clone(), Room::new(user_id, store, room_id, room_type, sender))
(
store.clone(),
Room::new(user_id, SaveLockedStateStore::new(store), room_id, room_type, sender),
)
}

fn timestamp(minutes_ago: u32) -> MilliSecondsSinceUnixEpoch {
Expand Down
10 changes: 7 additions & 3 deletions crates/matrix-sdk-base/src/room/display_name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use tracing::{debug, trace, warn};

use super::{Room, RoomMemberships};
use crate::{
RoomMember, RoomState,
RoomMember, RoomState, StateStore,
deserialized_responses::SyncOrStrippedState,
store::{Result as StoreResult, StateStoreExt},
};
Expand Down Expand Up @@ -559,7 +559,8 @@ mod tests {

use super::{Room, RoomDisplayName, compute_display_name_from_heroes};
use crate::{
MinimalStateEvent, RoomHero, RoomState, StateChanges, StateStore, store::MemoryStore,
MinimalStateEvent, RoomHero, RoomState, StateChanges, StateStore,
store::{MemoryStore, SaveLockedStateStore},
};

fn make_room_test_helper(room_type: RoomState) -> (Arc<MemoryStore>, Room) {
Expand All @@ -568,7 +569,10 @@ mod tests {
let room_id = room_id!("!test:localhost");
let (sender, _receiver) = tokio::sync::broadcast::channel(1);

(store.clone(), Room::new(user_id, store, room_id, room_type, sender))
(
store.clone(),
Room::new(user_id, SaveLockedStateStore::new(store), room_id, room_type, sender),
)
}

fn make_stripped_member_event(user_id: &UserId, name: &str) -> Raw<StrippedRoomMemberEvent> {
Expand Down
11 changes: 9 additions & 2 deletions crates/matrix-sdk-base/src/room/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,22 @@ mod tests {
};

use super::{EncryptionState, Room};
use crate::{RoomState, store::MemoryStore, utils::RawStateEventWithKeys};
use crate::{
RoomState,
store::{MemoryStore, SaveLockedStateStore},
utils::RawStateEventWithKeys,
};

fn make_room_test_helper(room_type: RoomState) -> (Arc<MemoryStore>, Room) {
let store = Arc::new(MemoryStore::new());
let user_id = user_id!("@me:example.org");
let room_id = room_id!("!test:localhost");
let (sender, _receiver) = tokio::sync::broadcast::channel(1);

(store.clone(), Room::new(user_id, store, room_id, room_type, sender))
(
store.clone(),
Room::new(user_id, SaveLockedStateStore::new(store), room_id, room_type, sender),
)
}

fn timestamp(minutes_ago: u32) -> MilliSecondsSinceUnixEpoch {
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-base/src/room/knock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tracing::warn;

use super::Room;
use crate::{
StateStoreDataKey, StateStoreDataValue, StoreError,
StateStore, StateStoreDataKey, StateStoreDataValue, StoreError,
deserialized_responses::{MemberEvent, RawMemberEvent, SyncOrStrippedState},
store::{Result as StoreResult, StateStoreExt},
};
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-base/src/room/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use tracing::debug;

use super::Room;
use crate::{
MinimalRoomMemberEvent, StoreError,
MinimalRoomMemberEvent, StateStore, StoreError,
deserialized_responses::{DisplayName, MemberEvent},
store::{Result as StoreResult, StateStoreExt, ambiguity_map::is_display_name_ambiguous},
};
Expand Down
Loading
Loading