Skip to main content

haste_fhir_search/elastic_search/
mod.rs

1use crate::{
2    IndexResource, ParameterLevel, ResolvedParameter, SearchEngine, SearchOptions,
3    SearchParameterResolve, SearchReturn, SuccessfullyIndexedCount,
4    indexing_conversion::{self, InsertableIndex},
5};
6use elasticsearch::{
7    BulkOperation, BulkParts, Elasticsearch,
8    auth::Credentials,
9    cert::CertificateValidation,
10    http::{
11        Url,
12        transport::{BuildError, SingleNodeConnectionPool, TransportBuilder},
13    },
14};
15use haste_fhir_client::request::SearchRequest;
16use haste_fhir_model::r4::generated::{
17    resources::{Resource, ResourceType},
18    terminology::IssueType,
19};
20use haste_fhir_operation_error::{OperationOutcomeError, derive::OperationOutcomeError};
21use haste_fhirpath::FPEngine;
22use haste_jwt::{ProjectId, ResourceId, TenantId, VersionId};
23use haste_repository::types::{FHIRMethod, SupportedFHIRVersions};
24use serde::Deserialize;
25use std::{collections::HashMap, sync::Arc};
26
27mod migration;
28mod search;
29pub mod search_parameter_resolver;
30
31#[derive(Deserialize, Debug)]
32struct SearchEntryPrivate {
33    pub id: Vec<ResourceId>,
34    pub resource_type: Vec<ResourceType>,
35    pub version_id: Vec<VersionId>,
36    pub project: Vec<ProjectId>,
37}
38
39static DYNAMIC_PARAMETER_INDEX_FIELD: &str = "dynamic_parameters";
40
41#[derive(OperationOutcomeError, Debug)]
42pub enum SearchError {
43    #[fatal(
44        code = "exception",
45        diagnostic = "Failed to evaluate fhirpath expression."
46    )]
47    FHIRPathError(#[from] haste_fhirpath::FHIRPathError),
48    #[fatal(
49        code = "exception",
50        diagnostic = "Search does not support the fhir method: '{arg0:?}'"
51    )]
52    UnsupportedFHIRMethod(FHIRMethod),
53    #[fatal(
54        code = "exception",
55        diagnostic = "Failed to index resources server responded with status code: '{arg0}'"
56    )]
57    Fatal(u16),
58    #[fatal(
59        code = "exception",
60        diagnostic = "Elasticsearch server failed to index."
61    )]
62    ElasticsearchError(#[from] elasticsearch::Error),
63    #[fatal(
64        code = "exception",
65        diagnostic = "Elasticsearch server responded with an error: '{arg0}'"
66    )]
67    ElasticSearchResponseError(u16),
68    NotConnected,
69}
70
71#[derive(OperationOutcomeError, Debug)]
72pub enum SearchConfigError {
73    #[fatal(code = "exception", diagnostic = "Failed to parse URL: '{arg0}'.")]
74    UrlParseError(String),
75    #[fatal(
76        code = "exception",
77        diagnostic = "Elasticsearch client creation failed."
78    )]
79    ElasticSearchConfigError(#[from] BuildError),
80    #[fatal(
81        code = "exception",
82        diagnostic = "Unsupported FHIR version for index: '{arg0}'"
83    )]
84    UnsupportedIndex(SupportedFHIRVersions),
85}
86
87#[derive(Clone)]
88pub struct ElasticSearchEngine<SearchParameterResolver: SearchParameterResolve + 'static> {
89    parameter_resolver: Arc<SearchParameterResolver>,
90    fp_engine: Arc<FPEngine>,
91    client: Arc<Elasticsearch>,
92}
93
94pub fn create_es_client(
95    url: &str,
96    username: String,
97    password: String,
98) -> Result<Arc<Elasticsearch>, SearchConfigError> {
99    let url = Url::parse(url).map_err(|_e| SearchConfigError::UrlParseError(url.to_string()))?;
100    let conn_pool = SingleNodeConnectionPool::new(url);
101    let transport = TransportBuilder::new(conn_pool)
102        .cert_validation(CertificateValidation::None)
103        .auth(Credentials::Basic(username, password))
104        .build()?;
105
106    let elasticsearch_client = Elasticsearch::new(transport);
107
108    Ok(Arc::new(elasticsearch_client))
109}
110
111impl<SearchParameterResolver: SearchParameterResolve + 'static>
112    ElasticSearchEngine<SearchParameterResolver>
113{
114    pub fn new(
115        parameter_resolver: Arc<SearchParameterResolver>,
116        fp_engine: Arc<FPEngine>,
117        es_client: Arc<Elasticsearch>,
118    ) -> Self {
119        ElasticSearchEngine {
120            parameter_resolver,
121            fp_engine,
122            client: es_client,
123        }
124    }
125
126    pub async fn is_connected(&self) -> Result<(), SearchError> {
127        let res = self.client.ping().send().await.map_err(SearchError::from)?;
128
129        if res.status_code().is_success() {
130            Ok(())
131        } else {
132            Err(SearchError::NotConnected)
133        }
134    }
135}
136
137async fn resource_to_elastic_index(
138    fp_engine: Arc<FPEngine>,
139    parameters: &Vec<ResolvedParameter>,
140    resource: &Resource,
141) -> Result<HashMap<String, InsertableIndex>, OperationOutcomeError> {
142    let mut map = HashMap::new();
143    let mut dynamic_parameters = HashMap::new();
144    for param in parameters.iter() {
145        if let Some(expression) = param
146            .search_parameter
147            .expression
148            .as_ref()
149            .and_then(|e| e.value.as_ref())
150            && let Some(url) = param.search_parameter.url.value.as_ref()
151        {
152            let result = fp_engine
153                .evaluate(expression, vec![resource])
154                .await
155                .map_err(SearchError::from);
156
157            if let Err(err) = result {
158                tracing::error!(
159                    "Failed to evaluate FHIRPath expression: '{}' for resource.",
160                    expression,
161                );
162
163                return Err(SearchError::from(err).into());
164            }
165
166            let result_vec = indexing_conversion::to_insertable_index(
167                param,
168                result?.iter().collect::<Vec<_>>(),
169            )?;
170
171            match param.level {
172                ParameterLevel::System => {
173                    map.insert(url.clone(), result_vec);
174                }
175                // Project Parameters are indexed using a single JS Object which gets indexed to
176                ParameterLevel::Project => {
177                    dynamic_parameters.insert(url.clone(), result_vec);
178                }
179            }
180        }
181    }
182
183    // Various project level parameters. These are indexed under a single field in elasticsearch as a nested type with url and indexed value..
184    map.insert(
185        DYNAMIC_PARAMETER_INDEX_FIELD.to_string(),
186        InsertableIndex::DynamicParameters(dynamic_parameters),
187    );
188
189    Ok(map)
190}
191
192static R4_FHIR_INDEX: &str = "r4_search_index";
193
194pub fn get_index_name(
195    fhir_version: &SupportedFHIRVersions,
196) -> Result<&'static str, SearchConfigError> {
197    match fhir_version {
198        SupportedFHIRVersions::R4 => Ok(R4_FHIR_INDEX),
199        // _ => Err(SearchConfigError::UnsupportedIndex(fhir_version.clone())),
200    }
201}
202
203#[derive(serde::Deserialize, Debug)]
204struct ElasticSearchHitResult {
205    _index: String,
206    _id: String,
207    _score: Option<f64>,
208    fields: SearchEntryPrivate,
209}
210
211#[derive(serde::Deserialize, Debug)]
212struct ElasticSearchHitTotalMeta {
213    value: i64,
214    // relation: String,
215}
216
217#[derive(serde::Deserialize, Debug)]
218struct ElasticSearchHit {
219    total: Option<ElasticSearchHitTotalMeta>,
220    hits: Vec<ElasticSearchHitResult>,
221}
222
223#[derive(serde::Deserialize, Debug)]
224struct ElasticSearchResponse {
225    hits: ElasticSearchHit,
226}
227
228fn unique_index_id(
229    tenant: &TenantId,
230    project: &ProjectId,
231    resource_type: &ResourceType,
232    id: &ResourceId,
233) -> String {
234    let unique_index_id = format!(
235        "{}/{}/{}/{}",
236        tenant.as_ref(),
237        project.as_ref(),
238        resource_type.as_ref(),
239        id.as_ref()
240    );
241
242    unique_index_id
243}
244
245impl<SearchParameterResolver: SearchParameterResolve + 'static> SearchEngine
246    for ElasticSearchEngine<SearchParameterResolver>
247{
248    async fn search(
249        &self,
250        fhir_version: &SupportedFHIRVersions,
251        tenant: &TenantId,
252        project: &ProjectId,
253        search_request: &SearchRequest,
254        options: Option<SearchOptions>,
255    ) -> Result<SearchReturn, haste_fhir_operation_error::OperationOutcomeError> {
256        search::execute_search(
257            self.client.clone(),
258            self.parameter_resolver.clone(),
259            fhir_version,
260            tenant,
261            project,
262            search_request,
263            &options,
264        )
265        .await
266    }
267
268    fn index(
269        &self,
270        fhir_version: SupportedFHIRVersions,
271        resources: Vec<IndexResource>,
272    ) -> impl Future<Output = Result<SuccessfullyIndexedCount, OperationOutcomeError>> + Send {
273        async move {
274            // Iterator used to evaluate all of the search expressions for indexing.
275
276            let mut tasks = Vec::with_capacity(resources.len());
277            let resources_total = resources.len();
278            let search_index_name = get_index_name(&fhir_version)?;
279
280            for r in resources.into_iter().filter(|r| match r.fhir_method {
281                FHIRMethod::Create | FHIRMethod::Update | FHIRMethod::Delete => true,
282                _ => false,
283            }) {
284                let engine = self.fp_engine.clone();
285                let parameter_resolver = self.parameter_resolver.clone();
286                tasks.push(tokio::spawn(async move {
287                    match &r.fhir_method {
288                        FHIRMethod::Create | FHIRMethod::Update => {
289                            // Id is not sufficient because different Resourcetypes may have the same id.
290                            // Additionally should be namespaced by tenant and project to avoid conflicts across tenants and projects.
291                            let index_id =
292                                unique_index_id(&r.tenant, &r.project, &r.resource_type, &r.id);
293                            let params = parameter_resolver
294                                .by_resource_type(&r.tenant, &r.project, &r.resource_type)
295                                .await?;
296
297                            let mut elastic_index =
298                                resource_to_elastic_index(engine, &params, &r.resource).await?;
299
300                            elastic_index.insert(
301                                "resource_type".to_string(),
302                                InsertableIndex::Meta(r.resource_type.as_ref().to_string()),
303                            );
304
305                            elastic_index.insert(
306                                "id".to_string(),
307                                InsertableIndex::Meta(r.id.as_ref().to_string()),
308                            );
309
310                            elastic_index.insert(
311                                "version_id".to_string(),
312                                InsertableIndex::Meta(r.version_id.as_ref().to_string()),
313                            );
314                            elastic_index.insert(
315                                "project".to_string(),
316                                InsertableIndex::Meta(r.project.as_ref().to_string()),
317                            );
318                            elastic_index.insert(
319                                "tenant".to_string(),
320                                InsertableIndex::Meta(r.tenant.as_ref().to_string()),
321                            );
322
323                            Ok(BulkOperation::index(elastic_index)
324                                .id(index_id)
325                                .index(search_index_name)
326                                .into())
327                        }
328                        FHIRMethod::Delete => Ok(BulkOperation::delete(unique_index_id(
329                            &r.tenant,
330                            &r.project,
331                            &r.resource_type,
332                            &r.id,
333                        ))
334                        .index(search_index_name)
335                        .into()),
336                        method => Err(SearchError::UnsupportedFHIRMethod((*method).clone()))
337                            .map_err(OperationOutcomeError::from),
338                    }
339                }));
340            }
341
342            let client = self.client.clone();
343
344            let mut bulk_ops: Vec<BulkOperation<HashMap<String, InsertableIndex>>> =
345                Vec::with_capacity(resources_total);
346
347            for task in tasks {
348                let res = task.await.map_err(|e| {
349                    OperationOutcomeError::fatal(IssueType::Exception(None), e.to_string())
350                })??;
351                bulk_ops.push(res);
352            }
353
354            if !bulk_ops.is_empty() {
355                let res = client
356                    .bulk(BulkParts::Index(search_index_name))
357                    .body(bulk_ops)
358                    .send()
359                    .await
360                    .map_err(SearchError::from)?;
361
362                let response_body = res.json::<serde_json::Value>().await.map_err(|_e| {
363                    OperationOutcomeError::fatal(
364                        IssueType::Exception(None),
365                        "Failed to parse response body.".to_string(),
366                    )
367                })?;
368
369                if response_body["errors"].as_bool().unwrap() == true {
370                    tracing::error!("Failed to index resources. Response: '{:?}'", response_body);
371                    return Err(SearchError::Fatal(500).into());
372                }
373                Ok(SuccessfullyIndexedCount(
374                    response_body["items"].as_array().unwrap().len(),
375                ))
376            } else {
377                Ok(SuccessfullyIndexedCount(0))
378            }
379        }
380    }
381
382    async fn migrate(
383        &self,
384        _fhir_version: &SupportedFHIRVersions,
385    ) -> Result<(), haste_fhir_operation_error::OperationOutcomeError> {
386        migration::create_mapping(
387            self.parameter_resolver.clone(),
388            &self.client,
389            get_index_name(_fhir_version)?,
390        )
391        .await?;
392        Ok(())
393    }
394}