Skip to main content

haste_fhir_search/elastic_search/
search_parameter_resolver.rs

1use elasticsearch::Elasticsearch;
2use haste_fhir_client::{
3    request::{FHIRSearchTypeRequest, SearchRequest},
4    url::{Parameter, ParsedParameter, ParsedParameters},
5};
6use haste_fhir_model::r4::generated::resources::{Resource, ResourceType};
7use haste_fhir_operation_error::OperationOutcomeError;
8use haste_jwt::{ProjectId, TenantId};
9use haste_repository::{Repository, fhir::CachePolicy};
10use moka::future::{Cache, CacheBuilder};
11use std::sync::{Arc, LazyLock};
12
13use crate::{
14    ResolvedParameter, SearchOptions, SearchParameterResolve,
15    elastic_search::search,
16    memory::{R4_SEARCH_PARAMETERS_INDEX, SearchParametersIndex, create_index_map},
17};
18
19#[derive(Clone)]
20pub struct ElasticSearchParameterResolver<Repo: Repository + Send + Sync> {
21    es: Arc<Elasticsearch>,
22    repo: Arc<Repo>,
23}
24
25static SEARCHPARAMETER_CACHE: LazyLock<Cache<(TenantId, ProjectId), Arc<SearchParametersIndex>>> =
26    LazyLock::new(|| {
27        CacheBuilder::new(50_000)
28            // Duration for 2 hour for search parameters.
29            .time_to_idle(std::time::Duration::from_secs(2 * 60 * 60))
30            .build()
31    });
32
33impl<Repo: Repository + Send + Sync> ElasticSearchParameterResolver<Repo> {
34    pub fn new(es: Arc<Elasticsearch>, repo: Arc<Repo>) -> Self {
35        ElasticSearchParameterResolver { es, repo }
36    }
37}
38
39async fn create_project_sp_index<Repo: Repository + Send + Sync>(
40    es: Arc<Elasticsearch>,
41    repo: &Repo,
42    tenant: &TenantId,
43    project: &ProjectId,
44) -> Result<SearchParametersIndex, OperationOutcomeError> {
45    let result = search::execute_search(
46        es,
47        R4_SEARCH_PARAMETERS_INDEX.clone(),
48        &haste_repository::types::SupportedFHIRVersions::R4,
49        tenant,
50        project,
51        &SearchRequest::Type(FHIRSearchTypeRequest {
52            resource_type: ResourceType::SearchParameter,
53            parameters: ParsedParameters::new(vec![ParsedParameter::Resource(Parameter {
54                name: "status".to_string(),
55                value: vec!["active".to_string()],
56                chains: None,
57                modifier: None,
58            })]),
59        }),
60        &Some(SearchOptions {
61            count_limit: Some(10_000),
62        }),
63    )
64    .await?;
65
66    let version_ids = result
67        .entries
68        .iter()
69        .map(|r| &r.version_id)
70        .collect::<Vec<_>>();
71
72    let project_sps = repo
73        .read_by_version_ids(tenant, project, &version_ids, CachePolicy::Cache)
74        .await?
75        .into_iter()
76        .filter_map(|r| match r {
77            Resource::SearchParameter(sp) => Some(sp),
78            _ => None,
79        })
80        .collect::<Vec<_>>();
81
82    Ok(create_index_map(
83        &crate::ParameterLevel::Project,
84        project_sps,
85    ))
86}
87
88async fn get_or_create_sp_index_for_project<Repo: Repository + Send + Sync>(
89    es: Arc<Elasticsearch>,
90    repo: &Repo,
91    tenant: TenantId,
92    project: ProjectId,
93) -> Result<Option<Arc<SearchParametersIndex>>, OperationOutcomeError> {
94    match (&tenant, &project) {
95        (TenantId::System, ProjectId::System) => Ok(None),
96        _ => {
97            let index_key = (tenant, project);
98            if let Some(index) = SEARCHPARAMETER_CACHE.get(&index_key).await {
99                Ok(Some(index))
100            } else {
101                let index =
102                    Arc::new(create_project_sp_index(es, repo, &index_key.0, &index_key.1).await?);
103                SEARCHPARAMETER_CACHE.insert(index_key, index.clone()).await;
104
105                Ok(Some(index))
106            }
107        }
108    }
109}
110
111impl<Repo: Repository + Send + Sync> SearchParameterResolve
112    for ElasticSearchParameterResolver<Repo>
113{
114    async fn by_resource_type(
115        &self,
116        tenant: &haste_jwt::TenantId,
117        project: &haste_jwt::ProjectId,
118        resource_type: &haste_fhir_model::r4::generated::resources::ResourceType,
119    ) -> Result<Vec<ResolvedParameter>, OperationOutcomeError> {
120        let mut sps_by_resource_type = R4_SEARCH_PARAMETERS_INDEX
121            .by_resource_type(tenant, project, resource_type)
122            .await?;
123
124        if let Some(project_index) = get_or_create_sp_index_for_project(
125            self.es.clone(),
126            self.repo.as_ref(),
127            tenant.clone(),
128            project.clone(),
129        )
130        .await?
131        {
132            let project_sps = project_index
133                .by_resource_type(tenant, project, resource_type)
134                .await?;
135
136            sps_by_resource_type.extend(project_sps);
137        }
138
139        Ok(sps_by_resource_type)
140    }
141
142    async fn by_name(
143        &self,
144        tenant: &haste_jwt::TenantId,
145        project: &haste_jwt::ProjectId,
146        resource_type: Option<&haste_fhir_model::r4::generated::resources::ResourceType>,
147        code: &str,
148    ) -> Result<Option<ResolvedParameter>, OperationOutcomeError> {
149        if let Some(parameter) = R4_SEARCH_PARAMETERS_INDEX
150            .by_name(tenant, project, resource_type, code)
151            .await?
152        {
153            Ok(Some(parameter))
154        } else if let Some(project_index) = get_or_create_sp_index_for_project(
155            self.es.clone(),
156            self.repo.as_ref(),
157            tenant.clone(),
158            project.clone(),
159        )
160        .await?
161        {
162            project_index
163                .by_name(tenant, project, resource_type, code)
164                .await
165        } else {
166            Ok(None)
167        }
168    }
169
170    async fn all(
171        &self,
172        tenant: &haste_jwt::TenantId,
173        project: &haste_jwt::ProjectId,
174    ) -> Result<Vec<ResolvedParameter>, OperationOutcomeError> {
175        let mut all_sps = R4_SEARCH_PARAMETERS_INDEX.all(tenant, project).await?;
176
177        if let Some(project_index) = get_or_create_sp_index_for_project(
178            self.es.clone(),
179            self.repo.as_ref(),
180            tenant.clone(),
181            project.clone(),
182        )
183        .await?
184        {
185            all_sps.extend(project_index.all(tenant, project).await?);
186        }
187
188        Ok(all_sps)
189    }
190}