import: Use watch channel for state updates

This commit is contained in:
Elias Projahn 2021-03-28 17:28:49 +02:00
parent e43486367b
commit 690a150727
4 changed files with 61 additions and 27 deletions

View file

@ -1,14 +1,18 @@
use crate::error::{Error, Result}; use crate::error::{Error, Result};
use crate::session::{ImportSession, ImportTrack}; use crate::session::{ImportSession, ImportTrack, State};
use gstreamer::prelude::*; use gstreamer::prelude::*;
use gstreamer::{ClockTime, ElementFactory, MessageType, MessageView, TocEntryType}; use gstreamer::{ClockTime, ElementFactory, MessageType, MessageView, TocEntryType};
use gstreamer::tags::{Duration, TrackNumber}; use gstreamer::tags::{Duration, TrackNumber};
use log::info;
use sha2::{Sha256, Digest}; use sha2::{Sha256, Digest};
use std::path::PathBuf; use std::path::PathBuf;
use log::info; use std::sync::Mutex;
use tokio::sync::watch;
/// Create a new import session for the default disc drive. /// Create a new import session for the default disc drive.
pub(super) fn new() -> Result<ImportSession> { pub(super) fn new() -> Result<ImportSession> {
let (state_sender, state_receiver) = watch::channel(State::Waiting);
let mut tracks = Vec::new(); let mut tracks = Vec::new();
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
@ -145,6 +149,8 @@ pub(super) fn new() -> Result<ImportSession> {
source_id, source_id,
tracks, tracks,
copy: Some(Box::new(copy)), copy: Some(Box::new(copy)),
state_sender,
state_receiver: Mutex::new(state_receiver),
}; };
Ok(session) Ok(session)

View file

@ -1,13 +1,17 @@
use crate::error::{Error, Result}; use crate::error::{Error, Result};
use crate::session::{ImportSession, ImportTrack}; use crate::session::{ImportSession, ImportTrack, State};
use gstreamer::ClockTime; use gstreamer::ClockTime;
use gstreamer_pbutils::Discoverer; use gstreamer_pbutils::Discoverer;
use log::{warn, info}; use log::{warn, info};
use sha2::{Sha256, Digest}; use sha2::{Sha256, Digest};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Mutex;
use tokio::sync::watch;
/// Create a new import session for the specified folder. /// Create a new import session for the specified folder.
pub(super) fn new(path: PathBuf) -> Result<ImportSession> { pub(super) fn new(path: PathBuf) -> Result<ImportSession> {
let (state_sender, state_receiver) = watch::channel(State::Ready);
let mut tracks = Vec::new(); let mut tracks = Vec::new();
let mut number: u32 = 1; let mut number: u32 = 1;
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
@ -58,6 +62,8 @@ pub(super) fn new(path: PathBuf) -> Result<ImportSession> {
source_id, source_id,
tracks, tracks,
copy: None, copy: None,
state_sender,
state_receiver: Mutex::new(state_receiver),
}; };
Ok(session) Ok(session)

View file

@ -2,8 +2,24 @@ use crate::{disc, folder};
use crate::error::Result; use crate::error::Result;
use std::path::PathBuf; use std::path::PathBuf;
use std::thread; use std::thread;
use std::sync::Arc; use std::sync::{Arc, Mutex};
use tokio::sync::oneshot; use tokio::sync::{oneshot, watch};
/// The current state of the import process.
#[derive(Clone, Debug)]
pub enum State {
/// The import process has not been started yet.
Waiting,
/// The audio is copied from the source.
Copying,
/// The audio files are ready to be imported into the music library.
Ready,
/// An error has happened.
Error,
}
/// Interface for importing audio tracks from a medium or folder. /// Interface for importing audio tracks from a medium or folder.
pub struct ImportSession { pub struct ImportSession {
@ -15,10 +31,16 @@ pub struct ImportSession {
/// A closure that has to be called to copy the tracks if set. /// A closure that has to be called to copy the tracks if set.
pub(super) copy: Option<Box<dyn Fn() -> Result<()> + Send + Sync>>, pub(super) copy: Option<Box<dyn Fn() -> Result<()> + Send + Sync>>,
/// Sender through which listeners are notified of state changes.
pub(super) state_sender: watch::Sender<State>,
/// Receiver for state changes.
pub(super) state_receiver: Mutex<watch::Receiver<State>>,
} }
impl ImportSession { impl ImportSession {
/// Create a new import session for a audio CD. /// Create a new import session for an audio CD.
pub async fn audio_cd() -> Result<Arc<Self>> { pub async fn audio_cd() -> Result<Arc<Self>> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
@ -52,20 +74,33 @@ impl ImportSession {
&self.tracks &self.tracks
} }
/// Copy the tracks to their advertised locations, if neccessary. /// Retrieve the current state of the import process.
pub async fn copy(self: &Arc<Self>) -> Result<()> { pub fn state(&self) -> State {
self.state_receiver.lock().unwrap().borrow().clone()
}
/// Wait for the next state change and get the new state.
pub async fn state_change(&self) -> State {
match self.state_receiver.lock().unwrap().changed().await {
Ok(()) => self.state(),
Err(_) => State::Error,
}
}
/// Copy the tracks to their advertised locations in the background, if neccessary. The state
/// will be updated as the import is done.
pub fn copy(self: &Arc<Self>) {
if self.copy.is_some() { if self.copy.is_some() {
let clone = Arc::clone(self); let clone = Arc::clone(self);
let (sender, receiver) = oneshot::channel();
thread::spawn(move || { thread::spawn(move || {
let copy = clone.copy.as_ref().unwrap(); let copy = clone.copy.as_ref().unwrap();
sender.send(copy()).unwrap();
});
receiver.await? match copy() {
} else { Ok(()) => clone.state_sender.send(State::Ready).unwrap(),
Ok(()) Err(_) => clone.state_sender.send(State::Error).unwrap(),
}
});
} }
} }
} }

View file

@ -135,19 +135,6 @@ impl Screen<Arc<ImportSession>, ()> for MediumEditor {
this.handle.pop(None); this.handle.pop(None);
})); }));
spawn!(@clone this, async move {
match this.session.copy().await {
Err(err) => {
this.disc_status_page.set_description(Some(&err.to_string()));
this.widget.set_visible_child_name("disc_error");
},
Ok(_) => {
this.done_stack.set_visible_child(&this.done);
this.done_button.set_sensitive(true);
}
}
});
this this
} }
} }