haste_fhir_search/elastic_search/
mod.rs1use crate::{
2 IndexResource, SearchEngine, SearchEntry, SearchOptions, SearchReturn,
3 SuccessfullyIndexedCount,
4 indexing_conversion::{self, InsertableIndex},
5};
6use elasticsearch::{
7 BulkOperation, BulkParts, Elasticsearch, SearchParts,
8 auth::Credentials,
9 cert::CertificateValidation,
10 http::{
11 Url,
12 transport::{BuildError, SingleNodeConnectionPool, TransportBuilder},
13 },
14};
15use haste_fhir_client::request::SearchRequest;
16use haste_fhir_model::r4::generated::{
17 resources::{Resource, ResourceType, SearchParameter},
18 terminology::IssueType,
19};
20use haste_fhir_operation_error::{OperationOutcomeError, derive::OperationOutcomeError};
21use haste_fhirpath::FPEngine;
22use haste_jwt::{ProjectId, ResourceId, TenantId, VersionId};
23use haste_repository::types::{FHIRMethod, SupportedFHIRVersions};
24use serde::Deserialize;
25use std::{collections::HashMap, sync::Arc};
26
27mod migration;
28mod search;
29
30#[derive(Deserialize, Debug)]
31struct SearchEntryPrivate {
32 pub id: Vec<ResourceId>,
33 pub resource_type: Vec<ResourceType>,
34 pub version_id: Vec<VersionId>,
35}
36
37#[derive(OperationOutcomeError, Debug)]
38pub enum SearchError {
39 #[fatal(
40 code = "exception",
41 diagnostic = "Failed to evaluate fhirpath expression."
42 )]
43 FHIRPathError(#[from] haste_fhirpath::FHIRPathError),
44 #[fatal(
45 code = "exception",
46 diagnostic = "Search does not support the fhir method: '{arg0:?}'"
47 )]
48 UnsupportedFHIRMethod(FHIRMethod),
49 #[fatal(
50 code = "exception",
51 diagnostic = "Failed to index resources server responded with status code: '{arg0}'"
52 )]
53 Fatal(u16),
54 #[fatal(
55 code = "exception",
56 diagnostic = "Elasticsearch server failed to index."
57 )]
58 ElasticsearchError(#[from] elasticsearch::Error),
59 #[fatal(
60 code = "exception",
61 diagnostic = "Elasticsearch server responded with an error: '{arg0}'"
62 )]
63 ElasticSearchResponseError(u16),
64 NotConnected,
65}
66
67#[derive(OperationOutcomeError, Debug)]
68pub enum SearchConfigError {
69 #[fatal(code = "exception", diagnostic = "Failed to parse URL: '{arg0}'.")]
70 UrlParseError(String),
71 #[fatal(
72 code = "exception",
73 diagnostic = "Elasticsearch client creation failed."
74 )]
75 ElasticSearchConfigError(#[from] BuildError),
76 #[fatal(
77 code = "exception",
78 diagnostic = "Unsupported FHIR version for index: '{arg0}'"
79 )]
80 UnsupportedIndex(SupportedFHIRVersions),
81}
82
83#[derive(Clone)]
84pub struct ElasticSearchEngine {
85 fp_engine: Arc<FPEngine>,
86 client: Arc<Elasticsearch>,
87}
88
89impl ElasticSearchEngine {
90 pub fn new(
91 fp_engine: Arc<FPEngine>,
92 url: &str,
93 username: String,
94 password: String,
95 ) -> Result<Self, SearchConfigError> {
96 let url =
97 Url::parse(url).map_err(|_e| SearchConfigError::UrlParseError(url.to_string()))?;
98 let conn_pool = SingleNodeConnectionPool::new(url);
99 let transport = TransportBuilder::new(conn_pool)
100 .cert_validation(CertificateValidation::None)
101 .auth(Credentials::Basic(username, password))
102 .build()?;
103
104 let elasticsearch_client = Elasticsearch::new(transport);
105 Ok(ElasticSearchEngine {
106 fp_engine,
107 client: Arc::new(elasticsearch_client),
108 })
109 }
110
111 pub async fn is_connected(&self) -> Result<(), SearchError> {
112 let res = self.client.ping().send().await.map_err(SearchError::from)?;
113
114 if res.status_code().is_success() {
115 Ok(())
116 } else {
117 Err(SearchError::NotConnected)
118 }
119 }
120}
121
122async fn resource_to_elastic_index(
123 fp_engine: Arc<FPEngine>,
124 parameters: &Vec<Arc<SearchParameter>>,
125 resource: &Resource,
126) -> Result<HashMap<String, InsertableIndex>, OperationOutcomeError> {
127 let mut map = HashMap::new();
128 for param in parameters.iter() {
129 if let Some(expression) = param.expression.as_ref().and_then(|e| e.value.as_ref())
130 && let Some(url) = param.url.value.as_ref()
131 {
132 let result = fp_engine
133 .evaluate(expression, vec![resource])
134 .await
135 .map_err(SearchError::from);
136
137 if let Err(err) = result {
138 tracing::error!(
139 "Failed to evaluate FHIRPath expression: '{}' for resource.",
140 expression,
141 );
142
143 return Err(SearchError::from(err).into());
144 }
145
146 let result_vec = indexing_conversion::to_insertable_index(
147 param,
148 result?.iter().collect::<Vec<_>>(),
149 )?;
150
151 map.insert(url.clone(), result_vec);
152 }
153 }
154
155 Ok(map)
156}
157
158static R4_FHIR_INDEX: &str = "r4_search_index";
159
160pub fn get_index_name(
161 fhir_version: &SupportedFHIRVersions,
162) -> Result<&'static str, SearchConfigError> {
163 match fhir_version {
164 SupportedFHIRVersions::R4 => Ok(R4_FHIR_INDEX),
165 }
167}
168
169#[derive(serde::Deserialize, Debug)]
170struct ElasticSearchHitResult {
171 _index: String,
172 _id: String,
173 _score: Option<f64>,
174 fields: SearchEntryPrivate,
175}
176
177#[derive(serde::Deserialize, Debug)]
178struct ElasticSearchHitTotalMeta {
179 value: i64,
180 }
182
183#[derive(serde::Deserialize, Debug)]
184struct ElasticSearchHit {
185 total: Option<ElasticSearchHitTotalMeta>,
186 hits: Vec<ElasticSearchHitResult>,
187}
188
189#[derive(serde::Deserialize, Debug)]
190struct ElasticSearchResponse {
191 hits: ElasticSearchHit,
192}
193
194fn unique_index_id(
195 tenant: &TenantId,
196 project: &ProjectId,
197 resource_type: &ResourceType,
198 id: &ResourceId,
199) -> String {
200 let unique_index_id = format!(
201 "{}/{}/{}/{}",
202 tenant.as_ref(),
203 project.as_ref(),
204 resource_type.as_ref(),
205 id.as_ref()
206 );
207
208 unique_index_id
209}
210
211impl SearchEngine for ElasticSearchEngine {
212 async fn search(
213 &self,
214 fhir_version: &SupportedFHIRVersions,
215 tenant: &TenantId,
216 project: &ProjectId,
217 search_request: &SearchRequest,
218 options: Option<SearchOptions>,
219 ) -> Result<SearchReturn, haste_fhir_operation_error::OperationOutcomeError> {
220 let query = search::build_elastic_search_query(tenant, project, &search_request, &options)?;
221
222 let search_response = self
223 .client
224 .search(SearchParts::Index(&[get_index_name(&fhir_version)?]))
225 .body(query)
226 .send()
227 .await
228 .map_err(SearchError::from)?;
229
230 if !search_response.status_code().is_success() {
231 return Err(SearchError::ElasticSearchResponseError(
232 search_response.status_code().as_u16(),
233 )
234 .into());
235 }
236
237 let search_results = search_response
238 .json::<ElasticSearchResponse>()
239 .await
240 .map_err(SearchError::from)?;
241
242 Ok(SearchReturn {
243 total: search_results.hits.total.as_ref().map(|t| t.value),
244 entries: search_results
245 .hits
246 .hits
247 .into_iter()
248 .map(|mut hit| SearchEntry {
249 id: hit.fields.id.pop().unwrap(),
250 resource_type: hit.fields.resource_type.pop().unwrap(),
251 version_id: hit.fields.version_id.pop().unwrap(),
252 })
253 .collect(),
254 })
255 }
256
257 fn index(
258 &self,
259 fhir_version: SupportedFHIRVersions,
260 resources: Vec<IndexResource>,
261 ) -> impl Future<Output = Result<SuccessfullyIndexedCount, OperationOutcomeError>> + Send + Sync
262 {
263 async move {
264 let mut tasks = Vec::with_capacity(resources.len());
267 let resources_total = resources.len();
268 let search_index_name = get_index_name(&fhir_version)?;
269
270 for r in resources.into_iter().filter(|r| match r.fhir_method {
271 FHIRMethod::Create | FHIRMethod::Update | FHIRMethod::Delete => true,
272 _ => false,
273 }) {
274 let engine = self.fp_engine.clone();
275 tasks.push(tokio::spawn(async move {
276 match &r.fhir_method {
277 FHIRMethod::Create | FHIRMethod::Update => {
278 let index_id =
280 unique_index_id(&r.tenant, &r.project, &r.resource_type, &r.id);
281 let params =
282 haste_artifacts::search_parameters::get_search_parameters_for_resource(
283 &r.resource_type,
284 );
285
286 let mut elastic_index =
287 resource_to_elastic_index(engine, ¶ms, &r.resource).await?;
288
289 elastic_index.insert(
290 "resource_type".to_string(),
291 InsertableIndex::Meta(r.resource_type.as_ref().to_string()),
292 );
293
294 elastic_index.insert(
295 "id".to_string(),
296 InsertableIndex::Meta(r.id.as_ref().to_string()),
297 );
298
299 elastic_index.insert(
300 "version_id".to_string(),
301 InsertableIndex::Meta(r.version_id.as_ref().to_string()),
302 );
303 elastic_index.insert(
304 "project".to_string(),
305 InsertableIndex::Meta(r.project.as_ref().to_string()),
306 );
307 elastic_index.insert(
308 "tenant".to_string(),
309 InsertableIndex::Meta(r.tenant.as_ref().to_string()),
310 );
311 Ok(BulkOperation::index(elastic_index)
312 .id(index_id)
313 .index(search_index_name)
314 .into())
315 }
316 FHIRMethod::Delete => Ok(BulkOperation::delete(unique_index_id(
317 &r.tenant,
318 &r.project,
319 &r.resource_type,
320 &r.id,
321 ))
322 .index(search_index_name)
323 .into()),
324 method => Err(SearchError::UnsupportedFHIRMethod((*method).clone()))
325 .map_err(OperationOutcomeError::from),
326 }
327 }));
328 }
329
330 let client = self.client.clone();
331
332 let mut bulk_ops: Vec<BulkOperation<HashMap<String, InsertableIndex>>> =
333 Vec::with_capacity(resources_total);
334
335 for task in tasks {
336 let res = task.await.map_err(|e| {
337 OperationOutcomeError::fatal(IssueType::Exception(None), e.to_string())
338 })??;
339 bulk_ops.push(res);
340 }
341
342 if !bulk_ops.is_empty() {
343 let res = client
344 .bulk(BulkParts::Index(search_index_name))
345 .body(bulk_ops)
346 .send()
347 .await
348 .map_err(SearchError::from)?;
349
350 let response_body = res.json::<serde_json::Value>().await.map_err(|_e| {
351 OperationOutcomeError::fatal(
352 IssueType::Exception(None),
353 "Failed to parse response body.".to_string(),
354 )
355 })?;
356
357 if response_body["errors"].as_bool().unwrap() == true {
358 tracing::error!("Failed to index resources. Response: '{:?}'", response_body);
359 return Err(SearchError::Fatal(500).into());
360 }
361 Ok(SuccessfullyIndexedCount(
362 response_body["items"].as_array().unwrap().len(),
363 ))
364 } else {
365 Ok(SuccessfullyIndexedCount(0))
366 }
367 }
368 }
369
370 async fn migrate(
371 &self,
372 _fhir_version: &SupportedFHIRVersions,
373 ) -> Result<(), haste_fhir_operation_error::OperationOutcomeError> {
374 migration::create_mapping(&self.client, get_index_name(_fhir_version)?).await?;
375 Ok(())
376 }
377}