haste_fhir_search/elastic_search/
mod.rs1use crate::{
2 IndexResource, SearchEngine, SearchEntry, SearchOptions, SearchReturn,
3 SuccessfullyIndexedCount,
4 indexing_conversion::{self, InsertableIndex},
5};
6use elasticsearch::{
7 BulkOperation, BulkParts, Elasticsearch, SearchParts,
8 auth::Credentials,
9 cert::CertificateValidation,
10 http::{
11 Url,
12 transport::{BuildError, SingleNodeConnectionPool, TransportBuilder},
13 },
14};
15use haste_fhir_client::request::SearchRequest;
16use haste_fhir_model::r4::generated::{
17 resources::{Resource, ResourceType, SearchParameter},
18 terminology::IssueType,
19};
20use haste_fhir_operation_error::{OperationOutcomeError, derive::OperationOutcomeError};
21use haste_fhirpath::FPEngine;
22use haste_jwt::{ProjectId, ResourceId, TenantId, VersionId};
23use haste_repository::types::{FHIRMethod, SupportedFHIRVersions};
24use rayon::prelude::*;
25use serde::Deserialize;
26use std::{collections::HashMap, sync::Arc};
27
28mod migration;
29mod search;
30
31#[derive(Deserialize, Debug)]
32struct SearchEntryPrivate {
33 pub id: Vec<ResourceId>,
34 pub resource_type: Vec<ResourceType>,
35 pub version_id: Vec<VersionId>,
36}
37
38#[derive(OperationOutcomeError, Debug)]
39pub enum SearchError {
40 #[fatal(
41 code = "exception",
42 diagnostic = "Failed to evaluate fhirpath expression."
43 )]
44 FHIRPathError(#[from] haste_fhirpath::FHIRPathError),
45 #[fatal(
46 code = "exception",
47 diagnostic = "Search does not support the fhir method: '{arg0:?}'"
48 )]
49 UnsupportedFHIRMethod(FHIRMethod),
50 #[fatal(
51 code = "exception",
52 diagnostic = "Failed to index resources server responded with status code: '{arg0}'"
53 )]
54 Fatal(u16),
55 #[fatal(
56 code = "exception",
57 diagnostic = "Elasticsearch server failed to index."
58 )]
59 ElasticsearchError(#[from] elasticsearch::Error),
60 #[fatal(
61 code = "exception",
62 diagnostic = "Elasticsearch server responded with an error: '{arg0}'"
63 )]
64 ElasticSearchResponseError(u16),
65 NotConnected,
66}
67
68#[derive(OperationOutcomeError, Debug)]
69pub enum SearchConfigError {
70 #[fatal(code = "exception", diagnostic = "Failed to parse URL: '{arg0}'.")]
71 UrlParseError(String),
72 #[fatal(
73 code = "exception",
74 diagnostic = "Elasticsearch client creation failed."
75 )]
76 ElasticSearchConfigError(#[from] BuildError),
77 #[fatal(
78 code = "exception",
79 diagnostic = "Unsupported FHIR version for index: '{arg0}'"
80 )]
81 UnsupportedIndex(SupportedFHIRVersions),
82}
83
84#[derive(Clone)]
85pub struct ElasticSearchEngine {
86 fp_engine: Arc<FPEngine>,
87 client: Elasticsearch,
88}
89
90impl ElasticSearchEngine {
91 pub fn new(
92 fp_engine: Arc<FPEngine>,
93 url: &str,
94 username: String,
95 password: String,
96 ) -> Result<Self, SearchConfigError> {
97 let url =
98 Url::parse(url).map_err(|_e| SearchConfigError::UrlParseError(url.to_string()))?;
99 let conn_pool = SingleNodeConnectionPool::new(url);
100 let transport = TransportBuilder::new(conn_pool)
101 .cert_validation(CertificateValidation::None)
102 .auth(Credentials::Basic(username, password))
103 .build()?;
104
105 let elasticsearch_client = Elasticsearch::new(transport);
106 Ok(ElasticSearchEngine {
107 fp_engine,
108 client: elasticsearch_client,
109 })
110 }
111
112 pub async fn is_connected(&self) -> Result<(), SearchError> {
113 let res = self.client.ping().send().await.map_err(SearchError::from)?;
114
115 if res.status_code().is_success() {
116 Ok(())
117 } else {
118 Err(SearchError::NotConnected)
119 }
120 }
121}
122
123fn resource_to_elastic_index(
124 fp_engine: Arc<FPEngine>,
125 parameters: &Vec<Arc<SearchParameter>>,
126 resource: &Resource,
127) -> Result<HashMap<String, InsertableIndex>, OperationOutcomeError> {
128 let mut map = HashMap::new();
129 for param in parameters.iter() {
130 if let Some(expression) = param.expression.as_ref().and_then(|e| e.value.as_ref())
131 && let Some(url) = param.url.value.as_ref()
132 {
133 let result = fp_engine
134 .evaluate(expression, vec![resource])
135 .map_err(SearchError::from);
136
137 if let Err(err) = result {
138 tracing::error!(
139 "Failed to evaluate FHIRPath expression: '{}' for resource.",
140 expression,
141 );
142
143 return Err(SearchError::from(err).into());
144 }
145
146 let result_vec = indexing_conversion::to_insertable_index(
147 param,
148 result?.iter().collect::<Vec<_>>(),
149 )?;
150
151 map.insert(url.clone(), result_vec);
152 }
153 }
154
155 Ok(map)
156}
157
158static R4_FHIR_INDEX: &str = "r4_search_index";
159
160pub fn get_index_name(
161 fhir_version: &SupportedFHIRVersions,
162) -> Result<&'static str, SearchConfigError> {
163 match fhir_version {
164 SupportedFHIRVersions::R4 => Ok(R4_FHIR_INDEX),
165 }
167}
168
169#[derive(serde::Deserialize, Debug)]
170struct ElasticSearchHitResult {
171 _index: String,
172 _id: String,
173 _score: Option<f64>,
174 fields: SearchEntryPrivate,
175}
176
177#[derive(serde::Deserialize, Debug)]
178struct ElasticSearchHitTotalMeta {
179 value: i64,
180 }
182
183#[derive(serde::Deserialize, Debug)]
184struct ElasticSearchHit {
185 total: Option<ElasticSearchHitTotalMeta>,
186 hits: Vec<ElasticSearchHitResult>,
187}
188
189#[derive(serde::Deserialize, Debug)]
190struct ElasticSearchResponse {
191 hits: ElasticSearchHit,
192}
193
194fn unique_index_id(
195 tenant: &TenantId,
196 project: &ProjectId,
197 resource_type: &ResourceType,
198 id: &ResourceId,
199) -> String {
200 let unique_index_id = format!(
201 "{}/{}/{}/{}",
202 tenant.as_ref(),
203 project.as_ref(),
204 resource_type.as_ref(),
205 id.as_ref()
206 );
207
208 unique_index_id
209}
210
211impl SearchEngine for ElasticSearchEngine {
212 async fn search(
213 &self,
214 fhir_version: &SupportedFHIRVersions,
215 tenant: &TenantId,
216 project: &ProjectId,
217 search_request: &SearchRequest,
218 options: Option<SearchOptions>,
219 ) -> Result<SearchReturn, haste_fhir_operation_error::OperationOutcomeError> {
220 let query = search::build_elastic_search_query(tenant, project, &search_request, &options)?;
221
222 let search_response = self
223 .client
224 .search(SearchParts::Index(&[get_index_name(&fhir_version)?]))
225 .body(query)
226 .send()
227 .await
228 .map_err(SearchError::from)?;
229
230 if !search_response.status_code().is_success() {
231 return Err(SearchError::ElasticSearchResponseError(
232 search_response.status_code().as_u16(),
233 )
234 .into());
235 }
236
237 let search_results = search_response
238 .json::<ElasticSearchResponse>()
239 .await
240 .map_err(SearchError::from)?;
241
242 Ok(SearchReturn {
243 total: search_results.hits.total.as_ref().map(|t| t.value),
244 entries: search_results
245 .hits
246 .hits
247 .into_iter()
248 .map(|mut hit| SearchEntry {
249 id: hit.fields.id.pop().unwrap(),
250 resource_type: hit.fields.resource_type.pop().unwrap(),
251 version_id: hit.fields.version_id.pop().unwrap(),
252 })
253 .collect(),
254 })
255 }
256
257 async fn index<'a>(
258 &self,
259 _fhir_version: &SupportedFHIRVersions,
260 tenant: &TenantId,
261
262 resources: Vec<IndexResource<'a>>,
263 ) -> Result<SuccessfullyIndexedCount, haste_fhir_operation_error::OperationOutcomeError> {
264 let bulk_ops: Vec<BulkOperation<HashMap<String, InsertableIndex>>> = resources
267 .par_iter()
268 .filter(|r| match r.fhir_method {
269 FHIRMethod::Create | FHIRMethod::Update | FHIRMethod::Delete => true,
270 _ => false,
271 })
272 .map(|r| match &r.fhir_method {
273 FHIRMethod::Create | FHIRMethod::Update => {
274 let index_id = unique_index_id(tenant, r.project, &r.resource_type, &r.id);
276 let params =
277 haste_artifacts::search_parameters::get_search_parameters_for_resource(
278 &r.resource_type,
279 );
280
281 let mut elastic_index =
282 resource_to_elastic_index(self.fp_engine.clone(), ¶ms, &r.resource)?;
283
284 elastic_index.insert(
285 "resource_type".to_string(),
286 InsertableIndex::Meta(r.resource_type.as_ref().to_string()),
287 );
288
289 elastic_index.insert(
290 "id".to_string(),
291 InsertableIndex::Meta(r.id.as_ref().to_string()),
292 );
293
294 elastic_index.insert(
295 "version_id".to_string(),
296 InsertableIndex::Meta(r.version_id.to_string()),
297 );
298 elastic_index.insert(
299 "project".to_string(),
300 InsertableIndex::Meta(r.project.as_ref().to_string()),
301 );
302 elastic_index.insert(
303 "tenant".to_string(),
304 InsertableIndex::Meta(tenant.as_ref().to_string()),
305 );
306
307 Ok(BulkOperation::index(elastic_index)
308 .id(index_id)
309 .index(get_index_name(_fhir_version)?)
310 .into())
311 }
312 FHIRMethod::Delete => Ok(BulkOperation::delete(unique_index_id(
313 tenant,
314 r.project,
315 &r.resource_type,
316 &r.id,
317 ))
318 .index(get_index_name(_fhir_version)?)
319 .into()),
320 method => Err(SearchError::UnsupportedFHIRMethod((*method).clone()).into()),
321 })
322 .collect::<Result<Vec<_>, OperationOutcomeError>>()?;
323
324 if !bulk_ops.is_empty() {
325 let res = self
326 .client
327 .bulk(BulkParts::Index(get_index_name(_fhir_version)?))
328 .body(bulk_ops)
329 .send()
330 .await
331 .map_err(SearchError::from)?;
332
333 let response_body = res.json::<serde_json::Value>().await.map_err(|_e| {
334 OperationOutcomeError::fatal(
335 IssueType::Exception(None),
336 "Failed to parse response body.".to_string(),
337 )
338 })?;
339
340 if response_body["errors"].as_bool().unwrap() == true {
341 tracing::error!(
342 "Failed to index resources for tenant: '{}'. Response: '{:?}'",
343 tenant.as_ref(),
344 response_body
345 );
346 return Err(SearchError::Fatal(500).into());
347 }
348 Ok(SuccessfullyIndexedCount(
349 response_body["items"].as_array().unwrap().len(),
350 ))
351 } else {
352 Ok(SuccessfullyIndexedCount(0))
353 }
354 }
355
356 async fn migrate(
357 &self,
358 _fhir_version: &SupportedFHIRVersions,
359 ) -> Result<(), haste_fhir_operation_error::OperationOutcomeError> {
360 migration::create_mapping(&self.client, get_index_name(_fhir_version)?).await?;
361 Ok(())
362 }
363}