Skip to main content

fhir_subscription_processor/
lib.rs

1use std::sync::Arc;
2
3use haste_fhir_client::url::{ParsedParameter, ParsedParameters};
4use haste_fhir_model::r4::generated::{
5    resources::{Resource, ResourceType, SearchParameter, Subscription},
6    terminology::IssueType,
7};
8use haste_fhir_operation_error::{OperationOutcomeError, derive::OperationOutcomeError};
9use haste_fhir_search::indexing_conversion::{self, InsertableIndex};
10
11pub mod traits;
12
13#[derive(OperationOutcomeError, Debug)]
14pub enum SubscriptionFilterError {
15    #[fatal(
16        code = "exception",
17        diagnostic = "Failed to evaluate fhirpath expression."
18    )]
19    FHIRPathError(#[from] haste_fhirpath::FHIRPathError),
20}
21
22#[allow(dead_code)]
23pub struct SubscriptionParameter {
24    search_parameter: Arc<SearchParameter>,
25    fp_extract_expression: String,
26    value: Vec<String>,
27    modifier: Option<String>,
28}
29
30pub enum SubscriptionTrigger {
31    // Based around simple Subscription.criteria.
32    QueryFilter {
33        resource_type: ResourceType,
34        parameters: Vec<SubscriptionParameter>,
35    },
36    // This could come from a subscriptiontopic which
37    // allows arbitrary FHIRPath expressions, or from more complex criteria in the future.
38    FHIRPathFilter {
39        expression: String,
40    },
41}
42
43/// In memory representation of a subscription filter.
44/// This is what we will use to evaluate whether a given subscription matches an incoming event.
45#[allow(dead_code)]
46pub struct MemorySubscriptionFilter {
47    fp_engine: haste_fhirpath::FPEngine,
48    triggers: Vec<SubscriptionTrigger>,
49}
50
51impl TryFrom<Subscription> for MemorySubscriptionFilter {
52    type Error = OperationOutcomeError;
53
54    fn try_from(value: Subscription) -> Result<Self, Self::Error> {
55        if let Some(criteria) = value.criteria.value {
56            let criteria_pieces = criteria.split('?').collect::<Vec<_>>();
57            let [path, parameters] = criteria_pieces.as_slice() else {
58                return Err(OperationOutcomeError::error(
59                    IssueType::Exception(None),
60                    "Invalid subscription criteria format".to_string(),
61                ));
62            };
63
64            let resource_type = ResourceType::try_from(*path).map_err(|_| {
65                OperationOutcomeError::error(
66                    IssueType::Exception(None),
67                    "Invalid resource type".to_string(),
68                )
69            })?;
70
71            let parsed_parameters = ParsedParameters::try_from(*parameters)?;
72
73            let subscription_parsed_parameters = parsed_parameters
74                .owned_parameters()
75                .into_iter()
76                .map(|parameter| match parameter {
77                    ParsedParameter::Resource(resource_param) => {
78                        let Some(search_parameter) =
79                            haste_artifacts::search_parameters::get_search_parameter_for_name(
80                                Some(&resource_type),
81                                &resource_param.name,
82                            )
83                        else {
84                            return Err(OperationOutcomeError::error(
85                                IssueType::Exception(None),
86                                format!(
87                                    "Invalid search parameter in subscription criteria: {}",
88                                    resource_param.name
89                                ),
90                            ));
91                        };
92
93                        if resource_param.chains.is_some() {
94                            return Err(OperationOutcomeError::error(
95                                IssueType::Exception(None),
96                                format!(
97                                    "Chained parameters are not supported in subscription criteria: {}",
98                                    resource_param.name
99                                ),
100                            ));
101                        }
102
103                        let Some(fp_expression) = search_parameter
104                            .expression
105                            .as_ref()
106                            .and_then(|expr| expr.value.as_ref())
107                        else {
108                            return Err(OperationOutcomeError::error(
109                                IssueType::Exception(None),
110                                format!(
111                                    "Search parameter does not have an expression: {}",
112                                    resource_param.name
113                                ),
114                            ));
115                        };
116
117                        Ok(SubscriptionParameter {
118                            search_parameter: search_parameter.clone(),
119                            fp_extract_expression: fp_expression.clone(),
120                            value: resource_param.value,
121                            modifier: resource_param.modifier,
122                        })
123
124                    }
125                    ParsedParameter::Result(result_param) => {
126                        return Err(OperationOutcomeError::error(
127                            IssueType::Exception(None),
128                            format!(
129                                "Unsupported parameter in subscription criteria: {}",
130                                result_param.name
131                            ),
132                        ));
133                    }
134                })
135                .collect::<Result<Vec<_>, OperationOutcomeError>>()?;
136
137            Ok(MemorySubscriptionFilter {
138                fp_engine: haste_fhirpath::FPEngine::new(),
139                triggers: vec![SubscriptionTrigger::QueryFilter {
140                    resource_type,
141                    parameters: subscription_parsed_parameters,
142                }],
143            })
144        } else {
145            Err(OperationOutcomeError::error(
146                IssueType::Exception(None),
147                "SubscriptionFilter conversion not implemented".to_string(),
148            ))
149        }
150    }
151}
152
153async fn fits_subscription_parameter(
154    fp_engine: &haste_fhirpath::FPEngine,
155    subscription_parameter: &SubscriptionParameter,
156    resource: &Resource,
157) -> Result<bool, OperationOutcomeError> {
158    let result = fp_engine
159        .evaluate(
160            &subscription_parameter.fp_extract_expression,
161            vec![resource],
162        )
163        .await
164        .map_err(SubscriptionFilterError::from)?;
165
166    let conversions = indexing_conversion::to_insertable_index(
167        &subscription_parameter.search_parameter.as_ref(),
168        result.iter().collect::<Vec<_>>(),
169    )?;
170
171    match conversions {
172        InsertableIndex::String(resource_values) => {
173            Ok(resource_values.iter().any(|resource_value| {
174                subscription_parameter
175                    .value
176                    .iter()
177                    .any(|v| resource_value.to_lowercase().starts_with(&v.to_lowercase()))
178            }))
179        }
180        InsertableIndex::Number(_) => Err(OperationOutcomeError::error(
181            IssueType::Exception(None),
182            "Number search parameters are not supported in subscription criteria".to_string(),
183        ))?,
184        InsertableIndex::URI(_) => Err(OperationOutcomeError::error(
185            IssueType::Exception(None),
186            "URI search parameters are not supported in subscription criteria".to_string(),
187        ))?,
188        InsertableIndex::Token(_) => Err(OperationOutcomeError::error(
189            IssueType::Exception(None),
190            "Token search parameters are not supported in subscription criteria".to_string(),
191        ))?,
192        InsertableIndex::Date(_) => Err(OperationOutcomeError::error(
193            IssueType::Exception(None),
194            "Date search parameters are not supported in subscription criteria".to_string(),
195        ))?,
196
197        InsertableIndex::Reference(_) => Err(OperationOutcomeError::error(
198            IssueType::Exception(None),
199            "Reference search parameters are not supported in subscription criteria".to_string(),
200        ))?,
201        InsertableIndex::Quantity(_) => Err(OperationOutcomeError::error(
202            IssueType::Exception(None),
203            "Quantity search parameters are not supported in subscription criteria".to_string(),
204        ))?,
205
206        InsertableIndex::Composite(_) => Err(OperationOutcomeError::error(
207            IssueType::Exception(None),
208            "Composite search parameters are not supported in subscription criteria".to_string(),
209        ))?,
210        InsertableIndex::Special(_) => Err(OperationOutcomeError::error(
211            IssueType::Exception(None),
212            "Special search parameters are not supported in subscription criteria".to_string(),
213        ))?,
214        InsertableIndex::Meta(_) => Err(OperationOutcomeError::error(
215            IssueType::Exception(None),
216            "Meta search parameters are not supported in subscription criteria".to_string(),
217        ))?,
218    }
219}
220
221impl traits::SubscriptionFilter for MemorySubscriptionFilter {
222    async fn matches(&self, resource: &Resource) -> Result<bool, OperationOutcomeError> {
223        let resource_resource_type = resource.resource_type();
224
225        for trigger in self.triggers.iter() {
226            match trigger {
227                SubscriptionTrigger::QueryFilter {
228                    resource_type,
229                    parameters,
230                } => {
231                    if *resource_type != resource_resource_type {
232                        return Ok(false);
233                    }
234
235                    for sub_parameter in parameters {
236                        let fits_criteria =
237                            fits_subscription_parameter(&self.fp_engine, sub_parameter, resource)
238                                .await?;
239                        if !fits_criteria {
240                            return Ok(false);
241                        }
242                    }
243
244                    return Ok(true);
245                }
246                SubscriptionTrigger::FHIRPathFilter { .. } => {
247                    return Err(OperationOutcomeError::error(
248                        IssueType::Exception(None),
249                        "FHIRPathFilter triggers are not yet supported".to_string(),
250                    ))?;
251                }
252            }
253        }
254
255        Ok(false)
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use haste_fhir_model::r4::generated::{
262        resources::Patient,
263        types::{FHIRString, HumanName},
264    };
265
266    use crate::traits::SubscriptionFilter;
267
268    use super::*;
269
270    #[test]
271    fn quick_test_derive() {
272        let subscription = Subscription {
273            criteria: Box::new(FHIRString {
274                value: Some("Patient?name=Smith".to_string()),
275                ..Default::default()
276            }),
277            ..Default::default()
278        };
279
280        let sub_filter = MemorySubscriptionFilter::try_from(subscription).unwrap();
281
282        assert_eq!(sub_filter.triggers.len(), 1);
283
284        match &sub_filter.triggers[0] {
285            SubscriptionTrigger::QueryFilter {
286                resource_type,
287                parameters,
288            } => {
289                assert_eq!(resource_type, &ResourceType::Patient);
290                assert_eq!(parameters[0].fp_extract_expression, "Patient.name");
291                assert_eq!(parameters[0].value, vec!["Smith".to_string()]);
292            }
293            _ => panic!("Expected QueryFilter trigger"),
294        };
295    }
296
297    #[test]
298    fn modifier_check() {
299        let subscription = Subscription {
300            criteria: Box::new(FHIRString {
301                value: Some("Observation?category:missing=true".to_string()),
302                ..Default::default()
303            }),
304            ..Default::default()
305        };
306
307        let sub_filter = MemorySubscriptionFilter::try_from(subscription).unwrap();
308
309        assert_eq!(sub_filter.triggers.len(), 1);
310
311        match &sub_filter.triggers[0] {
312            SubscriptionTrigger::QueryFilter {
313                resource_type,
314                parameters,
315            } => {
316                assert_eq!(resource_type, &ResourceType::Observation);
317                assert_eq!(parameters[0].fp_extract_expression, "Observation.category");
318                assert_eq!(parameters[0].value, vec!["true".to_string()]);
319                assert_eq!(parameters[0].modifier, Some("missing".to_string()));
320            }
321            _ => panic!("Expected QueryFilter trigger"),
322        };
323    }
324
325    #[tokio::test]
326    async fn test_run_fhirpath() {
327        let sub_filter = MemorySubscriptionFilter::try_from(Subscription {
328            criteria: Box::new(FHIRString {
329                value: Some("Patient?name=Smith".to_string()),
330                ..Default::default()
331            }),
332            ..Default::default()
333        })
334        .unwrap();
335        let patient = Resource::Patient(Patient {
336            name: Some(vec![Box::new(HumanName {
337                family: Some(Box::new(FHIRString {
338                    value: Some("Smith".to_string()),
339                    ..Default::default()
340                })),
341                ..Default::default()
342            })]),
343            ..Default::default()
344        });
345
346        assert_eq!(sub_filter.matches(&patient).await.unwrap(), true);
347
348        let sub_filter_partial = MemorySubscriptionFilter::try_from(Subscription {
349            criteria: Box::new(FHIRString {
350                value: Some("Patient?name=Sm".to_string()),
351                ..Default::default()
352            }),
353            ..Default::default()
354        })
355        .unwrap();
356
357        assert_eq!(sub_filter_partial.matches(&patient).await.unwrap(), true);
358
359        let sub_filter_casing = MemorySubscriptionFilter::try_from(Subscription {
360            criteria: Box::new(FHIRString {
361                value: Some("Patient?name=sm".to_string()),
362                ..Default::default()
363            }),
364            ..Default::default()
365        })
366        .unwrap();
367
368        assert_eq!(sub_filter_casing.matches(&patient).await.unwrap(), true);
369
370        let patient = Resource::Patient(Patient {
371            name: Some(vec![Box::new(HumanName {
372                family: Some(Box::new(FHIRString {
373                    value: Some("NotSmith".to_string()),
374                    ..Default::default()
375                })),
376                ..Default::default()
377            })]),
378            ..Default::default()
379        });
380
381        assert_eq!(sub_filter.matches(&patient).await.unwrap(), false);
382    }
383}