Springboot Frame SHEDLOCK + WebSocket feature conflict solution

Background description:

The first encounter in project development When the server is deployed in cluster mode, the timing tasks of multiple nodes will be executed at each node. For this single problem, the previous article
https://www.cnblogs.com/Iris1998/p/11413099.html gave a springboot integrated shedlock solution, but after a period of time, there are new requirements in the project, and I want to realize the message initiative The function of pushing to the client
From a functional point of view, it can be realized by the websocket that comes with springboot, but the development process encounters a very strange error. The error message is as follows
{"time":"2019-09-27 11:07:22 626", "level":"WARN", "classname":"org.apache.juli.logging.DirectJDKLog", "method":"log", "line":"173", "msg":"The web application [ROOT] appears to have started a thread named [RxIoScheduler-1 (Evictor)] but has failed to stop it. This is very likely to create a memory leak. Stack trace of thread:
 sun.misc.Unsafe.park(Native Method)
 java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
 java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
 java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
 java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 java.lang.Thread.run(Thread.java:748)"}
{"time":"2019-09-27 11:07:22 666", "level":"ERROR", "classname":"org.springframework.boot.SpringApplication", "method":"reportFailure", " line":"858", "msg":"Application run failed"}
org.springframework.beans.factory.BeanCreationException: Error creating bean with name'stompWebSocketHandlerMapping' defined in class path resource [org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.web.servlet.HandlerMapping]: Factory method'stompWebSocketHandlerMapping' threw exception; nested exception is java.lang.IllegalStateException: @Bean method AbstractMessageBrokerConfiguration.messageBrokerTaskScheduler called as bean reference for type [org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler] but overridden by non-compatible bean instance of type [com.sun.proxy.$Proxy220]. Overriding bean of same name declared in: class path resource [org/springframework /web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:627)
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:456)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1288)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1127)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:538)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:498)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:846)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
at com.hundunyun.wechat.App.main(App.java:19)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.web.servlet.HandlerMapping]: Factory method'stompWebSocketHandlerMapping' threw exception; nested exception is java.lang.IllegalStateException: @Bean method AbstractMessageBrokerConfiguration.messageBrokerTaskScheduler called as bean reference for type [org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler] but overridden by non-compatible bean instance of type [com.sun.proxy.$Proxy220]. Overriding bean of same name declared in: class path resource [org /springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622)
... 19 common frames omitted
Caused by: java.lang.IllegalStateException: @Bean method AbstractMessageBrokerConfiguration.messageBrokerTaskScheduler called as bean reference for type [org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler] but overridden by non-compatible bean instance of type [com.sun.proxy.$ Proxy220]. Overriding bean of same name declared in: class path resource [org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.resolveBeanReference(ConfigurationClassEnhancer.java:418)
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:366)
at org.springframework.web.socket.config.annotation.DelegatingWebSocketMessageBrokerConfiguration$$EnhancerBySpringCGLIB$$748fbfa6.messageBrokerTaskScheduler()
at org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurationSupport.stompWebSocketHandlerMapping(WebSocketMessageBrokerConfigurationSupport.java:76)
at org.springframework.web.socket.config.annotation.DelegatingWebSocketMessageBrokerConfiguration$$EnhancerBySpringCGLIB$$748fbfa6.CGLIB$stompWebSocketHandlerMapping$14()
at org.springframework.web.socket.config.annotation.DelegatingWebSocketMessageBrokerConfiguration$$EnhancerBySpringCGLIB$$748fbfa6$$FastClassBySpringCGLIB$$19a6d0cf.invoke()
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:363)
at org.springframework.web.socket.config.annotation.DelegatingWebSocketMessageBrokerConfiguration$$EnhancerBySpringCGLIB$$748fbfa6.stompWebSocketHandlerMapping()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
... 20 common frames omitted

 After experimentation, there is no problem using shedlock alone, and there is no problem using websocket alone, but there is a problem when the two are put together. Boldly guess that the problem lies at the bottom of springBoot’s shedlock and websocket. After trying to solve the problem, I decided to change to another implementation method. Springboot’s websocket+redis distributed lock realizes the compatibility of the two functions in a distributed environment.

The first step: introduce related packages

 
            org.springframework.boot
            spring-boot-starter-websocket
            2.2.1
        
        
            org.springframework.boot
            spring-boot-starter-data-redis
2.2.1
 
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            io.projectreactor
            reactor-test
            test
        

        
        
            net.javacrumbs.shedlock
            shedlock-spring
            2.2.1
        
        
            net.javacrumbs.shedlock
            shedlock-provider-redis-jedis
            2.2.1
        
    
       com.alibaba
                                    " version>1.2.41
    

Step 2: Add files related to redis distributed lock function

1. Add custom lock notes

import java.lang.annotation.*;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
    String lockedPrefix() default ""; //redis lock key prefix
    int expireTime() default 10; //The time the key exists in redis, 1000S
}

2. Add aop aspect class

import com.iris.websocket .annotation.CacheLock;
import com.iris.websocket.springwebsocket.redis.RedisClient;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

/*
 * @Description TODO
 * @Author muruan.lt
 * @Date 2019/9/26 11:36
 */
@Aspect
@Slf4j
@Component
public class CacheLockAspect {
    private static final String LOCK_VALUE = "locked";

    @Autowired
    RedisClient redisClient;

    /*
     * 1. In the case of springboot-websocket, the @Synchronized annotation is not used, and the distributed locks caused by multi-threading may not take effect
     * 2. The @Synchronized annotation can be omitted without springboot-websocket
     */
    @Around("@annotation(com.iris.websocket.annotation.CacheLock)")
    @Synchronized
    public void cacheLockPoint(ProceedingJoinPoint pjp) {

        String name = pjp.getSignature().getName();
        Method[] methods = pjp.getTarget().getClass().getMethods();
        for (Method cacheMethod: methods) {
            if (null != cacheMethod.getAnnotation(CacheLock.class)
                    && name.equals(cacheMethod.getName())){
                try {
// String lockKey = pjp.getTarget().getClass().getName()+cacheMethod.getAnnotation(CacheLock.class).lockedPrefix();
                    String lockKey = cacheMethod.getAnnotation(CacheLock.class).lockedPrefix();
                    int timeOut = cacheMethod.getAnnotation(CacheLock.class).expireTime();
                    if(null == lockKey){
                        return;
                    }
                    if (redisClient.setnx(lockKey, LOCK_VALUE)) {
                        redisClient.expire(lockKey, timeOut);
// log.info("method:{}Acquire lock:{}, start running!",cacheMethod,lockKey);
                        pjp.proceed();
                        return;
                    }
// log.info("method: {} did not acquire the lock: {}, operation failed!", cacheMethod, lockKey);
                } catch (Throwable e) {
                    log.error("method:{},operation error!",cacheMethod,e);
                    return;
                }
                break;
            }
        }
    }
}

3. Add application.properties configuration file

server.port: 8800

spring.application.name: sun
-websocket

eureka.instance.hostname: localhost
eureka.client.registerWithEureka:
false
eureka.client.fetchRegistry:
false
serviceUrl.defaultZone: http:
//${eureka.instance. hostname}:${server.port}/eureka/

###################################### redis configuration information start### #####################################
redis.host
=127.0.0.1
redis.password
=123456
redis.port
=6379
#Maximum number of objects that can maintain idel state
redis.maxIdle
=300
#Maximum number of objects allocated
redis.maxActive
=600
#When there is no return object in the pool, the maximum waiting time
redis.maxWaitMillis
=1000
redis.maxTotal
=1000
#Whether the validity check is performed when the borrow Object method is called
redis.testOnBorrow
=true
#Whether the validity check is performed when the return Object method is called
redis.testOnReturn
=true
redis.timeout
=3600
###################################### redis configuration information end### #######################################

4. Add redis configuration related files

4.1 Add RedisPool.java file

import org.springframework.beans. factory.annotation.Value;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/*
* @Description: JedisPool configuration class, the configuration information is in the default "application.properties" file
* @Author: muruan.lt
* @CreateDate: 2019/9/27 9:50
* @Version: 1.0
*/
public abstract class RedisPool {

    public JedisPool jedisPool;

    @Value("${redis.host}")
    private String host;
    @Value("${redis.port}")
    private Integer port;
    @Value("${redis.password}")
    private String password;
    @Value("${redis.maxIdle}")
    private Integer maxIdle;
    @Value("${redis.maxTotal}")
    private Integer maxTotal;
    @Value("${redis.maxActive}")
    private Integer maxActive;
    @Value("${redis.maxWaitMillis}")
    private Integer maxWaitMillis;
    @Value("${redis.timeout}")
    private Integer timeout;
    @Value("${redis.testOnBorrow}")
    private Boolean testOnBorrow;
    @Value("${redis.testOnReturn}")
    private Boolean testOnReturn;

    protected RedisPool() {
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxIdle(maxIdle);
        jedisPoolConfig.setMaxTotal(maxTotal);
        jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
        jedisPoolConfig.setTestOnBorrow(testOnBorrow);
        jedisPoolConfig.setTestOnReturn(testOnReturn);
        jedisPool =
                new JedisPool(jedisPoolConfig, host, port, timeout, password);
    }
}

4.2 Add RedisClient.java file

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

@Component
public class RedisClient {
    private static final Logger loggger = LoggerFactory.getLogger(RedisClient.class);

    @Autowired
    private JedisPool jedisPool;

    private Jedis getJedis() {
        Jedis jedis = jedisPool.getResource();
// jedis.select(database);
        return jedis;
    }

    public void set(String key, String value) {
        Jedis jedis = null;
        try {
            jedis = getJedis();
            jedis.set(key, value);
        } finally {
            jedis.close();
        }
    }

    public void set(String key, String value, int expTime) {
        Jedis jedis = null;
        try {
            jedis = getJedis();
            jedis.setex(key, expTime, value);
        } finally {
            jedis.close();
        }
    }

    public String get(String key) {
        Jedis jedis = null;
        try {
            jedis = getJedis();
            return jedis.get(key);
        } finally {
            jedis.close();
        }
    }

    public void expire(String key, int expTime) {
        Jedis jedis = null;
        try {
            jedis = getJedis();
            jedis.expire(key, expTime);
        } finally {
            jedis.close();
        }
    }

    public boolean setnx(String lockKey, String expires) {
        Jedis jedis = null;
        try {
            jedis = getJedis();
            Long result = jedis.setnx(lockKey, expires);
            return result == 1;
        } finally {
            jedis.close();
        }
    }
}

4.3 Add RedisConfig.java file

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.interceptor.KeyGenerator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import redis.clients.jedis.JedisPool;

import java.lang.reflect.Method;

/*
 * @Description serves shedlock-redis
 * @Author muruan.lt
 * @Date 2019/9/26 10:19
 */
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {

    @Value("${redis.host}")
    private String host;
    @Value("${redis.port}")
    private Integer port;

    @Bean
    @Override
    public KeyGenerator keyGenerator() {
        return new KeyGenerator() {
            @Override
            public Object generate(Object target, Method method, Object... params) {
                StringBuilder sb = new StringBuilder();
                sb.append(target.getClass().getName());
                sb.append(method.getName());
                for (Object o: params) {
                    if (o != null) {
                        sb.append(o.toString());
                    }
                }
                return sb.toString();
            }
        };
    }

    @Bean
    public RedisTemplate myRedisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate srt = new StringRedisTemplate(factory);
        Jackson2JsonRedisSerializer j2j = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        j2j.setObjectMapper(om);
        srt.setValueSerializer(j2j);
        srt.afterPropertiesSet();
        return srt;
    }

    @Bean
    public JedisPool jedisPool() {
        return new JedisPool(this.host, this.port);
    }
}

The above has completed the addition of all the files related to the redis distributed lock function. The following are the files that need to be added for websocket

Step 3: Add websocket related functions File

1. Add websocket configuration file

import org.springframework.context.annotation. Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/*
 * @Description springWebsoket configuration file
 * @Author muruan.lt
 * @Date 2019/9/24 09:23
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS();
    }
}

So far, all the documents related to the two functions have been completed. The following is the use and detection of the functions.

Step 4: Use and detection of functions

import com.alibaba.fastjson. JSONObject;
import com.iris.websocket.annotation.CacheLock;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.core.SchedulerLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Date;

/*
 * @Description Timing task test class
 * @Author muruan.lt
 * @Date 2019/9/2 14:27
 */
@Component
@Slf4j
public class TaskTest {

    @Autowired
    private SimpMessageSendingOperations messageTemplate;
    
    @Scheduled(cron = "0/10 * * * * ?")
    @CacheLock(lockedPrefix = "test",expireTime=9)
    public void test1(){
        System.out.println(new Date() + "hello1!");
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("code", 200);
        jsonObject.put("content", "hello1!");
        String resStr = jsonObject.toJSONString();
        log.info("send message to browser,message content [{}]", resStr);
        messageTemplate.convertAndSend("/topic/greetings", resStr);
    }

    @Scheduled(cron = "0/10 * * * * ?")
    @CacheLock(lockedPrefix = "test",expireTime=9)
    public void test2(){
        System.out.println(new Date() + "hello2!");
    }

    /**
     * Perform timing tasks
     **/
    @Scheduled(cron = "0/10 * * * * ?")
    @CacheLock(lockedPrefix = "TimeTaskService",expireTime=9)
    public void executeTask() {
        System.out.println(new Date() +"hello3!");
    }

    /**
     * Perform timing tasks
     **/
    @Scheduled(cron = "0/10 * * * * ?")
    @CacheLock(lockedPrefix = "TimeTaskService",expireTime=9)
    public void executeTask1() {
        System.out.println(new Date() +"hello4!");
    }
}

Use of websocket: add where you need to push messages to the front end

@Autowired
private SimpMessageSendingOperations messageTemplate;

Simply, send related to a specific topic, and the client can receive new messages through this topic

@CacheLock(lockedPrefix = "TimeTaskService",expireTime=9)

This annotation is the key to realizing distributed redis lock. Adding this annotation to the method to solve the problem of repeated execution of distributed deployment can solve the problem of repeated execution of distributed deployment.

{"time":"2019-09-27 11:07:22 626", "level":" WARN", "classname":"org.apache.juli.logging.DirectJDKLog", "method":"log", "line":"173", "msg":"The web application [ROOT] appears to have started a thread named [RxIoScheduler-1 (Evictor)] but has failed to stop it. This is very likely to create a memory leak. Stack trace of thread:
 sun.misc.Unsafe.park(Native Method)
 java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
 java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
 java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
 java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 java.lang.Thread.run(Thread.java:748)"}
{"time":"2019-09-27 11:07:22 666", "level":"ERROR", "classname":"org.springframework.boot.SpringApplication", "method":"reportFailure", " line":"858", "msg":"Application run failed"}
org.springframework.beans.factory.BeanCreationException: Error creating bean with name'stompWebSocketHandlerMapping' defined in class path resource [org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.web.servlet.HandlerMapping]: Factory method'stompWebSocketHandlerMapping' threw exception; nested exception is java.lang.IllegalStateException: @Bean method AbstractMessageBrokerConfiguration.messageBrokerTaskScheduler called as bean reference for type [org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler] but overridden by non-compatible bean instance of type [com.sun.proxy.$Proxy220]. Overriding bean of same name declared in: class path resource [org/springframework /web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:627)
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:456)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1288)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1127)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:538)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:498)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:846)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
at com.hundunyun.wechat.App.main(App.java:19)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.web.servlet.HandlerMapping]: Factory method'stompWebSocketHandlerMapping' threw exception; nested exception is java.lang.IllegalStateException: @Bean method AbstractMessageBrokerConfiguration.messageBrokerTaskScheduler called as bean reference for type [org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler] but overridden by non-compatible bean instance of type [com.sun.proxy.$Proxy220]. Overriding bean of same name declared in: class path resource [org /springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622)
... 19 common frames omitted
Caused by: java.lang.IllegalStateException: @Bean method AbstractMessageBrokerConfiguration.messageBrokerTaskScheduler called as bean reference for type [org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler] but overridden by non-compatible bean instance of type [com.sun.proxy.$ Proxy220]. Overriding bean of same name declared in: class path resource [org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.resolveBeanReference(ConfigurationClassEnhancer.java:418)
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:366)
at org.springframework.web.socket.config.annotation.DelegatingWebSocketMessageBrokerConfiguration$$EnhancerBySpringCGLIB$$748fbfa6.messageBrokerTaskScheduler()
at org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurationSupport.stompWebSocketHandlerMapping(WebSocketMessageBrokerConfigurationSupport.java:76)
at org.springframework.web.socket.config.annotation.DelegatingWebSocketMessageBrokerConfiguration$$EnhancerBySpringCGLIB$$748fbfa6.CGLIB$stompWebSocketHandlerMapping$14()
at org.springframework.web.socket.config.annotation.DelegatingWebSocketMessageBrokerConfiguration$$EnhancerBySpringCGLIB$$748fbfa6$$FastClassBySpringCGLIB$$19a6d0cf.invoke()
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:363)
at org.springframework.web.socket.config.annotation.DelegatingWebSocketMessageBrokerConfiguration$$EnhancerBySpringCGLIB$$748fbfa6.stompWebSocketHandlerMapping()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
	... 20 common frames omitted

        
            org.springframework.boot
            spring-boot-starter-websocket
            2.2.1
        
        
            org.springframework.boot
            spring-boot-starter-data-redis
2.2.1
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            io.projectreactor
            reactor-test
            test
        

        
        
            net.javacrumbs.shedlock
            shedlock-spring
            2.2.1
        
        
            net.javacrumbs.shedlock
            shedlock-provider-redis-jedis
            2.2.1
        
    
       com.alibaba
       fastjson
       1.2.41
    

import java.lang.annotation.*;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CacheLock {
    String lockedPrefix() default "";   //redis 锁key的前缀
    int expireTime() default 10;      //key在redis里存在的时间,1000S
}

import com.iris.websocket.annotation.CacheLock;
import com.iris.websocket.springwebsocket.redis.RedisClient;
import lombok.Synchronized;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

/*
 * @Description TODO
 * @Author muruan.lt
 * @Date 2019/9/26 11:36
 */
@Aspect
@Slf4j
@Component
public class CacheLockAspect {
    private static final String LOCK_VALUE = "locked";

    @Autowired
    RedisClient redisClient;

    /*
     * 1.有springboot-websocket情况下不用@Synchronized这个注解,可能多线程导致的分布式锁不生效
     * 2.没有springboot-websocket情况下可以不用@Synchronized这个注解
     */
    @Around("@annotation(com.iris.websocket.annotation.CacheLock)")
    @Synchronized
    public void cacheLockPoint(ProceedingJoinPoint pjp) {

        String name = pjp.getSignature().getName();
        Method[] methods = pjp.getTarget().getClass().getMethods();
        for (Method cacheMethod : methods) {
            if (null != cacheMethod.getAnnotation(CacheLock.class)
                    && name.equals(cacheMethod.getName())){
                try {
//                    String lockKey = pjp.getTarget().getClass().getName()+cacheMethod.getAnnotation(CacheLock.class).lockedPrefix();
                    String lockKey = cacheMethod.getAnnotation(CacheLock.class).lockedPrefix();
                    int timeOut = cacheMethod.getAnnotation(CacheLock.class).expireTime();
                    if(null == lockKey){
                        return;
                    }
                    if (redisClient.setnx(lockKey, LOCK_VALUE)) {
                        redisClient.expire(lockKey, timeOut);
//                        log.info("method:{}获取锁:{},开始运行!",cacheMethod,lockKey);
                        pjp.proceed();
                        return;
                    }
//                    log.info("method:{}未获取锁:{},运行失败!",cacheMethod,lockKey);
                } catch (Throwable e) {
                    log.error("method:{},运行错误!",cacheMethod,e);
                    return;
                }
                break;
            }
        }
    }
}

server.port: 8800

spring.application.name: sun
-websocket

eureka.instance.hostname: localhost
eureka.client.registerWithEureka:
false
eureka.client.fetchRegistry:
false
serviceUrl.defaultZone: http:
//${eureka.instance.hostname}:${server.port}/eureka/

####################################### redis 配置信息开始 ###########################################
redis.host
=127.0.0.1
redis.password
=123456
redis.port
=6379
#最大能够保持idel状态的对象数
redis.maxIdle
=300
#最大分配的对象数
redis.maxActive
=600
#当池内没有返回对象时,最大等待时间
redis.maxWaitMillis
=1000
redis.maxTotal
=1000
#当调用borrow Object方法时,是否进行有效性检查
redis.testOnBorrow
=true
#当调用return Object方法时,是否进行有效性检查
redis.testOnReturn
=true
redis.timeout
=3600
####################################### redis 配置信息结束 ###########################################

import org.springframework.beans.factory.annotation.Value;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/*
* @Description:    JedisPool配置类,配置信息在默认的"application.properties"文件中
* @Author:         muruan.lt
* @CreateDate:     2019/9/27 9:50
* @Version:        1.0
*/
public abstract class RedisPool {

    public JedisPool jedisPool;

    @Value("${redis.host}")
    private String host;
    @Value("${redis.port}")
    private Integer port;
    @Value("${redis.password}")
    private String password;
    @Value("${redis.maxIdle}")
    private Integer maxIdle;
    @Value("${redis.maxTotal}")
    private Integer maxTotal;
    @Value("${redis.maxActive}")
    private Integer maxActive;
    @Value("${redis.maxWaitMillis}")
    private Integer maxWaitMillis;
    @Value("${redis.timeout}")
    private Integer timeout;
    @Value("${redis.testOnBorrow}")
    private Boolean testOnBorrow;
    @Value("${redis.testOnReturn}")
    private Boolean testOnReturn;

    protected RedisPool() {
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxIdle(maxIdle);
        jedisPoolConfig.setMaxTotal(maxTotal);
        jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
        jedisPoolConfig.setTestOnBorrow(testOnBorrow);
        jedisPoolConfig.setTestOnReturn(testOnReturn);
        jedisPool =
                new JedisPool(jedisPoolConfig, host, port, timeout, password);
    }
}

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisConnectionException;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

@Component
public class RedisClient {
    private static final Logger loggger = LoggerFactory.getLogger(RedisClient.class);

    @Autowired
    private JedisPool jedisPool;

    private Jedis getJedis() {
        Jedis jedis = jedisPool.getResource();
//        jedis.select(database);
        return jedis;
    }

    public void set(String key, String value) {
        Jedis jedis = null;
        try {
            jedis = getJedis();
            jedis.set(key, value);
        } finally {
            jedis.close();
        }
    }

    public void set(String key, String value, int expTime) {
        Jedis jedis = null;
        try {
            jedis = getJedis();
            jedis.setex(key, expTime, value);
        } finally {
            jedis.close();
        }
    }

    public String get(String key) {
        Jedis jedis = null;
        try {
            jedis = getJedis();
            return jedis.get(key);
        } finally {
            jedis.close();
        }
    }

    public void expire(String key, int expTime) {
        Jedis jedis = null;
        try {
            jedis = getJedis();
            jedis.expire(key, expTime);
        } finally {
            jedis.close();
        }
    }

    public boolean setnx(String lockKey, String expires) {
        Jedis jedis = null;
        try {
            jedis = getJedis();
            Long result = jedis.setnx(lockKey, expires);
            return result == 1;
        } finally {
            jedis.close();
        }
    }
}

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.interceptor.KeyGenerator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import redis.clients.jedis.JedisPool;

import java.lang.reflect.Method;

/*
 * @Description 服务于shedlock-redis
 * @Author muruan.lt
 * @Date 2019/9/26 10:19
 */
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {

    @Value("${redis.host}")
    private String host;
    @Value("${redis.port}")
    private Integer port;

    @Bean
    @Override
    public KeyGenerator keyGenerator() {
        return new KeyGenerator() {
            @Override
            public Object generate(Object target, Method method, Object... params) {
                StringBuilder sb = new StringBuilder();
                sb.append(target.getClass().getName());
                sb.append(method.getName());
                for (Object o : params) {
                    if (o != null) {
                        sb.append(o.toString());
                    }
                }
                return sb.toString();
            }
        };
    }

    @Bean
    public RedisTemplate myRedisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate srt = new StringRedisTemplate(factory);
        Jackson2JsonRedisSerializer j2j = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        j2j.setObjectMapper(om);
        srt.setValueSerializer(j2j);
        srt.afterPropertiesSet();
        return srt;
    }

    @Bean
    public JedisPool jedisPool() {
        return new JedisPool(this.host, this.port);
    }
}

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/*
 * @Description springWebsoket配置文件
 * @Author muruan.lt
 * @Date 2019/9/24 09:23
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").setAllowedOrigins("*").withSockJS();
    }
}

import com.alibaba.fastjson.JSONObject;
import com.iris.websocket.annotation.CacheLock;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.core.SchedulerLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Date;

/*
 * @Description 定时任务测试类
 * @Author muruan.lt
 * @Date 2019/9/2 14:27
 */
@Component
@Slf4j
public class TaskTest {

    @Autowired
    private SimpMessageSendingOperations messageTemplate;
    
    @Scheduled(cron = "0/10 * * * * ?")
    @CacheLock(lockedPrefix = "test",expireTime=9)
    public void test1(){
        System.out.println(new Date() + "hello1!");
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("code", 200);
        jsonObject.put("content", "hello1!");
        String resStr = jsonObject.toJSONString();
        log.info("send message to browser,message content [{}]", resStr);
        messageTemplate.convertAndSend("/topic/greetings", resStr);
    }

    @Scheduled(cron = "0/10 * * * * ?")
    @CacheLock(lockedPrefix = "test",expireTime=9)
    public void test2(){
        System.out.println(new Date() + "hello2!");
    }

    /**
     * 执行定时任务
     **/
    @Scheduled(cron = "0/10 * * * * ?")
    @CacheLock(lockedPrefix = "TimeTaskService",expireTime=9)
    public void executeTask() {
        System.out.println(new Date() +"hello3!");
    }

    /**
     * 执行定时任务
     **/
    @Scheduled(cron = "0/10 * * * * ?")
    @CacheLock(lockedPrefix = "TimeTaskService",expireTime=9)
    public void executeTask1() {
        System.out.println(new Date() +"hello4!");
    }
}

@Autowired
private SimpMessageSendingOperations messageTemplate;

@CacheLock(lockedPrefix = "TimeTaskService",expireTime=9)

Leave a Comment

Your email address will not be published.