haste_fhir_search/elastic_search/
mod.rs

1use crate::{
2    IndexResource, SearchEngine, SearchEntry, SearchOptions, SearchReturn,
3    SuccessfullyIndexedCount,
4    indexing_conversion::{self, InsertableIndex},
5};
6use elasticsearch::{
7    BulkOperation, BulkParts, Elasticsearch, SearchParts,
8    auth::Credentials,
9    cert::CertificateValidation,
10    http::{
11        Url,
12        transport::{BuildError, SingleNodeConnectionPool, TransportBuilder},
13    },
14};
15use haste_fhir_client::request::SearchRequest;
16use haste_fhir_model::r4::generated::{
17    resources::{Resource, ResourceType, SearchParameter},
18    terminology::IssueType,
19};
20use haste_fhir_operation_error::{OperationOutcomeError, derive::OperationOutcomeError};
21use haste_fhirpath::FPEngine;
22use haste_jwt::{ProjectId, ResourceId, TenantId, VersionId};
23use haste_repository::types::{FHIRMethod, SupportedFHIRVersions};
24use rayon::prelude::*;
25use serde::Deserialize;
26use std::{collections::HashMap, sync::Arc};
27
28mod migration;
29mod search;
30
31#[derive(Deserialize, Debug)]
32struct SearchEntryPrivate {
33    pub id: Vec<ResourceId>,
34    pub resource_type: Vec<ResourceType>,
35    pub version_id: Vec<VersionId>,
36}
37
38#[derive(OperationOutcomeError, Debug)]
39pub enum SearchError {
40    #[fatal(
41        code = "exception",
42        diagnostic = "Failed to evaluate fhirpath expression."
43    )]
44    FHIRPathError(#[from] haste_fhirpath::FHIRPathError),
45    #[fatal(
46        code = "exception",
47        diagnostic = "Search does not support the fhir method: '{arg0:?}'"
48    )]
49    UnsupportedFHIRMethod(FHIRMethod),
50    #[fatal(
51        code = "exception",
52        diagnostic = "Failed to index resources server responded with status code: '{arg0}'"
53    )]
54    Fatal(u16),
55    #[fatal(
56        code = "exception",
57        diagnostic = "Elasticsearch server failed to index."
58    )]
59    ElasticsearchError(#[from] elasticsearch::Error),
60    #[fatal(
61        code = "exception",
62        diagnostic = "Elasticsearch server responded with an error: '{arg0}'"
63    )]
64    ElasticSearchResponseError(u16),
65    NotConnected,
66}
67
68#[derive(OperationOutcomeError, Debug)]
69pub enum SearchConfigError {
70    #[fatal(code = "exception", diagnostic = "Failed to parse URL: '{arg0}'.")]
71    UrlParseError(String),
72    #[fatal(
73        code = "exception",
74        diagnostic = "Elasticsearch client creation failed."
75    )]
76    ElasticSearchConfigError(#[from] BuildError),
77    #[fatal(
78        code = "exception",
79        diagnostic = "Unsupported FHIR version for index: '{arg0}'"
80    )]
81    UnsupportedIndex(SupportedFHIRVersions),
82}
83
84#[derive(Clone)]
85pub struct ElasticSearchEngine {
86    fp_engine: Arc<FPEngine>,
87    client: Elasticsearch,
88}
89
90impl ElasticSearchEngine {
91    pub fn new(
92        fp_engine: Arc<FPEngine>,
93        url: &str,
94        username: String,
95        password: String,
96    ) -> Result<Self, SearchConfigError> {
97        let url =
98            Url::parse(url).map_err(|_e| SearchConfigError::UrlParseError(url.to_string()))?;
99        let conn_pool = SingleNodeConnectionPool::new(url);
100        let transport = TransportBuilder::new(conn_pool)
101            .cert_validation(CertificateValidation::None)
102            .auth(Credentials::Basic(username, password))
103            .build()?;
104
105        let elasticsearch_client = Elasticsearch::new(transport);
106        Ok(ElasticSearchEngine {
107            fp_engine,
108            client: elasticsearch_client,
109        })
110    }
111
112    pub async fn is_connected(&self) -> Result<(), SearchError> {
113        let res = self.client.ping().send().await.map_err(SearchError::from)?;
114
115        if res.status_code().is_success() {
116            Ok(())
117        } else {
118            Err(SearchError::NotConnected)
119        }
120    }
121}
122
123fn resource_to_elastic_index(
124    fp_engine: Arc<FPEngine>,
125    parameters: &Vec<Arc<SearchParameter>>,
126    resource: &Resource,
127) -> Result<HashMap<String, InsertableIndex>, OperationOutcomeError> {
128    let mut map = HashMap::new();
129    for param in parameters.iter() {
130        if let Some(expression) = param.expression.as_ref().and_then(|e| e.value.as_ref())
131            && let Some(url) = param.url.value.as_ref()
132        {
133            let result = fp_engine
134                .evaluate(expression, vec![resource])
135                .map_err(SearchError::from);
136
137            if let Err(err) = result {
138                tracing::error!(
139                    "Failed to evaluate FHIRPath expression: '{}' for resource.",
140                    expression,
141                );
142
143                return Err(SearchError::from(err).into());
144            }
145
146            let result_vec = indexing_conversion::to_insertable_index(
147                param,
148                result?.iter().collect::<Vec<_>>(),
149            )?;
150
151            map.insert(url.clone(), result_vec);
152        }
153    }
154
155    Ok(map)
156}
157
158static R4_FHIR_INDEX: &str = "r4_search_index";
159
160pub fn get_index_name(
161    fhir_version: &SupportedFHIRVersions,
162) -> Result<&'static str, SearchConfigError> {
163    match fhir_version {
164        SupportedFHIRVersions::R4 => Ok(R4_FHIR_INDEX),
165        // _ => Err(SearchConfigError::UnsupportedIndex(fhir_version.clone())),
166    }
167}
168
169#[derive(serde::Deserialize, Debug)]
170struct ElasticSearchHitResult {
171    _index: String,
172    _id: String,
173    _score: Option<f64>,
174    fields: SearchEntryPrivate,
175}
176
177#[derive(serde::Deserialize, Debug)]
178struct ElasticSearchHitTotalMeta {
179    value: i64,
180    // relation: String,
181}
182
183#[derive(serde::Deserialize, Debug)]
184struct ElasticSearchHit {
185    total: Option<ElasticSearchHitTotalMeta>,
186    hits: Vec<ElasticSearchHitResult>,
187}
188
189#[derive(serde::Deserialize, Debug)]
190struct ElasticSearchResponse {
191    hits: ElasticSearchHit,
192}
193
194fn unique_index_id(
195    tenant: &TenantId,
196    project: &ProjectId,
197    resource_type: &ResourceType,
198    id: &ResourceId,
199) -> String {
200    let unique_index_id = format!(
201        "{}/{}/{}/{}",
202        tenant.as_ref(),
203        project.as_ref(),
204        resource_type.as_ref(),
205        id.as_ref()
206    );
207
208    unique_index_id
209}
210
211impl SearchEngine for ElasticSearchEngine {
212    async fn search(
213        &self,
214        fhir_version: &SupportedFHIRVersions,
215        tenant: &TenantId,
216        project: &ProjectId,
217        search_request: &SearchRequest,
218        options: Option<SearchOptions>,
219    ) -> Result<SearchReturn, haste_fhir_operation_error::OperationOutcomeError> {
220        let query = search::build_elastic_search_query(tenant, project, &search_request, &options)?;
221
222        let search_response = self
223            .client
224            .search(SearchParts::Index(&[get_index_name(&fhir_version)?]))
225            .body(query)
226            .send()
227            .await
228            .map_err(SearchError::from)?;
229
230        if !search_response.status_code().is_success() {
231            return Err(SearchError::ElasticSearchResponseError(
232                search_response.status_code().as_u16(),
233            )
234            .into());
235        }
236
237        let search_results = search_response
238            .json::<ElasticSearchResponse>()
239            .await
240            .map_err(SearchError::from)?;
241
242        Ok(SearchReturn {
243            total: search_results.hits.total.as_ref().map(|t| t.value),
244            entries: search_results
245                .hits
246                .hits
247                .into_iter()
248                .map(|mut hit| SearchEntry {
249                    id: hit.fields.id.pop().unwrap(),
250                    resource_type: hit.fields.resource_type.pop().unwrap(),
251                    version_id: hit.fields.version_id.pop().unwrap(),
252                })
253                .collect(),
254        })
255    }
256
257    async fn index<'a>(
258        &self,
259        _fhir_version: &SupportedFHIRVersions,
260        tenant: &TenantId,
261
262        resources: Vec<IndexResource<'a>>,
263    ) -> Result<SuccessfullyIndexedCount, haste_fhir_operation_error::OperationOutcomeError> {
264        // Iterator used to evaluate all of the search expressions for indexing.
265
266        let bulk_ops: Vec<BulkOperation<HashMap<String, InsertableIndex>>> = resources
267            .par_iter()
268            .filter(|r| match r.fhir_method {
269                FHIRMethod::Create | FHIRMethod::Update | FHIRMethod::Delete => true,
270                _ => false,
271            })
272            .map(|r| match &r.fhir_method {
273                FHIRMethod::Create | FHIRMethod::Update => {
274                    // Id is not sufficient because different Resourcetypes may have the same id.
275                    let index_id = unique_index_id(tenant, r.project, &r.resource_type, &r.id);
276                    let params =
277                        haste_artifacts::search_parameters::get_search_parameters_for_resource(
278                            &r.resource_type,
279                        );
280
281                    let mut elastic_index =
282                        resource_to_elastic_index(self.fp_engine.clone(), &params, &r.resource)?;
283
284                    elastic_index.insert(
285                        "resource_type".to_string(),
286                        InsertableIndex::Meta(r.resource_type.as_ref().to_string()),
287                    );
288
289                    elastic_index.insert(
290                        "id".to_string(),
291                        InsertableIndex::Meta(r.id.as_ref().to_string()),
292                    );
293
294                    elastic_index.insert(
295                        "version_id".to_string(),
296                        InsertableIndex::Meta(r.version_id.to_string()),
297                    );
298                    elastic_index.insert(
299                        "project".to_string(),
300                        InsertableIndex::Meta(r.project.as_ref().to_string()),
301                    );
302                    elastic_index.insert(
303                        "tenant".to_string(),
304                        InsertableIndex::Meta(tenant.as_ref().to_string()),
305                    );
306
307                    Ok(BulkOperation::index(elastic_index)
308                        .id(index_id)
309                        .index(get_index_name(_fhir_version)?)
310                        .into())
311                }
312                FHIRMethod::Delete => Ok(BulkOperation::delete(unique_index_id(
313                    tenant,
314                    r.project,
315                    &r.resource_type,
316                    &r.id,
317                ))
318                .index(get_index_name(_fhir_version)?)
319                .into()),
320                method => Err(SearchError::UnsupportedFHIRMethod((*method).clone()).into()),
321            })
322            .collect::<Result<Vec<_>, OperationOutcomeError>>()?;
323
324        if !bulk_ops.is_empty() {
325            let res = self
326                .client
327                .bulk(BulkParts::Index(get_index_name(_fhir_version)?))
328                .body(bulk_ops)
329                .send()
330                .await
331                .map_err(SearchError::from)?;
332
333            let response_body = res.json::<serde_json::Value>().await.map_err(|_e| {
334                OperationOutcomeError::fatal(
335                    IssueType::Exception(None),
336                    "Failed to parse response body.".to_string(),
337                )
338            })?;
339
340            if response_body["errors"].as_bool().unwrap() == true {
341                tracing::error!(
342                    "Failed to index resources for tenant: '{}'. Response: '{:?}'",
343                    tenant.as_ref(),
344                    response_body
345                );
346                return Err(SearchError::Fatal(500).into());
347            }
348            Ok(SuccessfullyIndexedCount(
349                response_body["items"].as_array().unwrap().len(),
350            ))
351        } else {
352            Ok(SuccessfullyIndexedCount(0))
353        }
354    }
355
356    async fn migrate(
357        &self,
358        _fhir_version: &SupportedFHIRVersions,
359    ) -> Result<(), haste_fhir_operation_error::OperationOutcomeError> {
360        migration::create_mapping(&self.client, get_index_name(_fhir_version)?).await?;
361        Ok(())
362    }
363}