haste_fhir_search/elastic_search/
mod.rs1use crate::{
2 IndexResource, SearchEngine, SearchEntry, SearchOptions, SearchReturn,
3 SuccessfullyIndexedCount,
4 indexing_conversion::{self, InsertableIndex},
5};
6use elasticsearch::{
7 BulkOperation, BulkParts, Elasticsearch, SearchParts,
8 auth::Credentials,
9 cert::CertificateValidation,
10 http::{
11 Url,
12 transport::{BuildError, SingleNodeConnectionPool, TransportBuilder},
13 },
14};
15use haste_fhir_client::request::SearchRequest;
16use haste_fhir_model::r4::generated::{
17 resources::{Resource, ResourceType, SearchParameter},
18 terminology::IssueType,
19};
20use haste_fhir_operation_error::{OperationOutcomeError, derive::OperationOutcomeError};
21use haste_fhirpath::FPEngine;
22use haste_jwt::{ProjectId, ResourceId, TenantId, VersionId};
23use haste_repository::types::{FHIRMethod, SupportedFHIRVersions};
24use serde::Deserialize;
25use std::{collections::HashMap, sync::Arc};
26
27mod migration;
28mod search;
29
30#[derive(Deserialize, Debug)]
31struct SearchEntryPrivate {
32 pub id: Vec<ResourceId>,
33 pub resource_type: Vec<ResourceType>,
34 pub version_id: Vec<VersionId>,
35}
36
37#[derive(OperationOutcomeError, Debug)]
38pub enum SearchError {
39 #[fatal(
40 code = "exception",
41 diagnostic = "Failed to evaluate fhirpath expression."
42 )]
43 FHIRPathError(#[from] haste_fhirpath::FHIRPathError),
44 #[fatal(
45 code = "exception",
46 diagnostic = "Search does not support the fhir method: '{arg0:?}'"
47 )]
48 UnsupportedFHIRMethod(FHIRMethod),
49 #[fatal(
50 code = "exception",
51 diagnostic = "Failed to index resources server responded with status code: '{arg0}'"
52 )]
53 Fatal(u16),
54 #[fatal(
55 code = "exception",
56 diagnostic = "Elasticsearch server failed to index."
57 )]
58 ElasticsearchError(#[from] elasticsearch::Error),
59 #[fatal(
60 code = "exception",
61 diagnostic = "Elasticsearch server responded with an error: '{arg0}'"
62 )]
63 ElasticSearchResponseError(u16),
64 NotConnected,
65}
66
67#[derive(OperationOutcomeError, Debug)]
68pub enum SearchConfigError {
69 #[fatal(code = "exception", diagnostic = "Failed to parse URL: '{arg0}'.")]
70 UrlParseError(String),
71 #[fatal(
72 code = "exception",
73 diagnostic = "Elasticsearch client creation failed."
74 )]
75 ElasticSearchConfigError(#[from] BuildError),
76 #[fatal(
77 code = "exception",
78 diagnostic = "Unsupported FHIR version for index: '{arg0}'"
79 )]
80 UnsupportedIndex(SupportedFHIRVersions),
81}
82
83#[derive(Clone)]
84pub struct ElasticSearchEngine {
85 fp_engine: Arc<FPEngine>,
86 client: Arc<Elasticsearch>,
87}
88
89impl ElasticSearchEngine {
90 pub fn new(
91 fp_engine: Arc<FPEngine>,
92 url: &str,
93 username: String,
94 password: String,
95 ) -> Result<Self, SearchConfigError> {
96 let url =
97 Url::parse(url).map_err(|_e| SearchConfigError::UrlParseError(url.to_string()))?;
98 let conn_pool = SingleNodeConnectionPool::new(url);
99 let transport = TransportBuilder::new(conn_pool)
100 .cert_validation(CertificateValidation::None)
101 .auth(Credentials::Basic(username, password))
102 .build()?;
103
104 let elasticsearch_client = Elasticsearch::new(transport);
105 Ok(ElasticSearchEngine {
106 fp_engine,
107 client: Arc::new(elasticsearch_client),
108 })
109 }
110
111 pub async fn is_connected(&self) -> Result<(), SearchError> {
112 let res = self.client.ping().send().await.map_err(SearchError::from)?;
113
114 if res.status_code().is_success() {
115 Ok(())
116 } else {
117 Err(SearchError::NotConnected)
118 }
119 }
120}
121
122async fn resource_to_elastic_index(
123 fp_engine: Arc<FPEngine>,
124 parameters: &Vec<Arc<SearchParameter>>,
125 resource: &Resource,
126) -> Result<HashMap<String, InsertableIndex>, OperationOutcomeError> {
127 let mut map = HashMap::new();
128 for param in parameters.iter() {
129 if let Some(expression) = param.expression.as_ref().and_then(|e| e.value.as_ref())
130 && let Some(url) = param.url.value.as_ref()
131 {
132 let result = fp_engine
133 .evaluate(expression, vec![resource])
134 .await
135 .map_err(SearchError::from);
136
137 if let Err(err) = result {
138 tracing::error!(
139 "Failed to evaluate FHIRPath expression: '{}' for resource.",
140 expression,
141 );
142
143 return Err(SearchError::from(err).into());
144 }
145
146 let result_vec = indexing_conversion::to_insertable_index(
147 param,
148 result?.iter().collect::<Vec<_>>(),
149 )?;
150
151 map.insert(url.clone(), result_vec);
152 }
153 }
154
155 Ok(map)
156}
157
158static R4_FHIR_INDEX: &str = "r4_search_index";
159
160pub fn get_index_name(
161 fhir_version: &SupportedFHIRVersions,
162) -> Result<&'static str, SearchConfigError> {
163 match fhir_version {
164 SupportedFHIRVersions::R4 => Ok(R4_FHIR_INDEX),
165 }
167}
168
169#[derive(serde::Deserialize, Debug)]
170struct ElasticSearchHitResult {
171 _index: String,
172 _id: String,
173 _score: Option<f64>,
174 fields: SearchEntryPrivate,
175}
176
177#[derive(serde::Deserialize, Debug)]
178struct ElasticSearchHitTotalMeta {
179 value: i64,
180 }
182
183#[derive(serde::Deserialize, Debug)]
184struct ElasticSearchHit {
185 total: Option<ElasticSearchHitTotalMeta>,
186 hits: Vec<ElasticSearchHitResult>,
187}
188
189#[derive(serde::Deserialize, Debug)]
190struct ElasticSearchResponse {
191 hits: ElasticSearchHit,
192}
193
194fn unique_index_id(
195 tenant: &TenantId,
196 project: &ProjectId,
197 resource_type: &ResourceType,
198 id: &ResourceId,
199) -> String {
200 let unique_index_id = format!(
201 "{}/{}/{}/{}",
202 tenant.as_ref(),
203 project.as_ref(),
204 resource_type.as_ref(),
205 id.as_ref()
206 );
207
208 unique_index_id
209}
210
211impl SearchEngine for ElasticSearchEngine {
212 async fn search(
213 &self,
214 fhir_version: &SupportedFHIRVersions,
215 tenant: &TenantId,
216 project: &ProjectId,
217 search_request: &SearchRequest,
218 options: Option<SearchOptions>,
219 ) -> Result<SearchReturn, haste_fhir_operation_error::OperationOutcomeError> {
220 let query = search::build_elastic_search_query(tenant, project, &search_request, &options)?;
221
222 let search_response = self
223 .client
224 .search(SearchParts::Index(&[get_index_name(&fhir_version)?]))
225 .body(query)
226 .send()
227 .await
228 .map_err(SearchError::from)?;
229
230 if !search_response.status_code().is_success() {
231 return Err(SearchError::ElasticSearchResponseError(
232 search_response.status_code().as_u16(),
233 )
234 .into());
235 }
236
237 let search_results = search_response
238 .json::<ElasticSearchResponse>()
239 .await
240 .map_err(SearchError::from)?;
241
242 Ok(SearchReturn {
243 total: search_results.hits.total.as_ref().map(|t| t.value),
244 entries: search_results
245 .hits
246 .hits
247 .into_iter()
248 .map(|mut hit| SearchEntry {
249 id: hit.fields.id.pop().unwrap(),
250 resource_type: hit.fields.resource_type.pop().unwrap(),
251 version_id: hit.fields.version_id.pop().unwrap(),
252 })
253 .collect(),
254 })
255 }
256
257 fn index(
258 &self,
259 fhir_version: SupportedFHIRVersions,
260 tenant: TenantId,
261 resources: Vec<IndexResource>,
262 ) -> impl Future<Output = Result<SuccessfullyIndexedCount, OperationOutcomeError>> + Send + Sync
263 {
264 async move {
265 let mut tasks = Vec::with_capacity(resources.len());
268 let resources_total = resources.len();
269 let search_index_name = get_index_name(&fhir_version)?;
270
271 for r in resources.into_iter().filter(|r| match r.fhir_method {
272 FHIRMethod::Create | FHIRMethod::Update | FHIRMethod::Delete => true,
273 _ => false,
274 }) {
275 let engine = self.fp_engine.clone();
276 let tenant = tenant.clone();
277
278 tasks.push(tokio::spawn(async move {
279 match &r.fhir_method {
280 FHIRMethod::Create | FHIRMethod::Update => {
281 let index_id =
283 unique_index_id(&tenant, &r.project, &r.resource_type, &r.id);
284 let params =
285 haste_artifacts::search_parameters::get_search_parameters_for_resource(
286 &r.resource_type,
287 );
288
289 let mut elastic_index =
290 resource_to_elastic_index(engine, ¶ms, &r.resource).await?;
291
292 elastic_index.insert(
293 "resource_type".to_string(),
294 InsertableIndex::Meta(r.resource_type.as_ref().to_string()),
295 );
296
297 elastic_index.insert(
298 "id".to_string(),
299 InsertableIndex::Meta(r.id.as_ref().to_string()),
300 );
301
302 elastic_index.insert(
303 "version_id".to_string(),
304 InsertableIndex::Meta(r.version_id.to_string()),
305 );
306 elastic_index.insert(
307 "project".to_string(),
308 InsertableIndex::Meta(r.project.as_ref().to_string()),
309 );
310 elastic_index.insert(
311 "tenant".to_string(),
312 InsertableIndex::Meta(tenant.as_ref().to_string()),
313 );
314 Ok(BulkOperation::index(elastic_index)
315 .id(index_id)
316 .index(search_index_name)
317 .into())
318 }
319 FHIRMethod::Delete => Ok(BulkOperation::delete(unique_index_id(
320 &tenant,
321 &r.project,
322 &r.resource_type,
323 &r.id,
324 ))
325 .index(search_index_name)
326 .into()),
327 method => Err(SearchError::UnsupportedFHIRMethod((*method).clone()))
328 .map_err(OperationOutcomeError::from),
329 }
330 }));
331 }
332
333 let client = self.client.clone();
334
335 let mut bulk_ops: Vec<BulkOperation<HashMap<String, InsertableIndex>>> =
336 Vec::with_capacity(resources_total);
337
338 for task in tasks {
339 let res = task.await.map_err(|e| {
340 OperationOutcomeError::fatal(IssueType::Exception(None), e.to_string())
341 })??;
342 bulk_ops.push(res);
343 }
344
345 if !bulk_ops.is_empty() {
346 let res = client
347 .bulk(BulkParts::Index(search_index_name))
348 .body(bulk_ops)
349 .send()
350 .await
351 .map_err(SearchError::from)?;
352
353 let response_body = res.json::<serde_json::Value>().await.map_err(|_e| {
354 OperationOutcomeError::fatal(
355 IssueType::Exception(None),
356 "Failed to parse response body.".to_string(),
357 )
358 })?;
359
360 if response_body["errors"].as_bool().unwrap() == true {
361 tracing::error!(
362 "Failed to index resources for tenant: '{}'. Response: '{:?}'",
363 tenant.as_ref(),
364 response_body
365 );
366 return Err(SearchError::Fatal(500).into());
367 }
368 Ok(SuccessfullyIndexedCount(
369 response_body["items"].as_array().unwrap().len(),
370 ))
371 } else {
372 Ok(SuccessfullyIndexedCount(0))
373 }
374 }
375 }
376
377 async fn migrate(
378 &self,
379 _fhir_version: &SupportedFHIRVersions,
380 ) -> Result<(), haste_fhir_operation_error::OperationOutcomeError> {
381 migration::create_mapping(&self.client, get_index_name(_fhir_version)?).await?;
382 Ok(())
383 }
384}