通过脚本加载所有的组件jar到服务器磁盘 配置javaagent jvm参数 ,然后执行启动之前 执行javaagent参数,执行agent - premain方法 将制制定字节码classFiletransfomer 添加到参数instrumentation中, classFileTransformer对classFile加载之前执行,比如abstractApplicationContext容器,主要完成1、外部字节码加载到URLClassLoader 中,2、 修改spring增强,比如invokeBeanFactoryPostProcessor这个是ioc容器启动的时候 refresh里的关键方法,可以将实现了beandefinitionRegistrypostProcessor 接口的方法进行实例化,也可以对bean生命周期增强,
然后其他的组件 比如 日志自动降级组件。auto-log-degrede.jar 可以定义beanDifinitionRegistryPostProcessor 然后在实例化的时候执行 对beanDefinition添加 降级监听器
组件加载 java-agent
静态加载方式添加 javaagent: .jar 脚本添加。
执行main方法之前添加 往 instrucmentation参数添加 transform
AppendSpringBeanTransformer PluginApiEnhanceTransformer spring增强transformer 插件增强transformer
作用增强字节码,是在类加载是拦截所有的class文件进行修改或增强。 classFilertransformer 会拦截所有加载的classFilter文件,但是可以在自定义的transformer 通过if判断
只对指定的class进行增强,比如ioc容器的bean增强。 插件bean增强。
增强逻辑: 将下载到磁盘上的jar包 通过RULClassLoader进行加载。 就是在加载 ioc容器的字节码的时候 加载 磁盘上组件的class 通过RULClassLoader 进行加载。
1、将磁盘所有jar所在路径添加到URLClassLoader url[]数组进行加载。
2、 将组件bean加入spring管理 对spring容器进行增强。
spring ioc启动的refresh方法里存在 invokeBeanFactoryPostProcessor 可以通过字节码修改bytebuddy 动态插入一段代码,用于执行指定bean容器初始化,加载插件,纳入spring管理 invokeBeanFactoryPostProcessor 执行invokeBeanFactoryPostProcessor 之后 就可以将所有实现了 beanDefinitionRetrisyPostProcessor接口的类进行实例化。
监听 bean增强&初始化
监听ducc 配置变更 beanDifinitionRegistryPostProcessor 增强加载日志降级bean的时候, 通过beanDefinitionRegistryPostProcessor 进行增强,在所有bean实例化之前进行执行,可以对自己其他ducc bean listener添加对应日志降级配置,该listener监听ducc配置, 包括日志级别,自动降级规则,最近10次 100ms 调用量统计,则降级或恢复;
对ducc的 _configObserver beanDefinition 添加 Listener用于监听ducc变更。
监听ducc初始化, 设置日志级别 , 将降级规则初始化计数器 , 将日志拦截器turborFilter添加到日志上下文 –> LOGGER_CONTEXT
日志降级 监听器,对满足条件的计数进行 降级 恢复监听 ,用于监听计数器
计数监听器组件: 实现接口 AutoDegradeListener (作用:执行降级/恢复,发送通知记录消息)
更新 标志变量 private final AtomicReference<DegradeStatusEnum> currStatus;
标志变量对应3个状态。 未操作(初始化构造方法设置) 已降级 已恢复
降级
<img width="1302" height="817" alt="image" src="https://github.com/user-attachments/assets/d286bbaf-a745-4ba2-afec-dabe95ea9267" />
降级: 未操作状态----> 已降级状态
恢复: 已降级--->已恢复状态。
状态定义为 原子引用,通过无锁并发进行修改。 维护这个状态解决并发问题。
<img width="1666" height="966" alt="image" src="https://github.com/user-attachments/assets/1ea10a88-e1f6-4702-a75b-3ed7da9b0eca" />
计数组件: atomicReference atomicLong - 滑动窗口计数计数统计 rpc
abstractFilter 拦截器rpc进行调用次数统计
计数单元 累加increment
加载完ducc配置 ,初始化计数器。AutoDegradeWindowRateCounter
计数器 包括:
1、 counterUnit: 计数单元
2、 degradeStrategy: 降级策略 次数。 间隔时间。调用阈值
3、 recoverStrategy: 恢复策略: 次数。 间隔时间。调用阈值
4、 listener: 降级监听器。
5、 recoverTrigger 恢复监听周期线程。
Executors.newScheduledThreadPool
scheduleWithFixedDelay()
恢复策略 降级策略。
logback 降级 : turborFilter 等级别过滤器日志降级
logback-filter TurborFilter根据ducc日志级别进行初始化,或者根据 流量统计情况进行 降级/恢复
消息组件
降级恢复操作记录通知降级之后或恢复之后通知成功消息
在基础包实现 消息通知 数据上报等基础功能。
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package com.jd.cf.pangolin.plugin.logging.autodegrade.counter;
import com.jd.cf.pangolin.plugin.logging.autodegrade.listener.AutoDegradeListener;
import com.jd.cf.pangolin.plugin.logging.autodegrade.util.UmpUtils;
import java.util.Collections;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongBinaryOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AutoDegradeWindowRateCounter {
private static final Logger log = LoggerFactory.getLogger(AutoDegradeWindowRateCounter.class);
private final CounterUnit counterUnit;
private final DegradeStrategy degradeStrategy;
private final RecoverStrategy recoverStrategy;
private final AutoDegradeListener listener;
private final ConcurrentSkipListMap<Long, Long> durationRecords;
private ScheduledExecutorService recoverTrigger;
public AutoDegradeWindowRateCounter(int intervalInMs, long highWaterLevelNum, int degradeConsecutiveTimes, long lowWaterLevelNum, int recoverConsecutiveTimes, AutoDegradeListener listener) {
this.durationRecords = new ConcurrentSkipListMap();
checkParams(intervalInMs, highWaterLevelNum, degradeConsecutiveTimes, lowWaterLevelNum, recoverConsecutiveTimes);
this.counterUnit = new CounterUnit(intervalInMs);
this.degradeStrategy = new DegradeStrategy(intervalInMs, highWaterLevelNum, degradeConsecutiveTimes);
this.recoverStrategy = new RecoverStrategy(intervalInMs, lowWaterLevelNum, recoverConsecutiveTimes);
this.listener = listener;
this.recoverTrigger = Executors.newScheduledThreadPool(1, (r) -> new Thread(r, "logAutoDegrade-recover-trigger"));
this.recoverTrigger.scheduleWithFixedDelay(() -> this.accumulateNum(0), 5L, 5L, TimeUnit.MINUTES);
}
public AutoDegradeWindowRateCounter(int intervalInMs, long highWaterLevelNum, int degradeConsecutiveTimes, long lowWaterLevelNum, int recoverConsecutiveTimes) {
this(intervalInMs, highWaterLevelNum, degradeConsecutiveTimes, lowWaterLevelNum, recoverConsecutiveTimes, (AutoDegradeListener)null);
}
public AutoDegradeWindowRateCounter(int intervalInMs, long highWaterLevelNum, int consecutiveTimes, long lowWaterLevelNum) {
this(intervalInMs, highWaterLevelNum, consecutiveTimes, lowWaterLevelNum, consecutiveTimes);
}
public AutoDegradeWindowRateCounter(RateCounterConfig conf) {
this(conf.getIntervalInMs(), conf.getHighWaterLevelNum(), conf.getDegradeConsecutiveTimes(), conf.getLowWaterLevelNum(), conf.getRecoverConsecutiveTimes(), conf.getListener());
}
private static void checkParams(int intervalInMs, long highWaterLevelNum, int degradeConsecutiveTimes, long lowWaterLevelNum, int recoverConsecutiveTimes) {
if (intervalInMs < 10) {
throw new IllegalArgumentException(String.format("intervalInMs cannot be less than 10 ms, value %d is invalid.", intervalInMs));
} else if (highWaterLevelNum < 1L) {
throw new IllegalArgumentException(String.format("highWaterLevelNum cannot be less than 1, value %d is invalid.", highWaterLevelNum));
} else if (lowWaterLevelNum < 1L) {
throw new IllegalArgumentException(String.format("lowWaterLevelNum cannot be less than 1, value %d is invalid.", lowWaterLevelNum));
} else if (lowWaterLevelNum > highWaterLevelNum) {
throw new IllegalArgumentException(String.format("lowWaterLevelNum cannot be greater than highWaterLevelNum, value (%d,%d) is invalid.", lowWaterLevelNum, highWaterLevelNum));
} else if (degradeConsecutiveTimes < 2) {
throw new IllegalArgumentException(String.format("degradeConsecutiveTimes cannot be less than 2, value %d is invalid.", degradeConsecutiveTimes));
} else if (recoverConsecutiveTimes < 2) {
throw new IllegalArgumentException(String.format("recoverConsecutiveTimes cannot be less than 2, value %d is invalid.", recoverConsecutiveTimes));
} else if (degradeConsecutiveTimes > 1000) {
throw new IllegalArgumentException(String.format("degradeConsecutiveTimes cannot be greater than 1000, value %d is invalid.", degradeConsecutiveTimes));
} else if (recoverConsecutiveTimes > 1000) {
throw new IllegalArgumentException(String.format("recoverConsecutiveTimes cannot be greater than 1000, value %d is invalid.", recoverConsecutiveTimes));
}
}
public void accumulateNum(int num) {
this.counterUnit.accumulateNum(num);
}
public void increment() {
this.counterUnit.accumulateNum(1);
}
public boolean canDegrade() {
return this.degradeStrategy.canDegrade();
}
public boolean canRecover() {
return this.recoverStrategy.canRecover();
}
private void triggerSwitchTimeWindow(long preTime, long preNum) {
long currentTime = System.currentTimeMillis();
long degradeStartTime = this.degradeStrategy.updateStartTime(currentTime);
long recoverStartTime = this.recoverStrategy.updateStartTime(currentTime);
this.durationRecords.headMap(Math.min(degradeStartTime, recoverStartTime), true).clear();
this.durationRecords.put(preTime, preNum);
if (this.listener != null) {
if (this.canDegrade()) {
this.listener.degrade(Collections.unmodifiableSortedMap(this.durationRecords));
} else if (this.canRecover()) {
this.listener.recover(Collections.unmodifiableSortedMap(this.durationRecords));
}
}
}
private static long computeStartTime(long currentTime, int intervalInMs, int times) {
return computeNextTime(currentTime, intervalInMs, -times);
}
private static long computeNextTime(long currentTime, int intervalInMs) {
return computeNextTime(currentTime, intervalInMs, 1);
}
private static long computeNextTime(long currentTime, int intervalInMs, int times) {
return currentTime - currentTime % (long)intervalInMs + (long)intervalInMs * (long)times;
}
public void destroy() {
this.recoverTrigger.shutdown();
}
public AutoDegradeListener getListener() {
return this.listener;
}
class CounterUnit {
private final LongBinaryOperator ACCUMULATOR_FUNCTION;
private final int intervalInMs;
private final AtomicLong counter;
private final AtomicLong nextRefreshTime;
private CounterUnit(int intervalInMs) {
this.ACCUMULATOR_FUNCTION = Long::sum;
this.counter = new AtomicLong();
this.nextRefreshTime = new AtomicLong();
this.intervalInMs = intervalInMs;
this.nextRefreshTime.set(AutoDegradeWindowRateCounter.computeNextTime(System.currentTimeMillis(), intervalInMs));
}
private void accumulateNum(int num) {
long currentTime = System.currentTimeMillis();
long nextTime = this.nextRefreshTime.get();
if (currentTime < nextTime) {
this.counter.accumulateAndGet((long)num, this.ACCUMULATOR_FUNCTION);
} else {
if (this.nextRefreshTime.compareAndSet(nextTime, AutoDegradeWindowRateCounter.computeNextTime(currentTime, this.intervalInMs))) {
long preTimeNum = this.counter.get();
this.counter.set((long)num);
UmpUtils.umpCountNum("AutoDegradeSlidingWindow.recordNum", preTimeNum);
AutoDegradeWindowRateCounter.this.triggerSwitchTimeWindow(nextTime, preTimeNum);
} else {
this.counter.accumulateAndGet((long)num, this.ACCUMULATOR_FUNCTION);
}
}
}
}
class DegradeStrategy {
private final int intervalInMs;
private final long highWaterLevelNum;
private final int degradeConsecutiveTimes;
private final AtomicLong lastStartTime;
private DegradeStrategy(int intervalInMs, long highWaterLevelNum, int degradeConsecutiveTimes) {
this.lastStartTime = new AtomicLong();
this.intervalInMs = intervalInMs;
this.highWaterLevelNum = highWaterLevelNum;
this.degradeConsecutiveTimes = degradeConsecutiveTimes;
this.lastStartTime.set(AutoDegradeWindowRateCounter.computeStartTime(System.currentTimeMillis(), intervalInMs, degradeConsecutiveTimes));
}
private boolean canDegrade() {
Integer overloadTimes = (Integer)AutoDegradeWindowRateCounter.this.durationRecords.tailMap(this.lastStartTime.get()).values().stream().map((n) -> n > this.highWaterLevelNum ? 1 : 0).reduce(0, Integer::sum);
return overloadTimes >= this.degradeConsecutiveTimes;
}
private long updateStartTime(long currentTime) {
long startTime = AutoDegradeWindowRateCounter.computeStartTime(currentTime, this.intervalInMs, this.degradeConsecutiveTimes);
this.lastStartTime.set(startTime);
return startTime;
}
}
class RecoverStrategy {
private final int intervalInMs;
private final long lowWaterLevelNum;
private final int recoverConsecutiveTimes;
private final AtomicLong lastStartTime;
private RecoverStrategy(int intervalInMs, long lowWaterLevelNum, int recoverConsecutiveTimes) {
this.lastStartTime = new AtomicLong();
this.intervalInMs = intervalInMs;
this.lowWaterLevelNum = lowWaterLevelNum;
this.recoverConsecutiveTimes = recoverConsecutiveTimes;
}
private boolean canRecover() {
Integer lowWaterTimes = (Integer)AutoDegradeWindowRateCounter.this.durationRecords.tailMap(this.lastStartTime.get()).values().stream().map((n) -> n > this.lowWaterLevelNum ? 1 : 0).reduce(0, Integer::sum);
return lowWaterTimes == 0;
}
private long updateStartTime(long currentTime) {
long startTime = AutoDegradeWindowRateCounter.computeStartTime(currentTime, this.intervalInMs, this.recoverConsecutiveTimes);
this.lastStartTime.set(startTime);
return startTime;
}
}
}
```滑动窗口计数 环形缓冲区方案-生产方案 package com.example.counter.v4_ring_buffer;
import java.util.concurrent.atomic.AtomicLongArray; import java.util.concurrent.atomic.AtomicLong;
/**
- 版本4:环形缓冲区方案(最优方案)
- 特点:
-
- 固定内存占用(循环数组)
-
- 无锁设计(CAS 操作)
-
- 支持任意时间精度
-
- 吞吐量最高
- 原理:
- 将滑动窗口视为一个环形数组,每次新请求更新对应的桶位置
-
时间推进时自动覆盖过期数据 */ public class RingBufferCounter { // 环形缓冲区大小(秒级桶,60个 = 60秒窗口) private static final int BUFFER_SIZE = 60; private static final long BUCKET_DURATION_MS = 1000; // 每个桶代表1秒
// 环形缓冲区:存储每秒的请求计数 private final AtomicLongArray buffer = new AtomicLongArray(BUFFER_SIZE);
// 当前窗口的起始时间戳 private final AtomicLong windowStartTime = new AtomicLong(System.currentTimeMillis());
/**
- 添加请求(无锁) */ public void addRequest() { long now = System.currentTimeMillis(); addRequestAtTime(now); }
private void addRequestAtTime(long timestamp) { long windowStart = windowStartTime.get(); long timeDiff = timestamp - windowStart;
// 如果时间跨度超过窗口大小,重置窗口 if (timeDiff >= BUFFER_SIZE * BUCKET_DURATION_MS) { slideWindow(timestamp); addRequestAtTime(timestamp); return; } // 计算在环形缓冲区中的位置 int bucketIndex = (int) ((timeDiff / BUCKET_DURATION_MS) % BUFFER_SIZE); buffer.incrementAndGet(bucketIndex); }/**
-
获取窗口内的总计数 */ public long getCount() { long now = System.currentTimeMillis(); long windowStart = windowStartTime.get(); long timeDiff = now - windowStart;
// 如果超过窗口,需要重置 if (timeDiff >= BUFFER_SIZE * BUCKET_DURATION_MS) { slideWindow(now); return getCount(); }
long sum = 0; // 统计有效的桶(在当前窗口内) int validBuckets = (int) ((timeDiff / BUCKET_DURATION_MS) + 1); for (int i = 0; i < Math.min(validBuckets, BUFFER_SIZE); i++) { sum += buffer.get(i); } return sum; }
/**
-
滑动窗口(只在必要时调用) */ private synchronized void slideWindow(long now) { long windowStart = windowStartTime.get(); long timeDiff = now - windowStart;
// 再次检查是否需要滑动 if (timeDiff < BUFFER_SIZE * BUCKET_DURATION_MS) { return; }
// 计算需要清理的桶数 int bucketsToClean = (int) (timeDiff / BUCKET_DURATION_MS);
// 重置即将使用的桶 for (int i = 0; i < Math.min(bucketsToClean, BUFFER_SIZE); i++) { buffer.set(i, 0); }
// 更新窗口起始时间 long newWindowStart = windowStart + (bucketsToClean / BUFFER_SIZE) * BUFFER_SIZE * BUCKET_DURATION_MS; windowStartTime.set(newWindowStart); }
/**
-
获取详细的统计信息(用于调试) */ public void printStats() { long now = System.currentTimeMillis(); long windowStart = windowStartTime.get();
System.out.println(“Window Start: “ + windowStart); System.out.println(“Current Time: “ + now); System.out.println(“Time Diff: “ + (now - windowStart) + “ms”); System.out.println(“Total Count: “ + getCount()); System.out.println(“Buffer content:”); for (int i = 0; i < BUFFER_SIZE; i++) { System.out.print(buffer.get(i) + “ “); if ((i + 1) % 10 == 0) System.out.println(); } }
public static void main(String[] args) throws InterruptedException { RingBufferCounter counter = new RingBufferCounter();
// 并发测试 System.out.println("Starting concurrent test..."); long startTime = System.currentTimeMillis(); for (int t = 0; t < 10; t++) { new Thread(() -> { for (int i = 0; i < 10000; i++) { counter.addRequest(); } }).start(); } Thread.sleep(2000); long endTime = System.currentTimeMillis(); System.out.println("\n=== Results ==="); System.out.println("Total time: " + (endTime - startTime) + "ms"); System.out.println("Total count: " + counter.getCount()); System.out.println("Expected: ~100000"); counter.printStats(); } } ```
@Slf4j
@Service("slidingWindowRateLimiter")
public class SlidingWindowRateLimiter {
private final ConcurrentLinkedDeque<Long> requestTimestamps = new ConcurrentLinkedDeque<>();
private final AtomicInteger currentCount = new AtomicInteger(0);
private static final long WINDOW_SIZE_MS = 1000;
@LafValue("jmMsg.strategy.call.count")
private Integer jmMsgStrategyCallCount = 20;
/**
* 滑动窗口限流检查
*/
public synchronized boolean tryAcquire() {
long now = System.currentTimeMillis();
// 清理过期的时间戳
while (!requestTimestamps.isEmpty() &&
requestTimestamps.peekFirst() <= now - WINDOW_SIZE_MS) {
requestTimestamps.pollFirst();
currentCount.decrementAndGet();
}
// 检查是否超过限制
if (currentCount.get() >= jmMsgStrategyCallCount) {
return false;
}
// 添加当前请求
requestTimestamps.addLast(now);
currentCount.incrementAndGet();
return true;
}
}






