SpringBoot与InfluxDB整合,实现智能电表数据采集系统
InfluxDB是一个分布式时序数据库,专门用于存储和查询大规模的时间序列数据。广泛应用于监控系统、物联网(IoT)、金融数据分析等领域。
这个项目为什么使用InfluxDB?
- InfluxDB采用优化的数据结构和压缩算法,能够在高并发情况下保持高效的写入性能。
- 支持复杂的查询语言(如Flux),可以轻松进行聚合、过滤和分析操作。
- InfluxDB 提供了内置的数据保留策略(Retention Policies),可以根据需要自动删除旧数据,从而有效地管理存储空间。
- InfluxDB 支持多种高级数据分析功能,可以更好分析利用电表数据。
哪些公司使用了InfluxDB?
- Netflix 使用 InfluxDB 来监控其全球流媒体服务的性能指标。
- 特斯拉使用 InfluxDB 来存储和分析电动汽车的各种传感器数据。
- Spotify 使用 InfluxDB 来监控其音频流媒体平台的各项指标。
- 思科使用 InfluxDB 来收集和分析网络设备的日志和性能数据。
- Airbnb 使用 InfluxDB 来收集和分析用户活动数据。
- Bosch使用 InfluxDB 来收集和分析工业自动化系统的数据。
- 西门子使用 InfluxDB 来处理工业控制系统的数据。
代码实操
org.springframework.boot
spring-boot-starter-web
com.influxdb
influxdb-client-java
6.8.0
org.projectlombok
lombok
true
com.fasterxml.jackson.core
jackson-databind
application.properties
# InfluxDB Configuration
influx.url=http://localhost:8086
influx.token=my-token
influx.org=my-org
influx.bucket=my-bucket
电表数据的数据模型
package com.example.smartmeter.model;
import lombok.Data;
import java.time.Instant;
/**
* 表示电表数据的数据模型
*/
@Data
public class ElectricMeterData {
private String meterId; // 电表ID
private double energyConsumption; // 能耗(单位:千瓦时)
private Instant timestamp; // 时间戳
}
费率配置的数据模型
package com.example.smartmeter.model;
import lombok.Data;
import java.util.Map;
/**
* 表示费率配置的数据模型
*/
@Data
public class RateConfig {
private Map peakRates; // 峰时段费率,键为小时数(例如:"07", "08"),值为每千瓦时费率
private Map offPeakRates; // 谷时段费率,键为小时数(例如:"23", "00"),值为每千瓦时费率
private Map holidayRates; // 节假日费率,键为日期(例如:"2025-12-25"),值为每千瓦时费率
}
加载费率配置
我只是做个Demo出来,给大家感受一下,就简单一点吧。使用一个JSON文件中加载费率配置,位置就随手放在resources/rates.json
{
"peakRates": {
"07": 0.15,
"08": 0.15,
"09": 0.15,
"10": 0.15,
"11": 0.15,
"12": 0.15,
"13": 0.15,
"14": 0.15,
"15": 0.15,
"16": 0.15,
"17": 0.15,
"18": 0.15,
"19": 0.15,
"20": 0.15,
"21": 0.15,
"22": 0.15
},
"offPeakRates": {
"00": 0.10,
"01": 0.10,
"02": 0.10,
"03": 0.10,
"04": 0.10,
"05": 0.10,
"06": 0.10,
"23": 0.10
},
"holidayRates": {
"2025-12-25": 0.05,
"2025-01-01": 0.05
}
}
加载和管理费率配置的服务类
package com.example.smartmeter.service;
import com.example.smartmeter.model.RateConfig;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.Resource;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.IOException;
/**
* 用于加载和管理费率配置的服务类
*/
@Service
public class RateConfigService {
@Value("classpath:rates.json")
private Resource ratesResource; // 指定费率配置文件路径
private RateConfig rateConfig; // 存储费率配置的对象
/**
* 在Bean初始化后加载费率配置文件
*/
@PostConstruct
public void init() throws IOException {
ObjectMapper objectMapper = new ObjectMapper(); // 使用Jackson解析JSON
rateConfig = objectMapper.readValue(ratesResource.getInputStream(), RateConfig.class); // 读取并解析JSON文件
}
/**
* 获取当前的费率配置
*
* @return 当前的费率配置对象
*/
public RateConfig getRateConfig() {
return rateConfig;
}
}
处理电表数据采集和服务的服务类
package com.example.smartmeter.service;
import com.example.smartmeter.model.ElectricMeterData;
import com.example.smartmeter.model.RateConfig;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.domain.WritePrecision;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoField;
/**
* 用于处理电表数据采集和服务的服务类
*/
@Service
public class ElectricMeterService {
@Value("${influx.url}")
private String influxUrl; // InfluxDB URL
@Value("${influx.token}")
private String token; // InfluxDB Token
@Value("${influx.org}")
private String org; // InfluxDB Organization
@Value("${influx.bucket}")
private String bucket; // InfluxDB Bucket
@Autowired
private RateConfigService rateConfigService; // 注入费率配置服务
/**
* 将电表数据保存到InfluxDB
*
* @param data 电表数据对象
*/
public void saveData(ElectricMeterData data) {
try (var client = InfluxDBClientFactory.create(influxUrl, token.toCharArray())) { // 创建InfluxDB客户端
WriteApiBlocking writeApi = client.getWriteApiBlocking(); // 获取写API
var point = com.influxdb.client.write.Point.measurement("electric_meter") // 创建数据点
.addTag("meter_id", data.getMeterId()) // 添加标签
.addField("energy_consumption", data.getEnergyConsumption()) // 添加字段
.time(data.getTimestamp(), WritePrecision.S); // 设置时间戳
writeApi.writePoint(bucket, org, point); // 写入数据
}
}
/**
* 根据时间和能耗计算动态费率
*
* @param energyConsumption 能耗(单位:千瓦时)
* @param date 日期
* @param hourOfDay 小时数(0-23)
* @return 计算出的费用
*/
public double calculateDynamicRate(double energyConsumption, LocalDate date, int hourOfDay) {
RateConfig rateConfig = rateConfigService.getRateConfig(); // 获取费率配置
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); // 日期格式化器
String dateString = date.format(formatter); // 格式化日期字符串
if (rateConfig.getHolidayRates().containsKey(dateString)) {
// 如果是节假日,使用节假日费率
return energyConsumption * rateConfig.getHolidayRates().get(dateString);
} elseif (rateConfig.getPeakRates().containsKey(String.format("%02d", hourOfDay))) {
// 如果是峰时段,使用峰时段费率
return energyConsumption * rateConfig.getPeakRates().get(String.format("%02d", hourOfDay));
} elseif (rateConfig.getOffPeakRates().containsKey(String.format("%02d", hourOfDay))) {
// 如果是谷时段,使用谷时段费率
return energyConsumption * rateConfig.getOffPeakRates().get(String.format("%02d", hourOfDay));
} else {
throw new IllegalArgumentException("No rate configuration found for the given time."); // 抛出异常
}
}
}
Controller
package com.example.smartmeter.controller;
import com.example.smartmeter.model.ElectricMeterData;
import com.example.smartmeter.service.ElectricMeterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
@RestController
@RequestMapping("/api/meters")
public class ElectricMeterController {
@Autowired
private ElectricMeterService electricMeterService; // 注入电表服务
/**
* 接收POST请求保存电表数据并计算费用
*
* @param data 电表数据对象
* @return 成功消息及计算出的费用
*/
@PostMapping("/data")
public String saveMeterData(@RequestBody ElectricMeterData data) {
electricMeterService.saveData(data); // 保存数据到InfluxDB
LocalDate date = Instant.ofEpochSecond(data.getTimestamp().getEpochSecond()).atZone(ZoneId.systemDefault()).toLocalDate(); // 获取日期
int hourOfDay = data.getTimestamp().get(ChronoField.HOUR_OF_DAY); // 获取小时数
double rate = electricMeterService.calculateDynamicRate(data.getEnergyConsumption(), date, hourOfDay); // 计算费用
return"Data saved successfully! Calculated rate: $" + rate; // 返回成功消息及费用
}
}
Application
package com.example.smartmeter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SmartMeterApplication {
public static void main(String[] args) {
SpringApplication.run(SmartMeterApplication.class, args);
}
}
测试
curl -X POST http://localhost:8080/api/meters/data
-H "Content-Type: application/json"
-d '{
"meterId": "METER123",
"energyConsumption": 5.5,
"timestamp": 1696074000
}'
Respons
Data saved successfully! Calculated rate: $0.825