haste_fhir_search/elastic_search/
mod.rs1use crate::{
2 IndexResource, ParameterLevel, ResolvedParameter, SearchEngine, SearchOptions,
3 SearchParameterResolve, SearchReturn, SuccessfullyIndexedCount,
4 indexing_conversion::{self, InsertableIndex},
5};
6use elasticsearch::{
7 BulkOperation, BulkParts, Elasticsearch,
8 auth::Credentials,
9 cert::CertificateValidation,
10 http::{
11 Url,
12 transport::{BuildError, SingleNodeConnectionPool, TransportBuilder},
13 },
14};
15use haste_fhir_client::request::SearchRequest;
16use haste_fhir_model::r4::generated::{
17 resources::{Resource, ResourceType},
18 terminology::IssueType,
19};
20use haste_fhir_operation_error::{OperationOutcomeError, derive::OperationOutcomeError};
21use haste_fhirpath::FPEngine;
22use haste_jwt::{ProjectId, ResourceId, TenantId, VersionId};
23use haste_repository::types::{FHIRMethod, SupportedFHIRVersions};
24use serde::Deserialize;
25use std::{collections::HashMap, sync::Arc};
26
27mod migration;
28mod search;
29pub mod search_parameter_resolver;
30
31#[derive(Deserialize, Debug)]
32struct SearchEntryPrivate {
33 pub id: Vec<ResourceId>,
34 pub resource_type: Vec<ResourceType>,
35 pub version_id: Vec<VersionId>,
36 pub project: Vec<ProjectId>,
37}
38
39static DYNAMIC_PARAMETER_INDEX_FIELD: &str = "dynamic_parameters";
40
41#[derive(OperationOutcomeError, Debug)]
42pub enum SearchError {
43 #[fatal(
44 code = "exception",
45 diagnostic = "Failed to evaluate fhirpath expression."
46 )]
47 FHIRPathError(#[from] haste_fhirpath::FHIRPathError),
48 #[fatal(
49 code = "exception",
50 diagnostic = "Search does not support the fhir method: '{arg0:?}'"
51 )]
52 UnsupportedFHIRMethod(FHIRMethod),
53 #[fatal(
54 code = "exception",
55 diagnostic = "Failed to index resources server responded with status code: '{arg0}'"
56 )]
57 Fatal(u16),
58 #[fatal(
59 code = "exception",
60 diagnostic = "Elasticsearch server failed to index."
61 )]
62 ElasticsearchError(#[from] elasticsearch::Error),
63 #[fatal(
64 code = "exception",
65 diagnostic = "Elasticsearch server responded with an error: '{arg0}'"
66 )]
67 ElasticSearchResponseError(u16),
68 NotConnected,
69}
70
71#[derive(OperationOutcomeError, Debug)]
72pub enum SearchConfigError {
73 #[fatal(code = "exception", diagnostic = "Failed to parse URL: '{arg0}'.")]
74 UrlParseError(String),
75 #[fatal(
76 code = "exception",
77 diagnostic = "Elasticsearch client creation failed."
78 )]
79 ElasticSearchConfigError(#[from] BuildError),
80 #[fatal(
81 code = "exception",
82 diagnostic = "Unsupported FHIR version for index: '{arg0}'"
83 )]
84 UnsupportedIndex(SupportedFHIRVersions),
85}
86
87#[derive(Clone)]
88pub struct ElasticSearchEngine<SearchParameterResolver: SearchParameterResolve + 'static> {
89 parameter_resolver: Arc<SearchParameterResolver>,
90 fp_engine: Arc<FPEngine>,
91 client: Arc<Elasticsearch>,
92}
93
94pub fn create_es_client(
95 url: &str,
96 username: String,
97 password: String,
98) -> Result<Arc<Elasticsearch>, SearchConfigError> {
99 let url = Url::parse(url).map_err(|_e| SearchConfigError::UrlParseError(url.to_string()))?;
100 let conn_pool = SingleNodeConnectionPool::new(url);
101 let transport = TransportBuilder::new(conn_pool)
102 .cert_validation(CertificateValidation::None)
103 .auth(Credentials::Basic(username, password))
104 .build()?;
105
106 let elasticsearch_client = Elasticsearch::new(transport);
107
108 Ok(Arc::new(elasticsearch_client))
109}
110
111impl<SearchParameterResolver: SearchParameterResolve + 'static>
112 ElasticSearchEngine<SearchParameterResolver>
113{
114 pub fn new(
115 parameter_resolver: Arc<SearchParameterResolver>,
116 fp_engine: Arc<FPEngine>,
117 es_client: Arc<Elasticsearch>,
118 ) -> Self {
119 ElasticSearchEngine {
120 parameter_resolver,
121 fp_engine,
122 client: es_client,
123 }
124 }
125
126 pub async fn is_connected(&self) -> Result<(), SearchError> {
127 let res = self.client.ping().send().await.map_err(SearchError::from)?;
128
129 if res.status_code().is_success() {
130 Ok(())
131 } else {
132 Err(SearchError::NotConnected)
133 }
134 }
135}
136
137async fn resource_to_elastic_index(
138 fp_engine: Arc<FPEngine>,
139 parameters: &Vec<ResolvedParameter>,
140 resource: &Resource,
141) -> Result<HashMap<String, InsertableIndex>, OperationOutcomeError> {
142 let mut map = HashMap::new();
143 let mut dynamic_parameters = HashMap::new();
144 for param in parameters.iter() {
145 if let Some(expression) = param
146 .search_parameter
147 .expression
148 .as_ref()
149 .and_then(|e| e.value.as_ref())
150 && let Some(url) = param.search_parameter.url.value.as_ref()
151 {
152 let result = fp_engine
153 .evaluate(expression, vec![resource])
154 .await
155 .map_err(SearchError::from);
156
157 if let Err(err) = result {
158 tracing::error!(
159 "Failed to evaluate FHIRPath expression: '{}' for resource.",
160 expression,
161 );
162
163 return Err(SearchError::from(err).into());
164 }
165
166 let result_vec = indexing_conversion::to_insertable_index(
167 param,
168 result?.iter().collect::<Vec<_>>(),
169 )?;
170
171 match param.level {
172 ParameterLevel::System => {
173 map.insert(url.clone(), result_vec);
174 }
175 ParameterLevel::Project => {
177 dynamic_parameters.insert(url.clone(), result_vec);
178 }
179 }
180 }
181 }
182
183 map.insert(
185 DYNAMIC_PARAMETER_INDEX_FIELD.to_string(),
186 InsertableIndex::DynamicParameters(dynamic_parameters),
187 );
188
189 Ok(map)
190}
191
192static R4_FHIR_INDEX: &str = "r4_search_index";
193
194pub fn get_index_name(
195 fhir_version: &SupportedFHIRVersions,
196) -> Result<&'static str, SearchConfigError> {
197 match fhir_version {
198 SupportedFHIRVersions::R4 => Ok(R4_FHIR_INDEX),
199 }
201}
202
203#[derive(serde::Deserialize, Debug)]
204struct ElasticSearchHitResult {
205 _index: String,
206 _id: String,
207 _score: Option<f64>,
208 fields: SearchEntryPrivate,
209}
210
211#[derive(serde::Deserialize, Debug)]
212struct ElasticSearchHitTotalMeta {
213 value: i64,
214 }
216
217#[derive(serde::Deserialize, Debug)]
218struct ElasticSearchHit {
219 total: Option<ElasticSearchHitTotalMeta>,
220 hits: Vec<ElasticSearchHitResult>,
221}
222
223#[derive(serde::Deserialize, Debug)]
224struct ElasticSearchResponse {
225 hits: ElasticSearchHit,
226}
227
228fn unique_index_id(
229 tenant: &TenantId,
230 project: &ProjectId,
231 resource_type: &ResourceType,
232 id: &ResourceId,
233) -> String {
234 let unique_index_id = format!(
235 "{}/{}/{}/{}",
236 tenant.as_ref(),
237 project.as_ref(),
238 resource_type.as_ref(),
239 id.as_ref()
240 );
241
242 unique_index_id
243}
244
245impl<SearchParameterResolver: SearchParameterResolve + 'static> SearchEngine
246 for ElasticSearchEngine<SearchParameterResolver>
247{
248 async fn search(
249 &self,
250 fhir_version: &SupportedFHIRVersions,
251 tenant: &TenantId,
252 project: &ProjectId,
253 search_request: &SearchRequest,
254 options: Option<SearchOptions>,
255 ) -> Result<SearchReturn, haste_fhir_operation_error::OperationOutcomeError> {
256 search::execute_search(
257 self.client.clone(),
258 self.parameter_resolver.clone(),
259 fhir_version,
260 tenant,
261 project,
262 search_request,
263 &options,
264 )
265 .await
266 }
267
268 fn index(
269 &self,
270 fhir_version: SupportedFHIRVersions,
271 resources: Vec<IndexResource>,
272 ) -> impl Future<Output = Result<SuccessfullyIndexedCount, OperationOutcomeError>> + Send {
273 async move {
274 let mut tasks = Vec::with_capacity(resources.len());
277 let resources_total = resources.len();
278 let search_index_name = get_index_name(&fhir_version)?;
279
280 for r in resources.into_iter().filter(|r| match r.fhir_method {
281 FHIRMethod::Create | FHIRMethod::Update | FHIRMethod::Delete => true,
282 _ => false,
283 }) {
284 let engine = self.fp_engine.clone();
285 let parameter_resolver = self.parameter_resolver.clone();
286 tasks.push(tokio::spawn(async move {
287 match &r.fhir_method {
288 FHIRMethod::Create | FHIRMethod::Update => {
289 let index_id =
292 unique_index_id(&r.tenant, &r.project, &r.resource_type, &r.id);
293 let params = parameter_resolver
294 .by_resource_type(&r.tenant, &r.project, &r.resource_type)
295 .await?;
296
297 let mut elastic_index =
298 resource_to_elastic_index(engine, ¶ms, &r.resource).await?;
299
300 elastic_index.insert(
301 "resource_type".to_string(),
302 InsertableIndex::Meta(r.resource_type.as_ref().to_string()),
303 );
304
305 elastic_index.insert(
306 "id".to_string(),
307 InsertableIndex::Meta(r.id.as_ref().to_string()),
308 );
309
310 elastic_index.insert(
311 "version_id".to_string(),
312 InsertableIndex::Meta(r.version_id.as_ref().to_string()),
313 );
314 elastic_index.insert(
315 "project".to_string(),
316 InsertableIndex::Meta(r.project.as_ref().to_string()),
317 );
318 elastic_index.insert(
319 "tenant".to_string(),
320 InsertableIndex::Meta(r.tenant.as_ref().to_string()),
321 );
322
323 Ok(BulkOperation::index(elastic_index)
324 .id(index_id)
325 .index(search_index_name)
326 .into())
327 }
328 FHIRMethod::Delete => Ok(BulkOperation::delete(unique_index_id(
329 &r.tenant,
330 &r.project,
331 &r.resource_type,
332 &r.id,
333 ))
334 .index(search_index_name)
335 .into()),
336 method => Err(SearchError::UnsupportedFHIRMethod((*method).clone()))
337 .map_err(OperationOutcomeError::from),
338 }
339 }));
340 }
341
342 let client = self.client.clone();
343
344 let mut bulk_ops: Vec<BulkOperation<HashMap<String, InsertableIndex>>> =
345 Vec::with_capacity(resources_total);
346
347 for task in tasks {
348 let res = task.await.map_err(|e| {
349 OperationOutcomeError::fatal(IssueType::Exception(None), e.to_string())
350 })??;
351 bulk_ops.push(res);
352 }
353
354 if !bulk_ops.is_empty() {
355 let res = client
356 .bulk(BulkParts::Index(search_index_name))
357 .body(bulk_ops)
358 .send()
359 .await
360 .map_err(SearchError::from)?;
361
362 let response_body = res.json::<serde_json::Value>().await.map_err(|_e| {
363 OperationOutcomeError::fatal(
364 IssueType::Exception(None),
365 "Failed to parse response body.".to_string(),
366 )
367 })?;
368
369 if response_body["errors"].as_bool().unwrap() == true {
370 tracing::error!("Failed to index resources. Response: '{:?}'", response_body);
371 return Err(SearchError::Fatal(500).into());
372 }
373 Ok(SuccessfullyIndexedCount(
374 response_body["items"].as_array().unwrap().len(),
375 ))
376 } else {
377 Ok(SuccessfullyIndexedCount(0))
378 }
379 }
380 }
381
382 async fn migrate(
383 &self,
384 _fhir_version: &SupportedFHIRVersions,
385 ) -> Result<(), haste_fhir_operation_error::OperationOutcomeError> {
386 migration::create_mapping(
387 self.parameter_resolver.clone(),
388 &self.client,
389 get_index_name(_fhir_version)?,
390 )
391 .await?;
392 Ok(())
393 }
394}