Skip to main content

haste_server/
services.rs

1use crate::{
2    ServerEnvironmentVariables,
3    fhir_client::{FHIRServerClient, ServerClientConfig},
4};
5use haste_config::Config;
6use haste_fhir_model::r4::generated::terminology::IssueType;
7use haste_fhir_operation_error::{OperationOutcomeError, derive::OperationOutcomeError};
8use haste_fhir_search::elastic_search::search_parameter_resolver::ElasticSearchParameterResolver;
9use haste_fhir_search::{
10    SearchEngine,
11    elastic_search::{ElasticSearchEngine, create_es_client},
12};
13use haste_fhir_terminology::{FHIRTerminology, client::FHIRCanonicalTerminology};
14use haste_fhirpath::FPEngine;
15use haste_repository::{Repository, pg::PGConnection};
16use sqlx::{Pool, Postgres};
17use sqlx_postgres::PgPoolOptions;
18use std::{env::VarError, sync::Arc};
19use tokio::sync::OnceCell;
20use tracing::info;
21
22// Singleton for the database connection pool in postgres.
23static POOL: OnceCell<Pool<Postgres>> = OnceCell::const_new();
24pub async fn get_pool(config: &dyn Config<ServerEnvironmentVariables>) -> &'static Pool<Postgres> {
25    POOL.get_or_init(async || {
26        let database_url = config
27            .get(ServerEnvironmentVariables::DataBaseURL)
28            .expect(&format!(
29                "'{}' must be set",
30                String::from(ServerEnvironmentVariables::DataBaseURL)
31            ));
32        info!("Connecting to postgres database");
33        let connection = PgPoolOptions::new()
34            .max_connections(
35                config
36                    .get(ServerEnvironmentVariables::PGMaxConnections)
37                    .map(|s| s.parse::<u32>().unwrap_or(20))
38                    .unwrap_or(20),
39            )
40            .connect(&database_url)
41            .await
42            .expect("Failed to create database connection pool");
43        connection
44    })
45    .await
46}
47
48#[derive(OperationOutcomeError, Debug)]
49pub enum ConfigError {
50    #[error(code = "invalid", diagnostic = "Invalid environment!")]
51    DotEnv(#[from] dotenvy::Error),
52    #[error(code = "invalid", diagnostic = "Invalid session!")]
53    Session(#[from] tower_sessions::session::Error),
54    #[error(code = "invalid", diagnostic = "Database error")]
55    Database(#[from] sqlx::Error),
56    #[error(code = "invalid", diagnostic = "Environment variable not set {arg0}")]
57    EnvironmentVariable(#[from] VarError),
58    #[error(code = "invalid", diagnostic = "Failed to render template.")]
59    TemplateRender,
60}
61
62#[derive(OperationOutcomeError, Debug)]
63pub enum CustomOpError {
64    #[error(code = "invalid", diagnostic = "FHIRPath error")]
65    FHIRPath(#[from] haste_fhirpath::FHIRPathError),
66    #[error(code = "invalid", diagnostic = "Failed to deserialize resource")]
67    Deserialize(#[from] serde_json::Error),
68    #[error(code = "invalid", diagnostic = "Internal server error")]
69    InternalServerError,
70}
71
72pub struct AppState<
73    Repo: Repository + Send + Sync + 'static,
74    Search: SearchEngine + Send + Sync + 'static,
75    Terminology: FHIRTerminology + Send + Sync + 'static,
76> {
77    pub terminology: Arc<Terminology>,
78    pub search: Arc<Search>,
79    pub repo: Arc<Repo>,
80    pub rate_limit: Arc<dyn haste_rate_limit::RateLimit>,
81    pub fhir_client: Arc<FHIRServerClient<Repo, Search, Terminology>>,
82    pub config: Arc<dyn Config<ServerEnvironmentVariables>>,
83}
84
85impl<
86    Repo: Repository + Send + Sync + 'static,
87    Search: SearchEngine + Send + Sync + 'static,
88    Terminology: FHIRTerminology + Send + Sync + 'static,
89> AppState<Repo, Search, Terminology>
90{
91    pub async fn transaction(&self) -> Result<Self, OperationOutcomeError> {
92        self.repo.transaction(true).await.map(|tx_repo| {
93            let tx_repo = Arc::new(tx_repo);
94            AppState {
95                terminology: self.terminology.clone(),
96                search: self.search.clone(),
97                repo: tx_repo.clone(),
98                rate_limit: self.rate_limit.clone(),
99                fhir_client: Arc::new(FHIRServerClient::new(ServerClientConfig::new(
100                    tx_repo,
101                    self.search.clone(),
102                    self.terminology.clone(),
103                    self.config.clone(),
104                ))),
105                config: self.config.clone(),
106            }
107        })
108    }
109    pub async fn commit(self) -> Result<(), OperationOutcomeError> {
110        let repo = self.repo.clone();
111        drop(self);
112
113        Arc::try_unwrap(repo)
114            .map_err(|_e| {
115                OperationOutcomeError::fatal(
116                    IssueType::Exception(None),
117                    "Failed to unwrap transaction client".to_string(),
118                )
119            })?
120            .commit()
121            .await?;
122
123        Ok(())
124    }
125}
126
127pub async fn create_services(
128    config: Arc<dyn Config<ServerEnvironmentVariables>>,
129) -> Result<
130    Arc<
131        AppState<
132            PGConnection,
133            ElasticSearchEngine<ElasticSearchParameterResolver<PGConnection>>,
134            FHIRCanonicalTerminology,
135        >,
136    >,
137    OperationOutcomeError,
138> {
139    let pool = Arc::new(PGConnection::pool(get_pool(config.as_ref()).await.clone()));
140    let es_client = create_es_client(
141        &config
142            .get(ServerEnvironmentVariables::ElasticSearchURL)
143            .expect(&format!(
144                "'{}' variable not set",
145                String::from(ServerEnvironmentVariables::ElasticSearchURL)
146            )),
147        config
148            .get(ServerEnvironmentVariables::ElasticSearchUsername)
149            .expect(&format!(
150                "'{}' variable not set",
151                String::from(ServerEnvironmentVariables::ElasticSearchUsername)
152            )),
153        config
154            .get(ServerEnvironmentVariables::ElasticSearchPassword)
155            .expect(&format!(
156                "'{}' variable not set",
157                String::from(ServerEnvironmentVariables::ElasticSearchPassword)
158            )),
159    )
160    .expect("Failed to create Elasticsearch client");
161
162    let search_engine = Arc::new(haste_fhir_search::elastic_search::ElasticSearchEngine::new(
163        Arc::new(ElasticSearchParameterResolver::new(
164            es_client.clone(),
165            pool.clone(),
166        )),
167        Arc::new(FPEngine::new()),
168        es_client,
169    ));
170
171    let terminology = Arc::new(FHIRCanonicalTerminology::new());
172
173    let can_mutate: String = config
174        .get(ServerEnvironmentVariables::AllowArtifactMutations)
175        .unwrap_or("false".into());
176
177    let fhir_client = Arc::new(FHIRServerClient::new(if can_mutate == "true" {
178        ServerClientConfig::allow_mutate_artifacts(
179            pool.clone(),
180            search_engine.clone(),
181            terminology.clone(),
182            config.clone(),
183        )
184    } else {
185        ServerClientConfig::new(
186            pool.clone(),
187            search_engine.clone(),
188            terminology.clone(),
189            config.clone(),
190        )
191    }));
192
193    let shared_state = Arc::new(AppState {
194        config,
195        rate_limit: pool.clone(),
196        repo: pool,
197        terminology: terminology,
198        search: search_engine,
199        fhir_client,
200    });
201
202    Ok(shared_state)
203}