Skip to main content

haste_fhir_search/elastic_search/
mod.rs

1use crate::{
2    IndexResource, SearchEngine, SearchEntry, SearchOptions, SearchReturn,
3    SuccessfullyIndexedCount,
4    indexing_conversion::{self, InsertableIndex},
5};
6use elasticsearch::{
7    BulkOperation, BulkParts, Elasticsearch, SearchParts,
8    auth::Credentials,
9    cert::CertificateValidation,
10    http::{
11        Url,
12        transport::{BuildError, SingleNodeConnectionPool, TransportBuilder},
13    },
14};
15use haste_fhir_client::request::SearchRequest;
16use haste_fhir_model::r4::generated::{
17    resources::{Resource, ResourceType, SearchParameter},
18    terminology::IssueType,
19};
20use haste_fhir_operation_error::{OperationOutcomeError, derive::OperationOutcomeError};
21use haste_fhirpath::FPEngine;
22use haste_jwt::{ProjectId, ResourceId, TenantId, VersionId};
23use haste_repository::types::{FHIRMethod, SupportedFHIRVersions};
24use serde::Deserialize;
25use std::{collections::HashMap, sync::Arc};
26
27mod migration;
28mod search;
29
30#[derive(Deserialize, Debug)]
31struct SearchEntryPrivate {
32    pub id: Vec<ResourceId>,
33    pub resource_type: Vec<ResourceType>,
34    pub version_id: Vec<VersionId>,
35}
36
37#[derive(OperationOutcomeError, Debug)]
38pub enum SearchError {
39    #[fatal(
40        code = "exception",
41        diagnostic = "Failed to evaluate fhirpath expression."
42    )]
43    FHIRPathError(#[from] haste_fhirpath::FHIRPathError),
44    #[fatal(
45        code = "exception",
46        diagnostic = "Search does not support the fhir method: '{arg0:?}'"
47    )]
48    UnsupportedFHIRMethod(FHIRMethod),
49    #[fatal(
50        code = "exception",
51        diagnostic = "Failed to index resources server responded with status code: '{arg0}'"
52    )]
53    Fatal(u16),
54    #[fatal(
55        code = "exception",
56        diagnostic = "Elasticsearch server failed to index."
57    )]
58    ElasticsearchError(#[from] elasticsearch::Error),
59    #[fatal(
60        code = "exception",
61        diagnostic = "Elasticsearch server responded with an error: '{arg0}'"
62    )]
63    ElasticSearchResponseError(u16),
64    NotConnected,
65}
66
67#[derive(OperationOutcomeError, Debug)]
68pub enum SearchConfigError {
69    #[fatal(code = "exception", diagnostic = "Failed to parse URL: '{arg0}'.")]
70    UrlParseError(String),
71    #[fatal(
72        code = "exception",
73        diagnostic = "Elasticsearch client creation failed."
74    )]
75    ElasticSearchConfigError(#[from] BuildError),
76    #[fatal(
77        code = "exception",
78        diagnostic = "Unsupported FHIR version for index: '{arg0}'"
79    )]
80    UnsupportedIndex(SupportedFHIRVersions),
81}
82
83#[derive(Clone)]
84pub struct ElasticSearchEngine {
85    fp_engine: Arc<FPEngine>,
86    client: Arc<Elasticsearch>,
87}
88
89impl ElasticSearchEngine {
90    pub fn new(
91        fp_engine: Arc<FPEngine>,
92        url: &str,
93        username: String,
94        password: String,
95    ) -> Result<Self, SearchConfigError> {
96        let url =
97            Url::parse(url).map_err(|_e| SearchConfigError::UrlParseError(url.to_string()))?;
98        let conn_pool = SingleNodeConnectionPool::new(url);
99        let transport = TransportBuilder::new(conn_pool)
100            .cert_validation(CertificateValidation::None)
101            .auth(Credentials::Basic(username, password))
102            .build()?;
103
104        let elasticsearch_client = Elasticsearch::new(transport);
105        Ok(ElasticSearchEngine {
106            fp_engine,
107            client: Arc::new(elasticsearch_client),
108        })
109    }
110
111    pub async fn is_connected(&self) -> Result<(), SearchError> {
112        let res = self.client.ping().send().await.map_err(SearchError::from)?;
113
114        if res.status_code().is_success() {
115            Ok(())
116        } else {
117            Err(SearchError::NotConnected)
118        }
119    }
120}
121
122async fn resource_to_elastic_index(
123    fp_engine: Arc<FPEngine>,
124    parameters: &Vec<Arc<SearchParameter>>,
125    resource: &Resource,
126) -> Result<HashMap<String, InsertableIndex>, OperationOutcomeError> {
127    let mut map = HashMap::new();
128    for param in parameters.iter() {
129        if let Some(expression) = param.expression.as_ref().and_then(|e| e.value.as_ref())
130            && let Some(url) = param.url.value.as_ref()
131        {
132            let result = fp_engine
133                .evaluate(expression, vec![resource])
134                .await
135                .map_err(SearchError::from);
136
137            if let Err(err) = result {
138                tracing::error!(
139                    "Failed to evaluate FHIRPath expression: '{}' for resource.",
140                    expression,
141                );
142
143                return Err(SearchError::from(err).into());
144            }
145
146            let result_vec = indexing_conversion::to_insertable_index(
147                param,
148                result?.iter().collect::<Vec<_>>(),
149            )?;
150
151            map.insert(url.clone(), result_vec);
152        }
153    }
154
155    Ok(map)
156}
157
158static R4_FHIR_INDEX: &str = "r4_search_index";
159
160pub fn get_index_name(
161    fhir_version: &SupportedFHIRVersions,
162) -> Result<&'static str, SearchConfigError> {
163    match fhir_version {
164        SupportedFHIRVersions::R4 => Ok(R4_FHIR_INDEX),
165        // _ => Err(SearchConfigError::UnsupportedIndex(fhir_version.clone())),
166    }
167}
168
169#[derive(serde::Deserialize, Debug)]
170struct ElasticSearchHitResult {
171    _index: String,
172    _id: String,
173    _score: Option<f64>,
174    fields: SearchEntryPrivate,
175}
176
177#[derive(serde::Deserialize, Debug)]
178struct ElasticSearchHitTotalMeta {
179    value: i64,
180    // relation: String,
181}
182
183#[derive(serde::Deserialize, Debug)]
184struct ElasticSearchHit {
185    total: Option<ElasticSearchHitTotalMeta>,
186    hits: Vec<ElasticSearchHitResult>,
187}
188
189#[derive(serde::Deserialize, Debug)]
190struct ElasticSearchResponse {
191    hits: ElasticSearchHit,
192}
193
194fn unique_index_id(
195    tenant: &TenantId,
196    project: &ProjectId,
197    resource_type: &ResourceType,
198    id: &ResourceId,
199) -> String {
200    let unique_index_id = format!(
201        "{}/{}/{}/{}",
202        tenant.as_ref(),
203        project.as_ref(),
204        resource_type.as_ref(),
205        id.as_ref()
206    );
207
208    unique_index_id
209}
210
211impl SearchEngine for ElasticSearchEngine {
212    async fn search(
213        &self,
214        fhir_version: &SupportedFHIRVersions,
215        tenant: &TenantId,
216        project: &ProjectId,
217        search_request: &SearchRequest,
218        options: Option<SearchOptions>,
219    ) -> Result<SearchReturn, haste_fhir_operation_error::OperationOutcomeError> {
220        let query = search::build_elastic_search_query(tenant, project, &search_request, &options)?;
221
222        let search_response = self
223            .client
224            .search(SearchParts::Index(&[get_index_name(&fhir_version)?]))
225            .body(query)
226            .send()
227            .await
228            .map_err(SearchError::from)?;
229
230        if !search_response.status_code().is_success() {
231            return Err(SearchError::ElasticSearchResponseError(
232                search_response.status_code().as_u16(),
233            )
234            .into());
235        }
236
237        let search_results = search_response
238            .json::<ElasticSearchResponse>()
239            .await
240            .map_err(SearchError::from)?;
241
242        Ok(SearchReturn {
243            total: search_results.hits.total.as_ref().map(|t| t.value),
244            entries: search_results
245                .hits
246                .hits
247                .into_iter()
248                .map(|mut hit| SearchEntry {
249                    id: hit.fields.id.pop().unwrap(),
250                    resource_type: hit.fields.resource_type.pop().unwrap(),
251                    version_id: hit.fields.version_id.pop().unwrap(),
252                })
253                .collect(),
254        })
255    }
256
257    fn index(
258        &self,
259        fhir_version: SupportedFHIRVersions,
260        resources: Vec<IndexResource>,
261    ) -> impl Future<Output = Result<SuccessfullyIndexedCount, OperationOutcomeError>> + Send + Sync
262    {
263        async move {
264            // Iterator used to evaluate all of the search expressions for indexing.
265
266            let mut tasks = Vec::with_capacity(resources.len());
267            let resources_total = resources.len();
268            let search_index_name = get_index_name(&fhir_version)?;
269
270            for r in resources.into_iter().filter(|r| match r.fhir_method {
271                FHIRMethod::Create | FHIRMethod::Update | FHIRMethod::Delete => true,
272                _ => false,
273            }) {
274                let engine = self.fp_engine.clone();
275                tasks.push(tokio::spawn(async move {
276                    match &r.fhir_method {
277                        FHIRMethod::Create | FHIRMethod::Update => {
278                            // Id is not sufficient because different Resourcetypes may have the same id.
279                            let index_id =
280                                unique_index_id(&r.tenant, &r.project, &r.resource_type, &r.id);
281                            let params =
282                            haste_artifacts::search_parameters::get_search_parameters_for_resource(
283                                &r.resource_type,
284                            );
285
286                            let mut elastic_index =
287                                resource_to_elastic_index(engine, &params, &r.resource).await?;
288
289                            elastic_index.insert(
290                                "resource_type".to_string(),
291                                InsertableIndex::Meta(r.resource_type.as_ref().to_string()),
292                            );
293
294                            elastic_index.insert(
295                                "id".to_string(),
296                                InsertableIndex::Meta(r.id.as_ref().to_string()),
297                            );
298
299                            elastic_index.insert(
300                                "version_id".to_string(),
301                                InsertableIndex::Meta(r.version_id.as_ref().to_string()),
302                            );
303                            elastic_index.insert(
304                                "project".to_string(),
305                                InsertableIndex::Meta(r.project.as_ref().to_string()),
306                            );
307                            elastic_index.insert(
308                                "tenant".to_string(),
309                                InsertableIndex::Meta(r.tenant.as_ref().to_string()),
310                            );
311                            Ok(BulkOperation::index(elastic_index)
312                                .id(index_id)
313                                .index(search_index_name)
314                                .into())
315                        }
316                        FHIRMethod::Delete => Ok(BulkOperation::delete(unique_index_id(
317                            &r.tenant,
318                            &r.project,
319                            &r.resource_type,
320                            &r.id,
321                        ))
322                        .index(search_index_name)
323                        .into()),
324                        method => Err(SearchError::UnsupportedFHIRMethod((*method).clone()))
325                            .map_err(OperationOutcomeError::from),
326                    }
327                }));
328            }
329
330            let client = self.client.clone();
331
332            let mut bulk_ops: Vec<BulkOperation<HashMap<String, InsertableIndex>>> =
333                Vec::with_capacity(resources_total);
334
335            for task in tasks {
336                let res = task.await.map_err(|e| {
337                    OperationOutcomeError::fatal(IssueType::Exception(None), e.to_string())
338                })??;
339                bulk_ops.push(res);
340            }
341
342            if !bulk_ops.is_empty() {
343                let res = client
344                    .bulk(BulkParts::Index(search_index_name))
345                    .body(bulk_ops)
346                    .send()
347                    .await
348                    .map_err(SearchError::from)?;
349
350                let response_body = res.json::<serde_json::Value>().await.map_err(|_e| {
351                    OperationOutcomeError::fatal(
352                        IssueType::Exception(None),
353                        "Failed to parse response body.".to_string(),
354                    )
355                })?;
356
357                if response_body["errors"].as_bool().unwrap() == true {
358                    tracing::error!("Failed to index resources. Response: '{:?}'", response_body);
359                    return Err(SearchError::Fatal(500).into());
360                }
361                Ok(SuccessfullyIndexedCount(
362                    response_body["items"].as_array().unwrap().len(),
363                ))
364            } else {
365                Ok(SuccessfullyIndexedCount(0))
366            }
367        }
368    }
369
370    async fn migrate(
371        &self,
372        _fhir_version: &SupportedFHIRVersions,
373    ) -> Result<(), haste_fhir_operation_error::OperationOutcomeError> {
374        migration::create_mapping(&self.client, get_index_name(_fhir_version)?).await?;
375        Ok(())
376    }
377}