Data Flows
Module: Lens Version: 1.0.0-RELEASE Last Updated: October 25, 2025
Overview
This document describes the major data flows through the AWS Lens module, including:
- 6 core request/response flows
- Sequence diagrams for each flow
- Step-by-step processing details
- Performance metrics and optimization points
Flow Categories
- Synchronous Query Flows - Real-time cost data queries (80% of traffic)
- Asynchronous Processing Flows - Background jobs, cache warming
- Event-Driven Flows - Message queue processing
- Batch Processing Flows - Report generation, data aggregation
- External API Integration Flows - AWS SDK calls
- Cache Management Flows - Multi-level caching strategy
Flow 1: Cost Summary Query (Most Common)
Endpoint: GET /admin-pages/cost/summary
Frequency: ~10,000 requests/day
Avg Response Time: 1.2 seconds
Sequence Diagram
Step-by-Step Flow
Step 1: Request Reception
// AwsVsActualCostController.java:45
@GetMapping("/cost/summary")
public ResponseDto<`List<CostSummaryDTO>`> getAwsAndActualCostSummary(
@Valid GenericRequestDTO genericRequestDTO, BindingResult bindingResult) {
return new SuccessResponseDto<>(
awsVsActualCostService.getCostSummary(genericRequestDTO));
}
- Controller receives request with query parameters
@Validtriggers JSR-303 validation on GenericRequestDTO- If validation fails, ErrorHandler returns 400 Bad Request
Step 2: Authentication & Authorization
// Interceptor executed via @Secured annotation
@Secured(key = "LENS_AWSVSACTUALCOSTCONTROLLER")
public class AwsVsActualCostController { ... }
- authX interceptor validates JWT from
Authorization: Bearer <token>header - Extracts customerId and permissions from JWT claims
- Checks user has
LENS_AWSVSACTUALCOSTCONTROLLERpermission - Returns 403 Forbidden if unauthorized
Step 3: Service Layer Processing
// AwsVsActualCostServiceImpl.java:67
@Override
@Cacheable(value = "costSummary", key = "#request.cacheKey()", ttl = 900)
public `List<CostSummaryDTO>` getCostSummary(GenericRequestDTO request) {
`List<CostSummaryDTO>` costs = dao.queryCostSummary(request);
return calculateVarianceAndSavings(costs);
}
- Check Redis cache using key:
costSummary:CUST-123:2024-01-01:2024-01-31 - If cache hit: return immediately (response time: ~50ms)
- If cache miss: proceed to DAO layer
Step 4: Data Access Layer
// AwsVsActualCostDaoImpl.java:89
@Override
public `List<CostSummaryDTO>` queryCostSummary(GenericRequestDTO request) {
String sql = queryLoader.getQuery("query.cost.summary");
return snowflakeJdbcTemplate.query(sql, costSummaryRowMapper,
request.getAccountId(), request.getStartDate(), request.getEndDate());
}
- Load SQL query from
queries/cost/query.cost.summary.sql - Execute query against Snowflake with parameters
- RowMapper converts ResultSet to DTOs
Step 5: Snowflake Query Execution
-- queries/cost/query.cost.summary.sql
SELECT
SERVICE,
SUM(CASE WHEN SOURCE='AWS' THEN COST ELSE 0 END) AS AWS_COST,
SUM(CASE WHEN SOURCE='ACTUAL' THEN COST ELSE 0 END) AS ACTUAL_COST,
(AWS_COST - ACTUAL_COST) AS VARIANCE,
(VARIANCE / AWS_COST * 100) AS VARIANCE_PERCENT,
GREATEST(0, VARIANCE) AS SAVINGS
FROM COST_DAILY
WHERE ACCOUNT_ID = ?
AND DATE BETWEEN ? AND ?
GROUP BY SERVICE
ORDER BY AWS_COST DESC
- Query uses result cache (24-hour cache on identical queries)
- Clustering on DATE column improves performance
- Typical execution time: 800ms - 1.2s
Step 6: Response Construction
// Controller wraps in standardized response
return new SuccessResponseDto<>(costs);
// Resulting JSON:
{
"status": "SUCCESS",
"message": "Cost summary retrieved",
"data": [
{
"service": "EC2",
"awsCost": 15420.50,
"actualCost": 12336.40,
"variance": 3084.10,
"variancePercent": 20.0,
"savingsRealized": 3084.10
}
]
}
Performance Metrics
- Cache Hit Response: 50-80ms
- Cache Miss Response: 1,000-1,500ms
- Cache Hit Ratio: 85%
- Throughput: 100 req/sec (cache enabled), 15 req/sec (no cache)
Flow 2: RI Utilization Calculation
Endpoint: GET /admin-pages/ri/utilization
Frequency: ~2,000 requests/day
Avg Response Time: 2.5 seconds
Sequence Diagram
Key Processing Steps
Step 1: Parallel Data Fetching
// RiUtilizationServiceImpl.java:112
@Override
public RiUtilizationResponseDTO getRiUtilization(GenericRequestDTO request) {
// Parallel execution using CompletableFuture
`CompletableFuture<`List<RiUtilizationDTO>``> riDataFuture =
CompletableFuture.supplyAsync(() -> dao.queryRiUtilization(request));
`CompletableFuture<`List<RecommendationDTO>``> recommendationsFuture =
CompletableFuture.supplyAsync(() -> awsService.getRiRecommendations(request));
// Wait for both to complete
`List<RiUtilizationDTO>` riData = riDataFuture.join();
`List<RecommendationDTO>` recommendations = recommendationsFuture.join();
return combineResults(riData, recommendations);
}
- Fetches RI usage data from Snowflake and AWS recommendations in parallel
- Reduces total response time by ~40%
Step 2: Utilization Calculation
// RiUtilizationServiceImpl.java:134
private void calculateUtilization(RiUtilizationDTO ri) {
double utilizationPercent = (ri.getRiHoursUsed() / ri.getRiHoursPurchased()) * 100;
ri.setUtilizationPercent(utilizationPercent);
if (utilizationPercent < 80) {
int unusedHours = ri.getRiHoursPurchased() - ri.getRiHoursUsed();
double wastedCost = unusedHours * ri.getHourlyRate();
ri.setWastedCost(wastedCost);
// Generate alert
AlertDTO alert = new AlertDTO();
alert.setSeverity("MEDIUM");
alert.setMessage("RI utilization below 80%: " + wastedCost + " USD wasted");
ri.setAlert(alert);
}
}
Step 3: AWS API Rate Limiting
// AwsSdkServiceImpl.java:178
@RateLimiter(name = "awsApi", fallbackMethod = "fallbackGetRecommendations")
public `List<RecommendationDTO>` getRiRecommendations(GenericRequestDTO request) {
GetReservationPurchaseRecommendationRequest awsRequest =
new GetReservationPurchaseRecommendationRequest()
.withService("Amazon Elastic Compute Cloud - Compute")
.withLookbackPeriodInDays("THIRTY_DAYS");
GetReservationPurchaseRecommendationResult result =
costExplorerClient.getReservationPurchaseRecommendation(awsRequest);
return mapToRecommendationDTOs(result);
}
- Rate limited to 5 req/sec (AWS limit)
- Exponential backoff on throttling
- Fallback returns cached recommendations
Performance Metrics
- With Cache: 60-100ms
- Without Cache (Snowflake only): 1,200ms
- Without Cache (Snowflake + AWS): 2,500ms
- Cache Hit Ratio: 92% (due to 1-hour TTL)
Flow 3: Cost Alert Generation (Event-Driven)
Trigger: RabbitMQ message or scheduled job Frequency: Every 15 minutes Processing Time: 5-10 seconds
Sequence Diagram
Key Processing Steps
Step 1: Message Consumption
// CostAlertListener.java:34
@RabbitListener(queues = "lens.cost.check")
public void handleCostCheckEvent(CostCheckEvent event) {
log.info("Processing cost alert check for all customers");
costAlertService.processCostAlerts();
}
Step 2: Alert Evaluation
// CostAlertServiceImpl.java:89
@Override
@Transactional(readOnly = true)
public void processCostAlerts() {
`List<CostAlertDTO>` activeAlerts = dao.getActiveAlerts();
for (CostAlertDTO alert : activeAlerts) {
Double currentCost = dao.queryCurrentCost(
alert.getCustomerId(),
alert.getAccountId(),
LocalDate.now().withDayOfMonth(1), // Start of month
LocalDate.now()
);
evaluateThresholds(alert, currentCost);
}
}
private void evaluateThresholds(CostAlertDTO alert, Double currentCost) {
for (ThresholdDTO threshold : alert.getThresholds()) {
if (currentCost > threshold.getThresholdValue()) {
publishAlertEvent(alert, currentCost, threshold);
}
}
}
Step 3: Alert Event Publishing
// CostAlertServiceImpl.java:145
private void publishAlertEvent(CostAlertDTO alert, Double currentCost, ThresholdDTO threshold) {
AlertEvent event = AlertEvent.builder()
.alertId(alert.getAlertId())
.customerId(alert.getCustomerId())
.alertType("BUDGET_THRESHOLD")
.severity(threshold.getSeverity())
.currentCost(currentCost)
.thresholdValue(threshold.getThresholdValue())
.message(String.format("Cost alert: Current cost %.2f exceeds threshold %.2f",
currentCost, threshold.getThresholdValue()))
.timestamp(LocalDateTime.now())
.build();
rabbitTemplate.convertAndSend("lens.alerts", "alert.threshold", event);
log.warn("Alert published: {}", event);
}
Performance Metrics
- Alerts Processed/Run: 50-200 alerts
- Processing Time: 5-10 seconds
- Alert Latency: 15 minutes (check interval)
- False Positive Rate: <2%
Flow 4: Report Generation (Async Batch)
Endpoint: POST /admin-pages/reports/generate
Frequency: ~500 requests/day
Avg Processing Time: 30-60 seconds
Sequence Diagram
Key Processing Steps
Step 1: Async Request Submission
// ReportController.java:78
@PostMapping("/reports/generate")
public ResponseDto<ReportResponseDTO> generateReport(
@Valid @RequestBody ReportRequestDTO request) {
String reportId = reportService.submitReportGeneration(request);
ReportResponseDTO response = ReportResponseDTO.builder()
.reportId(reportId)
.status("PENDING")
.estimatedCompletionTime(LocalDateTime.now().plusSeconds(60))
.build();
return new SuccessResponseDto<>(response);
}
Step 2: Background Processing
// ReportWorker.java:45
@RabbitListener(queues = "lens.reports", concurrency = "3-5")
public void processReportGeneration(ReportGenerationEvent event) {
String reportId = event.getReportId();
try {
// Update status to IN_PROGRESS
reportRepository.updateStatus(reportId, "IN_PROGRESS");
// Query data
`List<ReportRowDTO>` data = dao.queryReportData(event.getReportConfig());
// Format based on requested format
byte[] fileBytes = formatReport(data, event.getFormat());
// Upload to S3
String fileUrl = s3Service.uploadReport(reportId, fileBytes, event.getFormat());
// Update status to COMPLETED
reportRepository.updateStatus(reportId, "COMPLETED", fileUrl);
log.info("Report {} generated successfully", reportId);
} catch (Exception e) {
reportRepository.updateStatus(reportId, "FAILED", null, e.getMessage());
log.error("Report generation failed for {}", reportId, e);
}
}
Step 3: Report Formatting
// ReportFormatterService.java:89
public byte[] formatReport(`List<ReportRowDTO>` data, String format) {
switch (format) {
case "CSV":
return csvFormatter.format(data);
case "EXCEL":
return excelFormatter.format(data);
case "PDF":
return pdfFormatter.format(data);
default:
throw new IllegalArgumentException("Unsupported format: " + format);
}
}
Performance Metrics
- Small Reports (<1000 rows): 10-15 seconds
- Medium Reports (1000-10000 rows): 30-45 seconds
- Large Reports (10000+ rows): 60-120 seconds
- Concurrent Workers: 3-5 workers
- Success Rate: 98.5%
Flow 5: CUDOS Dashboard Data Aggregation
Endpoint: GET /admin-pages/cudos/dashboard
Frequency: ~1,500 requests/day
Avg Response Time: 3.5 seconds
Sequence Diagram
Key Processing Steps
Step 1: Parallel DAO Calls
// CudosDashboardServiceImpl.java:123
@Override
public CudosDashboardDTO getCudosDashboard(GenericRequestDTO request) {
// Execute 6 queries in parallel
`CompletableFuture<S3DashboardDTO>` s3Future =
CompletableFuture.supplyAsync(() -> s3Dao.getS3Metrics(request));
`CompletableFuture<Ec2DashboardDTO>` ec2Future =
CompletableFuture.supplyAsync(() -> ec2Dao.getEc2Metrics(request));
`CompletableFuture<RdsDashboardDTO>` rdsFuture =
CompletableFuture.supplyAsync(() -> rdsDao.getRdsMetrics(request));
`CompletableFuture<LambdaDashboardDTO>` lambdaFuture =
CompletableFuture.supplyAsync(() -> lambdaDao.getLambdaMetrics(request));
`CompletableFuture<NetworkDashboardDTO>` networkFuture =
CompletableFuture.supplyAsync(() -> networkDao.getNetworkMetrics(request));
`CompletableFuture<ComputeDashboardDTO>` computeFuture =
CompletableFuture.supplyAsync(() -> computeDao.getComputeMetrics(request));
// Wait for all to complete
CompletableFuture.allOf(s3Future, ec2Future, rdsFuture,
lambdaFuture, networkFuture, computeFuture).join();
// Aggregate results
return aggregateDashboard(
s3Future.join(),
ec2Future.join(),
rdsFuture.join(),
lambdaFuture.join(),
networkFuture.join(),
computeFuture.join()
);
}
Step 2: Dashboard Aggregation
// CudosDashboardServiceImpl.java:178
private CudosDashboardDTO aggregateDashboard(
S3DashboardDTO s3, Ec2DashboardDTO ec2, RdsDashboardDTO rds,
LambdaDashboardDTO lambda, NetworkDashboardDTO network, ComputeDashboardDTO compute) {
CudosDashboardDTO dashboard = new CudosDashboardDTO();
// Total costs
double totalCost = s3.getTotalCost() + ec2.getTotalCost() +
rds.getTotalCost() + lambda.getTotalCost() +
network.getTotalCost() + compute.getTotalCost();
dashboard.setTotalCost(totalCost);
// Cost breakdown
dashboard.setS3Cost(s3.getTotalCost());
dashboard.setEc2Cost(ec2.getTotalCost());
dashboard.setRdsCost(rds.getTotalCost());
dashboard.setLambdaCost(lambda.getTotalCost());
dashboard.setNetworkCost(network.getTotalCost());
dashboard.setComputeCost(compute.getTotalCost());
// Top resources
dashboard.setTopResources(aggregateTopResources(s3, ec2, rds));
// Recommendations
dashboard.setRecommendations(aggregateRecommendations(s3, ec2, rds));
return dashboard;
}
Performance Metrics
- Sequential Processing: 12-15 seconds (6 queries × 2s each)
- Parallel Processing: 3-4 seconds (slowest query determines time)
- Cache Hit: 80ms
- Performance Gain: 70% reduction in response time
Flow 6: Cache Warming (Scheduled)
Trigger: Scheduled job at 6 AM daily Processing Time: 15-20 minutes
Sequence Diagram
Implementation
// CacheWarmingService.java:45
@Scheduled(cron = "0 0 6 * * ?") // 6 AM daily
public void warmCache() {
log.info("Starting cache warming job");
`List<String>` activeCustomers = customerService.getActiveCustomers();
for (String customerId : activeCustomers) {
warmCustomerCache(customerId);
}
log.info("Cache warming completed");
}
private void warmCustomerCache(String customerId) {
LocalDate startDate = LocalDate.now().minusDays(30);
LocalDate endDate = LocalDate.now();
GenericRequestDTO request = new GenericRequestDTO();
request.setCustomerId(customerId);
request.setStartDate(startDate);
request.setEndDate(endDate);
// Warm common queries
awsVsActualCostService.getCostSummary(request); // Caches result
riUtilizationService.getRiUtilization(request); // Caches result
billingConsoleService.getDashboardData(request); // Caches result
log.info("Warmed cache for customer {}", customerId);
}
Performance Impact
- Cache Hit Ratio Before Warming: 60-70%
- Cache Hit Ratio After Warming: 85-92%
- First Request Response Time Reduction: 90% (1500ms → 150ms)
Cross-Cutting Data Flow Patterns
1. Multi-Level Caching Strategy
Request → Controller → Service
↓
L1: @Cacheable (Spring Cache Abstraction)
↓
L2: Redis (Application Cache)
↓
L3: Snowflake Result Cache (24h)
↓
L4: Snowflake Data Cache
Cache Key Strategy:
// CacheKeyGenerator.java:23
public String generateCacheKey(GenericRequestDTO request) {
return String.format("%s:%s:%s:%s",
request.getCustomerId(),
request.getAccountId(),
request.getStartDate(),
request.getEndDate()
);
}
2. Error Propagation
// Exception hierarchy
Exception
└── LensException (base)
├── DataAccessException
│ ├── SnowflakeConnectionException
│ └── QueryTimeoutException
├── ValidationException
├── AuthenticationException
└── BusinessLogicException
// Global error handler
@RestControllerAdvice
public class GlobalExceptionHandler {
@ExceptionHandler(SnowflakeConnectionException.class)
public `ResponseEntity<ErrorResponseDto>` handleSnowflakeException(
SnowflakeConnectionException ex) {
ErrorResponseDto error = new ErrorResponseDto();
error.setStatus("ERROR");
error.setMessage("Database connection failed");
error.setCode("DB_CONNECTION_ERROR");
return ResponseEntity.status(503).body(error);
}
}
3. Retry Logic
// RetryConfiguration.java:34
@Retryable(
value = {SnowflakeConnectionException.class, TransientDataAccessException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 10000)
)
public `List<CostDTO>` queryWithRetry(GenericRequestDTO request) {
return dao.query(request);
}
// Execution timeline:
// Attempt 1: Fails at T=0s
// Attempt 2: Fails at T=1s (1000ms delay)
// Attempt 3: Succeeds at T=3s (2000ms delay)
4. Circuit Breaker Pattern
// CircuitBreakerConfiguration.java:56
@CircuitBreaker(
name = "snowflake",
fallbackMethod = "fallbackGetCosts"
)
public `List<CostDTO>` getCostsWithCircuitBreaker(GenericRequestDTO request) {
return dao.query(request);
}
public `List<CostDTO>` fallbackGetCosts(GenericRequestDTO request, Throwable ex) {
// Return stale data from cache
return cacheService.getCachedCosts(request);
}
// Circuit states:
// CLOSED: Normal operation
// OPEN: All requests fail fast, return cached data
// HALF_OPEN: Test request to check if service recovered
Performance Optimization Techniques
1. Query Optimization
Before:
-- Slow query (3.5s)
SELECT * FROM COST_DAILY
WHERE ACCOUNT_ID = '123456789012'
AND DATE >= '2024-01-01'
ORDER BY DATE DESC;
After:
-- Optimized query (0.8s)
SELECT DATE, SERVICE, SUM(COST) AS TOTAL_COST
FROM COST_DAILY
WHERE ACCOUNT_ID = '123456789012'
AND DATE >= '2024-01-01'
GROUP BY DATE, SERVICE
ORDER BY DATE DESC;
- Optimization: Select only required columns, aggregate early
- Performance Gain: 77% reduction
2. Connection Pooling
# HikariCP configuration
spring.datasource.hikari.maximum-pool-size=20
spring.datasource.hikari.minimum-idle=5
spring.datasource.hikari.connection-timeout=30000
spring.datasource.hikari.idle-timeout=600000
spring.datasource.hikari.max-lifetime=1800000
Impact:
- Connection acquisition time: 500ms → 5ms
- Throughput: 10 req/sec → 100 req/sec
3. Async Processing
@Async("taskExecutor")
public `CompletableFuture<`List<CostDTO>``> queryAsync(GenericRequestDTO request) {
`List<CostDTO>` result = dao.query(request);
return CompletableFuture.completedFuture(result);
}
Thread Pool Configuration:
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("lens-async-");
executor.initialize();
return executor;
}
Summary
6 Major Data Flows: Cost summary, RI utilization, alerts, reports, CUDOS dashboard, cache warming
Response Times:
- Fast queries (cached): 50-100ms
- Medium queries (Snowflake): 1-2s
- Complex queries (multi-source): 3-5s
- Batch processing: 30-60s
Optimization Techniques:
- Multi-level caching (85% hit ratio)
- Parallel query execution (70% time reduction)
- Connection pooling (95% faster connection acquisition)
- Async processing (10x throughput improvement)
Resilience Patterns:
- Retry logic (3 attempts, exponential backoff)
- Circuit breaker (fail fast + cached fallback)
- Rate limiting (AWS API: 5 req/sec)
Document Version: 1.0 Last Updated: October 25, 2025