Skip to main content

haste_sql_on_fhir/
lib.rs

1use base64::{Engine as _, engine::general_purpose};
2use chrono::Utc;
3use futures::{StreamExt as _, stream::FuturesOrdered};
4use haste_fhir_client::FHIRClient;
5use haste_fhir_generated_ops::generated::ViewDefinitionRun;
6use haste_fhir_model::r4::{
7    self,
8    generated::{
9        resources::{Binary, Resource, ResourceType, ViewDefinition},
10        terminology::{IssueType, OutputFormatCodes},
11        types::{FHIRBase64Binary, FHIRBoolean},
12    },
13};
14use haste_fhir_operation_error::OperationOutcomeError;
15use haste_fhirpath::{Config, FPEngine};
16use haste_reflect::MetaValue;
17use itertools::Itertools as _;
18use ordermap::OrderMap;
19use serde::{Deserialize, Serialize};
20use std::{borrow::Cow, collections::HashMap, sync::Arc};
21
22use crate::conversions::primitives::PrimitiveValue;
23
24mod conversions;
25mod output;
26
27async fn resolve_view_definition<
28    'a,
29    CTX: Send + Sync + Clone + 'static,
30    Client: FHIRClient<CTX, OperationOutcomeError> + Send + Sync + 'static,
31>(
32    context: CTX,
33    client: &Client,
34    input: &'a ViewDefinitionRun::Input,
35) -> Result<Cow<'a, ViewDefinition>, OperationOutcomeError> {
36    if let Some(view_definition) = &input.viewResource {
37        Ok(Cow::Borrowed(view_definition))
38    } else if let Some(view_definition_reference) = input.viewReference.as_ref() {
39        let view_definition_reference = view_definition_reference
40            .reference
41            .as_ref()
42            .ok_or_else(|| {
43                OperationOutcomeError::error(
44                    IssueType::Invalid(None),
45                    "viewReference.reference is required".to_string(),
46                )
47            })?
48            .value
49            .as_ref()
50            .ok_or_else(|| {
51                OperationOutcomeError::error(
52                    IssueType::Invalid(None),
53                    "viewReference.reference.value is required".to_string(),
54                )
55            })?;
56
57        let reference_pieces = view_definition_reference.split('/').collect::<Vec<_>>();
58
59        let view_definition_id = reference_pieces
60            .last()
61            .ok_or_else(|| {
62                OperationOutcomeError::error(
63                    IssueType::Invalid(None),
64                    "Invalid viewReference.reference format".to_string(),
65                )
66            })?
67            .to_string();
68
69        let result = client
70            .read(
71                context,
72                ResourceType::ViewDefinition,
73                view_definition_id.clone(),
74            )
75            .await?
76            .ok_or_else(|| {
77                OperationOutcomeError::error(
78                    IssueType::NotFound(None),
79                    format!(
80                        "ViewDefinition not found with id '{:?}'",
81                        view_definition_id
82                    ),
83                )
84            })?;
85
86        if let Resource::ViewDefinition(view_definition) = result {
87            Ok(Cow::Owned(view_definition))
88        } else {
89            Err(OperationOutcomeError::error(
90                IssueType::Invalid(None),
91                "Referenced resource is not a ViewDefinition".to_string(),
92            ))
93        }
94    } else {
95        Err(OperationOutcomeError::error(
96            IssueType::Invalid(None),
97            "Either viewResource or viewReference must be provided".to_string(),
98        ))
99    }
100}
101
102async fn get_resources_to_process<
103    CTX: Send + Sync + Clone + 'static,
104    Client: FHIRClient<CTX, OperationOutcomeError> + Send + Sync + 'static,
105>(
106    context: CTX,
107    client: &Client,
108    view_definition: &ViewDefinition,
109    input: &ViewDefinitionRun::Input,
110) -> Result<Vec<Resource>, OperationOutcomeError> {
111    if let Some(input_resources) = input.resource.clone() {
112        Ok(input_resources)
113    } else {
114        let since = input
115            ._since
116            .as_ref()
117            .and_then(|since| since.value.clone())
118            .unwrap_or(r4::datetime::Instant::Iso8601(Utc::now()));
119
120        let Some(resource_type): Option<String> = view_definition.resource.as_ref().into() else {
121            return Err(OperationOutcomeError::error(
122                IssueType::Invalid(None),
123                "ViewDefinition.resource is required".to_string(),
124            ));
125        };
126
127        let resource_type = ResourceType::try_from(resource_type).map_err(|e| {
128            OperationOutcomeError::error(
129                IssueType::Invalid(None),
130                format!("Invalid resource type: {}", e),
131            )
132        })?;
133
134        let result = client
135            .history_type(
136                context,
137                resource_type,
138                vec![
139                    ("_since".to_string(), vec![since.to_string()]),
140                    ("_count".to_string(), vec!["1000".to_string()]),
141                ]
142                .into(),
143            )
144            .await?;
145
146        Ok(vec![Resource::Bundle(result)])
147    }
148}
149
150fn build_hashmap_fp_variables<'a>(
151    viewdefinition: &'a ViewDefinition,
152) -> HashMap<String, &'a dyn MetaValue> {
153    let mut hashmap = HashMap::new();
154
155    if let Some(constants) = &viewdefinition.constant {
156        for constant in constants {
157            if let Some(name) = &constant.name.value.as_ref() {
158                hashmap.insert((*name).clone(), &constant.value as &dyn MetaValue);
159            }
160        }
161    }
162
163    hashmap
164}
165
166fn cartesian_product(
167    select_statement_results: Vec<Vec<OrderMap<String, OutputResults>>>,
168) -> Vec<OrderMap<String, OutputResults>> {
169    let mut output_results = Vec::new();
170
171    for combination in select_statement_results
172        .into_iter()
173        .multi_cartesian_product()
174    {
175        let mut combined_result = OrderMap::new();
176
177        for result in combination {
178            for (key, value) in result {
179                combined_result.insert(key, value);
180            }
181        }
182
183        output_results.push(combined_result);
184    }
185
186    output_results
187}
188
189// Need to distinguish between a scalar value and a collection of values for each column in the output. This enum helps to represent that distinction.
190#[derive(Debug, Clone, Deserialize, Serialize)]
191#[serde(untagged)]
192enum OutputResults {
193    Scalar(Option<PrimitiveValue>),
194    Collection(Vec<Option<PrimitiveValue>>),
195}
196
197async fn process_resource<
198    CTX: Send + Sync + Clone + 'static,
199    Client: FHIRClient<CTX, OperationOutcomeError> + Send + Sync + 'static,
200>(
201    _context: CTX,
202    _client: Arc<Client>,
203    variables: Arc<HashMap<String, &dyn MetaValue>>,
204    view_definition: &ViewDefinition,
205    input: Box<Resource>,
206) -> Result<Vec<OrderMap<String, OutputResults>>, OperationOutcomeError> {
207    let fp_engine = FPEngine::new();
208
209    let mut select_statement_results = Vec::with_capacity(view_definition.select.len());
210
211    for select_statement in view_definition.select.iter() {
212        let fp_config = Arc::new(
213            Config::builder()
214                .with_variable_resolver(haste_fhirpath::ExternalConstantResolver::Variable(
215                    variables.clone(),
216                ))
217                .with_resource_id(input.id().clone().unwrap_or("".to_string())),
218        );
219
220        let mut iterable_context = None;
221        let mut set_null = false;
222
223        if let Some(for_each_fp) = select_statement
224            .forEach
225            .as_ref()
226            .and_then(|f| f.value.as_ref())
227        {
228            iterable_context = Some(vec![
229                fp_engine
230                    .evaluate_with_config(for_each_fp, vec![&input], fp_config.clone())
231                    .await
232                    .map_err(|e| {
233                        OperationOutcomeError::error(
234                            IssueType::Exception(None),
235                            format!("Error evaluating forEach expression: {}", e),
236                        )
237                    })?,
238            ]);
239        } else if let Some(for_each_or_null_fp) = select_statement
240            .forEachOrNull
241            .as_ref()
242            .and_then(|f| f.value.as_ref())
243        {
244            iterable_context = Some(vec![
245                fp_engine
246                    .evaluate_with_config(for_each_or_null_fp, vec![&input], fp_config.clone())
247                    .await
248                    .map_err(|e| {
249                        OperationOutcomeError::error(
250                            IssueType::Exception(None),
251                            format!("Error evaluating forEachOrNull expression: {}", e),
252                        )
253                    })?,
254            ]);
255            set_null = true;
256        } else if let Some(_repeat) = select_statement
257            .repeat
258            .as_ref()
259            .map(|r| r.iter().filter_map(|r| r.value.as_ref()))
260        {
261            let mut repeat_fps = vec![];
262            for repeat_fp in _repeat {
263                let repeat = format!("$this.repeat({})", repeat_fp);
264                repeat_fps.push(
265                    fp_engine
266                        .evaluate_with_config(&repeat, vec![&input], fp_config.clone())
267                        .await
268                        .map_err(|e| {
269                            OperationOutcomeError::error(
270                                IssueType::Exception(None),
271                                format!("Error evaluating repeat expression: {}", e),
272                            )
273                        })?,
274                );
275            }
276
277            iterable_context = Some(repeat_fps);
278        }
279
280        let select_context: Vec<&dyn MetaValue> = if let Some(iterable) = iterable_context.as_ref()
281        {
282            iterable
283                .iter()
284                .flat_map(|item| item.iter())
285                .collect::<Vec<&dyn MetaValue>>()
286        } else {
287            vec![&input]
288        };
289
290        let mut select_results = Vec::with_capacity(select_context.len());
291
292        if set_null && select_context.is_empty() {
293            let mut output_result = OrderMap::new();
294            for column in select_statement.column.as_ref().into_iter().flatten() {
295                let Some(name) = column.name.value.as_ref().map(|n| n.as_str()) else {
296                    return Err(OperationOutcomeError::error(
297                        IssueType::Invalid(None),
298                        "Column name is required".to_string(),
299                    ));
300                };
301                output_result.insert(name.to_string(), OutputResults::Scalar(None));
302            }
303            select_results.push(output_result);
304        }
305
306        for context in select_context {
307            let mut output_result = OrderMap::new();
308            for column in select_statement.column.as_ref().into_iter().flatten() {
309                let Some(path) = column.path.value.as_ref().map(|p| p.as_str()) else {
310                    return Err(OperationOutcomeError::error(
311                        IssueType::Invalid(None),
312                        "Column path is required".to_string(),
313                    ));
314                };
315
316                let Some(name) = column.name.value.as_ref().map(|n| n.as_str()) else {
317                    return Err(OperationOutcomeError::error(
318                        IssueType::Invalid(None),
319                        "Column name is required".to_string(),
320                    ));
321                };
322
323                let result = fp_engine
324                    .evaluate_with_config(path, vec![context; 1], fp_config.clone())
325                    .await
326                    .map_err(|e| {
327                        OperationOutcomeError::error(
328                            IssueType::Exception(None),
329                            format!("Error evaluating expression: {}", e),
330                        )
331                    })?;
332
333                let column_type = column
334                    .type_
335                    .as_ref()
336                    .and_then(|t| t.value.as_ref())
337                    .map(|t| t.as_str())
338                    // Default to string.
339                    .unwrap_or(
340                        // If column type is not set than assume it's the first values fhir_type
341                        // or default to string if there are no values.
342                        result
343                            .iter()
344                            .peekable()
345                            .next()
346                            .map(|v| v.fhir_type())
347                            .unwrap_or("string"),
348                    );
349
350                let mut column_result = result
351                    .iter()
352                    .map(|value| conversions::primitives::convert_meta_value(column_type, value))
353                    .collect::<Result<Vec<Option<PrimitiveValue>>, OperationOutcomeError>>()?;
354
355                let is_collection = column
356                    .collection
357                    .as_ref()
358                    .and_then(|c| c.value)
359                    .unwrap_or(false);
360
361                let insert_value = if is_collection {
362                    OutputResults::Collection(column_result)
363                } else {
364                    if column_result.len() > 1 {
365                        return Err(OperationOutcomeError::error(
366                            IssueType::Invalid(None),
367                            "Column result is a collection but the column is not marked as a collection"
368                                .to_string(),
369                        ));
370                    }
371
372                    let mut singular_value = None;
373
374                    if let Some(first_value) = column_result.get_mut(0) {
375                        std::mem::swap(&mut singular_value, first_value);
376                    }
377
378                    OutputResults::Scalar(singular_value)
379                };
380
381                output_result.insert(name.to_string(), insert_value);
382            }
383            select_results.push(output_result);
384        }
385
386        select_statement_results.push(select_results);
387    }
388
389    let output_results = cartesian_product(select_statement_results);
390
391    Ok(output_results)
392}
393
394fn flatten_results(resource: Vec<Resource>) -> Vec<Box<Resource>> {
395    let mut resources = Vec::new();
396    for resource in resource {
397        match resource {
398            Resource::Bundle(bundle) => {
399                for entry in bundle.entry.into_iter().flatten() {
400                    if let Some(resource) = entry.resource {
401                        resources.push(resource);
402                    }
403                }
404            }
405            _ => {
406                resources.push(Box::new(resource));
407            }
408        }
409    }
410
411    resources
412}
413
414async fn passes_where_clauses(
415    fp_engine: &FPEngine,
416    variables: Arc<HashMap<String, &dyn MetaValue>>,
417    where_clauses: &[&str],
418    resource: &Resource,
419) -> Result<bool, OperationOutcomeError> {
420    for where_clause in where_clauses {
421        let result = fp_engine
422            .evaluate_with_config(
423                where_clause,
424                vec![resource],
425                Arc::new(Config::builder().with_variable_resolver(
426                    haste_fhirpath::ExternalConstantResolver::Variable(variables.clone()),
427                )),
428            )
429            .await
430            .map_err(|e| {
431                OperationOutcomeError::error(
432                    IssueType::Exception(None),
433                    format!("Error evaluating where clause expression: {}", e),
434                )
435            })?;
436
437        let bool_results = result
438            .iter()
439            .map(|v| match v.fhir_type() {
440                "boolean" => Ok(v
441                    .as_any()
442                    .downcast_ref::<FHIRBoolean>()
443                    .and_then(|b| b.value.as_ref())
444                    .unwrap_or(&false)),
445                "http://hl7.org/fhirpath/System.Boolean" => {
446                    Ok(v.as_any().downcast_ref::<bool>().unwrap_or(&false))
447                }
448                _ => Err(OperationOutcomeError::error(
449                    IssueType::Invalid(None),
450                    format!(
451                        "Where clause expression must evaluate to a boolean, got: {}",
452                        v.fhir_type()
453                    ),
454                )),
455            })
456            .collect::<Result<Vec<_>, _>>()?;
457
458        if bool_results.iter().any(|v| **v == false) {
459            return Ok(false);
460        }
461    }
462
463    Ok(true)
464}
465
466async fn process_view_definition<
467    CTX: Send + Sync + Clone + 'static,
468    Client: FHIRClient<CTX, OperationOutcomeError> + Send + Sync + 'static,
469>(
470    context: CTX,
471    output_format: &OutputFormatCodes,
472    client: Arc<Client>,
473    view_definition: &ViewDefinition,
474    input: &ViewDefinitionRun::Input,
475) -> Result<Binary, OperationOutcomeError> {
476    let variables = Arc::new(build_hashmap_fp_variables(view_definition));
477    let _limit = input
478        ._limit
479        .as_ref()
480        .and_then(|limit| limit.value.clone())
481        .unwrap_or(1000);
482
483    let input_ = flatten_results(
484        get_resources_to_process(context.clone(), client.as_ref(), view_definition, input).await?,
485    );
486
487    let mut tasks = FuturesOrdered::new();
488
489    let where_clauses = view_definition
490        .where_
491        .as_ref()
492        .map(|w| Cow::Borrowed(w))
493        .unwrap_or_else(|| Cow::Owned(vec![]));
494
495    let where_fp_clauses = where_clauses
496        .iter()
497        .filter_map(|w| w.path.value.as_ref())
498        .map(|s| s.as_str())
499        .collect::<Vec<_>>();
500
501    for resource in input_ {
502        if passes_where_clauses(
503            &FPEngine::new(),
504            variables.clone(),
505            where_fp_clauses.as_slice(),
506            resource.as_ref(),
507        )
508        .await?
509        {
510            tasks.push_back(async {
511                process_resource(
512                    context.clone(),
513                    client.clone(),
514                    variables.clone(),
515                    view_definition,
516                    resource,
517                )
518                .await
519            });
520        }
521    }
522
523    let mut results = Vec::with_capacity(tasks.len());
524
525    while let Some(result) = tasks.next().await {
526        results.push(result?);
527    }
528
529    let results = results.into_iter().flatten().collect::<Vec<_>>();
530
531    match output_format {
532        OutputFormatCodes::Csv(_) => {
533            let data = output::csv::csv(results)?;
534
535            let base64_string: String = general_purpose::STANDARD.encode(&data);
536
537            Ok(Binary {
538                data: Some(Box::new(FHIRBase64Binary {
539                    value: Some(base64_string),
540                    ..Default::default()
541                })),
542                ..Default::default()
543            })
544        }
545        OutputFormatCodes::Json(_) => {
546            let data = output::json::json(results)?;
547
548            let base64_string: String = general_purpose::STANDARD.encode(&data);
549
550            Ok(Binary {
551                data: Some(Box::new(FHIRBase64Binary {
552                    value: Some(base64_string),
553                    ..Default::default()
554                })),
555                ..Default::default()
556            })
557        }
558        OutputFormatCodes::Ndjson(_) => {
559            let data = output::ndjson::ndjson(results)?;
560            let base64_string: String = general_purpose::STANDARD.encode(&data);
561
562            Ok(Binary {
563                data: Some(Box::new(FHIRBase64Binary {
564                    value: Some(base64_string),
565                    ..Default::default()
566                })),
567                ..Default::default()
568            })
569        }
570        _ => Err(OperationOutcomeError::error(
571            IssueType::NotSupported(None),
572            format!("Output format '{:?}' is not supported", output_format),
573        )),
574    }
575}
576
577pub async fn view_definition_run<
578    CTX: Send + Sync + Clone + 'static,
579    Client: FHIRClient<CTX, OperationOutcomeError> + Send + Sync + 'static,
580>(
581    context: CTX,
582    client: Arc<Client>,
583    input: &ViewDefinitionRun::Input,
584) -> Result<ViewDefinitionRun::Output, OperationOutcomeError> {
585    let output_format = input
586        ._format
587        .as_ref()
588        .and_then(|v| v.value.as_ref())
589        .and_then(|s| OutputFormatCodes::try_from(s.to_string()).ok())
590        .unwrap_or(OutputFormatCodes::Csv(None));
591
592    let view_definition =
593        Arc::new(resolve_view_definition(context.clone(), client.as_ref(), &input).await?);
594
595    let output = process_view_definition(
596        context,
597        &output_format,
598        client,
599        view_definition.as_ref(),
600        &input,
601    )
602    .await?;
603
604    Ok(ViewDefinitionRun::Output { return_: output })
605}