akka如何和Spring集成起来

一.概述

最近又重新学习了一下akka,并且在一个web应用中成功使用了akka提升了系统的性能。我们在web应用中一般都会有dao&&service,这些都是spring的bean,我们定义一个actor,这个actor中可能调用dao读写数据库,也可能调用service做业务逻辑,因此我们就希望actor也是从spring容器中获取到的,这样就可以在actor中注入service&&dao了。

二.如何从Spring容器中获取actor

1.用于从Spring容器中获取actor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class SpringActorProducer implements IndirectActorProducer {

    final ApplicationContext applicationContext;
    final String             actorBeanName;

    public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName){
        this.applicationContext = applicationContext;
        this.actorBeanName = actorBeanName;
    }

    @Override
    public Actor produce() {
        return (Actor) applicationContext.getBean(actorBeanName);
    }

    @SuppressWarnings("unchecked")
    @Override
    public Class<? extends Actor> actorClass() {
        return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
    }
}

2.扩展actor的创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class SpringExtension extends AbstractExtensionId<SpringExtension.SpringExt> {

    /**
     * The identifier used to access the SpringExtension.
     */
    public static SpringExtension SpringExtProvider = new SpringExtension();

    /**
     * Is used by Akka to instantiate the Extension identified by this ExtensionId, internal use only.
     */
    @Override
    public SpringExt createExtension(ExtendedActorSystem system) {
        return new SpringExt();
    }

    /**
     * The Extension implementation.
     */
    public static class SpringExt implements Extension {

        private volatile ApplicationContext applicationContext;

        /**
         * Used to initialize the Spring application context for the extension.
         * 
         * @param applicationContext
         */
        public void initialize(ApplicationContext applicationContext) {
            this.applicationContext = applicationContext;
        }

        /**
         * Create a Props for the specified actorBeanName using the SpringActorProducer class.
         * 
         * @param actorBeanName The name of the actor bean to create Props for
         * @return a Props that will create the named actor bean using Spring
         */
        public Props props(String actorBeanName) {
            return Props.create(SpringActorProducer.class, applicationContext, actorBeanName);
        }
    }
}

3.示例actor以及actor依赖的service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class CountingService {
  /**
   * Increment the given number by one.
   */
  public int increment(int count) {
    return count + 1;
  }
}
class CountingActor extends UntypedActor {

    public static class Count {}
    public static class Get {}

    // the service that will be automatically injected
    private CountingService countingService;

    private int count = 0;

    @Override
    public void onReceive(Object message) throws Exception {
        System.out.println(countingService);
        if (message instanceof Count) {
            count = countingService.increment(count);
        } else if (message instanceof Get) {
            getSender().tell(count, getSelf());
        } else {
            unhandled(message);
        }
    }

    public void setCountingService(CountingService countingService) {
        this.countingService = countingService;
    }
    
    public CountingActor() {
        System.out.println("CountingActor is Creating...");
    }
}

CountingActor依赖了CountingService,CountingService到时候会被注入到CountingActor中。

4.配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans-2.0.xsd"
	default-autowire="byName">

	<bean id="countingService" class="com.bolin.young.akka.spring.CountingService" />
	<bean id="countingActor" class="com.bolin.young.akka.spring.CountingActor"  scope="prototype" />

	<bean id="myActorSystem" class="akka.actor.ActorSystem"
		factory-method="create" destroy-method="shutdown" scope="singleton">
		<constructor-arg value="mySpringAkkaSystem" />
	</bean>
	
</beans>

5.如何使用

1
2
3
4
5
6
7
8
9
10
11
// create a spring context and scan the classes
    ApplicationContext ctx = new ClassPathXmlApplicationContext("beans.xml");

    ActorSystem myActorSystem = (ActorSystem)ctx.getBean("myActorSystem");
    
    System.out.println(myActorSystem);
    
    SpringExtProvider.get(myActorSystem).initialize(ctx);
    
    final ActorRef myActor = myActorSystem.actorOf(
                   SpringExtProvider.get(myActorSystem).props("countingActor"), "countingActor");

三.最后总结

1.一般actorA发消息给actorB,actorB返回消息给actorA。我们可以自己在代码中发消息给actorA,然后坐等actorA的返回,如下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
BootstrapAnalyzeMsg bootstrapAnalyzeMsg = new BootstrapAnalyzeMsg();
bootstrapAnalyzeMsg.setFileItem(treeFile);

Timeout timeout = new Timeout(Duration.create(TIME_OUT, "seconds"));
Future<Object> future = Patterns.ask(bootstrapAnalyzeActor, bootstrapAnalyzeMsg, timeout);

try {
     // 当前线程会阻塞,直到有当前的Actor有消息过来。
     ProjectAnalyzeResultMsg projectAnalyzeResultMsg = (ProjectAnalyzeResultMsg) Await.result(future,                                            timeout.duration());
     return projectAnalyzeResultMsg.getJarConflictInfoList();
} catch (Exception e) {
     e.printStackTrace();
} finally {
     bootstrapAnalyzeActor.tell(new ActorStopMsg(), null);
}

该思想类似于java的Future。
2.actor也可以终止自己,没用的actor尽量提前终止。

1
getContext().stop(getSelf());

3.actor在系统中的名字必须唯一。

4.自己动手写了一个完整的小例子 https://github.com/yangbolin/akka-demo