Impleme library downloads

This commit is contained in:
Elias Projahn 2025-03-23 14:57:43 +01:00
parent a21a63e4b8
commit bf1ffef05a
13 changed files with 1231 additions and 46 deletions

View file

@ -16,12 +16,17 @@ use adw::{
use anyhow::{anyhow, Context, Result};
use chrono::prelude::*;
use diesel::{dsl::exists, prelude::*, sql_types, QueryDsl, SqliteConnection};
use formatx::formatx;
use futures_util::StreamExt;
use gettextrs::gettext;
use once_cell::sync::Lazy;
use tempfile::NamedTempFile;
use tokio::io::AsyncWriteExt;
use zip::{write::SimpleFileOptions, ZipWriter};
use crate::{
db::{self, models::*, schema::*, tables, TranslatedString},
process::ProcessMsg,
program::Program,
};
@ -73,17 +78,17 @@ impl Library {
}
/// Import from a library archive.
pub fn import(
pub fn import_archive(
&self,
path: impl AsRef<Path>,
) -> Result<async_channel::Receiver<LibraryProcessMsg>> {
) -> Result<async_channel::Receiver<ProcessMsg>> {
let path = path.as_ref().to_owned();
let library_folder = PathBuf::from(&self.folder());
let this_connection = self.imp().connection.get().unwrap().clone();
let (sender, receiver) = async_channel::unbounded::<LibraryProcessMsg>();
let (sender, receiver) = async_channel::unbounded::<ProcessMsg>();
thread::spawn(move || {
if let Err(err) = sender.send_blocking(LibraryProcessMsg::Result(import_from_zip(
if let Err(err) = sender.send_blocking(ProcessMsg::Result(import_from_zip(
path,
library_folder,
this_connection,
@ -96,21 +101,43 @@ impl Library {
Ok(receiver)
}
/// Import from a library archive at `url`.
pub fn import_url(&self, url: &str) -> Result<async_channel::Receiver<ProcessMsg>> {
let url = url.to_owned();
let library_folder = PathBuf::from(&self.folder());
let this_connection = self.imp().connection.get().unwrap().clone();
let (sender, receiver) = async_channel::unbounded::<ProcessMsg>();
thread::spawn(move || {
if let Err(err) = sender.send_blocking(ProcessMsg::Result(import_from_url(
url,
library_folder,
this_connection,
&sender,
))) {
log::error!("Failed to send library action result: {err:?}");
}
});
Ok(receiver)
}
/// Export the whole music library to an archive at `path`. If `path` already exists, it will
/// be overwritten. The work will be done in a background thread.
pub fn export(
pub fn export_archive(
&self,
path: impl AsRef<Path>,
) -> Result<async_channel::Receiver<LibraryProcessMsg>> {
) -> Result<async_channel::Receiver<ProcessMsg>> {
let connection = &mut *self.imp().connection.get().unwrap().lock().unwrap();
let path = path.as_ref().to_owned();
let library_folder = PathBuf::from(&self.folder());
let tracks = tracks::table.load::<tables::Track>(connection)?;
let (sender, receiver) = async_channel::unbounded::<LibraryProcessMsg>();
let (sender, receiver) = async_channel::unbounded::<ProcessMsg>();
thread::spawn(move || {
if let Err(err) = sender.send_blocking(LibraryProcessMsg::Result(write_zip(
if let Err(err) = sender.send_blocking(ProcessMsg::Result(write_zip(
path,
library_folder,
tracks,
@ -1790,7 +1817,7 @@ fn write_zip(
zip_path: impl AsRef<Path>,
library_folder: impl AsRef<Path>,
tracks: Vec<tables::Track>,
sender: &async_channel::Sender<LibraryProcessMsg>,
sender: &async_channel::Sender<ProcessMsg>,
) -> Result<()> {
let mut zip = zip::ZipWriter::new(BufWriter::new(fs::File::create(zip_path)?));
@ -1804,9 +1831,7 @@ fn write_zip(
add_file_to_zip(&mut zip, &library_folder, &track.path)?;
// Ignore if the reveiver has been dropped.
let _ = sender.send_blocking(LibraryProcessMsg::Progress(
(index + 1) as f64 / n_tracks as f64,
));
let _ = sender.send_blocking(ProcessMsg::Progress((index + 1) as f64 / n_tracks as f64));
}
zip.finish()?;
@ -1837,7 +1862,7 @@ fn import_from_zip(
zip_path: impl AsRef<Path>,
library_folder: impl AsRef<Path>,
this_connection: Arc<Mutex<SqliteConnection>>,
sender: &async_channel::Sender<LibraryProcessMsg>,
sender: &async_channel::Sender<ProcessMsg>,
) -> Result<()> {
let now = Local::now().naive_local();
@ -2065,16 +2090,79 @@ fn import_from_zip(
}
// Ignore if the reveiver has been dropped.
let _ = sender.send_blocking(LibraryProcessMsg::Progress(
(index + 1) as f64 / n_tracks as f64,
));
let _ = sender.send_blocking(ProcessMsg::Progress((index + 1) as f64 / n_tracks as f64));
}
Ok(())
}
#[derive(Debug)]
pub enum LibraryProcessMsg {
Progress(f64),
Result(Result<()>),
fn import_from_url(
url: String,
library_folder: impl AsRef<Path>,
this_connection: Arc<Mutex<SqliteConnection>>,
sender: &async_channel::Sender<ProcessMsg>,
) -> Result<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let _ = sender.send_blocking(ProcessMsg::Message(
formatx!(gettext("Downloading {}"), &url).unwrap(),
));
let archive_file = runtime.block_on(download_tmp_file(&url, &sender));
match archive_file {
Ok(archive_file) => {
let _ = sender.send_blocking(ProcessMsg::Message(
formatx!(gettext("Importing downloaded library"), &url).unwrap(),
));
let _ = sender.send_blocking(ProcessMsg::Result(import_from_zip(
archive_file.path(),
library_folder,
this_connection,
&sender,
)));
}
Err(err) => {
let _ = sender.send_blocking(ProcessMsg::Result(Err(err)));
}
}
Ok(())
}
async fn download_tmp_file(
url: &str,
sender: &async_channel::Sender<ProcessMsg>,
) -> Result<NamedTempFile> {
let client = reqwest::Client::builder()
.connect_timeout(std::time::Duration::from_secs(10))
.build()?;
let response = client.get(url).send().await?;
let total_size = response.content_length();
let mut body_stream = response.bytes_stream();
let file = NamedTempFile::new()?;
let mut writer =
tokio::io::BufWriter::new(tokio::fs::File::from_std(file.as_file().try_clone()?));
let mut downloaded = 0;
while let Some(chunk) = body_stream.next().await {
let chunk: Vec<u8> = chunk?.into();
let chunk_size = chunk.len();
writer.write_all(&chunk).await?;
if let Some(total_size) = total_size {
downloaded += chunk_size as u64;
let _ = sender
.send(ProcessMsg::Progress(downloaded as f64 / total_size as f64))
.await;
}
}
Ok(file)
}