Skip to main content

Data Flows


Module: Lens Version: 1.0.0-RELEASE Last Updated: October 25, 2025


Overview

This document describes the major data flows through the AWS Lens module, including:

  • 6 core request/response flows
  • Sequence diagrams for each flow
  • Step-by-step processing details
  • Performance metrics and optimization points

Flow Categories

  1. Synchronous Query Flows - Real-time cost data queries (80% of traffic)
  2. Asynchronous Processing Flows - Background jobs, cache warming
  3. Event-Driven Flows - Message queue processing
  4. Batch Processing Flows - Report generation, data aggregation
  5. External API Integration Flows - AWS SDK calls
  6. Cache Management Flows - Multi-level caching strategy

Flow 1: Cost Summary Query (Most Common)

Endpoint: GET /admin-pages/cost/summary Frequency: ~10,000 requests/day Avg Response Time: 1.2 seconds

Sequence Diagram

Step-by-Step Flow

Step 1: Request Reception

// AwsVsActualCostController.java:45
@GetMapping("/cost/summary")
public ResponseDto<`List<CostSummaryDTO>`> getAwsAndActualCostSummary(
@Valid GenericRequestDTO genericRequestDTO, BindingResult bindingResult) {
return new SuccessResponseDto<>(
awsVsActualCostService.getCostSummary(genericRequestDTO));
}
  • Controller receives request with query parameters
  • @Valid triggers JSR-303 validation on GenericRequestDTO
  • If validation fails, ErrorHandler returns 400 Bad Request

Step 2: Authentication & Authorization

// Interceptor executed via @Secured annotation
@Secured(key = "LENS_AWSVSACTUALCOSTCONTROLLER")
public class AwsVsActualCostController { ... }
  • authX interceptor validates JWT from Authorization: Bearer <token> header
  • Extracts customerId and permissions from JWT claims
  • Checks user has LENS_AWSVSACTUALCOSTCONTROLLER permission
  • Returns 403 Forbidden if unauthorized

Step 3: Service Layer Processing

// AwsVsActualCostServiceImpl.java:67
@Override
@Cacheable(value = "costSummary", key = "#request.cacheKey()", ttl = 900)
public `List<CostSummaryDTO>` getCostSummary(GenericRequestDTO request) {
`List<CostSummaryDTO>` costs = dao.queryCostSummary(request);
return calculateVarianceAndSavings(costs);
}
  • Check Redis cache using key: costSummary:CUST-123:2024-01-01:2024-01-31
  • If cache hit: return immediately (response time: ~50ms)
  • If cache miss: proceed to DAO layer

Step 4: Data Access Layer

// AwsVsActualCostDaoImpl.java:89
@Override
public `List<CostSummaryDTO>` queryCostSummary(GenericRequestDTO request) {
String sql = queryLoader.getQuery("query.cost.summary");
return snowflakeJdbcTemplate.query(sql, costSummaryRowMapper,
request.getAccountId(), request.getStartDate(), request.getEndDate());
}
  • Load SQL query from queries/cost/query.cost.summary.sql
  • Execute query against Snowflake with parameters
  • RowMapper converts ResultSet to DTOs

Step 5: Snowflake Query Execution

-- queries/cost/query.cost.summary.sql
SELECT
SERVICE,
SUM(CASE WHEN SOURCE='AWS' THEN COST ELSE 0 END) AS AWS_COST,
SUM(CASE WHEN SOURCE='ACTUAL' THEN COST ELSE 0 END) AS ACTUAL_COST,
(AWS_COST - ACTUAL_COST) AS VARIANCE,
(VARIANCE / AWS_COST * 100) AS VARIANCE_PERCENT,
GREATEST(0, VARIANCE) AS SAVINGS
FROM COST_DAILY
WHERE ACCOUNT_ID = ?
AND DATE BETWEEN ? AND ?
GROUP BY SERVICE
ORDER BY AWS_COST DESC
  • Query uses result cache (24-hour cache on identical queries)
  • Clustering on DATE column improves performance
  • Typical execution time: 800ms - 1.2s

Step 6: Response Construction

// Controller wraps in standardized response
return new SuccessResponseDto<>(costs);

// Resulting JSON:
{
"status": "SUCCESS",
"message": "Cost summary retrieved",
"data": [
{
"service": "EC2",
"awsCost": 15420.50,
"actualCost": 12336.40,
"variance": 3084.10,
"variancePercent": 20.0,
"savingsRealized": 3084.10
}
]
}

Performance Metrics

  • Cache Hit Response: 50-80ms
  • Cache Miss Response: 1,000-1,500ms
  • Cache Hit Ratio: 85%
  • Throughput: 100 req/sec (cache enabled), 15 req/sec (no cache)

Flow 2: RI Utilization Calculation

Endpoint: GET /admin-pages/ri/utilization Frequency: ~2,000 requests/day Avg Response Time: 2.5 seconds

Sequence Diagram

Key Processing Steps

Step 1: Parallel Data Fetching

// RiUtilizationServiceImpl.java:112
@Override
public RiUtilizationResponseDTO getRiUtilization(GenericRequestDTO request) {
// Parallel execution using CompletableFuture
`CompletableFuture<`List<RiUtilizationDTO>``> riDataFuture =
CompletableFuture.supplyAsync(() -> dao.queryRiUtilization(request));

`CompletableFuture<`List<RecommendationDTO>``> recommendationsFuture =
CompletableFuture.supplyAsync(() -> awsService.getRiRecommendations(request));

// Wait for both to complete
`List<RiUtilizationDTO>` riData = riDataFuture.join();
`List<RecommendationDTO>` recommendations = recommendationsFuture.join();

return combineResults(riData, recommendations);
}
  • Fetches RI usage data from Snowflake and AWS recommendations in parallel
  • Reduces total response time by ~40%

Step 2: Utilization Calculation

// RiUtilizationServiceImpl.java:134
private void calculateUtilization(RiUtilizationDTO ri) {
double utilizationPercent = (ri.getRiHoursUsed() / ri.getRiHoursPurchased()) * 100;
ri.setUtilizationPercent(utilizationPercent);

if (utilizationPercent < 80) {
int unusedHours = ri.getRiHoursPurchased() - ri.getRiHoursUsed();
double wastedCost = unusedHours * ri.getHourlyRate();
ri.setWastedCost(wastedCost);

// Generate alert
AlertDTO alert = new AlertDTO();
alert.setSeverity("MEDIUM");
alert.setMessage("RI utilization below 80%: " + wastedCost + " USD wasted");
ri.setAlert(alert);
}
}

Step 3: AWS API Rate Limiting

// AwsSdkServiceImpl.java:178
@RateLimiter(name = "awsApi", fallbackMethod = "fallbackGetRecommendations")
public `List<RecommendationDTO>` getRiRecommendations(GenericRequestDTO request) {
GetReservationPurchaseRecommendationRequest awsRequest =
new GetReservationPurchaseRecommendationRequest()
.withService("Amazon Elastic Compute Cloud - Compute")
.withLookbackPeriodInDays("THIRTY_DAYS");

GetReservationPurchaseRecommendationResult result =
costExplorerClient.getReservationPurchaseRecommendation(awsRequest);

return mapToRecommendationDTOs(result);
}
  • Rate limited to 5 req/sec (AWS limit)
  • Exponential backoff on throttling
  • Fallback returns cached recommendations

Performance Metrics

  • With Cache: 60-100ms
  • Without Cache (Snowflake only): 1,200ms
  • Without Cache (Snowflake + AWS): 2,500ms
  • Cache Hit Ratio: 92% (due to 1-hour TTL)

Flow 3: Cost Alert Generation (Event-Driven)

Trigger: RabbitMQ message or scheduled job Frequency: Every 15 minutes Processing Time: 5-10 seconds

Sequence Diagram

Key Processing Steps

Step 1: Message Consumption

// CostAlertListener.java:34
@RabbitListener(queues = "lens.cost.check")
public void handleCostCheckEvent(CostCheckEvent event) {
log.info("Processing cost alert check for all customers");
costAlertService.processCostAlerts();
}

Step 2: Alert Evaluation

// CostAlertServiceImpl.java:89
@Override
@Transactional(readOnly = true)
public void processCostAlerts() {
`List<CostAlertDTO>` activeAlerts = dao.getActiveAlerts();

for (CostAlertDTO alert : activeAlerts) {
Double currentCost = dao.queryCurrentCost(
alert.getCustomerId(),
alert.getAccountId(),
LocalDate.now().withDayOfMonth(1), // Start of month
LocalDate.now()
);

evaluateThresholds(alert, currentCost);
}
}

private void evaluateThresholds(CostAlertDTO alert, Double currentCost) {
for (ThresholdDTO threshold : alert.getThresholds()) {
if (currentCost > threshold.getThresholdValue()) {
publishAlertEvent(alert, currentCost, threshold);
}
}
}

Step 3: Alert Event Publishing

// CostAlertServiceImpl.java:145
private void publishAlertEvent(CostAlertDTO alert, Double currentCost, ThresholdDTO threshold) {
AlertEvent event = AlertEvent.builder()
.alertId(alert.getAlertId())
.customerId(alert.getCustomerId())
.alertType("BUDGET_THRESHOLD")
.severity(threshold.getSeverity())
.currentCost(currentCost)
.thresholdValue(threshold.getThresholdValue())
.message(String.format("Cost alert: Current cost %.2f exceeds threshold %.2f",
currentCost, threshold.getThresholdValue()))
.timestamp(LocalDateTime.now())
.build();

rabbitTemplate.convertAndSend("lens.alerts", "alert.threshold", event);
log.warn("Alert published: {}", event);
}

Performance Metrics

  • Alerts Processed/Run: 50-200 alerts
  • Processing Time: 5-10 seconds
  • Alert Latency: 15 minutes (check interval)
  • False Positive Rate: <2%

Flow 4: Report Generation (Async Batch)

Endpoint: POST /admin-pages/reports/generate Frequency: ~500 requests/day Avg Processing Time: 30-60 seconds

Sequence Diagram

Key Processing Steps

Step 1: Async Request Submission

// ReportController.java:78
@PostMapping("/reports/generate")
public ResponseDto<ReportResponseDTO> generateReport(
@Valid @RequestBody ReportRequestDTO request) {

String reportId = reportService.submitReportGeneration(request);

ReportResponseDTO response = ReportResponseDTO.builder()
.reportId(reportId)
.status("PENDING")
.estimatedCompletionTime(LocalDateTime.now().plusSeconds(60))
.build();

return new SuccessResponseDto<>(response);
}

Step 2: Background Processing

// ReportWorker.java:45
@RabbitListener(queues = "lens.reports", concurrency = "3-5")
public void processReportGeneration(ReportGenerationEvent event) {
String reportId = event.getReportId();

try {
// Update status to IN_PROGRESS
reportRepository.updateStatus(reportId, "IN_PROGRESS");

// Query data
`List<ReportRowDTO>` data = dao.queryReportData(event.getReportConfig());

// Format based on requested format
byte[] fileBytes = formatReport(data, event.getFormat());

// Upload to S3
String fileUrl = s3Service.uploadReport(reportId, fileBytes, event.getFormat());

// Update status to COMPLETED
reportRepository.updateStatus(reportId, "COMPLETED", fileUrl);

log.info("Report {} generated successfully", reportId);

} catch (Exception e) {
reportRepository.updateStatus(reportId, "FAILED", null, e.getMessage());
log.error("Report generation failed for {}", reportId, e);
}
}

Step 3: Report Formatting

// ReportFormatterService.java:89
public byte[] formatReport(`List<ReportRowDTO>` data, String format) {
switch (format) {
case "CSV":
return csvFormatter.format(data);
case "EXCEL":
return excelFormatter.format(data);
case "PDF":
return pdfFormatter.format(data);
default:
throw new IllegalArgumentException("Unsupported format: " + format);
}
}

Performance Metrics

  • Small Reports (<1000 rows): 10-15 seconds
  • Medium Reports (1000-10000 rows): 30-45 seconds
  • Large Reports (10000+ rows): 60-120 seconds
  • Concurrent Workers: 3-5 workers
  • Success Rate: 98.5%

Flow 5: CUDOS Dashboard Data Aggregation

Endpoint: GET /admin-pages/cudos/dashboard Frequency: ~1,500 requests/day Avg Response Time: 3.5 seconds

Sequence Diagram

Key Processing Steps

Step 1: Parallel DAO Calls

// CudosDashboardServiceImpl.java:123
@Override
public CudosDashboardDTO getCudosDashboard(GenericRequestDTO request) {
// Execute 6 queries in parallel
`CompletableFuture<S3DashboardDTO>` s3Future =
CompletableFuture.supplyAsync(() -> s3Dao.getS3Metrics(request));

`CompletableFuture<Ec2DashboardDTO>` ec2Future =
CompletableFuture.supplyAsync(() -> ec2Dao.getEc2Metrics(request));

`CompletableFuture<RdsDashboardDTO>` rdsFuture =
CompletableFuture.supplyAsync(() -> rdsDao.getRdsMetrics(request));

`CompletableFuture<LambdaDashboardDTO>` lambdaFuture =
CompletableFuture.supplyAsync(() -> lambdaDao.getLambdaMetrics(request));

`CompletableFuture<NetworkDashboardDTO>` networkFuture =
CompletableFuture.supplyAsync(() -> networkDao.getNetworkMetrics(request));

`CompletableFuture<ComputeDashboardDTO>` computeFuture =
CompletableFuture.supplyAsync(() -> computeDao.getComputeMetrics(request));

// Wait for all to complete
CompletableFuture.allOf(s3Future, ec2Future, rdsFuture,
lambdaFuture, networkFuture, computeFuture).join();

// Aggregate results
return aggregateDashboard(
s3Future.join(),
ec2Future.join(),
rdsFuture.join(),
lambdaFuture.join(),
networkFuture.join(),
computeFuture.join()
);
}

Step 2: Dashboard Aggregation

// CudosDashboardServiceImpl.java:178
private CudosDashboardDTO aggregateDashboard(
S3DashboardDTO s3, Ec2DashboardDTO ec2, RdsDashboardDTO rds,
LambdaDashboardDTO lambda, NetworkDashboardDTO network, ComputeDashboardDTO compute) {

CudosDashboardDTO dashboard = new CudosDashboardDTO();

// Total costs
double totalCost = s3.getTotalCost() + ec2.getTotalCost() +
rds.getTotalCost() + lambda.getTotalCost() +
network.getTotalCost() + compute.getTotalCost();
dashboard.setTotalCost(totalCost);

// Cost breakdown
dashboard.setS3Cost(s3.getTotalCost());
dashboard.setEc2Cost(ec2.getTotalCost());
dashboard.setRdsCost(rds.getTotalCost());
dashboard.setLambdaCost(lambda.getTotalCost());
dashboard.setNetworkCost(network.getTotalCost());
dashboard.setComputeCost(compute.getTotalCost());

// Top resources
dashboard.setTopResources(aggregateTopResources(s3, ec2, rds));

// Recommendations
dashboard.setRecommendations(aggregateRecommendations(s3, ec2, rds));

return dashboard;
}

Performance Metrics

  • Sequential Processing: 12-15 seconds (6 queries × 2s each)
  • Parallel Processing: 3-4 seconds (slowest query determines time)
  • Cache Hit: 80ms
  • Performance Gain: 70% reduction in response time

Flow 6: Cache Warming (Scheduled)

Trigger: Scheduled job at 6 AM daily Processing Time: 15-20 minutes

Sequence Diagram

Implementation

// CacheWarmingService.java:45
@Scheduled(cron = "0 0 6 * * ?") // 6 AM daily
public void warmCache() {
log.info("Starting cache warming job");

`List<String>` activeCustomers = customerService.getActiveCustomers();

for (String customerId : activeCustomers) {
warmCustomerCache(customerId);
}

log.info("Cache warming completed");
}

private void warmCustomerCache(String customerId) {
LocalDate startDate = LocalDate.now().minusDays(30);
LocalDate endDate = LocalDate.now();

GenericRequestDTO request = new GenericRequestDTO();
request.setCustomerId(customerId);
request.setStartDate(startDate);
request.setEndDate(endDate);

// Warm common queries
awsVsActualCostService.getCostSummary(request); // Caches result
riUtilizationService.getRiUtilization(request); // Caches result
billingConsoleService.getDashboardData(request); // Caches result

log.info("Warmed cache for customer {}", customerId);
}

Performance Impact

  • Cache Hit Ratio Before Warming: 60-70%
  • Cache Hit Ratio After Warming: 85-92%
  • First Request Response Time Reduction: 90% (1500ms → 150ms)

Cross-Cutting Data Flow Patterns

1. Multi-Level Caching Strategy

Request → Controller → Service

L1: @Cacheable (Spring Cache Abstraction)

L2: Redis (Application Cache)

L3: Snowflake Result Cache (24h)

L4: Snowflake Data Cache

Cache Key Strategy:

// CacheKeyGenerator.java:23
public String generateCacheKey(GenericRequestDTO request) {
return String.format("%s:%s:%s:%s",
request.getCustomerId(),
request.getAccountId(),
request.getStartDate(),
request.getEndDate()
);
}

2. Error Propagation

// Exception hierarchy
Exception
└── LensException (base)
├── DataAccessException
│ ├── SnowflakeConnectionException
│ └── QueryTimeoutException
├── ValidationException
├── AuthenticationException
└── BusinessLogicException

// Global error handler
@RestControllerAdvice
public class GlobalExceptionHandler {

@ExceptionHandler(SnowflakeConnectionException.class)
public `ResponseEntity<ErrorResponseDto>` handleSnowflakeException(
SnowflakeConnectionException ex) {

ErrorResponseDto error = new ErrorResponseDto();
error.setStatus("ERROR");
error.setMessage("Database connection failed");
error.setCode("DB_CONNECTION_ERROR");

return ResponseEntity.status(503).body(error);
}
}

3. Retry Logic

// RetryConfiguration.java:34
@Retryable(
value = {SnowflakeConnectionException.class, TransientDataAccessException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 10000)
)
public `List<CostDTO>` queryWithRetry(GenericRequestDTO request) {
return dao.query(request);
}

// Execution timeline:
// Attempt 1: Fails at T=0s
// Attempt 2: Fails at T=1s (1000ms delay)
// Attempt 3: Succeeds at T=3s (2000ms delay)

4. Circuit Breaker Pattern

// CircuitBreakerConfiguration.java:56
@CircuitBreaker(
name = "snowflake",
fallbackMethod = "fallbackGetCosts"
)
public `List<CostDTO>` getCostsWithCircuitBreaker(GenericRequestDTO request) {
return dao.query(request);
}

public `List<CostDTO>` fallbackGetCosts(GenericRequestDTO request, Throwable ex) {
// Return stale data from cache
return cacheService.getCachedCosts(request);
}

// Circuit states:
// CLOSED: Normal operation
// OPEN: All requests fail fast, return cached data
// HALF_OPEN: Test request to check if service recovered

Performance Optimization Techniques

1. Query Optimization

Before:

-- Slow query (3.5s)
SELECT * FROM COST_DAILY
WHERE ACCOUNT_ID = '123456789012'
AND DATE >= '2024-01-01'
ORDER BY DATE DESC;

After:

-- Optimized query (0.8s)
SELECT DATE, SERVICE, SUM(COST) AS TOTAL_COST
FROM COST_DAILY
WHERE ACCOUNT_ID = '123456789012'
AND DATE >= '2024-01-01'
GROUP BY DATE, SERVICE
ORDER BY DATE DESC;
  • Optimization: Select only required columns, aggregate early
  • Performance Gain: 77% reduction

2. Connection Pooling

# HikariCP configuration
spring.datasource.hikari.maximum-pool-size=20
spring.datasource.hikari.minimum-idle=5
spring.datasource.hikari.connection-timeout=30000
spring.datasource.hikari.idle-timeout=600000
spring.datasource.hikari.max-lifetime=1800000

Impact:

  • Connection acquisition time: 500ms → 5ms
  • Throughput: 10 req/sec → 100 req/sec

3. Async Processing

@Async("taskExecutor")
public `CompletableFuture<`List<CostDTO>``> queryAsync(GenericRequestDTO request) {
`List<CostDTO>` result = dao.query(request);
return CompletableFuture.completedFuture(result);
}

Thread Pool Configuration:

@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("lens-async-");
executor.initialize();
return executor;
}

Summary

6 Major Data Flows: Cost summary, RI utilization, alerts, reports, CUDOS dashboard, cache warming

Response Times:

  • Fast queries (cached): 50-100ms
  • Medium queries (Snowflake): 1-2s
  • Complex queries (multi-source): 3-5s
  • Batch processing: 30-60s

Optimization Techniques:

  • Multi-level caching (85% hit ratio)
  • Parallel query execution (70% time reduction)
  • Connection pooling (95% faster connection acquisition)
  • Async processing (10x throughput improvement)

Resilience Patterns:

  • Retry logic (3 attempts, exponential backoff)
  • Circuit breaker (fail fast + cached fallback)
  • Rate limiting (AWS API: 5 req/sec)

Document Version: 1.0 Last Updated: October 25, 2025