Import in separate crate and change source ID calculation

This commit is contained in:
Elias Projahn 2021-02-20 19:03:26 +01:00
parent c2c811e321
commit aeb7da73c9
20 changed files with 479 additions and 379 deletions

16
import/Cargo.toml Normal file
View file

@ -0,0 +1,16 @@
[package]
name = "musicus_import"
version = "0.1.0"
edition = "2018"
[dependencies]
base64 = "0.13.0"
futures-channel = "0.3.5"
glib = { git = "https://github.com/gtk-rs/gtk-rs/", features = ["v2_64"] }
gstreamer = "0.16.5"
gstreamer-pbutils = "0.16.5"
log = "0.4.14"
once_cell = "1.5.2"
rand = "0.7.3"
thiserror = "1.0.23"
sha2 = "0.9.3"

165
import/src/disc.rs Normal file
View file

@ -0,0 +1,165 @@
use crate::error::{Error, Result};
use crate::session::{ImportSession, ImportTrack};
use gstreamer::prelude::*;
use gstreamer::{ClockTime, ElementFactory, MessageType, MessageView, TocEntryType};
use gstreamer::tags::{Duration, TrackNumber};
use sha2::{Sha256, Digest};
use std::path::PathBuf;
use log::info;
/// Create a new import session for the default disc drive.
pub(super) fn new() -> Result<ImportSession> {
let mut tracks = Vec::new();
let mut hasher = Sha256::new();
// Build the GStreamer pipeline. It will contain a fakesink initially to be able to run it
// forward to the paused state without specifying a file name before knowing the tracks.
let cdparanoiasrc = ElementFactory::make("cdparanoiasrc", None)?;
let queue = ElementFactory::make("queue", None)?;
let audioconvert = ElementFactory::make("audioconvert", None)?;
let flacenc = ElementFactory::make("flacenc", None)?;
let fakesink = gstreamer::ElementFactory::make("fakesink", None)?;
let pipeline = gstreamer::Pipeline::new(None);
pipeline.add_many(&[&cdparanoiasrc, &queue, &audioconvert, &flacenc, &fakesink])?;
gstreamer::Element::link_many(&[&cdparanoiasrc, &queue, &audioconvert, &flacenc, &fakesink])?;
let bus = pipeline.get_bus().ok_or(Error::u(String::from("Failed to get bus from pipeline.")))?;
// Run the pipeline into the paused state and wait for the resulting TOC message on the bus.
pipeline.set_state(gstreamer::State::Paused)?;
let msg = bus.timed_pop_filtered(ClockTime::from_seconds(5),
&vec![MessageType::Toc, MessageType::Error]);
let toc = match msg {
Some(msg) => match msg.view() {
MessageView::Error(err) => Err(Error::os(err.get_error())),
MessageView::Toc(toc) => Ok(toc.get_toc().0),
_ => Err(Error::u(format!("Unexpected message from GStreamer: {:?}", msg))),
},
None => Err(Error::Timeout(
format!("Timeout while waiting for first message from GStreamer."))),
}?;
pipeline.set_state(gstreamer::State::Ready)?;
// Replace the fakesink with the real filesink. This won't need to be synced to the pipeline
// state, because we will set the whole pipeline's state to playing later.
gstreamer::Element::unlink(&flacenc, &fakesink);
fakesink.set_state(gstreamer::State::Null)?;
pipeline.remove(&fakesink)?;
let filesink = gstreamer::ElementFactory::make("filesink", None)?;
pipeline.add(&filesink)?;
gstreamer::Element::link(&flacenc, &filesink)?;
// Get track data from the toc message that was received above.
let tmp_dir = create_tmp_dir()?;
for entry in toc.get_entries() {
if entry.get_entry_type() == TocEntryType::Track {
let duration = entry.get_tags()
.ok_or(Error::u(String::from("No tags in TOC entry.")))?
.get::<Duration>()
.ok_or(Error::u(String::from("No duration tag found in TOC entry.")))?
.get()
.ok_or(Error::u(String::from("Failed to unwrap duration tag from TOC entry.")))?
.mseconds()
.ok_or(Error::u(String::from("Failed to unwrap track duration.")))?;
let number = entry.get_tags()
.ok_or(Error::u(String::from("No tags in TOC entry.")))?
.get::<TrackNumber>()
.ok_or(Error::u(String::from("No track number tag found in TOC entry.")))?
.get()
.ok_or(Error::u(
String::from("Failed to unwrap track number tag from TOC entry.")))?;
hasher.update(duration.to_le_bytes());
let name = format!("Track {}", number);
let file_name = format!("track_{:02}.flac", number);
let mut path = tmp_dir.clone();
path.push(file_name);
let track = ImportTrack {
number,
name,
path,
duration,
};
tracks.push(track);
}
}
let source_id = base64::encode_config(hasher.finalize(), base64::URL_SAFE);
info!("Successfully loaded audio CD with {} tracks.", tracks.len());
info!("Source ID: {}", source_id);
let tracks_clone = tracks.clone();
let copy = move || {
for track in &tracks_clone {
info!("Starting to rip track {}.", track.number);
cdparanoiasrc.set_property("track", &track.number)?;
// The filesink needs to be reset to be able to change the file location.
filesink.set_state(gstreamer::State::Null)?;
let path = track.path.to_str().unwrap();
filesink.set_property("location", &path)?;
// This will also affect the filesink as expected.
pipeline.set_state(gstreamer::State::Playing)?;
for msg in bus.iter_timed(ClockTime::none()) {
match msg.view() {
MessageView::Eos(..) => {
info!("Finished ripping track {}.", track.number);
pipeline.set_state(gstreamer::State::Ready)?;
break;
},
MessageView::Error(err) => {
pipeline.set_state(gstreamer::State::Null)?;
Err(Error::os(err.get_error()))?;
},
_ => (),
}
}
}
pipeline.set_state(gstreamer::State::Null)?;
Ok(())
};
let session = ImportSession {
source_id,
tracks,
copy: Some(Box::new(copy)),
};
Ok(session)
}
/// Create a new temporary directory and return its path.
fn create_tmp_dir() -> Result<PathBuf> {
let mut tmp_dir = glib::get_tmp_dir().ok_or(Error::u(
String::from("Failed to get temporary directory using glib::get_tmp_dir().")))?;
let dir_name = format!("musicus-{}", rand::random::<u64>());
tmp_dir.push(dir_name);
std::fs::create_dir(&tmp_dir)?;
Ok(tmp_dir)
}

96
import/src/error.rs Normal file
View file

@ -0,0 +1,96 @@
use std::error;
/// An error within an import session.
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// A timeout was reached.
#[error("{0}")]
Timeout(String),
/// Some common error.
#[error("{msg}")]
Other {
/// The error message.
msg: String,
#[source]
source: Option<Box<dyn error::Error + Send + Sync>>,
},
/// Something unexpected happened.
#[error("{msg}")]
Unexpected {
/// The error message.
msg: String,
#[source]
source: Option<Box<dyn error::Error + Send + Sync>>,
},
}
impl Error {
/// Create a new error without an explicit source.
pub(super) fn o(msg: String) -> Self {
Self::Unexpected {
msg,
source: None,
}
}
/// Create a new error with an explicit source.
pub(super) fn os(source: impl error::Error + Send + Sync + 'static) -> Self {
Self::Unexpected {
msg: format!("An error has happened: {}", source),
source: Some(Box::new(source)),
}
}
/// Create a new unexpected error without an explicit source.
pub(super) fn u(msg: String) -> Self {
Self::Unexpected {
msg,
source: None,
}
}
/// Create a new unexpected error with an explicit source.
pub(super) fn us(source: impl error::Error + Send + Sync + 'static) -> Self {
Self::Unexpected {
msg: format!("An unexpected error has happened: {}", source),
source: Some(Box::new(source)),
}
}
}
impl From<futures_channel::oneshot::Canceled> for Error {
fn from(err: futures_channel::oneshot::Canceled) -> Self {
Self::us(err)
}
}
impl From<gstreamer::glib::Error> for Error {
fn from(err: gstreamer::glib::Error) -> Self {
Self::us(err)
}
}
impl From<gstreamer::glib::BoolError> for Error {
fn from(err: gstreamer::glib::BoolError) -> Self {
Self::us(err)
}
}
impl From<gstreamer::StateChangeError> for Error {
fn from(err: gstreamer::StateChangeError) -> Self {
Self::us(err)
}
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
Self::us(err)
}
}
pub type Result<T> = std::result::Result<T, Error>;

64
import/src/folder.rs Normal file
View file

@ -0,0 +1,64 @@
use crate::error::{Error, Result};
use crate::session::{ImportSession, ImportTrack};
use gstreamer::ClockTime;
use gstreamer_pbutils::Discoverer;
use log::{warn, info};
use sha2::{Sha256, Digest};
use std::path::PathBuf;
/// Create a new import session for the specified folder.
pub(super) fn new(path: PathBuf) -> Result<ImportSession> {
let mut tracks = Vec::new();
let mut number: u32 = 1;
let mut hasher = Sha256::new();
let discoverer = Discoverer::new(ClockTime::from_seconds(1))?;
for entry in std::fs::read_dir(path)? {
let entry = entry?;
if entry.file_type()?.is_file() {
let path = entry.path();
let uri = glib::filename_to_uri(&path, None)
.or(Err(Error::u(format!("Failed to create URI from path: {:?}", path))))?;
let info = discoverer.discover_uri(&uri)?;
if !info.get_audio_streams().is_empty() {
let duration = info.get_duration().mseconds()
.ok_or(Error::u(format!("Failed to get duration for {}.", uri)))?;
let file_name = entry.file_name();
let name = file_name.into_string()
.or(Err(Error::u(format!(
"Failed to convert OsString to String: {:?}", entry.file_name()))))?;
hasher.update(duration.to_le_bytes());
let track = ImportTrack {
number,
name,
path,
duration,
};
tracks.push(track);
number += 1;
} else {
warn!("File {} skipped, because it doesn't contain any audio streams.", uri);
}
}
}
let source_id = base64::encode_config(hasher.finalize(), base64::URL_SAFE);
info!("Source ID: {}", source_id);
let session = ImportSession {
source_id,
tracks,
copy: None,
};
Ok(session)
}

8
import/src/lib.rs Normal file
View file

@ -0,0 +1,8 @@
pub use session::{ImportSession, ImportTrack};
pub use error::{Error, Result};
pub mod error;
pub mod session;
mod disc;
mod folder;

91
import/src/session.rs Normal file
View file

@ -0,0 +1,91 @@
use crate::{disc, folder};
use crate::error::Result;
use futures_channel::oneshot;
use std::path::PathBuf;
use std::thread;
use std::sync::Arc;
/// Interface for importing audio tracks from a medium or folder.
pub struct ImportSession {
/// A string identifying the source as specific as possible across platforms and formats.
pub(super) source_id: String,
/// The tracks that are available on the source.
pub(super) tracks: Vec<ImportTrack>,
/// A closure that has to be called to copy the tracks if set.
pub(super) copy: Option<Box<dyn Fn() -> Result<()> + Send + Sync>>,
}
impl ImportSession {
/// Create a new import session for a audio CD.
pub async fn audio_cd() -> Result<Arc<Self>> {
let (sender, receiver) = oneshot::channel();
thread::spawn(move || {
let result = disc::new();
let _ = sender.send(result);
});
Ok(Arc::new(receiver.await??))
}
/// Create a new import session for a folder.
pub async fn folder(path: PathBuf) -> Result<Arc<Self>> {
let (sender, receiver) = oneshot::channel();
thread::spawn(move || {
let result = folder::new(path);
let _ = sender.send(result);
});
Ok(Arc::new(receiver.await??))
}
/// Get a string identifying the source as specific as possible across platforms and mediums.
pub fn source_id(&self) -> &str {
&self.source_id
}
/// Get the tracks that are available on the source.
pub fn tracks(&self) -> &[ImportTrack] {
&self.tracks
}
/// Copy the tracks to their advertised locations, if neccessary.
pub async fn copy(self: &Arc<Self>) -> Result<()> {
if self.copy.is_some() {
let clone = Arc::clone(self);
let (sender, receiver) = oneshot::channel();
thread::spawn(move || {
let copy = clone.copy.as_ref().unwrap();
sender.send(copy()).unwrap();
});
receiver.await?
} else {
Ok(())
}
}
}
/// A track on an import source.
#[derive(Clone, Debug)]
pub struct ImportTrack {
/// The track number.
pub number: u32,
/// A human readable identifier for the track. This will be used to present the track for
/// selection.
pub name: String,
/// The path to the file where the corresponding audio file is. This file is only required to
/// exist, once the import was successfully completed. This will not be the actual file within
/// the user's music library, but the temporary location from which it can be copied to the
/// music library.
pub path: PathBuf,
/// The track's duration in milliseconds.
pub duration: u64,
}