aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/epub.rs62
1 files changed, 47 insertions, 15 deletions
diff --git a/src/epub.rs b/src/epub.rs
index 735bb1d..1e7402e 100644
--- a/src/epub.rs
+++ b/src/epub.rs
@@ -3,7 +3,7 @@ use crate::{
xml::{build_epub_chapter, write_modified_opf},
};
use anyhow::{Context, Result};
-use futures_util::TryStreamExt;
+use futures_util::{StreamExt, TryStreamExt, stream::FuturesUnordered};
use ogrim::xml;
use relative_path::{RelativePath, RelativePathBuf};
use reqwest::Client;
@@ -16,6 +16,9 @@ use tokio::fs::{self, File};
use tokio_util::io::StreamReader;
use zip::{CompressionMethod, ZipWriter, write::FileOptions};
+// TODO: make configurable.
+const MAX_CONCURRENT: usize = 4;
+
/// Creates and writes container.xml.
fn write_container_xml<W: Write>(out: &mut W, opf_full_path: &RelativePathBuf) -> Result<()> {
// Prepare file contents.
@@ -33,30 +36,59 @@ fn write_container_xml<W: Write>(out: &mut W, opf_full_path: &RelativePathBuf) -
Ok(())
}
-/// Downloads files to the relative location specified in full_path.
+/// Downloads files in parallel to the relative location specified in full_path.
pub async fn download_all_files(
client: &Client,
file_entries: &[FileEntry],
dest_root: &Path,
) -> Result<()> {
- for entry in file_entries {
- let dest_path = entry.full_path.to_path(dest_root);
+ let mut downloading = FuturesUnordered::new();
+ let mut files_iter = file_entries.iter();
+
+ // Start downloading the first n files.
+ for entry in files_iter.by_ref().take(MAX_CONCURRENT) {
+ downloading.push(download_one_file(client, entry, dest_root));
+ }
- if let Some(parent_dir) = dest_path.parent() {
- fs::create_dir_all(parent_dir).await?;
+ // Obtain completed files from the list as they finish downloading, until empty.
+ while let Some(result) = downloading.next().await {
+ // Make sure they didn't fail first. Propagate any errors immediately.
+ result?;
+ // Refill the slot (if there are any remaining).
+ if let Some(entry) = files_iter.next() {
+ downloading.push(download_one_file(client, entry, dest_root));
}
+ }
- let mut file = File::create(dest_path).await?;
- let bytes_stream = client
- .get(entry.url.clone())
- .send()
- .await?
- .error_for_status()?
- .bytes_stream();
- let mut reader = StreamReader::new(bytes_stream.map_err(std::io::Error::other));
+ Ok(())
+}
- tokio::io::copy(&mut reader, &mut file).await?;
+/// Downloads the given file to the relative location specified in full_path.
+pub async fn download_one_file(
+ client: &Client,
+ file_entry: &FileEntry,
+ dest_root: &Path,
+) -> Result<()> {
+ let dest_path = file_entry.full_path.to_path(dest_root);
+
+ // Ensure the directory exists and open the file.
+ if let Some(parent_dir) = dest_path.parent() {
+ fs::create_dir_all(parent_dir).await?;
}
+ let mut file = File::create(dest_path).await?;
+
+ // Obtain the resource as a stream of bytes.
+ let bytes_stream = client
+ .get(file_entry.url.clone())
+ .send()
+ .await?
+ .error_for_status()?
+ .bytes_stream();
+ // Convert the bytes stream are fed to a reader. Must map errors to io errors.
+ let mut reader = StreamReader::new(bytes_stream.map_err(std::io::Error::other));
+
+ // Pipe bytes from the stream to the file.
+ tokio::io::copy(&mut reader, &mut file).await?;
Ok(())
}