diff options
| author | A Farzat <a@farzat.xyz> | 2026-03-10 13:26:56 +0300 |
|---|---|---|
| committer | A Farzat <a@farzat.xyz> | 2026-03-10 13:26:56 +0300 |
| commit | 1383c7be1b559520cebb3240f835d938c197d49f (patch) | |
| tree | 91f591635dbe3f03f893b52a55bafa1f032cc638 /src | |
| parent | 6bcc81d4fc2c3f4e8330398859bee75166387bab (diff) | |
| download | oreilly-epub-1383c7be1b559520cebb3240f835d938c197d49f.tar.gz oreilly-epub-1383c7be1b559520cebb3240f835d938c197d49f.zip | |
Add parallel downloading
This can be helpful when files are small in size, like the components of
an EPUB.
Diffstat (limited to 'src')
| -rw-r--r-- | src/epub.rs | 62 |
1 files changed, 47 insertions, 15 deletions
diff --git a/src/epub.rs b/src/epub.rs index 735bb1d..1e7402e 100644 --- a/src/epub.rs +++ b/src/epub.rs @@ -3,7 +3,7 @@ use crate::{ xml::{build_epub_chapter, write_modified_opf}, }; use anyhow::{Context, Result}; -use futures_util::TryStreamExt; +use futures_util::{StreamExt, TryStreamExt, stream::FuturesUnordered}; use ogrim::xml; use relative_path::{RelativePath, RelativePathBuf}; use reqwest::Client; @@ -16,6 +16,9 @@ use tokio::fs::{self, File}; use tokio_util::io::StreamReader; use zip::{CompressionMethod, ZipWriter, write::FileOptions}; +// TODO: make configurable. +const MAX_CONCURRENT: usize = 4; + /// Creates and writes container.xml. fn write_container_xml<W: Write>(out: &mut W, opf_full_path: &RelativePathBuf) -> Result<()> { // Prepare file contents. @@ -33,30 +36,59 @@ fn write_container_xml<W: Write>(out: &mut W, opf_full_path: &RelativePathBuf) - Ok(()) } -/// Downloads files to the relative location specified in full_path. +/// Downloads files in parallel to the relative location specified in full_path. pub async fn download_all_files( client: &Client, file_entries: &[FileEntry], dest_root: &Path, ) -> Result<()> { - for entry in file_entries { - let dest_path = entry.full_path.to_path(dest_root); + let mut downloading = FuturesUnordered::new(); + let mut files_iter = file_entries.iter(); + + // Start downloading the first n files. + for entry in files_iter.by_ref().take(MAX_CONCURRENT) { + downloading.push(download_one_file(client, entry, dest_root)); + } - if let Some(parent_dir) = dest_path.parent() { - fs::create_dir_all(parent_dir).await?; + // Obtain completed files from the list as they finish downloading, until empty. + while let Some(result) = downloading.next().await { + // Make sure they didn't fail first. Propagate any errors immediately. + result?; + // Refill the slot (if there are any remaining). + if let Some(entry) = files_iter.next() { + downloading.push(download_one_file(client, entry, dest_root)); } + } - let mut file = File::create(dest_path).await?; - let bytes_stream = client - .get(entry.url.clone()) - .send() - .await? - .error_for_status()? - .bytes_stream(); - let mut reader = StreamReader::new(bytes_stream.map_err(std::io::Error::other)); + Ok(()) +} - tokio::io::copy(&mut reader, &mut file).await?; +/// Downloads the given file to the relative location specified in full_path. +pub async fn download_one_file( + client: &Client, + file_entry: &FileEntry, + dest_root: &Path, +) -> Result<()> { + let dest_path = file_entry.full_path.to_path(dest_root); + + // Ensure the directory exists and open the file. + if let Some(parent_dir) = dest_path.parent() { + fs::create_dir_all(parent_dir).await?; } + let mut file = File::create(dest_path).await?; + + // Obtain the resource as a stream of bytes. + let bytes_stream = client + .get(file_entry.url.clone()) + .send() + .await? + .error_for_status()? + .bytes_stream(); + // Convert the bytes stream are fed to a reader. Must map errors to io errors. + let mut reader = StreamReader::new(bytes_stream.map_err(std::io::Error::other)); + + // Pipe bytes from the stream to the file. + tokio::io::copy(&mut reader, &mut file).await?; Ok(()) } |
