sppd_cli/
extractor.rs

1use crate::errors::{AppError, AppResult};
2use crate::models::ProcurementType;
3use crate::utils::{format_duration, mb_from_bytes, round_two_decimals};
4use rayon::{prelude::*, ThreadPoolBuilder};
5use std::collections::{BTreeMap, HashSet};
6use std::fs::{self, File};
7use std::io::{copy, BufWriter, Write};
8use std::path::{Path, PathBuf};
9use std::sync::Arc;
10use std::time::Instant;
11use tracing::{debug, info, warn};
12use zip::ZipArchive;
13
14/// Extracts ZIP files from the specified directory into subdirectories.
15///
16/// This function processes ZIP files that correspond to periods in `target_links`.
17/// For each period (e.g., "202301"), it looks for a corresponding ZIP file (`202301.zip`)
18/// in the extraction directory and extracts its contents into a subdirectory named
19/// after the period (`202301/`).
20///
21/// # Behavior
22///
23/// - **Skip existing**: If an extraction directory already exists for a period, that
24///   ZIP file is skipped.
25/// - **Missing files**: Missing ZIP files are logged as warnings but don't fail the
26///   operation.
27/// - **Progress tracking**: Elapsed time and throughput are logged after extraction.
28///
29/// # Arguments
30///
31/// * `target_links` - Map of period strings to URLs (used to determine which ZIPs to extract)
32/// * `procurement_type` - Procurement type determining the extraction directory
33///
34/// # Directory Structure
35///
36/// For a period "202301", the function expects:
37/// - Input: `{extract_dir}/202301.zip`
38/// - Output: `{extract_dir}/202301/` (contains extracted XML/Atom files)
39///
40/// # Errors
41///
42/// Returns an error if:
43/// - The extraction directory doesn't exist
44/// - ZIP file extraction fails for any file
45///
46pub async fn extract_all_zips(
47    target_links: &BTreeMap<String, String>,
48    procurement_type: &ProcurementType,
49    config: &crate::config::ResolvedConfig,
50) -> AppResult<()> {
51    let extract_dir = procurement_type.extract_dir(config);
52    if !extract_dir.exists() {
53        return Err(AppError::IoError(format!(
54            "Directory does not exist: {}",
55            extract_dir.display()
56        )));
57    }
58
59    // Collect ZIP files that need extraction
60    // Pre-allocate with known upper bound (bounded by target_links.len())
61    let capacity = target_links.len();
62    let mut zips_to_extract: Vec<PathBuf> = Vec::with_capacity(capacity);
63    let mut missing_zips = Vec::with_capacity(capacity);
64
65    for period in target_links.keys() {
66        let zip_path = extract_dir.join(format!("{period}.zip"));
67        if !zip_path.exists() {
68            missing_zips.push((period.clone(), zip_path));
69            continue;
70        }
71
72        // Check if extraction directory already exists
73        let extract_dir_path = zip_path
74            .parent()
75            .ok_or_else(|| {
76                AppError::InvalidInput(format!(
77                    "ZIP file has no parent directory: {}",
78                    zip_path.display()
79                ))
80            })?
81            .join(period);
82
83        if !extract_dir_path.exists() {
84            zips_to_extract.push(zip_path);
85        }
86    }
87
88    let total_zips = zips_to_extract.len();
89    let skipped_count = target_links.len() - total_zips - missing_zips.len();
90
91    if total_zips == 0 {
92        info!(
93            total = target_links.len(),
94            skipped = skipped_count,
95            missing = missing_zips.len(),
96            "All ZIP files already extracted, skipping extraction"
97        );
98        return Ok(());
99    }
100
101    // Log warnings for missing ZIP files
102    for (period, zip_path) in &missing_zips {
103        warn!(
104            zip_file = %zip_path.display(),
105            period = period,
106            "ZIP file not found, skipping"
107        );
108    }
109
110    info!(
111        total = total_zips,
112        skipped = skipped_count,
113        missing = missing_zips.len(),
114        "Starting extraction"
115    );
116
117    let start = Instant::now();
118
119    let cpu_count = std::thread::available_parallelism()
120        .map(|p| p.get())
121        .unwrap_or(1);
122    let thread_count = cpu_count.saturating_mul(2);
123    let rayon_pool = ThreadPoolBuilder::new()
124        .num_threads(thread_count)
125        .build()
126        .map_err(|e| AppError::IoError(format!("Failed to configure rayon thread pool: {e}")))?;
127
128    // Run parallel extraction using rayon within spawn_blocking
129    let results = tokio::task::spawn_blocking(move || {
130        rayon_pool.install(|| {
131            zips_to_extract
132                .par_iter()
133                .map(|zip_path| {
134                    let result = extract_zip_sync(zip_path);
135                    (zip_path.clone(), result)
136                })
137                .collect::<Vec<(PathBuf, AppResult<()>)>>()
138        })
139    })
140    .await
141    .map_err(|e| AppError::IoError(format!("Task join error: {e}")))?;
142
143    // Collect errors
144    let mut errors = Vec::new();
145    let mut extracted_bytes = 0u64;
146    for (zip_path, result) in results {
147        if let Err(e) = result {
148            let error_msg = format!("Failed to extract {}: {}", zip_path.display(), e);
149            warn!(
150                zip_file = %zip_path.display(),
151                error = %e,
152                "Failed to extract ZIP file"
153            );
154            errors.push(error_msg);
155            continue;
156        }
157
158        if let Some(extract_dir) = extracted_dir_for_zip(&zip_path) {
159            extracted_bytes += directory_size(&extract_dir);
160        }
161    }
162
163    if !errors.is_empty() {
164        return Err(AppError::IoError(format!(
165            "Failed to extract {} ZIP file(s): {}",
166            errors.len(),
167            errors.join("; ")
168        )));
169    }
170
171    if skipped_count > 0 {
172        debug!(skipped = skipped_count, "Skipped already extracted files");
173    }
174
175    let elapsed = start.elapsed();
176    let elapsed_str = format_duration(elapsed);
177    let total_mb = mb_from_bytes(extracted_bytes);
178    let throughput = if elapsed.as_secs_f64() > 0.0 {
179        total_mb / elapsed.as_secs_f64()
180    } else {
181        total_mb
182    };
183    let size_mb = round_two_decimals(total_mb);
184    let throughput_mb_s = round_two_decimals(throughput);
185
186    info!(
187        extracted = total_zips,
188        skipped = skipped_count,
189        missing = missing_zips.len(),
190        elapsed = elapsed_str,
191        size_mb = size_mb,
192        throughput_mb_s = throughput_mb_s,
193        "Extraction completed"
194    );
195
196    Ok(())
197}
198
199/// Synchronous function to extract a single ZIP file.
200/// This is used by rayon for parallel processing.
201fn extract_zip_sync(zip_path: &Path) -> AppResult<()> {
202    let zip_file_name = zip_path
203        .file_stem()
204        .and_then(|s| s.to_str())
205        .ok_or_else(|| {
206            AppError::InvalidInput(format!("Invalid ZIP file name: {}", zip_path.display()))
207        })?;
208
209    let extract_dir = zip_path
210        .parent()
211        .ok_or_else(|| {
212            AppError::InvalidInput(format!(
213                "ZIP file has no parent directory: {}",
214                zip_path.display()
215            ))
216        })?
217        .join(zip_file_name);
218
219    // Skip if extraction directory already exists
220    if extract_dir.exists() {
221        debug!(
222            zip_file = %zip_path.display(),
223            extract_dir = %extract_dir.display(),
224            "Skipping extraction, directory already exists"
225        );
226        return Ok(());
227    }
228
229    // Create extraction directory
230    std::fs::create_dir_all(&extract_dir).map_err(|e| {
231        AppError::IoError(format!(
232            "Failed to create extraction directory {}: {}",
233            extract_dir.display(),
234            e
235        ))
236    })?;
237
238    // Open and extract ZIP file
239    let file = File::open(zip_path).map_err(|e| {
240        AppError::IoError(format!(
241            "Failed to open ZIP file {}: {}",
242            zip_path.display(),
243            e
244        ))
245    })?;
246
247    let mut archive = ZipArchive::new(file).map_err(|e| {
248        AppError::ParseError(format!(
249            "Failed to read ZIP archive {}: {}",
250            zip_path.display(),
251            e
252        ))
253    })?;
254
255    let mut entries = Vec::with_capacity(archive.len());
256    let mut created_dirs = HashSet::new();
257
258    for i in 0..archive.len() {
259        let file = archive.by_index(i).map_err(|e| {
260            AppError::ParseError(format!(
261                "Failed to read file {} from ZIP {}: {}",
262                i,
263                zip_path.display(),
264                e
265            ))
266        })?;
267
268        let out_path = match file.enclosed_name() {
269            Some(path) => extract_dir.join(path),
270            None => continue,
271        };
272
273        if file.name().ends_with('/') {
274            continue;
275        }
276
277        if let Some(parent) = out_path.parent() {
278            if created_dirs.insert(parent.to_path_buf()) {
279                std::fs::create_dir_all(parent).map_err(|e| {
280                    AppError::IoError(format!(
281                        "Failed to create directory {}: {}",
282                        parent.display(),
283                        e
284                    ))
285                })?;
286            }
287        }
288
289        entries.push((i, out_path));
290    }
291
292    drop(archive);
293
294    let zip_path_arc = Arc::new(zip_path.to_path_buf());
295    entries
296        .par_iter()
297        .map(|(index, out_path)| {
298            let zip_path = zip_path_arc.clone();
299            let file = File::open(&*zip_path).map_err(|e| {
300                AppError::IoError(format!(
301                    "Failed to open ZIP file {}: {}",
302                    zip_path.display(),
303                    e
304                ))
305            })?;
306
307            let mut archive = ZipArchive::new(file).map_err(|e| {
308                AppError::ParseError(format!(
309                    "Failed to read ZIP archive {}: {}",
310                    zip_path.display(),
311                    e
312                ))
313            })?;
314
315            let mut file = archive.by_index(*index).map_err(|e| {
316                AppError::ParseError(format!(
317                    "Failed to read file {} from ZIP {}: {}",
318                    index,
319                    zip_path.display(),
320                    e
321                ))
322            })?;
323
324            let out_file = std::fs::File::create(out_path).map_err(|e| {
325                AppError::IoError(format!(
326                    "Failed to create file {}: {}",
327                    out_path.display(),
328                    e
329                ))
330            })?;
331
332            let mut writer = BufWriter::with_capacity(32 * 1024, out_file);
333            copy(&mut file, &mut writer).map_err(|e| {
334                AppError::IoError(format!(
335                    "Failed to copy file from ZIP {} to {}: {}",
336                    zip_path.display(),
337                    out_path.display(),
338                    e
339                ))
340            })?;
341            writer.flush().map_err(|e| {
342                AppError::IoError(format!(
343                    "Failed to flush file {}: {}",
344                    out_path.display(),
345                    e
346                ))
347            })?;
348
349            Ok(())
350        })
351        .collect::<AppResult<Vec<()>>>()?;
352
353    Ok(())
354}
355
356fn extracted_dir_for_zip(zip_path: &Path) -> Option<PathBuf> {
357    let parent = zip_path.parent()?;
358    let stem = zip_path.file_stem()?;
359    Some(parent.join(stem))
360}
361
362fn directory_size(dir: &Path) -> u64 {
363    let mut total = 0;
364    if let Ok(entries) = fs::read_dir(dir) {
365        for entry in entries.flatten() {
366            let path = entry.path();
367            if path.is_dir() {
368                total += directory_size(&path);
369            } else if let Ok(metadata) = entry.metadata() {
370                total += metadata.len();
371            }
372        }
373    }
374    total
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380    use std::fs;
381    use tempfile::TempDir;
382
383    #[test]
384    fn extracted_dir_for_zip_returns_parent_stem() {
385        let zip = PathBuf::from("/tmp/data/202401.zip");
386        assert_eq!(
387            extracted_dir_for_zip(&zip),
388            Some(PathBuf::from("/tmp/data/202401"))
389        );
390    }
391
392    #[test]
393    fn directory_size_counts_nested_files() {
394        let tmp = TempDir::new().unwrap();
395        let base = tmp.path().join("dir");
396        fs::create_dir_all(base.join("nested")).unwrap();
397        fs::write(base.join("file1.txt"), vec![0u8; 10]).unwrap();
398        fs::write(base.join("nested/file2.txt"), vec![0u8; 20]).unwrap();
399
400        let size = directory_size(&base);
401        assert_eq!(size, 30);
402    }
403}