Skip to content
43 changes: 37 additions & 6 deletions src/flow_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct FlowUpdateService {
flow_types: Vec<FlowType>,
channel: Channel,
aquila_token: String,
definition_source: Option<String>,
}

impl FlowUpdateService {
Expand Down Expand Up @@ -64,9 +65,15 @@ impl FlowUpdateService {
flow_types,
channel,
aquila_token,
definition_source: None,
}
}

pub fn with_definition_source(mut self, source: String) -> Self {
self.definition_source = Some(source);
self
}

pub fn with_flow_types(mut self, flow_types: Vec<FlowType>) -> Self {
self.flow_types = flow_types;
self
Expand All @@ -90,24 +97,30 @@ impl FlowUpdateService {
self
}

pub async fn send(&self) {
pub async fn send(&mut self) {
let _ = self.send_with_status().await;
}

pub async fn send_with_status(&self) -> bool {
pub async fn send_with_status(&mut self) -> bool {
let data_types_success = self.update_data_types().await;
Comment thread
raphael-goetz marked this conversation as resolved.
let runtime_functions_success = self.update_runtime_functions().await;
let functions_success = self.update_functions().await;
let flow_types_success = self.update_flow_types().await;
data_types_success && runtime_functions_success && functions_success && flow_types_success
}

async fn update_data_types(&self) -> bool {
async fn update_data_types(&mut self) -> bool {
if self.data_types.is_empty() {
log::info!("No DataTypes present.");
return true;
}

if let Some(source) = &self.definition_source {
for data_type in self.data_types.iter_mut() {
data_type.definition_source = source.to_string();
}
}

log::info!("Updating {} DataTypes.", self.data_types.len());
let mut client = DataTypeServiceClient::new(self.channel.clone());
let request = Request::from_parts(
Expand Down Expand Up @@ -135,12 +148,18 @@ impl FlowUpdateService {
}
}

async fn update_functions(&self) -> bool {
async fn update_functions(&mut self) -> bool {
if self.functions.is_empty() {
log::info!("No FunctionDefinitions present.");
return true;
}

if let Some(source) = &self.definition_source {
for function in self.functions.iter_mut() {
function.definition_source = source.to_string();
}
};
Comment thread
raphael-goetz marked this conversation as resolved.
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There’s an unnecessary trailing semicolon after the if let Some(source) = ... block. It doesn’t change behavior, but it’s inconsistent with the other update methods and can look like an accidental empty statement; removing it would improve readability.

Suggested change
};
}

Copilot uses AI. Check for mistakes.

log::info!("Updating {} FunctionDefinitions.", self.functions.len());
let mut client = FunctionDefinitionServiceClient::new(self.channel.clone());
let request = Request::from_parts(
Expand All @@ -167,12 +186,18 @@ impl FlowUpdateService {
}
}

async fn update_runtime_functions(&self) -> bool {
async fn update_runtime_functions(&mut self) -> bool {
if self.runtime_functions.is_empty() {
log::info!("No RuntimeFunctionDefinitions present.");
return true;
}

if let Some(source) = &self.definition_source {
for runtime_function in self.runtime_functions.iter_mut() {
runtime_function.definition_source = source.to_string();
}
}

log::info!(
"Updating {} RuntimeFunctionDefinitions.",
self.runtime_functions.len()
Expand Down Expand Up @@ -202,12 +227,18 @@ impl FlowUpdateService {
}
}

async fn update_flow_types(&self) -> bool {
async fn update_flow_types(&mut self) -> bool {
if self.flow_types.is_empty() {
log::info!("No FlowTypes present.");
return true;
}

if let Some(source) = &self.definition_source {
for flow_type in self.flow_types.iter_mut() {
flow_type.definition_source = Some(source.to_string());
}
}

log::info!("Updating {} FlowTypes.", self.flow_types.len());
let mut client = FlowTypeServiceClient::new(self.channel.clone());
let request = Request::from_parts(
Expand Down