1use crate::indexing_lock::IndexLockProvider;
2use haste_config::get_config;
3use haste_fhir_model::r4::generated::resources::ResourceTypeError;
4use haste_fhir_operation_error::{OperationOutcomeError, derive::OperationOutcomeError};
5use haste_fhir_search::{IndexResource, SearchEngine, elastic_search::ElasticSearchEngine};
6use haste_fhirpath::FHIRPathError;
7use haste_jwt::{TenantId, VersionId};
8use haste_repository::{fhir::FHIRRepository, types::SupportedFHIRVersions};
9use sqlx::{Pool, Postgres, query_as, types::time::OffsetDateTime};
10use std::sync::Arc;
11use tokio::sync::Mutex;
12
13mod indexing_lock;
14
15#[derive(OperationOutcomeError, Debug)]
16pub enum IndexingWorkerError {
17 #[fatal(code = "exception", diagnostic = "Database error: '{arg0}'")]
18 DatabaseConnectionError(#[from] sqlx::Error),
19 #[fatal(code = "exception", diagnostic = "Lock error: '{arg0}'")]
20 OperationError(#[from] OperationOutcomeError),
21 #[fatal(code = "exception", diagnostic = "Elasticsearch error: '{arg0}'")]
22 ElasticsearchError(#[from] elasticsearch::Error),
23 #[fatal(code = "exception", diagnostic = "FHIRPath error: '{arg0}'")]
24 FHIRPathError(#[from] FHIRPathError),
25 #[fatal(
26 code = "exception",
27 diagnostic = "Missing search parameters for resource: '{arg0}'"
28 )]
29 MissingSearchParameters(String),
30 #[fatal(
31 code = "exception",
32 diagnostic = "Fatal error occurred during indexing"
33 )]
34 Fatal,
35 #[fatal(
36 code = "exception",
37 diagnostic = "Artifact error: Invalid resource type '{arg0}'"
38 )]
39 ResourceTypeError(#[from] ResourceTypeError),
40}
41
42struct TenantReturn {
43 id: TenantId,
44 created_at: OffsetDateTime,
45}
46
47async fn get_tenants(
48 client: &Pool<Postgres>,
49 cursor: &OffsetDateTime,
50 count: usize,
51) -> Result<Vec<TenantReturn>, OperationOutcomeError> {
52 let result = query_as!(
53 TenantReturn,
54 r#"SELECT id as "id: TenantId", created_at FROM tenants WHERE created_at > $1 ORDER BY created_at DESC LIMIT $2"#,
55 cursor,
56 count as i64
57 )
58 .fetch_all(client)
59 .await
60 .map_err(IndexingWorkerError::from)?;
61
62 Ok(result)
63}
64
65static TOTAL_INDEXED: std::sync::LazyLock<Mutex<usize>> =
66 std::sync::LazyLock::new(|| Mutex::new(0));
67
68async fn index_tenant_next_sequence<
69 Repo: FHIRRepository + IndexLockProvider,
70 Engine: SearchEngine,
71>(
72 search_client: Arc<Engine>,
73 tx: &Repo,
74 tenant_id: &TenantId,
75) -> Result<(), IndexingWorkerError> {
76 let start = std::time::Instant::now();
77 let tenant_locks = tx.get_available_locks(vec![tenant_id]).await?;
78
79 if tenant_locks.is_empty() {
80 return Ok(());
81 }
82
83 let resources = tx
84 .get_sequence(
85 tenant_id,
86 tenant_locks[0].index_sequence_position as u64,
87 Some(1000),
88 )
89 .await?;
90
91 let resources_total = resources.len();
92 let start_sequence = resources.first().map(|r| r.sequence);
93 let last_value = resources.last().cloned();
94
95 if !resources.is_empty() {
97 let result = search_client
98 .index(
99 SupportedFHIRVersions::R4,
100 resources
101 .into_iter()
102 .map(|r| IndexResource {
103 tenant: r.tenant,
104 id: r.id,
105 version_id: VersionId::new(r.version_id),
106 project: r.project,
107 fhir_method: r.fhir_method,
108 resource_type: r.resource_type,
109 resource: r.resource.0,
110 })
111 .collect(),
112 )
113 .await?;
114
115 if result.0 != resources_total {
116 tracing::error!(
117 "Indexed resource count '{}' does not match retrieved resource count '{}'",
118 result.0,
119 resources_total
120 );
121 return Err(IndexingWorkerError::Fatal);
122 }
123
124 if let Some(resource) = last_value {
125 let diff = (resource.sequence + 1) - start_sequence.unwrap_or(0);
126 let total = resources_total;
127
128 if total != diff as usize {
129 tracing::event!(
130 tracing::Level::INFO,
131 first_seq = start_sequence.unwrap_or(0),
133 last_seq = resource.sequence,
134 total = resources_total,
135 diff = (resource.sequence + 1) - start_sequence.unwrap_or(0)
136 );
137 }
138
139 tx.update_lock(tenant_id.as_ref(), resource.sequence as usize)
140 .await?;
141
142 let elapsed = start.elapsed();
143 tracing::info!(
144 "Indexed {} resources for tenant '{}' in {:.2?} (up to sequence {})",
145 result.0,
146 tenant_id.as_ref(),
147 elapsed,
148 resource.sequence
149 );
150 }
151
152 *(TOTAL_INDEXED.lock().await) += result.0;
153 }
154
155 Ok(())
156}
157
158async fn index_for_tenant<Search: SearchEngine, Repository: FHIRRepository + IndexLockProvider>(
159 repo: Arc<Repository>,
160 search_client: Arc<Search>,
161 tenant_id: &TenantId,
162) -> Result<(), IndexingWorkerError> {
163 let search_client = search_client.clone();
164
165 let tx = repo.transaction(false).await.unwrap();
166
167 let res = index_tenant_next_sequence(search_client, &tx, &tenant_id).await;
168
169 match res {
170 Ok(res) => {
171 tx.commit().await?;
172 Ok(res)
173 }
174 Err(e) => {
175 tx.rollback().await?;
176 Err(e)
177 }
178 }
179}
180
181pub enum IndexingWorkerEnvironmentVariables {
182 DatabaseURL,
183 ElasticSearchURL,
184 ElasticSearchUsername,
185 ElasticSearchPassword,
186}
187
188impl From<IndexingWorkerEnvironmentVariables> for String {
189 fn from(value: IndexingWorkerEnvironmentVariables) -> Self {
190 match value {
191 IndexingWorkerEnvironmentVariables::DatabaseURL => "DATABASE_URL".to_string(),
192 IndexingWorkerEnvironmentVariables::ElasticSearchURL => "ELASTICSEARCH_URL".to_string(),
193 IndexingWorkerEnvironmentVariables::ElasticSearchUsername => {
194 "ELASTICSEARCH_USERNAME".to_string()
195 }
196 IndexingWorkerEnvironmentVariables::ElasticSearchPassword => {
197 "ELASTICSEARCH_PASSWORD".to_string()
198 }
199 }
200 }
201}
202
203pub async fn run_worker() -> Result<(), OperationOutcomeError> {
204 let config = get_config::<IndexingWorkerEnvironmentVariables>("environment".into());
205 let fp_engine = Arc::new(haste_fhirpath::FPEngine::new());
206
207 let pg_pool = sqlx::PgPool::connect(
208 &config
209 .get(IndexingWorkerEnvironmentVariables::DatabaseURL)
210 .unwrap(),
211 )
212 .await
213 .expect("Failed to connect to the database");
214 let repo = Arc::new(haste_repository::pg::PGConnection::pool(pg_pool.clone()));
215 let search_engine = Arc::new(
216 ElasticSearchEngine::new(
217 fp_engine.clone(),
218 &config
219 .get(IndexingWorkerEnvironmentVariables::ElasticSearchURL)
220 .expect(&format!(
221 "'{}' variable not set",
222 String::from(IndexingWorkerEnvironmentVariables::ElasticSearchURL)
223 )),
224 config
225 .get(IndexingWorkerEnvironmentVariables::ElasticSearchUsername)
226 .expect(&format!(
227 "'{}' variable not set",
228 String::from(IndexingWorkerEnvironmentVariables::ElasticSearchUsername)
229 )),
230 config
231 .get(IndexingWorkerEnvironmentVariables::ElasticSearchPassword)
232 .expect(&format!(
233 "'{}' variable not set",
234 String::from(IndexingWorkerEnvironmentVariables::ElasticSearchPassword)
235 )),
236 )
237 .expect("Failed to create Elasticsearch client"),
238 );
239
240 let mut attempts = 0;
241 while !search_engine.is_connected().await.is_ok() && attempts < 5 {
242 tracing::error!("Elasticsearch is not connected, retrying in 5 seconds...");
243 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
244 attempts += 1;
245 }
246
247 let mut cursor = OffsetDateTime::UNIX_EPOCH;
248 let tenants_limit: usize = 100;
249
250 tracing::info!("Starting indexing worker...");
251
252 let mut k = *TOTAL_INDEXED.lock().await;
253
254 loop {
255 let tenants_to_check = get_tenants(&pg_pool, &cursor, tenants_limit).await;
256 if let Ok(tenants_to_check) = tenants_to_check {
257 if tenants_to_check.is_empty() || tenants_to_check.len() < tenants_limit {
258 cursor = OffsetDateTime::UNIX_EPOCH; } else {
260 cursor = tenants_to_check[0].created_at;
261 }
262
263 for tenant in tenants_to_check {
264 let result =
265 index_for_tenant(repo.clone(), search_engine.clone(), &tenant.id).await;
266
267 if let Err(_error) = result {
268 tracing::error!(
269 "Failed to index tenant: '{}' cause: '{:?}'",
270 &tenant.id,
271 _error
272 );
273 }
274 }
275 } else if let Err(error) = tenants_to_check {
276 tracing::error!("Failed to retrieve tenants: {:?}", error);
277 }
278
279 if k != *TOTAL_INDEXED.lock().await {
280 k = *TOTAL_INDEXED.lock().await;
281 tracing::info!("TOTAL INDEXED SO FAR: {}", k);
282 }
283 }
284}