Skip to main content

haste_fhir_search/elastic_search/
search_parameter_resolver.rs

1use elasticsearch::Elasticsearch;
2use haste_fhir_client::{
3    request::{FHIRSearchTypeRequest, SearchRequest},
4    url::ParsedParameters,
5};
6use haste_fhir_model::r4::generated::resources::{Resource, ResourceType};
7use haste_fhir_operation_error::OperationOutcomeError;
8use haste_jwt::{ProjectId, TenantId};
9use haste_repository::{Repository, fhir::CachePolicy};
10use moka::future::{Cache, CacheBuilder};
11use std::sync::{Arc, LazyLock};
12
13use crate::{
14    ResolvedParameter, SearchOptions, SearchParameterResolve,
15    elastic_search::search,
16    memory::{R4_SEARCH_PARAMETERS_INDEX, SearchParametersIndex, create_index_map},
17};
18
19#[derive(Clone)]
20pub struct ElasticSearchParameterResolver<Repo: Repository + Send + Sync> {
21    es: Arc<Elasticsearch>,
22    repo: Arc<Repo>,
23}
24
25static SEARCHPARAMETER_CACHE: LazyLock<Cache<(TenantId, ProjectId), Arc<SearchParametersIndex>>> =
26    LazyLock::new(|| {
27        CacheBuilder::new(50_000)
28            // Duration for 2 hour for search parameters.
29            .time_to_idle(std::time::Duration::from_secs(2 * 60 * 60))
30            .build()
31    });
32
33impl<Repo: Repository + Send + Sync> ElasticSearchParameterResolver<Repo> {
34    pub fn new(es: Arc<Elasticsearch>, repo: Arc<Repo>) -> Self {
35        ElasticSearchParameterResolver { es, repo }
36    }
37}
38
39async fn create_project_sp_index<Repo: Repository + Send + Sync>(
40    es: Arc<Elasticsearch>,
41    repo: &Repo,
42    tenant: &TenantId,
43    project: &ProjectId,
44) -> Result<SearchParametersIndex, OperationOutcomeError> {
45    let result = search::execute_search(
46        es,
47        R4_SEARCH_PARAMETERS_INDEX.clone(),
48        &haste_repository::types::SupportedFHIRVersions::R4,
49        tenant,
50        project,
51        &SearchRequest::Type(FHIRSearchTypeRequest {
52            resource_type: ResourceType::SearchParameter,
53            parameters: ParsedParameters::new(vec![]),
54        }),
55        &Some(SearchOptions { count_limit: false }),
56    )
57    .await?;
58
59    let version_ids = result
60        .entries
61        .iter()
62        .map(|r| &r.version_id)
63        .collect::<Vec<_>>();
64
65    let project_sps = repo
66        .read_by_version_ids(tenant, project, &version_ids, CachePolicy::Cache)
67        .await?
68        .into_iter()
69        .filter_map(|r| match r {
70            Resource::SearchParameter(sp) => Some(sp),
71            _ => None,
72        })
73        .collect::<Vec<_>>();
74
75    Ok(create_index_map(
76        &crate::ParameterLevel::Project,
77        project_sps,
78    ))
79}
80
81async fn get_or_create_sp_index_for_project<Repo: Repository + Send + Sync>(
82    es: Arc<Elasticsearch>,
83    repo: &Repo,
84    tenant: TenantId,
85    project: ProjectId,
86) -> Result<Option<Arc<SearchParametersIndex>>, OperationOutcomeError> {
87    match (&tenant, &project) {
88        (TenantId::System, ProjectId::System) => Ok(None),
89        _ => {
90            let index_key = (tenant, project);
91            if let Some(index) = SEARCHPARAMETER_CACHE.get(&index_key).await {
92                Ok(Some(index))
93            } else {
94                let index =
95                    Arc::new(create_project_sp_index(es, repo, &index_key.0, &index_key.1).await?);
96                SEARCHPARAMETER_CACHE.insert(index_key, index.clone()).await;
97
98                Ok(Some(index))
99            }
100        }
101    }
102}
103
104impl<Repo: Repository + Send + Sync> SearchParameterResolve
105    for ElasticSearchParameterResolver<Repo>
106{
107    async fn by_resource_type(
108        &self,
109        tenant: &haste_jwt::TenantId,
110        project: &haste_jwt::ProjectId,
111        resource_type: &haste_fhir_model::r4::generated::resources::ResourceType,
112    ) -> Result<Vec<ResolvedParameter>, OperationOutcomeError> {
113        let mut sps_by_resource_type = R4_SEARCH_PARAMETERS_INDEX
114            .by_resource_type(tenant, project, resource_type)
115            .await?;
116
117        if let Some(project_index) = get_or_create_sp_index_for_project(
118            self.es.clone(),
119            self.repo.as_ref(),
120            tenant.clone(),
121            project.clone(),
122        )
123        .await?
124        {
125            let project_sps = project_index
126                .by_resource_type(tenant, project, resource_type)
127                .await?;
128
129            sps_by_resource_type.extend(project_sps);
130        }
131
132        Ok(sps_by_resource_type)
133    }
134
135    async fn by_name(
136        &self,
137        tenant: &haste_jwt::TenantId,
138        project: &haste_jwt::ProjectId,
139        resource_type: Option<&haste_fhir_model::r4::generated::resources::ResourceType>,
140        code: &str,
141    ) -> Result<Option<ResolvedParameter>, OperationOutcomeError> {
142        if let Some(parameter) = R4_SEARCH_PARAMETERS_INDEX
143            .by_name(tenant, project, resource_type, code)
144            .await?
145        {
146            Ok(Some(parameter))
147        } else if let Some(project_index) = get_or_create_sp_index_for_project(
148            self.es.clone(),
149            self.repo.as_ref(),
150            tenant.clone(),
151            project.clone(),
152        )
153        .await?
154        {
155            project_index
156                .by_name(tenant, project, resource_type, code)
157                .await
158        } else {
159            Ok(None)
160        }
161    }
162
163    async fn all(
164        &self,
165        tenant: &haste_jwt::TenantId,
166        project: &haste_jwt::ProjectId,
167    ) -> Result<Vec<ResolvedParameter>, OperationOutcomeError> {
168        let mut all_sps = R4_SEARCH_PARAMETERS_INDEX.all(tenant, project).await?;
169
170        if let Some(project_index) = get_or_create_sp_index_for_project(
171            self.es.clone(),
172            self.repo.as_ref(),
173            tenant.clone(),
174            project.clone(),
175        )
176        .await?
177        {
178            all_sps.extend(project_index.all(tenant, project).await?);
179        }
180
181        Ok(all_sps)
182    }
183}