sppd_cli/parser/
parquet_writer.rs

1use crate::errors::{AppError, AppResult};
2use crate::models::Entry;
3use crate::utils::{format_duration, mb_from_bytes, round_two_decimals};
4use futures::stream::{self, StreamExt, TryStreamExt};
5use polars::lazy::prelude::{LazyFrame, ScanArgsParquet};
6use polars::prelude::*;
7use rayon::prelude::*;
8use std::collections::BTreeMap;
9use std::fs::{self as std_fs, File};
10use std::path::PathBuf;
11use std::time::Instant;
12use tokio::fs as tokio_fs;
13use tracing::info;
14
15use super::file_finder::find_xmls;
16use super::xml_parser::parse_xml_bytes;
17
18/// Converts a vector of Entry structs into a Polars DataFrame.
19///
20/// This helper function creates a DataFrame from a slice of Entry structs,
21/// ensuring consistent schema across all DataFrame creations.
22/// Optimized to pre-allocate vectors and use take() instead of clone() where possible.
23fn entries_to_dataframe(entries: Vec<Entry>) -> AppResult<DataFrame> {
24    if entries.is_empty() {
25        // Return empty DataFrame with correct schema
26        return DataFrame::new(vec![
27            Series::new("id", Vec::<Option<String>>::new()),
28            Series::new("title", Vec::<Option<String>>::new()),
29            Series::new("link", Vec::<Option<String>>::new()),
30            Series::new("summary", Vec::<Option<String>>::new()),
31            Series::new("updated", Vec::<Option<String>>::new()),
32            Series::new("contract_folder_status", Vec::<Option<String>>::new()),
33        ])
34        .map_err(|e| AppError::ParseError(format!("Failed to create DataFrame: {e}")));
35    }
36
37    let len = entries.len();
38    // Pre-allocate vectors with known capacity
39    let mut ids = Vec::with_capacity(len);
40    let mut titles = Vec::with_capacity(len);
41    let mut links = Vec::with_capacity(len);
42    let mut summaries = Vec::with_capacity(len);
43    let mut updateds = Vec::with_capacity(len);
44    let mut contract_folder_statuses = Vec::with_capacity(len);
45
46    // Use take() to move values instead of cloning
47    for entry in entries {
48        ids.push(entry.id);
49        titles.push(entry.title);
50        links.push(entry.link);
51        summaries.push(entry.summary);
52        updateds.push(entry.updated);
53        contract_folder_statuses.push(entry.contract_folder_status);
54    }
55
56    DataFrame::new(vec![
57        Series::new("id", ids),
58        Series::new("title", titles),
59        Series::new("link", links),
60        Series::new("summary", summaries),
61        Series::new("updated", updateds),
62        Series::new("contract_folder_status", contract_folder_statuses),
63    ])
64    .map_err(|e| AppError::ParseError(format!("Failed to create DataFrame: {e}")))
65}
66
67async fn read_xml_contents(paths: &[PathBuf], concurrency: usize) -> AppResult<Vec<Vec<u8>>> {
68    let read_concurrency = concurrency.max(1);
69    stream::iter(paths.iter().cloned())
70        .map(|path| async move {
71            tokio_fs::read(&path)
72                .await
73                .map_err(|e| AppError::IoError(format!("Failed to read XML file {path:?}: {e}")))
74        })
75        .buffered(read_concurrency)
76        .try_collect()
77        .await
78}
79
80/// Parses XML/Atom files and converts them to Parquet format.
81///
82/// This function processes extracted XML/Atom files from the extraction directory,
83/// parses them into `Entry` structures, and writes the results as Parquet files.
84///
85/// # Workflow
86///
87/// 1. Finds all subdirectories in the extraction directory that contain XML/Atom files
88/// 2. Filters to only process subdirectories matching periods in `target_links`
89/// 3. Parses XML/Atom files in each matching subdirectory in batches, bounded by `batch_size`
90/// 4. Writes each batch to Parquet and optionally concatenates the batches per period
91///
92/// # Directory Structure
93///
94/// The function expects the following structure:
95/// - Input: `{extract_dir}/{period}/` (contains XML/Atom files)
96/// - Output: `{parquet_dir}/{period}.parquet`
97///
98/// # Arguments
99///
100/// * `target_links` - Map of period strings to URLs (used to filter which periods to process)
101/// * `procurement_type` - Procurement type determining the extract and parquet directories
102/// * `batch_size` - Number of XML files to process per chunk (affects memory usage)
103/// * `config` - Resolved configuration containing directory paths
104///
105/// # Behavior
106///
107/// - **Filtering**: Only processes subdirectories whose names match keys in `target_links`
108/// - **Skip empty**: Subdirectories with no entries are skipped (logged but not an error)
109/// - **Batch output**: Each chunk results in `data/parquet/{period}/batch_<n>.parquet`, `concat_batches` merges them afterwards
110/// - **Memory controls**: `batch_size` bounds the in-flight DataFrame and `read_concurrency` limits parallel reads
111/// - **Progress tracking**: Elapsed time and throughput are logged after parsing completes
112///
113/// # Errors
114///
115/// Returns an error if:
116/// - Directory creation fails
117/// - XML parsing fails
118/// - DataFrame creation fails
119/// - Parquet file writing fails
120pub async fn parse_xmls(
121    target_links: &BTreeMap<String, String>,
122    procurement_type: &crate::models::ProcurementType,
123    batch_size: usize,
124    config: &crate::config::ResolvedConfig,
125) -> AppResult<()> {
126    let extract_dir = procurement_type.extract_dir(config);
127    let parquet_dir = procurement_type.parquet_dir(config);
128
129    // Create parquet directory if it doesn't exist
130    std_fs::create_dir_all(&parquet_dir)
131        .map_err(|e| AppError::IoError(format!("Failed to create parquet directory: {e}")))?;
132
133    // Find all subdirectories with XML/atom files
134    let subdirs = find_xmls(&extract_dir)?;
135
136    // Filter subdirectories that match keys in target_links
137    let subdirs_to_process: Vec<_> = subdirs
138        .into_iter()
139        .filter(|(subdir_name, _)| target_links.contains_key(subdir_name))
140        .collect();
141
142    let total_subdirs = subdirs_to_process.len();
143
144    if total_subdirs == 0 {
145        info!("No matching subdirectories found for parsing");
146        return Ok(());
147    }
148
149    // Calculate total XML files across all periods for logging
150    let total_xml_files: usize = subdirs_to_process
151        .iter()
152        .map(|(_, files)| files.len())
153        .sum();
154
155    let start = Instant::now();
156    let mut total_parquet_bytes = 0u64;
157
158    info!(total = total_subdirs, "Starting XML parsing");
159
160    let mut processed_count = 0;
161    let mut skipped_count = 0;
162
163    // Process each subdirectory
164    for (subdir_name, xml_files) in subdirs_to_process {
165        let chunk_size = batch_size.max(1);
166        let mut has_entries = false;
167        let mut batch_index = 0;
168        let period_dir = parquet_dir.join(&subdir_name);
169        let mut period_dir_created = false;
170        let mut batch_paths: Vec<PathBuf> = Vec::new();
171
172        for xml_chunk in xml_files.chunks(chunk_size) {
173            let xml_contents = read_xml_contents(xml_chunk, config.read_concurrency).await?;
174            let parsed_entry_batches: Vec<Vec<Entry>> = xml_contents
175                .par_iter()
176                .map(|content| parse_xml_bytes(content))
177                .collect::<AppResult<Vec<_>>>()?;
178
179            let mut chunk_entries = Vec::new();
180            for mut entries in parsed_entry_batches {
181                if entries.is_empty() {
182                    continue;
183                }
184                chunk_entries.append(&mut entries);
185            }
186
187            if chunk_entries.is_empty() {
188                continue;
189            }
190
191            if !period_dir_created {
192                if period_dir.exists() {
193                    std_fs::remove_dir_all(&period_dir).map_err(|e| {
194                        AppError::IoError(format!(
195                            "Failed to remove previous parquet directory {period_dir:?}: {e}"
196                        ))
197                    })?;
198                }
199                std_fs::create_dir_all(&period_dir).map_err(|e| {
200                    AppError::IoError(format!(
201                        "Failed to create parquet period directory {period_dir:?}: {e}"
202                    ))
203                })?;
204                period_dir_created = true;
205            }
206
207            has_entries = true;
208            let mut chunk_df = entries_to_dataframe(chunk_entries)?;
209            let batch_path = period_dir.join(format!("batch_{batch_index}.parquet"));
210            let mut file = File::create(&batch_path).map_err(|e| {
211                AppError::IoError(format!(
212                    "Failed to create Parquet batch file {batch_path:?}: {e}"
213                ))
214            })?;
215
216            ParquetWriter::new(&mut file)
217                .finish(&mut chunk_df)
218                .map_err(|e| AppError::ParseError(format!("Failed to write Parquet batch: {e}")))?;
219
220            batch_paths.push(batch_path);
221            batch_index += 1;
222        }
223
224        if !has_entries {
225            skipped_count += 1;
226            if period_dir_created {
227                std_fs::remove_dir_all(&period_dir).map_err(|e| {
228                    AppError::IoError(format!(
229                        "Failed to remove empty parquet directory {period_dir:?}: {e}"
230                    ))
231                })?;
232            }
233            continue;
234        }
235
236        let mut output_paths = Vec::new();
237        if config.concat_batches {
238            let glob_path = period_dir.join("batch_*.parquet");
239            let glob_str = glob_path.to_string_lossy().into_owned();
240            let mut combined = LazyFrame::scan_parquet(&glob_str, ScanArgsParquet::default())
241                .map_err(|e| {
242                    AppError::ParseError(format!(
243                        "Failed to scan parquet batches for {subdir_name}: {e}"
244                    ))
245                })?
246                .collect()
247                .map_err(|e| {
248                    AppError::ParseError(format!(
249                        "Failed to collect combined DataFrame for {subdir_name}: {e}"
250                    ))
251                })?;
252
253            let final_path = parquet_dir.join(format!("{subdir_name}.parquet"));
254            let mut final_file = File::create(&final_path).map_err(|e| {
255                AppError::IoError(format!(
256                    "Failed to create final Parquet file {final_path:?}: {e}"
257                ))
258            })?;
259
260            ParquetWriter::new(&mut final_file)
261                .finish(&mut combined)
262                .map_err(|e| {
263                    AppError::ParseError(format!("Failed to write final Parquet file: {e}"))
264                })?;
265
266            output_paths.push(final_path);
267            std_fs::remove_dir_all(&period_dir).map_err(|e| {
268                AppError::IoError(format!(
269                    "Failed to remove temporary parquet directory {period_dir:?}: {e}"
270                ))
271            })?;
272        } else {
273            output_paths.extend(batch_paths.iter().cloned());
274        }
275
276        for output_path in output_paths {
277            let metadata = std_fs::metadata(&output_path).map_err(|e| {
278                AppError::IoError(format!(
279                    "Failed to read Parquet file metadata {output_path:?}: {e}"
280                ))
281            })?;
282            total_parquet_bytes += metadata.len();
283        }
284
285        processed_count += 1;
286    }
287
288    let elapsed = start.elapsed();
289    let elapsed_str = format_duration(elapsed);
290    let total_mb = mb_from_bytes(total_parquet_bytes);
291    let throughput = if elapsed.as_secs_f64() > 0.0 {
292        total_mb / elapsed.as_secs_f64()
293    } else {
294        total_mb
295    };
296    let size_mb = round_two_decimals(total_mb);
297    let throughput_mb_s = round_two_decimals(throughput);
298
299    info!(
300        processed = processed_count,
301        skipped = skipped_count,
302        xml_files = total_xml_files,
303        parquet_files = processed_count,
304        elapsed = elapsed_str,
305        output_size_mb = size_mb,
306        throughput_mb_s = throughput_mb_s,
307        "Parsing completed"
308    );
309
310    Ok(())
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316
317    #[test]
318    fn entries_to_dataframe_empty_yields_zero_rows() {
319        let df = entries_to_dataframe(vec![]).unwrap();
320        assert_eq!(df.height(), 0);
321        assert_eq!(df.width(), 6);
322    }
323
324    #[test]
325    fn entries_to_dataframe_single_entry() {
326        let entry = Entry {
327            id: Some("id".to_string()),
328            title: Some("title".to_string()),
329            link: Some("link".to_string()),
330            summary: Some("summary".to_string()),
331            updated: Some("2023-01-01".to_string()),
332            contract_folder_status: Some("{}".to_string()),
333        };
334
335        let df = entries_to_dataframe(vec![entry]).unwrap();
336        assert_eq!(df.height(), 1);
337        let value = df.column("id").unwrap().get(0).unwrap();
338        assert_eq!(value, AnyValue::String("id"));
339    }
340}