Browse Source

fix: unexpect stack overflow when searching a lot (#87)

pull/88/head
sigoden 3 years ago committed by GitHub
parent
commit
1e0cdafbcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 117
      Cargo.lock
  2. 2
      Cargo.toml
  3. 12
      src/main.rs
  4. 116
      src/server.rs

117
Cargo.lock generated

@ -54,17 +54,6 @@ dependencies = [
"tempfile", "tempfile",
] ]
[[package]]
name = "async-channel"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]] [[package]]
name = "async-compression" name = "async-compression"
version = "0.3.14" version = "0.3.14"
@ -78,26 +67,6 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "async-fs"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b3ca4f8ff117c37c278a2f7415ce9be55560b846b5bc4412aaa5d29c1c3dae2"
dependencies = [
"async-lock",
"blocking",
"futures-lite",
]
[[package]]
name = "async-lock"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e97a171d191782fba31bb902b14ad94e24a68145032b7eedf871ab0bc0d077b6"
dependencies = [
"event-listener",
]
[[package]] [[package]]
name = "async-stream" name = "async-stream"
version = "0.3.3" version = "0.3.3"
@ -119,12 +88,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "async-task"
version = "4.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30696a84d817107fc028e049980e09d5e140e8da8f1caeb17e8e950658a3cea9"
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.56" version = "0.1.56"
@ -136,16 +99,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "async-walkdir"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "826d88d73e87e7504b635b6e427561faa6a65f4a2f59e75efcbfa51a0876bb90"
dependencies = [
"async-fs",
"futures-lite",
]
[[package]] [[package]]
name = "async_io_utilities" name = "async_io_utilities"
version = "0.1.3" version = "0.1.3"
@ -169,12 +122,6 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "atomic-waker"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a"
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.1.0" version = "1.1.0"
@ -226,20 +173,6 @@ dependencies = [
"generic-array", "generic-array",
] ]
[[package]]
name = "blocking"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6ccb65d468978a086b69884437ded69a90faab3bbe6e67f242173ea728acccc"
dependencies = [
"async-channel",
"async-task",
"atomic-waker",
"fastrand",
"futures-lite",
"once_cell",
]
[[package]] [[package]]
name = "bstr" name = "bstr"
version = "0.2.17" version = "0.2.17"
@ -263,12 +196,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
name = "cache-padded"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c"
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.0.73" version = "1.0.73"
@ -316,15 +243,6 @@ dependencies = [
"os_str_bytes", "os_str_bytes",
] ]
[[package]]
name = "concurrent-queue"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3"
dependencies = [
"cache-padded",
]
[[package]] [[package]]
name = "core-foundation" name = "core-foundation"
version = "0.9.3" version = "0.9.3"
@ -457,7 +375,6 @@ dependencies = [
"assert_cmd", "assert_cmd",
"assert_fs", "assert_fs",
"async-stream", "async-stream",
"async-walkdir",
"async_zip", "async_zip",
"base64", "base64",
"chrono", "chrono",
@ -490,6 +407,7 @@ dependencies = [
"url", "url",
"urlencoding", "urlencoding",
"uuid", "uuid",
"walkdir",
"xml-rs", "xml-rs",
] ]
@ -508,12 +426,6 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "event-listener"
version = "2.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71"
[[package]] [[package]]
name = "fastrand" name = "fastrand"
version = "1.7.0" version = "1.7.0"
@ -631,21 +543,6 @@ version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
[[package]]
name = "futures-lite"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite",
"waker-fn",
]
[[package]] [[package]]
name = "futures-macro" name = "futures-macro"
version = "0.3.21" version = "0.3.21"
@ -1260,12 +1157,6 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "parking"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.12.1" version = "0.12.1"
@ -2210,12 +2101,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "waker-fn"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]] [[package]]
name = "walkdir" name = "walkdir"
version = "2.3.2" version = "2.3.2"

2
Cargo.toml

@ -22,7 +22,6 @@ serde_json = "1"
futures = "0.3" futures = "0.3"
base64 = "0.13" base64 = "0.13"
async_zip = { version = "0.0.8", default-features = false, features = ["deflate"] } async_zip = { version = "0.0.8", default-features = false, features = ["deflate"] }
async-walkdir = "0.2"
headers = "0.3" headers = "0.3"
mime_guess = "2.0" mime_guess = "2.0"
if-addrs = "0.7" if-addrs = "0.7"
@ -37,6 +36,7 @@ xml-rs = "0.8"
log = "0.4" log = "0.4"
socket2 = "0.4" socket2 = "0.4"
async-stream = "0.3" async-stream = "0.3"
walkdir = "2.3"
[features] [features]
default = ["tls"] default = ["tls"]

12
src/main.rs

@ -16,6 +16,7 @@ use crate::server::{Request, Server};
use crate::tls::{TlsAcceptor, TlsStream}; use crate::tls::{TlsAcceptor, TlsStream};
use std::net::{IpAddr, SocketAddr, TcpListener as StdTcpListener}; use std::net::{IpAddr, SocketAddr, TcpListener as StdTcpListener};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use futures::future::join_all; use futures::future::join_all;
@ -38,7 +39,8 @@ async fn run() -> BoxResult<()> {
logger::init().map_err(|e| format!("Failed to init logger, {}", e))?; logger::init().map_err(|e| format!("Failed to init logger, {}", e))?;
let args = Args::parse(matches())?; let args = Args::parse(matches())?;
let args = Arc::new(args); let args = Arc::new(args);
let handles = serve(args.clone())?; let running = Arc::new(AtomicBool::new(true));
let handles = serve(args.clone(), running.clone())?;
print_listening(args)?; print_listening(args)?;
tokio::select! { tokio::select! {
@ -51,13 +53,17 @@ async fn run() -> BoxResult<()> {
Ok(()) Ok(())
}, },
_ = shutdown_signal() => { _ = shutdown_signal() => {
running.store(false, Ordering::SeqCst);
Ok(()) Ok(())
}, },
} }
} }
fn serve(args: Arc<Args>) -> BoxResult<Vec<JoinHandle<Result<(), hyper::Error>>>> { fn serve(
let inner = Arc::new(Server::new(args.clone())); args: Arc<Args>,
running: Arc<AtomicBool>,
) -> BoxResult<Vec<JoinHandle<Result<(), hyper::Error>>>> {
let inner = Arc::new(Server::new(args.clone(), running));
let mut handles = vec![]; let mut handles = vec![];
let port = args.port; let port = args.port;
for ip in args.addrs.iter() { for ip in args.addrs.iter() {

116
src/server.rs

@ -1,13 +1,12 @@
use crate::streamer::Streamer; use crate::streamer::Streamer;
use crate::utils::{decode_uri, encode_uri, get_file_name, try_get_file_name}; use crate::utils::{decode_uri, encode_uri, get_file_name, try_get_file_name};
use crate::{Args, BoxResult}; use crate::{Args, BoxResult};
use async_walkdir::{Filtering, WalkDir}; use walkdir::WalkDir;
use xml::escape::escape_str_pcdata; use xml::escape::escape_str_pcdata;
use async_zip::write::{EntryOptions, ZipFileWriter}; use async_zip::write::{EntryOptions, ZipFileWriter};
use async_zip::Compression; use async_zip::Compression;
use chrono::{TimeZone, Utc}; use chrono::{TimeZone, Utc};
use futures::stream::StreamExt;
use futures::TryStreamExt; use futures::TryStreamExt;
use headers::{ use headers::{
AcceptRanges, AccessControlAllowCredentials, AccessControlAllowHeaders, AcceptRanges, AccessControlAllowCredentials, AccessControlAllowHeaders,
@ -24,6 +23,7 @@ use std::fs::Metadata;
use std::io::SeekFrom; use std::io::SeekFrom;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::SystemTime; use std::time::SystemTime;
use tokio::fs::File; use tokio::fs::File;
@ -45,13 +45,15 @@ const BUF_SIZE: usize = 65536;
pub struct Server { pub struct Server {
args: Arc<Args>, args: Arc<Args>,
assets_prefix: String, assets_prefix: String,
running: Arc<AtomicBool>,
} }
impl Server { impl Server {
pub fn new(args: Arc<Args>) -> Self { pub fn new(args: Arc<Args>, running: Arc<AtomicBool>) -> Self {
let assets_prefix = format!("{}__dufs_v{}_", args.uri_prefix, env!("CARGO_PKG_VERSION")); let assets_prefix = format!("{}__dufs_v{}_", args.uri_prefix, env!("CARGO_PKG_VERSION"));
Self { Self {
args, args,
running,
assets_prefix, assets_prefix,
} }
} }
@ -331,36 +333,42 @@ impl Server {
res: &mut Response, res: &mut Response,
) -> BoxResult<()> { ) -> BoxResult<()> {
let mut paths: Vec<PathItem> = vec![]; let mut paths: Vec<PathItem> = vec![];
let path_buf = path.to_path_buf();
let hidden = self.args.hidden.to_string(); let hidden = self.args.hidden.to_string();
let search = search.to_string(); let running = self.running.clone();
let mut walkdir = WalkDir::new(path).filter(move |entry| { let search = search.to_lowercase();
let hidden_cloned = hidden.clone(); let search_paths = tokio::task::spawn_blocking(move || {
let search_cloned = search.clone(); let mut it = WalkDir::new(&path_buf).into_iter();
async move { let mut paths: Vec<PathBuf> = vec![];
while let Some(Ok(entry)) = it.next() {
if !running.load(Ordering::SeqCst) {
break;
}
let entry_path = entry.path(); let entry_path = entry.path();
let base_name = get_file_name(&entry_path); let base_name = get_file_name(entry_path);
if is_hidden(&hidden_cloned, base_name) { let file_type = entry.file_type();
return Filtering::IgnoreDir; if is_hidden(&hidden, base_name) {
if file_type.is_dir() {
it.skip_current_dir();
} }
if !base_name continue;
.to_lowercase() }
.contains(&search_cloned.to_lowercase()) if !base_name.to_lowercase().contains(&search) {
{ continue;
return Filtering::Ignore;
} }
if fs::symlink_metadata(entry.path()).await.is_err() { if entry.path().symlink_metadata().is_err() {
return Filtering::Ignore; continue;
} }
Filtering::Continue paths.push(entry_path.to_path_buf());
} }
}); paths
while let Some(entry) = walkdir.next().await { })
if let Ok(entry) = entry { .await?;
if let Ok(Some(item)) = self.to_pathitem(entry.path(), path.to_path_buf()).await { for search_path in search_paths.into_iter() {
if let Ok(Some(item)) = self.to_pathitem(search_path, path.to_path_buf()).await {
paths.push(item); paths.push(item);
} }
} }
}
self.send_index(path, paths, true, head_only, res) self.send_index(path, paths, true, head_only, res)
} }
@ -387,8 +395,9 @@ impl Server {
} }
let path = path.to_owned(); let path = path.to_owned();
let hidden = self.args.hidden.clone(); let hidden = self.args.hidden.clone();
let running = self.running.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = zip_dir(&mut writer, &path, &hidden).await { if let Err(e) = zip_dir(&mut writer, &path, &hidden, running).await {
error!("Failed to zip {}, {}", path.display(), e); error!("Failed to zip {}, {}", path.display(), e);
} }
}); });
@ -992,42 +1001,55 @@ fn res_multistatus(res: &mut Response, content: &str) {
)); ));
} }
async fn zip_dir<W: AsyncWrite + Unpin>(writer: &mut W, dir: &Path, hidden: &str) -> BoxResult<()> { async fn zip_dir<W: AsyncWrite + Unpin>(
writer: &mut W,
dir: &Path,
hidden: &str,
running: Arc<AtomicBool>,
) -> BoxResult<()> {
let mut writer = ZipFileWriter::new(writer); let mut writer = ZipFileWriter::new(writer);
let hidden = Arc::new(hidden.to_string());
let hidden = hidden.to_string(); let hidden = hidden.to_string();
let mut walkdir = WalkDir::new(dir).filter(move |entry| { let dir_path_buf = dir.to_path_buf();
let hidden = hidden.clone(); let zip_paths = tokio::task::spawn_blocking(move || {
async move { let mut it = WalkDir::new(&dir_path_buf).into_iter();
let mut paths: Vec<PathBuf> = vec![];
while let Some(Ok(entry)) = it.next() {
if !running.load(Ordering::SeqCst) {
break;
}
let entry_path = entry.path(); let entry_path = entry.path();
let base_name = get_file_name(&entry_path); let base_name = get_file_name(entry_path);
let file_type = entry.file_type();
if is_hidden(&hidden, base_name) { if is_hidden(&hidden, base_name) {
return Filtering::IgnoreDir; if file_type.is_dir() {
it.skip_current_dir();
} }
let meta = match fs::symlink_metadata(entry.path()).await { continue;
Ok(meta) => meta,
Err(_) => return Filtering::Ignore,
};
if !meta.is_file() {
return Filtering::Ignore;
} }
Filtering::Continue if entry.path().symlink_metadata().is_err() {
continue;
} }
}); if !file_type.is_file() {
while let Some(entry) = walkdir.next().await { continue;
if let Ok(entry) = entry { }
let entry_path = entry.path(); paths.push(entry_path.to_path_buf());
let filename = match entry_path.strip_prefix(dir).ok().and_then(|v| v.to_str()) { }
paths
})
.await?;
for zip_path in zip_paths.into_iter() {
let filename = match zip_path.strip_prefix(dir).ok().and_then(|v| v.to_str()) {
Some(v) => v, Some(v) => v,
None => continue, None => continue,
}; };
let entry_options = EntryOptions::new(filename.to_owned(), Compression::Deflate) let entry_options =
.unix_permissions(0o644); EntryOptions::new(filename.to_owned(), Compression::Deflate).unix_permissions(0o644);
let mut file = File::open(&entry_path).await?; let mut file = File::open(&zip_path).await?;
let mut file_writer = writer.write_entry_stream(entry_options).await?; let mut file_writer = writer.write_entry_stream(entry_options).await?;
io::copy(&mut file, &mut file_writer).await?; io::copy(&mut file, &mut file_writer).await?;
file_writer.close().await?; file_writer.close().await?;
} }
}
writer.close().await?; writer.close().await?;
Ok(()) Ok(())
} }

Loading…
Cancel
Save