backend: Use tokio broadcast channels

This commit is contained in:
Elias Projahn 2021-04-25 12:26:43 +02:00
parent 3e2f73fc56
commit cb3e958f1b
5 changed files with 42 additions and 38 deletions

View file

@ -5,8 +5,6 @@ edition = "2018"
[dependencies]
fragile = "1.0.0"
futures = "0.3.6"
futures-channel = "0.3.5"
gio = "0.9.1"
glib = "0.10.3"
gstreamer = "0.16.4"
@ -16,6 +14,7 @@ musicus_client = { version = "0.1.0", path = "../client" }
musicus_database = { version = "0.1.0", path = "../database" }
musicus_import = { version = "0.1.0", path = "../import" }
thiserror = "1.0.23"
tokio = { version = "1.4.0", features = ["sync"] }
[target.'cfg(target_os = "linux")'.dependencies]
mpris-player = "0.6.0"

View file

@ -11,12 +11,12 @@ pub enum Error {
#[error("An error happened using the SecretService.")]
SecretServiceError(#[from] secret_service::Error),
#[error("A channel was canceled.")]
ChannelError(#[from] futures_channel::oneshot::Canceled),
#[error("An error happened while decoding to UTF-8.")]
Utf8Error(#[from] std::str::Utf8Error),
#[error("Failed to receive an event.")]
RecvError(#[from] tokio::sync::broadcast::error::RecvError),
#[error("An error happened: {0}")]
Other(String),
}

View file

@ -1,5 +1,3 @@
use futures::prelude::*;
use futures_channel::mpsc;
use gio::prelude::*;
use log::warn;
use musicus_client::{Client, LoginData};
@ -7,6 +5,10 @@ use musicus_database::DbThread;
use std::cell::RefCell;
use std::path::PathBuf;
use std::rc::Rc;
use tokio::sync::{
broadcast,
broadcast::Sender,
};
pub use musicus_client as client;
pub use musicus_database as db;
@ -25,6 +27,7 @@ pub use player::*;
mod secure;
/// General states the application can be in.
#[derive(Debug, Clone)]
pub enum BackendState {
/// The backend is not set up yet. This means that no backend methods except for setting the
/// music library path should be called. The user interface should adapt and only present this
@ -41,12 +44,8 @@ pub enum BackendState {
/// A collection of all backend state and functionality.
pub struct Backend {
/// A future resolving to the next state of the backend. Initially, this should be assumed to
/// be BackendState::Loading. Changes should be awaited before calling init().
state_stream: RefCell<mpsc::Receiver<BackendState>>,
/// The internal sender to publish the state via state_stream.
state_sender: RefCell<mpsc::Sender<BackendState>>,
state_sender: Sender<BackendState>,
/// Access to GSettings.
settings: gio::Settings,
@ -55,11 +54,8 @@ pub struct Backend {
/// is guaranteed to be Some, when the state is set to BackendState::Ready.
music_library_path: RefCell<Option<PathBuf>>,
/// The receiver to which library update notifications are sent.
library_updated_receiver: RefCell<mpsc::Receiver<()>>,
/// The sender for sending library update notifications.
library_updated_sender: RefCell<mpsc::Sender<()>>,
library_updated_sender: Sender<()>,
/// The database. This can be assumed to exist, when the state is set to BackendState::Ready.
database: RefCell<Option<Rc<DbThread>>>,
@ -76,16 +72,14 @@ impl Backend {
/// Create a new backend initerface. The user interface should subscribe to the state stream
/// and call init() afterwards.
pub fn new() -> Self {
let (state_sender, state_stream) = mpsc::channel(1024);
let (library_updated_sender, library_updated_receiver) = mpsc::channel(1024);
let (state_sender, _) = broadcast::channel(1024);
let (library_updated_sender, _) = broadcast::channel(1024);
Backend {
state_stream: RefCell::new(state_stream),
state_sender: RefCell::new(state_sender),
state_sender,
settings: gio::Settings::new("de.johrpan.musicus"),
music_library_path: RefCell::new(None),
library_updated_sender: RefCell::new(library_updated_sender),
library_updated_receiver: RefCell::new(library_updated_receiver),
library_updated_sender,
database: RefCell::new(None),
player: RefCell::new(None),
client: Client::new(),
@ -94,8 +88,8 @@ impl Backend {
/// Wait for the next state change. Initially, the state should be assumed to be
/// BackendState::Loading. Changes should be awaited before calling init().
pub async fn next_state(&self) -> Option<BackendState> {
self.state_stream.borrow_mut().next().await
pub async fn next_state(&self) -> Result<BackendState> {
Ok(self.state_sender.subscribe().recv().await?)
}
/// Initialize the backend updating the state accordingly.
@ -111,8 +105,11 @@ impl Backend {
#[cfg(all(feature = "dbus"))]
match Self::load_login_data().await {
Ok(Some(data)) => self.client.set_login_data(Some(data)),
Err(err) => warn!("The login data could not be loaded from SecretService. It will not \
be available. Error message: {}", err),
Err(err) => warn!(
"The login data could not be loaded from SecretService. It will not \
be available. Error message: {}",
err
),
_ => (),
}
@ -128,8 +125,11 @@ impl Backend {
/// Set the URL of the Musicus server to connect to.
pub fn set_server_url(&self, url: &str) {
if let Err(err) = self.settings.set_string("server-url", url) {
warn!("An error happened while trying to save the server URL to GSettings. Most \
likely it will not be available at the next startup. Error message: {}", err);
warn!(
"An error happened while trying to save the server URL to GSettings. Most \
likely it will not be available at the next startup. Error message: {}",
err
);
}
self.client.set_server_url(url);
@ -145,15 +145,21 @@ impl Backend {
#[cfg(all(feature = "dbus"))]
if let Some(data) = &data {
if let Err(err) = Self::store_login_data(data.clone()).await {
warn!("An error happened while trying to store the login data using SecretService. \
warn!(
"An error happened while trying to store the login data using SecretService. \
This means, that they will not be available at the next startup most likely. \
Error message: {}", err);
Error message: {}",
err
);
}
} else {
if let Err(err) = Self::delete_secrets().await {
warn!("An error happened while trying to delete the login data from SecretService. \
warn!(
"An error happened while trying to delete the login data from SecretService. \
This may result in the login data being reloaded at the next startup. Error \
message: {}", err);
message: {}",
err
);
}
}
@ -171,6 +177,6 @@ impl Backend {
/// Set the current state and notify the user interface.
fn set_state(&self, state: BackendState) {
self.state_sender.borrow_mut().try_send(state).unwrap();
self.state_sender.send(state).unwrap();
}
}

View file

@ -1,5 +1,4 @@
use crate::{Backend, BackendState, Player, Result};
use futures::prelude::*;
use gio::prelude::*;
use log::warn;
use musicus_database::DbThread;
@ -74,13 +73,13 @@ impl Backend {
}
/// Wait for the next library update.
pub async fn library_update(&self) {
self.library_updated_receiver.borrow_mut().next().await;
pub async fn library_update(&self) -> Result<()> {
Ok(self.library_updated_sender.subscribe().recv().await?)
}
/// Notify the frontend that the library was changed.
pub fn library_changed(&self) {
self.library_updated_sender.borrow_mut().try_send(()).unwrap();
self.library_updated_sender.send(()).unwrap();
}
/// Get an interface to the player and panic if there is none.

View file

@ -52,7 +52,7 @@ impl Window {
});
spawn!(@clone this, async move {
while let Some(state) = this.backend.next_state().await {
while let Ok(state) = this.backend.next_state().await {
match state {
BackendState::Loading => this.navigator.reset(),
BackendState::NoMusicLibrary => this.show_welcome_screen(),