mirror of
https://github.com/johrpan/musicus.git
synced 2025-10-26 19:57:25 +01:00
Move crates to subdirectory
This commit is contained in:
parent
1db96062fb
commit
ac4b29e86d
115 changed files with 10 additions and 5 deletions
16
crates/import/Cargo.toml
Normal file
16
crates/import/Cargo.toml
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
[package]
|
||||
name = "musicus_import"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
base64 = "0.13.0"
|
||||
glib = "0.15.11"
|
||||
gstreamer = "0.18.8"
|
||||
gstreamer-pbutils = "0.18.7"
|
||||
log = "0.4.16"
|
||||
once_cell = "1.10.0"
|
||||
rand = "0.8.5"
|
||||
thiserror = "1.0.31"
|
||||
sha2 = "0.10.2"
|
||||
tokio = { version = "1.18.0", features = ["sync"] }
|
||||
174
crates/import/src/disc.rs
Normal file
174
crates/import/src/disc.rs
Normal file
|
|
@ -0,0 +1,174 @@
|
|||
use crate::error::{Error, Result};
|
||||
use crate::session::{ImportSession, ImportTrack, State};
|
||||
use gstreamer::prelude::*;
|
||||
use gstreamer::tags::{Duration, TrackNumber};
|
||||
use gstreamer::{ClockTime, ElementFactory, MessageType, MessageView, TocEntryType};
|
||||
use log::info;
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::path::PathBuf;
|
||||
use tokio::sync::watch;
|
||||
|
||||
/// Create a new import session for the default disc drive.
|
||||
pub(super) fn new() -> Result<ImportSession> {
|
||||
let (state_sender, state_receiver) = watch::channel(State::Waiting);
|
||||
|
||||
let mut tracks = Vec::new();
|
||||
let mut hasher = Sha256::new();
|
||||
|
||||
// Build the GStreamer pipeline. It will contain a fakesink initially to be able to run it
|
||||
// forward to the paused state without specifying a file name before knowing the tracks.
|
||||
|
||||
let cdparanoiasrc = ElementFactory::make("cdparanoiasrc", None)?;
|
||||
let queue = ElementFactory::make("queue", None)?;
|
||||
let audioconvert = ElementFactory::make("audioconvert", None)?;
|
||||
let flacenc = ElementFactory::make("flacenc", None)?;
|
||||
let fakesink = gstreamer::ElementFactory::make("fakesink", None)?;
|
||||
|
||||
let pipeline = gstreamer::Pipeline::new(None);
|
||||
pipeline.add_many(&[&cdparanoiasrc, &queue, &audioconvert, &flacenc, &fakesink])?;
|
||||
gstreamer::Element::link_many(&[&cdparanoiasrc, &queue, &audioconvert, &flacenc, &fakesink])?;
|
||||
|
||||
let bus = pipeline
|
||||
.bus()
|
||||
.ok_or_else(|| Error::u(String::from("Failed to get bus from pipeline.")))?;
|
||||
|
||||
// Run the pipeline into the paused state and wait for the resulting TOC message on the bus.
|
||||
|
||||
pipeline.set_state(gstreamer::State::Paused)?;
|
||||
|
||||
let msg = bus.timed_pop_filtered(
|
||||
ClockTime::from_seconds(5),
|
||||
&[MessageType::Toc, MessageType::Error],
|
||||
);
|
||||
|
||||
let toc = match msg {
|
||||
Some(msg) => match msg.view() {
|
||||
MessageView::Error(err) => Err(Error::os(err.error())),
|
||||
MessageView::Toc(toc) => Ok(toc.toc().0),
|
||||
_ => Err(Error::u(format!(
|
||||
"Unexpected message from GStreamer: {:?}",
|
||||
msg
|
||||
))),
|
||||
},
|
||||
None => Err(Error::Timeout(
|
||||
"Timeout while waiting for first message from GStreamer.".to_string(),
|
||||
)),
|
||||
}?;
|
||||
|
||||
pipeline.set_state(gstreamer::State::Ready)?;
|
||||
|
||||
// Replace the fakesink with the real filesink. This won't need to be synced to the pipeline
|
||||
// state, because we will set the whole pipeline's state to playing later.
|
||||
|
||||
gstreamer::Element::unlink(&flacenc, &fakesink);
|
||||
fakesink.set_state(gstreamer::State::Null)?;
|
||||
pipeline.remove(&fakesink)?;
|
||||
|
||||
let filesink = gstreamer::ElementFactory::make("filesink", None)?;
|
||||
pipeline.add(&filesink)?;
|
||||
gstreamer::Element::link(&flacenc, &filesink)?;
|
||||
|
||||
// Get track data from the toc message that was received above.
|
||||
|
||||
let tmp_dir = create_tmp_dir()?;
|
||||
|
||||
for entry in toc.entries() {
|
||||
if entry.entry_type() == TocEntryType::Track {
|
||||
let duration = entry
|
||||
.tags()
|
||||
.ok_or_else(|| Error::u(String::from("No tags in TOC entry.")))?
|
||||
.get::<Duration>()
|
||||
.ok_or_else(|| Error::u(String::from("No duration tag found in TOC entry.")))?
|
||||
.get()
|
||||
.mseconds();
|
||||
|
||||
let number = entry
|
||||
.tags()
|
||||
.ok_or_else(|| Error::u(String::from("No tags in TOC entry.")))?
|
||||
.get::<TrackNumber>()
|
||||
.ok_or_else(|| Error::u(String::from("No track number tag found in TOC entry.")))?
|
||||
.get();
|
||||
|
||||
hasher.update(duration.to_le_bytes());
|
||||
|
||||
let name = format!("Track {}", number);
|
||||
|
||||
let file_name = format!("track_{:02}.flac", number);
|
||||
let mut path = tmp_dir.clone();
|
||||
path.push(file_name);
|
||||
|
||||
let track = ImportTrack {
|
||||
number,
|
||||
name,
|
||||
path,
|
||||
duration,
|
||||
};
|
||||
|
||||
tracks.push(track);
|
||||
}
|
||||
}
|
||||
|
||||
let source_id = base64::encode_config(hasher.finalize(), base64::URL_SAFE);
|
||||
|
||||
info!("Successfully loaded audio CD with {} tracks.", tracks.len());
|
||||
info!("Source ID: {}", source_id);
|
||||
|
||||
let tracks_clone = tracks.clone();
|
||||
let copy = move || {
|
||||
for track in &tracks_clone {
|
||||
info!("Starting to rip track {}.", track.number);
|
||||
|
||||
cdparanoiasrc.set_property("track", &track.number);
|
||||
|
||||
// The filesink needs to be reset to be able to change the file location.
|
||||
filesink.set_state(gstreamer::State::Null)?;
|
||||
|
||||
let path = track.path.to_str().unwrap();
|
||||
filesink.set_property("location", &path);
|
||||
|
||||
// This will also affect the filesink as expected.
|
||||
pipeline.set_state(gstreamer::State::Playing)?;
|
||||
|
||||
for msg in bus.iter_timed(None) {
|
||||
match msg.view() {
|
||||
MessageView::Eos(..) => {
|
||||
info!("Finished ripping track {}.", track.number);
|
||||
pipeline.set_state(gstreamer::State::Ready)?;
|
||||
break;
|
||||
}
|
||||
MessageView::Error(err) => {
|
||||
pipeline.set_state(gstreamer::State::Null)?;
|
||||
return Err(Error::os(err.error()));
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pipeline.set_state(gstreamer::State::Null)?;
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
let session = ImportSession {
|
||||
source_id,
|
||||
tracks,
|
||||
copy: Some(Box::new(copy)),
|
||||
state_sender,
|
||||
state_receiver,
|
||||
};
|
||||
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
/// Create a new temporary directory and return its path.
|
||||
fn create_tmp_dir() -> Result<PathBuf> {
|
||||
let mut tmp_dir = glib::tmp_dir();
|
||||
|
||||
let dir_name = format!("musicus-{}", rand::random::<u64>());
|
||||
tmp_dir.push(dir_name);
|
||||
|
||||
std::fs::create_dir(&tmp_dir)?;
|
||||
|
||||
Ok(tmp_dir)
|
||||
}
|
||||
84
crates/import/src/error.rs
Normal file
84
crates/import/src/error.rs
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
use std::error;
|
||||
|
||||
/// An error within an import session.
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
/// A timeout was reached.
|
||||
#[error("{0}")]
|
||||
Timeout(String),
|
||||
|
||||
/// Some common error.
|
||||
#[error("{msg}")]
|
||||
Other {
|
||||
/// The error message.
|
||||
msg: String,
|
||||
|
||||
#[source]
|
||||
source: Option<Box<dyn error::Error + Send + Sync>>,
|
||||
},
|
||||
|
||||
/// Something unexpected happened.
|
||||
#[error("{msg}")]
|
||||
Unexpected {
|
||||
/// The error message.
|
||||
msg: String,
|
||||
|
||||
#[source]
|
||||
source: Option<Box<dyn error::Error + Send + Sync>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Error {
|
||||
/// Create a new error with an explicit source.
|
||||
pub(super) fn os(source: impl error::Error + Send + Sync + 'static) -> Self {
|
||||
Self::Unexpected {
|
||||
msg: format!("An error has happened: {}", source),
|
||||
source: Some(Box::new(source)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new unexpected error without an explicit source.
|
||||
pub(super) fn u(msg: String) -> Self {
|
||||
Self::Unexpected { msg, source: None }
|
||||
}
|
||||
|
||||
/// Create a new unexpected error with an explicit source.
|
||||
pub(super) fn us(source: impl error::Error + Send + Sync + 'static) -> Self {
|
||||
Self::Unexpected {
|
||||
msg: format!("An unexpected error has happened: {}", source),
|
||||
source: Some(Box::new(source)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<tokio::sync::oneshot::error::RecvError> for Error {
|
||||
fn from(err: tokio::sync::oneshot::error::RecvError) -> Self {
|
||||
Self::us(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<gstreamer::glib::Error> for Error {
|
||||
fn from(err: gstreamer::glib::Error) -> Self {
|
||||
Self::us(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<gstreamer::glib::BoolError> for Error {
|
||||
fn from(err: gstreamer::glib::BoolError) -> Self {
|
||||
Self::us(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<gstreamer::StateChangeError> for Error {
|
||||
fn from(err: gstreamer::StateChangeError) -> Self {
|
||||
Self::us(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for Error {
|
||||
fn from(err: std::io::Error) -> Self {
|
||||
Self::us(err)
|
||||
}
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
80
crates/import/src/folder.rs
Normal file
80
crates/import/src/folder.rs
Normal file
|
|
@ -0,0 +1,80 @@
|
|||
use crate::error::{Error, Result};
|
||||
use crate::session::{ImportSession, ImportTrack, State};
|
||||
use gstreamer::ClockTime;
|
||||
use gstreamer_pbutils::Discoverer;
|
||||
use log::{info, warn};
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::fs::DirEntry;
|
||||
use std::path::PathBuf;
|
||||
use tokio::sync::watch;
|
||||
|
||||
/// Create a new import session for the specified folder.
|
||||
pub(super) fn new(path: PathBuf) -> Result<ImportSession> {
|
||||
let (state_sender, state_receiver) = watch::channel(State::Ready);
|
||||
|
||||
let mut tracks = Vec::new();
|
||||
let mut number: u32 = 1;
|
||||
let mut hasher = Sha256::new();
|
||||
let discoverer = Discoverer::new(ClockTime::from_seconds(1))?;
|
||||
|
||||
let mut entries =
|
||||
std::fs::read_dir(path)?.collect::<std::result::Result<Vec<DirEntry>, std::io::Error>>()?;
|
||||
entries.sort_by_key(|entry| entry.file_name());
|
||||
|
||||
for entry in entries {
|
||||
if entry.file_type()?.is_file() {
|
||||
let path = entry.path();
|
||||
|
||||
let uri = glib::filename_to_uri(&path, None)
|
||||
.map_err(|_| Error::u(format!("Failed to create URI from path: {:?}", path)))?;
|
||||
|
||||
let info = discoverer.discover_uri(&uri)?;
|
||||
|
||||
if !info.audio_streams().is_empty() {
|
||||
let duration = info
|
||||
.duration()
|
||||
.ok_or_else(|| Error::u(format!("Failed to get duration for {}.", uri)))?
|
||||
.mseconds();
|
||||
|
||||
let file_name = entry.file_name();
|
||||
let name = file_name.into_string().map_err(|_| {
|
||||
Error::u(format!(
|
||||
"Failed to convert OsString to String: {:?}",
|
||||
entry.file_name()
|
||||
))
|
||||
})?;
|
||||
|
||||
hasher.update(duration.to_le_bytes());
|
||||
|
||||
let track = ImportTrack {
|
||||
number,
|
||||
name,
|
||||
path,
|
||||
duration,
|
||||
};
|
||||
|
||||
tracks.push(track);
|
||||
number += 1;
|
||||
} else {
|
||||
warn!(
|
||||
"File {} skipped, because it doesn't contain any audio streams.",
|
||||
uri
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let source_id = base64::encode_config(hasher.finalize(), base64::URL_SAFE);
|
||||
|
||||
info!("Source ID: {}", source_id);
|
||||
|
||||
let session = ImportSession {
|
||||
source_id,
|
||||
tracks,
|
||||
copy: None,
|
||||
state_sender,
|
||||
state_receiver,
|
||||
};
|
||||
|
||||
Ok(session)
|
||||
}
|
||||
8
crates/import/src/lib.rs
Normal file
8
crates/import/src/lib.rs
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
pub use error::{Error, Result};
|
||||
pub use session::{ImportSession, ImportTrack, State};
|
||||
|
||||
pub mod error;
|
||||
pub mod session;
|
||||
|
||||
mod disc;
|
||||
mod folder;
|
||||
127
crates/import/src/session.rs
Normal file
127
crates/import/src/session.rs
Normal file
|
|
@ -0,0 +1,127 @@
|
|||
use crate::error::Result;
|
||||
use crate::{disc, folder};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use tokio::sync::{oneshot, watch};
|
||||
|
||||
/// The current state of the import process.
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum State {
|
||||
/// The import process has not been started yet.
|
||||
Waiting,
|
||||
|
||||
/// The audio is copied from the source.
|
||||
Copying,
|
||||
|
||||
/// The audio files are ready to be imported into the music library.
|
||||
Ready,
|
||||
|
||||
/// An error has happened.
|
||||
Error,
|
||||
}
|
||||
|
||||
/// Interface for importing audio tracks from a medium or folder.
|
||||
pub struct ImportSession {
|
||||
/// A string identifying the source as specific as possible across platforms and formats.
|
||||
pub(super) source_id: String,
|
||||
|
||||
/// The tracks that are available on the source.
|
||||
pub(super) tracks: Vec<ImportTrack>,
|
||||
|
||||
/// A closure that has to be called to copy the tracks if set.
|
||||
pub(super) copy: Option<Box<dyn Fn() -> Result<()> + Send + Sync>>,
|
||||
|
||||
/// Sender through which listeners are notified of state changes.
|
||||
pub(super) state_sender: watch::Sender<State>,
|
||||
|
||||
/// Receiver for state changes.
|
||||
pub(super) state_receiver: watch::Receiver<State>,
|
||||
}
|
||||
|
||||
impl ImportSession {
|
||||
/// Create a new import session for an audio CD.
|
||||
pub async fn audio_cd() -> Result<Arc<Self>> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
|
||||
thread::spawn(move || {
|
||||
let result = disc::new();
|
||||
let _ = sender.send(result);
|
||||
});
|
||||
|
||||
Ok(Arc::new(receiver.await??))
|
||||
}
|
||||
|
||||
/// Create a new import session for a folder.
|
||||
pub async fn folder(path: PathBuf) -> Result<Arc<Self>> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
|
||||
thread::spawn(move || {
|
||||
let result = folder::new(path);
|
||||
let _ = sender.send(result);
|
||||
});
|
||||
|
||||
Ok(Arc::new(receiver.await??))
|
||||
}
|
||||
|
||||
/// Get a string identifying the source as specific as possible across platforms and mediums.
|
||||
pub fn source_id(&self) -> &str {
|
||||
&self.source_id
|
||||
}
|
||||
|
||||
/// Get the tracks that are available on the source.
|
||||
pub fn tracks(&self) -> &[ImportTrack] {
|
||||
&self.tracks
|
||||
}
|
||||
|
||||
/// Retrieve the current state of the import process.
|
||||
pub fn state(&self) -> State {
|
||||
self.state_receiver.borrow().clone()
|
||||
}
|
||||
|
||||
/// Wait for the next state change and get the new state.
|
||||
pub async fn state_change(&self) -> State {
|
||||
let mut receiver = self.state_receiver.clone();
|
||||
match receiver.changed().await {
|
||||
Ok(()) => self.state(),
|
||||
Err(_) => State::Error,
|
||||
}
|
||||
}
|
||||
|
||||
/// Copy the tracks to their advertised locations in the background, if neccessary. The state
|
||||
/// will be updated as the import is done.
|
||||
pub fn copy(self: &Arc<Self>) {
|
||||
if self.copy.is_some() {
|
||||
let clone = Arc::clone(self);
|
||||
|
||||
thread::spawn(move || {
|
||||
let copy = clone.copy.as_ref().unwrap();
|
||||
|
||||
match copy() {
|
||||
Ok(()) => clone.state_sender.send(State::Ready).unwrap(),
|
||||
Err(_) => clone.state_sender.send(State::Error).unwrap(),
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A track on an import source.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ImportTrack {
|
||||
/// The track number.
|
||||
pub number: u32,
|
||||
|
||||
/// A human readable identifier for the track. This will be used to present the track for
|
||||
/// selection.
|
||||
pub name: String,
|
||||
|
||||
/// The path to the file where the corresponding audio file is. This file is only required to
|
||||
/// exist, once the import was successfully completed. This will not be the actual file within
|
||||
/// the user's music library, but the temporary location from which it can be copied to the
|
||||
/// music library.
|
||||
pub path: PathBuf,
|
||||
|
||||
/// The track's duration in milliseconds.
|
||||
pub duration: u64,
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue