Skip to main content

haste_fhir_search/elastic_search/
mod.rs

1use crate::{
2    IndexResource, SearchEngine, SearchEntry, SearchOptions, SearchReturn,
3    SuccessfullyIndexedCount,
4    indexing_conversion::{self, InsertableIndex},
5};
6use elasticsearch::{
7    BulkOperation, BulkParts, Elasticsearch, SearchParts,
8    auth::Credentials,
9    cert::CertificateValidation,
10    http::{
11        Url,
12        transport::{BuildError, SingleNodeConnectionPool, TransportBuilder},
13    },
14};
15use haste_fhir_client::request::SearchRequest;
16use haste_fhir_model::r4::generated::{
17    resources::{Resource, ResourceType, SearchParameter},
18    terminology::IssueType,
19};
20use haste_fhir_operation_error::{OperationOutcomeError, derive::OperationOutcomeError};
21use haste_fhirpath::FPEngine;
22use haste_jwt::{ProjectId, ResourceId, TenantId, VersionId};
23use haste_repository::types::{FHIRMethod, SupportedFHIRVersions};
24use serde::Deserialize;
25use std::{collections::HashMap, sync::Arc};
26
27mod migration;
28mod search;
29
30#[derive(Deserialize, Debug)]
31struct SearchEntryPrivate {
32    pub id: Vec<ResourceId>,
33    pub resource_type: Vec<ResourceType>,
34    pub version_id: Vec<VersionId>,
35}
36
37#[derive(OperationOutcomeError, Debug)]
38pub enum SearchError {
39    #[fatal(
40        code = "exception",
41        diagnostic = "Failed to evaluate fhirpath expression."
42    )]
43    FHIRPathError(#[from] haste_fhirpath::FHIRPathError),
44    #[fatal(
45        code = "exception",
46        diagnostic = "Search does not support the fhir method: '{arg0:?}'"
47    )]
48    UnsupportedFHIRMethod(FHIRMethod),
49    #[fatal(
50        code = "exception",
51        diagnostic = "Failed to index resources server responded with status code: '{arg0}'"
52    )]
53    Fatal(u16),
54    #[fatal(
55        code = "exception",
56        diagnostic = "Elasticsearch server failed to index."
57    )]
58    ElasticsearchError(#[from] elasticsearch::Error),
59    #[fatal(
60        code = "exception",
61        diagnostic = "Elasticsearch server responded with an error: '{arg0}'"
62    )]
63    ElasticSearchResponseError(u16),
64    NotConnected,
65}
66
67#[derive(OperationOutcomeError, Debug)]
68pub enum SearchConfigError {
69    #[fatal(code = "exception", diagnostic = "Failed to parse URL: '{arg0}'.")]
70    UrlParseError(String),
71    #[fatal(
72        code = "exception",
73        diagnostic = "Elasticsearch client creation failed."
74    )]
75    ElasticSearchConfigError(#[from] BuildError),
76    #[fatal(
77        code = "exception",
78        diagnostic = "Unsupported FHIR version for index: '{arg0}'"
79    )]
80    UnsupportedIndex(SupportedFHIRVersions),
81}
82
83#[derive(Clone)]
84pub struct ElasticSearchEngine {
85    fp_engine: Arc<FPEngine>,
86    client: Arc<Elasticsearch>,
87}
88
89impl ElasticSearchEngine {
90    pub fn new(
91        fp_engine: Arc<FPEngine>,
92        url: &str,
93        username: String,
94        password: String,
95    ) -> Result<Self, SearchConfigError> {
96        let url =
97            Url::parse(url).map_err(|_e| SearchConfigError::UrlParseError(url.to_string()))?;
98        let conn_pool = SingleNodeConnectionPool::new(url);
99        let transport = TransportBuilder::new(conn_pool)
100            .cert_validation(CertificateValidation::None)
101            .auth(Credentials::Basic(username, password))
102            .build()?;
103
104        let elasticsearch_client = Elasticsearch::new(transport);
105        Ok(ElasticSearchEngine {
106            fp_engine,
107            client: Arc::new(elasticsearch_client),
108        })
109    }
110
111    pub async fn is_connected(&self) -> Result<(), SearchError> {
112        let res = self.client.ping().send().await.map_err(SearchError::from)?;
113
114        if res.status_code().is_success() {
115            Ok(())
116        } else {
117            Err(SearchError::NotConnected)
118        }
119    }
120}
121
122async fn resource_to_elastic_index(
123    fp_engine: Arc<FPEngine>,
124    parameters: &Vec<Arc<SearchParameter>>,
125    resource: &Resource,
126) -> Result<HashMap<String, InsertableIndex>, OperationOutcomeError> {
127    let mut map = HashMap::new();
128    for param in parameters.iter() {
129        if let Some(expression) = param.expression.as_ref().and_then(|e| e.value.as_ref())
130            && let Some(url) = param.url.value.as_ref()
131        {
132            let result = fp_engine
133                .evaluate(expression, vec![resource])
134                .await
135                .map_err(SearchError::from);
136
137            if let Err(err) = result {
138                tracing::error!(
139                    "Failed to evaluate FHIRPath expression: '{}' for resource.",
140                    expression,
141                );
142
143                return Err(SearchError::from(err).into());
144            }
145
146            let result_vec = indexing_conversion::to_insertable_index(
147                param,
148                result?.iter().collect::<Vec<_>>(),
149            )?;
150
151            map.insert(url.clone(), result_vec);
152        }
153    }
154
155    Ok(map)
156}
157
158static R4_FHIR_INDEX: &str = "r4_search_index";
159
160pub fn get_index_name(
161    fhir_version: &SupportedFHIRVersions,
162) -> Result<&'static str, SearchConfigError> {
163    match fhir_version {
164        SupportedFHIRVersions::R4 => Ok(R4_FHIR_INDEX),
165        // _ => Err(SearchConfigError::UnsupportedIndex(fhir_version.clone())),
166    }
167}
168
169#[derive(serde::Deserialize, Debug)]
170struct ElasticSearchHitResult {
171    _index: String,
172    _id: String,
173    _score: Option<f64>,
174    fields: SearchEntryPrivate,
175}
176
177#[derive(serde::Deserialize, Debug)]
178struct ElasticSearchHitTotalMeta {
179    value: i64,
180    // relation: String,
181}
182
183#[derive(serde::Deserialize, Debug)]
184struct ElasticSearchHit {
185    total: Option<ElasticSearchHitTotalMeta>,
186    hits: Vec<ElasticSearchHitResult>,
187}
188
189#[derive(serde::Deserialize, Debug)]
190struct ElasticSearchResponse {
191    hits: ElasticSearchHit,
192}
193
194fn unique_index_id(
195    tenant: &TenantId,
196    project: &ProjectId,
197    resource_type: &ResourceType,
198    id: &ResourceId,
199) -> String {
200    let unique_index_id = format!(
201        "{}/{}/{}/{}",
202        tenant.as_ref(),
203        project.as_ref(),
204        resource_type.as_ref(),
205        id.as_ref()
206    );
207
208    unique_index_id
209}
210
211impl SearchEngine for ElasticSearchEngine {
212    async fn search(
213        &self,
214        fhir_version: &SupportedFHIRVersions,
215        tenant: &TenantId,
216        project: &ProjectId,
217        search_request: &SearchRequest,
218        options: Option<SearchOptions>,
219    ) -> Result<SearchReturn, haste_fhir_operation_error::OperationOutcomeError> {
220        let query = search::build_elastic_search_query(tenant, project, &search_request, &options)?;
221
222        let search_response = self
223            .client
224            .search(SearchParts::Index(&[get_index_name(&fhir_version)?]))
225            .body(query)
226            .send()
227            .await
228            .map_err(SearchError::from)?;
229
230        if !search_response.status_code().is_success() {
231            return Err(SearchError::ElasticSearchResponseError(
232                search_response.status_code().as_u16(),
233            )
234            .into());
235        }
236
237        let search_results = search_response
238            .json::<ElasticSearchResponse>()
239            .await
240            .map_err(SearchError::from)?;
241
242        Ok(SearchReturn {
243            total: search_results.hits.total.as_ref().map(|t| t.value),
244            entries: search_results
245                .hits
246                .hits
247                .into_iter()
248                .map(|mut hit| SearchEntry {
249                    id: hit.fields.id.pop().unwrap(),
250                    resource_type: hit.fields.resource_type.pop().unwrap(),
251                    version_id: hit.fields.version_id.pop().unwrap(),
252                })
253                .collect(),
254        })
255    }
256
257    fn index(
258        &self,
259        fhir_version: SupportedFHIRVersions,
260        tenant: TenantId,
261        resources: Vec<IndexResource>,
262    ) -> impl Future<Output = Result<SuccessfullyIndexedCount, OperationOutcomeError>> + Send + Sync
263    {
264        async move {
265            // Iterator used to evaluate all of the search expressions for indexing.
266
267            let mut tasks = Vec::with_capacity(resources.len());
268            let resources_total = resources.len();
269            let search_index_name = get_index_name(&fhir_version)?;
270
271            for r in resources.into_iter().filter(|r| match r.fhir_method {
272                FHIRMethod::Create | FHIRMethod::Update | FHIRMethod::Delete => true,
273                _ => false,
274            }) {
275                let engine = self.fp_engine.clone();
276                let tenant = tenant.clone();
277
278                tasks.push(tokio::spawn(async move {
279                    match &r.fhir_method {
280                        FHIRMethod::Create | FHIRMethod::Update => {
281                            // Id is not sufficient because different Resourcetypes may have the same id.
282                            let index_id =
283                                unique_index_id(&tenant, &r.project, &r.resource_type, &r.id);
284                            let params =
285                            haste_artifacts::search_parameters::get_search_parameters_for_resource(
286                                &r.resource_type,
287                            );
288
289                            let mut elastic_index =
290                                resource_to_elastic_index(engine, &params, &r.resource).await?;
291
292                            elastic_index.insert(
293                                "resource_type".to_string(),
294                                InsertableIndex::Meta(r.resource_type.as_ref().to_string()),
295                            );
296
297                            elastic_index.insert(
298                                "id".to_string(),
299                                InsertableIndex::Meta(r.id.as_ref().to_string()),
300                            );
301
302                            elastic_index.insert(
303                                "version_id".to_string(),
304                                InsertableIndex::Meta(r.version_id.to_string()),
305                            );
306                            elastic_index.insert(
307                                "project".to_string(),
308                                InsertableIndex::Meta(r.project.as_ref().to_string()),
309                            );
310                            elastic_index.insert(
311                                "tenant".to_string(),
312                                InsertableIndex::Meta(tenant.as_ref().to_string()),
313                            );
314                            Ok(BulkOperation::index(elastic_index)
315                                .id(index_id)
316                                .index(search_index_name)
317                                .into())
318                        }
319                        FHIRMethod::Delete => Ok(BulkOperation::delete(unique_index_id(
320                            &tenant,
321                            &r.project,
322                            &r.resource_type,
323                            &r.id,
324                        ))
325                        .index(search_index_name)
326                        .into()),
327                        method => Err(SearchError::UnsupportedFHIRMethod((*method).clone()))
328                            .map_err(OperationOutcomeError::from),
329                    }
330                }));
331            }
332
333            let client = self.client.clone();
334
335            let mut bulk_ops: Vec<BulkOperation<HashMap<String, InsertableIndex>>> =
336                Vec::with_capacity(resources_total);
337
338            for task in tasks {
339                let res = task.await.map_err(|e| {
340                    OperationOutcomeError::fatal(IssueType::Exception(None), e.to_string())
341                })??;
342                bulk_ops.push(res);
343            }
344
345            if !bulk_ops.is_empty() {
346                let res = client
347                    .bulk(BulkParts::Index(search_index_name))
348                    .body(bulk_ops)
349                    .send()
350                    .await
351                    .map_err(SearchError::from)?;
352
353                let response_body = res.json::<serde_json::Value>().await.map_err(|_e| {
354                    OperationOutcomeError::fatal(
355                        IssueType::Exception(None),
356                        "Failed to parse response body.".to_string(),
357                    )
358                })?;
359
360                if response_body["errors"].as_bool().unwrap() == true {
361                    tracing::error!(
362                        "Failed to index resources for tenant: '{}'. Response: '{:?}'",
363                        tenant.as_ref(),
364                        response_body
365                    );
366                    return Err(SearchError::Fatal(500).into());
367                }
368                Ok(SuccessfullyIndexedCount(
369                    response_body["items"].as_array().unwrap().len(),
370                ))
371            } else {
372                Ok(SuccessfullyIndexedCount(0))
373            }
374        }
375    }
376
377    async fn migrate(
378        &self,
379        _fhir_version: &SupportedFHIRVersions,
380    ) -> Result<(), haste_fhir_operation_error::OperationOutcomeError> {
381        migration::create_mapping(&self.client, get_index_name(_fhir_version)?).await?;
382        Ok(())
383    }
384}