From 690a1507272a84556f6d4e53c04b35166983450c Mon Sep 17 00:00:00 2001 From: Elias Projahn Date: Sun, 28 Mar 2021 17:28:49 +0200 Subject: [PATCH] import: Use watch channel for state updates --- import/src/disc.rs | 10 ++++- import/src/folder.rs | 8 +++- import/src/session.rs | 57 +++++++++++++++++++++++------ musicus/src/import/medium_editor.rs | 13 ------- 4 files changed, 61 insertions(+), 27 deletions(-) diff --git a/import/src/disc.rs b/import/src/disc.rs index d433990..2d2c27b 100644 --- a/import/src/disc.rs +++ b/import/src/disc.rs @@ -1,14 +1,18 @@ use crate::error::{Error, Result}; -use crate::session::{ImportSession, ImportTrack}; +use crate::session::{ImportSession, ImportTrack, State}; use gstreamer::prelude::*; use gstreamer::{ClockTime, ElementFactory, MessageType, MessageView, TocEntryType}; use gstreamer::tags::{Duration, TrackNumber}; +use log::info; use sha2::{Sha256, Digest}; use std::path::PathBuf; -use log::info; +use std::sync::Mutex; +use tokio::sync::watch; /// Create a new import session for the default disc drive. pub(super) fn new() -> Result { + let (state_sender, state_receiver) = watch::channel(State::Waiting); + let mut tracks = Vec::new(); let mut hasher = Sha256::new(); @@ -145,6 +149,8 @@ pub(super) fn new() -> Result { source_id, tracks, copy: Some(Box::new(copy)), + state_sender, + state_receiver: Mutex::new(state_receiver), }; Ok(session) diff --git a/import/src/folder.rs b/import/src/folder.rs index f6497ae..74af3d1 100644 --- a/import/src/folder.rs +++ b/import/src/folder.rs @@ -1,13 +1,17 @@ use crate::error::{Error, Result}; -use crate::session::{ImportSession, ImportTrack}; +use crate::session::{ImportSession, ImportTrack, State}; use gstreamer::ClockTime; use gstreamer_pbutils::Discoverer; use log::{warn, info}; use sha2::{Sha256, Digest}; use std::path::PathBuf; +use std::sync::Mutex; +use tokio::sync::watch; /// Create a new import session for the specified folder. pub(super) fn new(path: PathBuf) -> Result { + let (state_sender, state_receiver) = watch::channel(State::Ready); + let mut tracks = Vec::new(); let mut number: u32 = 1; let mut hasher = Sha256::new(); @@ -58,6 +62,8 @@ pub(super) fn new(path: PathBuf) -> Result { source_id, tracks, copy: None, + state_sender, + state_receiver: Mutex::new(state_receiver), }; Ok(session) diff --git a/import/src/session.rs b/import/src/session.rs index 27c2893..d556a49 100644 --- a/import/src/session.rs +++ b/import/src/session.rs @@ -2,8 +2,24 @@ use crate::{disc, folder}; use crate::error::Result; use std::path::PathBuf; use std::thread; -use std::sync::Arc; -use tokio::sync::oneshot; +use std::sync::{Arc, Mutex}; +use tokio::sync::{oneshot, watch}; + +/// The current state of the import process. +#[derive(Clone, Debug)] +pub enum State { + /// The import process has not been started yet. + Waiting, + + /// The audio is copied from the source. + Copying, + + /// The audio files are ready to be imported into the music library. + Ready, + + /// An error has happened. + Error, +} /// Interface for importing audio tracks from a medium or folder. pub struct ImportSession { @@ -15,10 +31,16 @@ pub struct ImportSession { /// A closure that has to be called to copy the tracks if set. pub(super) copy: Option Result<()> + Send + Sync>>, + + /// Sender through which listeners are notified of state changes. + pub(super) state_sender: watch::Sender, + + /// Receiver for state changes. + pub(super) state_receiver: Mutex>, } impl ImportSession { - /// Create a new import session for a audio CD. + /// Create a new import session for an audio CD. pub async fn audio_cd() -> Result> { let (sender, receiver) = oneshot::channel(); @@ -52,20 +74,33 @@ impl ImportSession { &self.tracks } - /// Copy the tracks to their advertised locations, if neccessary. - pub async fn copy(self: &Arc) -> Result<()> { + /// Retrieve the current state of the import process. + pub fn state(&self) -> State { + self.state_receiver.lock().unwrap().borrow().clone() + } + + /// Wait for the next state change and get the new state. + pub async fn state_change(&self) -> State { + match self.state_receiver.lock().unwrap().changed().await { + Ok(()) => self.state(), + Err(_) => State::Error, + } + } + + /// Copy the tracks to their advertised locations in the background, if neccessary. The state + /// will be updated as the import is done. + pub fn copy(self: &Arc) { if self.copy.is_some() { let clone = Arc::clone(self); - let (sender, receiver) = oneshot::channel(); thread::spawn(move || { let copy = clone.copy.as_ref().unwrap(); - sender.send(copy()).unwrap(); - }); - receiver.await? - } else { - Ok(()) + match copy() { + Ok(()) => clone.state_sender.send(State::Ready).unwrap(), + Err(_) => clone.state_sender.send(State::Error).unwrap(), + } + }); } } } diff --git a/musicus/src/import/medium_editor.rs b/musicus/src/import/medium_editor.rs index ab43ac7..ecb9bf9 100644 --- a/musicus/src/import/medium_editor.rs +++ b/musicus/src/import/medium_editor.rs @@ -135,19 +135,6 @@ impl Screen, ()> for MediumEditor { this.handle.pop(None); })); - spawn!(@clone this, async move { - match this.session.copy().await { - Err(err) => { - this.disc_status_page.set_description(Some(&err.to_string())); - this.widget.set_visible_child_name("disc_error"); - }, - Ok(_) => { - this.done_stack.set_visible_child(&this.done); - this.done_button.set_sensitive(true); - } - } - }); - this } }