穿山甲组件 日志自动降级

2026-05-22

通过脚本加载所有的组件jar到服务器磁盘 配置javaagent jvm参数 ,然后执行启动之前 执行javaagent参数,执行agent - premain方法 将制制定字节码classFiletransfomer 添加到参数instrumentation中, classFileTransformer对classFile加载之前执行,比如abstractApplicationContext容器,主要完成1、外部字节码加载到URLClassLoader 中,2、 修改spring增强,比如invokeBeanFactoryPostProcessor这个是ioc容器启动的时候 refresh里的关键方法,可以将实现了beandefinitionRegistrypostProcessor 接口的方法进行实例化,也可以对bean生命周期增强,

然后其他的组件 比如 日志自动降级组件。auto-log-degrede.jar 可以定义beanDifinitionRegistryPostProcessor 然后在实例化的时候执行 对beanDefinition添加 降级监听器

组件加载 java-agent

静态加载方式添加 javaagent: .jar 脚本添加。
image 执行main方法之前添加 往 instrucmentation参数添加 transform
AppendSpringBeanTransformer PluginApiEnhanceTransformer spring增强transformer 插件增强transformer
作用增强字节码,是在类加载是拦截所有的class文件进行修改或增强。 classFilertransformer 会拦截所有加载的classFilter文件,但是可以在自定义的transformer 通过if判断 只对指定的class进行增强,比如ioc容器的bean增强。 插件bean增强。 增强逻辑: 将下载到磁盘上的jar包 通过RULClassLoader进行加载。 就是在加载 ioc容器的字节码的时候 加载 磁盘上组件的class 通过RULClassLoader 进行加载。
1、将磁盘所有jar所在路径添加到URLClassLoader url[]数组进行加载。 image 2、 将组件bean加入spring管理 对spring容器进行增强。 image

spring ioc启动的refresh方法里存在 invokeBeanFactoryPostProcessor 可以通过字节码修改bytebuddy 动态插入一段代码,用于执行指定bean容器初始化,加载插件,纳入spring管理 invokeBeanFactoryPostProcessor 执行invokeBeanFactoryPostProcessor 之后 就可以将所有实现了 beanDefinitionRetrisyPostProcessor接口的类进行实例化。

监听 bean增强&初始化

监听ducc 配置变更 beanDifinitionRegistryPostProcessor 增强加载日志降级bean的时候, 通过beanDefinitionRegistryPostProcessor 进行增强,在所有bean实例化之前进行执行,可以对自己其他ducc bean listener添加对应日志降级配置,该listener监听ducc配置, 包括日志级别,自动降级规则,最近10次 100ms 调用量统计,则降级或恢复;

对ducc的 _configObserver beanDefinition 添加 Listener用于监听ducc变更。
image

监听ducc初始化, 设置日志级别 , 将降级规则初始化计数器 , 将日志拦截器turborFilter添加到日志上下文 –> LOGGER_CONTEXT image

日志降级 监听器,对满足条件的计数进行 降级 恢复监听 ,用于监听计数器 image

计数监听器组件:  实现接口 AutoDegradeListener (作用:执行降级/恢复,发送通知记录消息)
更新 标志变量      private final AtomicReference<DegradeStatusEnum> currStatus;       
  标志变量对应3个状态。 未操作(初始化构造方法设置)  已降级  已恢复
   降级
<img width="1302" height="817" alt="image" src="https://github.com/user-attachments/assets/d286bbaf-a745-4ba2-afec-dabe95ea9267" />
降级:  未操作状态----> 已降级状态      
     
恢复: 已降级--->已恢复状态。    
  状态定义为 原子引用,通过无锁并发进行修改。    维护这个状态解决并发问题。       
  <img width="1666" height="966" alt="image" src="https://github.com/user-attachments/assets/1ea10a88-e1f6-4702-a75b-3ed7da9b0eca" />

计数组件: atomicReference atomicLong - 滑动窗口计数计数统计 rpc

abstractFilter 拦截器rpc进行调用次数统计
image

计数单元 累加increment
image

加载完ducc配置 ,初始化计数器。AutoDegradeWindowRateCounter
计数器 包括:
1、 counterUnit: 计数单元
2、 degradeStrategy: 降级策略 次数。 间隔时间。调用阈值 3、 recoverStrategy: 恢复策略: 次数。 间隔时间。调用阈值 4、 listener: 降级监听器。
5、 recoverTrigger 恢复监听周期线程。
Executors.newScheduledThreadPool
scheduleWithFixedDelay()

恢复策略 降级策略。
image

logback 降级 : turborFilter 等级别过滤器日志降级

logback-filter TurborFilter根据ducc日志级别进行初始化,或者根据 流量统计情况进行 降级/恢复
image

消息组件

降级恢复操作记录通知降级之后或恢复之后通知成功消息

在基础包实现 消息通知 数据上报等基础功能。
image

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package com.jd.cf.pangolin.plugin.logging.autodegrade.counter;

import com.jd.cf.pangolin.plugin.logging.autodegrade.listener.AutoDegradeListener;
import com.jd.cf.pangolin.plugin.logging.autodegrade.util.UmpUtils;
import java.util.Collections;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongBinaryOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AutoDegradeWindowRateCounter {
    private static final Logger log = LoggerFactory.getLogger(AutoDegradeWindowRateCounter.class);
    private final CounterUnit counterUnit;
    private final DegradeStrategy degradeStrategy;
    private final RecoverStrategy recoverStrategy;
    private final AutoDegradeListener listener;
    private final ConcurrentSkipListMap<Long, Long> durationRecords;
    private ScheduledExecutorService recoverTrigger;

    public AutoDegradeWindowRateCounter(int intervalInMs, long highWaterLevelNum, int degradeConsecutiveTimes, long lowWaterLevelNum, int recoverConsecutiveTimes, AutoDegradeListener listener) {
        this.durationRecords = new ConcurrentSkipListMap();
        checkParams(intervalInMs, highWaterLevelNum, degradeConsecutiveTimes, lowWaterLevelNum, recoverConsecutiveTimes);
        this.counterUnit = new CounterUnit(intervalInMs);
        this.degradeStrategy = new DegradeStrategy(intervalInMs, highWaterLevelNum, degradeConsecutiveTimes);
        this.recoverStrategy = new RecoverStrategy(intervalInMs, lowWaterLevelNum, recoverConsecutiveTimes);
        this.listener = listener;
        this.recoverTrigger = Executors.newScheduledThreadPool(1, (r) -> new Thread(r, "logAutoDegrade-recover-trigger"));
        this.recoverTrigger.scheduleWithFixedDelay(() -> this.accumulateNum(0), 5L, 5L, TimeUnit.MINUTES);
    }

    public AutoDegradeWindowRateCounter(int intervalInMs, long highWaterLevelNum, int degradeConsecutiveTimes, long lowWaterLevelNum, int recoverConsecutiveTimes) {
        this(intervalInMs, highWaterLevelNum, degradeConsecutiveTimes, lowWaterLevelNum, recoverConsecutiveTimes, (AutoDegradeListener)null);
    }

    public AutoDegradeWindowRateCounter(int intervalInMs, long highWaterLevelNum, int consecutiveTimes, long lowWaterLevelNum) {
        this(intervalInMs, highWaterLevelNum, consecutiveTimes, lowWaterLevelNum, consecutiveTimes);
    }

    public AutoDegradeWindowRateCounter(RateCounterConfig conf) {
        this(conf.getIntervalInMs(), conf.getHighWaterLevelNum(), conf.getDegradeConsecutiveTimes(), conf.getLowWaterLevelNum(), conf.getRecoverConsecutiveTimes(), conf.getListener());
    }

    private static void checkParams(int intervalInMs, long highWaterLevelNum, int degradeConsecutiveTimes, long lowWaterLevelNum, int recoverConsecutiveTimes) {
        if (intervalInMs < 10) {
            throw new IllegalArgumentException(String.format("intervalInMs cannot be less than 10 ms, value %d is invalid.", intervalInMs));
        } else if (highWaterLevelNum < 1L) {
            throw new IllegalArgumentException(String.format("highWaterLevelNum cannot be less than 1, value %d is invalid.", highWaterLevelNum));
        } else if (lowWaterLevelNum < 1L) {
            throw new IllegalArgumentException(String.format("lowWaterLevelNum cannot be less than 1, value %d is invalid.", lowWaterLevelNum));
        } else if (lowWaterLevelNum > highWaterLevelNum) {
            throw new IllegalArgumentException(String.format("lowWaterLevelNum cannot be greater than highWaterLevelNum, value (%d,%d) is invalid.", lowWaterLevelNum, highWaterLevelNum));
        } else if (degradeConsecutiveTimes < 2) {
            throw new IllegalArgumentException(String.format("degradeConsecutiveTimes cannot be less than 2, value %d is invalid.", degradeConsecutiveTimes));
        } else if (recoverConsecutiveTimes < 2) {
            throw new IllegalArgumentException(String.format("recoverConsecutiveTimes cannot be less than 2, value %d is invalid.", recoverConsecutiveTimes));
        } else if (degradeConsecutiveTimes > 1000) {
            throw new IllegalArgumentException(String.format("degradeConsecutiveTimes cannot be greater than 1000, value %d is invalid.", degradeConsecutiveTimes));
        } else if (recoverConsecutiveTimes > 1000) {
            throw new IllegalArgumentException(String.format("recoverConsecutiveTimes cannot be greater than 1000, value %d is invalid.", recoverConsecutiveTimes));
        }
    }

    public void accumulateNum(int num) {
        this.counterUnit.accumulateNum(num);
    }

    public void increment() {
        this.counterUnit.accumulateNum(1);
    }

    public boolean canDegrade() {
        return this.degradeStrategy.canDegrade();
    }

    public boolean canRecover() {
        return this.recoverStrategy.canRecover();
    }

    private void triggerSwitchTimeWindow(long preTime, long preNum) {
        long currentTime = System.currentTimeMillis();
        long degradeStartTime = this.degradeStrategy.updateStartTime(currentTime);
        long recoverStartTime = this.recoverStrategy.updateStartTime(currentTime);
        this.durationRecords.headMap(Math.min(degradeStartTime, recoverStartTime), true).clear();
        this.durationRecords.put(preTime, preNum);
        if (this.listener != null) {
            if (this.canDegrade()) {
                this.listener.degrade(Collections.unmodifiableSortedMap(this.durationRecords));
            } else if (this.canRecover()) {
                this.listener.recover(Collections.unmodifiableSortedMap(this.durationRecords));
            }
        }

    }

    private static long computeStartTime(long currentTime, int intervalInMs, int times) {
        return computeNextTime(currentTime, intervalInMs, -times);
    }

    private static long computeNextTime(long currentTime, int intervalInMs) {
        return computeNextTime(currentTime, intervalInMs, 1);
    }

    private static long computeNextTime(long currentTime, int intervalInMs, int times) {
        return currentTime - currentTime % (long)intervalInMs + (long)intervalInMs * (long)times;
    }

    public void destroy() {
        this.recoverTrigger.shutdown();
    }

    public AutoDegradeListener getListener() {
        return this.listener;
    }

    class CounterUnit {
        private final LongBinaryOperator ACCUMULATOR_FUNCTION;
        private final int intervalInMs;
        private final AtomicLong counter;
        private final AtomicLong nextRefreshTime;

        private CounterUnit(int intervalInMs) {
            this.ACCUMULATOR_FUNCTION = Long::sum;
            this.counter = new AtomicLong();
            this.nextRefreshTime = new AtomicLong();
            this.intervalInMs = intervalInMs;
            this.nextRefreshTime.set(AutoDegradeWindowRateCounter.computeNextTime(System.currentTimeMillis(), intervalInMs));
        }

        private void accumulateNum(int num) {
            long currentTime = System.currentTimeMillis();
            long nextTime = this.nextRefreshTime.get();
            if (currentTime < nextTime) {
                this.counter.accumulateAndGet((long)num, this.ACCUMULATOR_FUNCTION);
            } else {
                if (this.nextRefreshTime.compareAndSet(nextTime, AutoDegradeWindowRateCounter.computeNextTime(currentTime, this.intervalInMs))) {
                    long preTimeNum = this.counter.get();
                    this.counter.set((long)num);
                    UmpUtils.umpCountNum("AutoDegradeSlidingWindow.recordNum", preTimeNum);
                    AutoDegradeWindowRateCounter.this.triggerSwitchTimeWindow(nextTime, preTimeNum);
                } else {
                    this.counter.accumulateAndGet((long)num, this.ACCUMULATOR_FUNCTION);
                }

            }
        }
    }

    class DegradeStrategy {
        private final int intervalInMs;
        private final long highWaterLevelNum;
        private final int degradeConsecutiveTimes;
        private final AtomicLong lastStartTime;

        private DegradeStrategy(int intervalInMs, long highWaterLevelNum, int degradeConsecutiveTimes) {
            this.lastStartTime = new AtomicLong();
            this.intervalInMs = intervalInMs;
            this.highWaterLevelNum = highWaterLevelNum;
            this.degradeConsecutiveTimes = degradeConsecutiveTimes;
            this.lastStartTime.set(AutoDegradeWindowRateCounter.computeStartTime(System.currentTimeMillis(), intervalInMs, degradeConsecutiveTimes));
        }

        private boolean canDegrade() {
            Integer overloadTimes = (Integer)AutoDegradeWindowRateCounter.this.durationRecords.tailMap(this.lastStartTime.get()).values().stream().map((n) -> n > this.highWaterLevelNum ? 1 : 0).reduce(0, Integer::sum);
            return overloadTimes >= this.degradeConsecutiveTimes;
        }

        private long updateStartTime(long currentTime) {
            long startTime = AutoDegradeWindowRateCounter.computeStartTime(currentTime, this.intervalInMs, this.degradeConsecutiveTimes);
            this.lastStartTime.set(startTime);
            return startTime;
        }
    }

    class RecoverStrategy {
        private final int intervalInMs;
        private final long lowWaterLevelNum;
        private final int recoverConsecutiveTimes;
        private final AtomicLong lastStartTime;

        private RecoverStrategy(int intervalInMs, long lowWaterLevelNum, int recoverConsecutiveTimes) {
            this.lastStartTime = new AtomicLong();
            this.intervalInMs = intervalInMs;
            this.lowWaterLevelNum = lowWaterLevelNum;
            this.recoverConsecutiveTimes = recoverConsecutiveTimes;
        }

        private boolean canRecover() {
            Integer lowWaterTimes = (Integer)AutoDegradeWindowRateCounter.this.durationRecords.tailMap(this.lastStartTime.get()).values().stream().map((n) -> n > this.lowWaterLevelNum ? 1 : 0).reduce(0, Integer::sum);
            return lowWaterTimes == 0;
        }

        private long updateStartTime(long currentTime) {
            long startTime = AutoDegradeWindowRateCounter.computeStartTime(currentTime, this.intervalInMs, this.recoverConsecutiveTimes);
            this.lastStartTime.set(startTime);
            return startTime;
        }
    }
}

image

image

image

image

```滑动窗口计数 环形缓冲区方案-生产方案 package com.example.counter.v4_ring_buffer;

import java.util.concurrent.atomic.AtomicLongArray; import java.util.concurrent.atomic.AtomicLong;

/**

  • 版本4:环形缓冲区方案(最优方案)
  • 特点:
    • 固定内存占用(循环数组)
    • 无锁设计(CAS 操作)
    • 支持任意时间精度
    • 吞吐量最高
  • 原理:
  • 将滑动窗口视为一个环形数组,每次新请求更新对应的桶位置
  • 时间推进时自动覆盖过期数据 */ public class RingBufferCounter { // 环形缓冲区大小(秒级桶,60个 = 60秒窗口) private static final int BUFFER_SIZE = 60; private static final long BUCKET_DURATION_MS = 1000; // 每个桶代表1秒

    // 环形缓冲区:存储每秒的请求计数 private final AtomicLongArray buffer = new AtomicLongArray(BUFFER_SIZE);

    // 当前窗口的起始时间戳 private final AtomicLong windowStartTime = new AtomicLong(System.currentTimeMillis());

    /**

    • 添加请求(无锁) */ public void addRequest() { long now = System.currentTimeMillis(); addRequestAtTime(now); }

    private void addRequestAtTime(long timestamp) { long windowStart = windowStartTime.get(); long timeDiff = timestamp - windowStart;

     // 如果时间跨度超过窗口大小,重置窗口
     if (timeDiff >= BUFFER_SIZE * BUCKET_DURATION_MS) {
         slideWindow(timestamp);
         addRequestAtTime(timestamp);
         return;
     }
        
     // 计算在环形缓冲区中的位置
     int bucketIndex = (int) ((timeDiff / BUCKET_DURATION_MS) % BUFFER_SIZE);
     buffer.incrementAndGet(bucketIndex);  }
    

    /**

    • 获取窗口内的总计数 */ public long getCount() { long now = System.currentTimeMillis(); long windowStart = windowStartTime.get(); long timeDiff = now - windowStart;

      // 如果超过窗口,需要重置 if (timeDiff >= BUFFER_SIZE * BUCKET_DURATION_MS) { slideWindow(now); return getCount(); }

      long sum = 0; // 统计有效的桶(在当前窗口内) int validBuckets = (int) ((timeDiff / BUCKET_DURATION_MS) + 1); for (int i = 0; i < Math.min(validBuckets, BUFFER_SIZE); i++) { sum += buffer.get(i); } return sum; }

    /**

    • 滑动窗口(只在必要时调用) */ private synchronized void slideWindow(long now) { long windowStart = windowStartTime.get(); long timeDiff = now - windowStart;

      // 再次检查是否需要滑动 if (timeDiff < BUFFER_SIZE * BUCKET_DURATION_MS) { return; }

      // 计算需要清理的桶数 int bucketsToClean = (int) (timeDiff / BUCKET_DURATION_MS);

      // 重置即将使用的桶 for (int i = 0; i < Math.min(bucketsToClean, BUFFER_SIZE); i++) { buffer.set(i, 0); }

      // 更新窗口起始时间 long newWindowStart = windowStart + (bucketsToClean / BUFFER_SIZE) * BUFFER_SIZE * BUCKET_DURATION_MS; windowStartTime.set(newWindowStart); }

    /**

    • 获取详细的统计信息(用于调试) */ public void printStats() { long now = System.currentTimeMillis(); long windowStart = windowStartTime.get();

      System.out.println(“Window Start: “ + windowStart); System.out.println(“Current Time: “ + now); System.out.println(“Time Diff: “ + (now - windowStart) + “ms”); System.out.println(“Total Count: “ + getCount()); System.out.println(“Buffer content:”); for (int i = 0; i < BUFFER_SIZE; i++) { System.out.print(buffer.get(i) + “ “); if ((i + 1) % 10 == 0) System.out.println(); } }

    public static void main(String[] args) throws InterruptedException { RingBufferCounter counter = new RingBufferCounter();

     // 并发测试
     System.out.println("Starting concurrent test...");
     long startTime = System.currentTimeMillis();
        
     for (int t = 0; t < 10; t++) {
         new Thread(() -> {
             for (int i = 0; i < 10000; i++) {
                 counter.addRequest();
             }
         }).start();
     }
        
     Thread.sleep(2000);
     long endTime = System.currentTimeMillis();
        
     System.out.println("\n=== Results ===");
     System.out.println("Total time: " + (endTime - startTime) + "ms");
     System.out.println("Total count: " + counter.getCount());
     System.out.println("Expected: ~100000");
     counter.printStats();  } } ```
    

@Slf4j
@Service("slidingWindowRateLimiter")
public class SlidingWindowRateLimiter {

    private final ConcurrentLinkedDeque<Long> requestTimestamps = new ConcurrentLinkedDeque<>();
    private final AtomicInteger currentCount = new AtomicInteger(0);
    private static final long WINDOW_SIZE_MS = 1000;

    @LafValue("jmMsg.strategy.call.count")
    private Integer jmMsgStrategyCallCount = 20;

    /**
     * 滑动窗口限流检查
     */
    public synchronized boolean tryAcquire() {
        long now = System.currentTimeMillis();

        // 清理过期的时间戳
        while (!requestTimestamps.isEmpty() &&
                requestTimestamps.peekFirst() <= now - WINDOW_SIZE_MS) {
            requestTimestamps.pollFirst();
            currentCount.decrementAndGet();
        }

        // 检查是否超过限制
        if (currentCount.get() >= jmMsgStrategyCallCount) {
            return false;
        }

        // 添加当前请求
        requestTimestamps.addLast(now);
        currentCount.incrementAndGet();
        return true;
    }
}