Java监听链上智能合约事件
1、事件的监听在智能合约中定义的任何事件都将用Event方法在智能合约封装包中进行表示,该事件采用交易收据,并从中提取索引和非索引事件参数。监听机制是建立在日志基础之上,如果智能合约触发了一个事件,那么该事件将写入以太坊日志;如果外部应用订阅了这个事件,那么应用就可以在日志中出现该事件后及时监测到,进而获取日志信息。2、观察者模式和发布订阅模式区别观察者模式:所谓观察者模式,其实就是为了实现松耦合
1、事件的监听
在智能合约中定义的任何事件都将用Event方法在智能合约封装包中进行表示,该事件采用交易收据,并从中提取索引和非索引事件参数。
监听机制是建立在日志基础之上,如果智能合约触发了一个事件,那么该事件将写入以太坊日志;如果外部应用订阅了这个事件,那么应用就可以在日志中出现该事件后及时监测到,进而获取日志信息。
2、观察者模式和发布订阅模式区别
观察者模式:
所谓观察者模式,其实就是为了实现松耦合(loosely coupled)。
在观察者模式中,观察者需要直接订阅目标事件;在目标发出内容改变的事件后,直接接收事件并作出响应。
演示示例:
解释一下几个名词的含义:
Observable (可观察者,即被观察者)
Observer (观察者)
subscribe (订阅)事件。
Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
导入依赖
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.21</version>
</dependency>
//被观察者
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
});
//观察者
Observer<String> observer = new Observer<String>() {
//对监听到的内容做出逻辑解释
@Override
public void onNext(String value) {
Log.info("onNext : " + value);
}
//逻辑异常,会调用此方法(和onComplete()二者互斥)
@Override
public void onError(Throwable e) {
Log.info("onError");
}
//逻辑正常执行结束,会调用此方法
@Override
public void onComplete() {
Log.info("onComplete" );
}
};
//3,建立订阅关系
observable.subscribe(observer);
发布订阅模式:
发布订阅模式属于广义上的观察者模式。
发布订阅模式是最常用的一种观察者模式的实现,并且从解耦和重用角度来看,更优于典型的观察者模式。
示例可参考redis的发布订阅模式,有兴趣的可以查找一下,这里不做详细的解释了。
3、通过web3j实现观察者模式
web3j的响应式函数可以使观察者通过事件去通知消息订阅者。
通过web3j对链服务建立连接,根据具体的事件去监听链上事件,实现跨服务订阅的一个效果。
在Ethereum以太坊中支持 三种web3j过滤器:
1、块滤波器(Block filters)
2、未决交易过滤器(Pending transaction filters)
3、主题过滤器(Topic filters)
块过滤器和未决交易过滤器提供了在网络上创建新交易或块的通知。
主题过滤器更灵活。允许根据提供的特定标准创建过滤器。
主题过滤器做出的示例demo:
Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的。
//添加要监听事件的主题,和合约地址
EthFilter filter = new EthFilter(new DefaultBlockParameterNumber(startBlock),DefaultBlockParameterName.LATEST, "合约地址");
//与链建立连接
Web3j web3j = Web3j.build(new HttpService("http://127.0.0.1:6792"));
web3j.ethLogObservable(filter).subscribe(new Subscriber<Log>() {
//在监听逻辑执行之前,先执行此方法
@Override
public void onStart() {
Log.info("onStart");
}
@Override
public void onNext(Log l) {
//处理逻辑
Log.info("onNext : " + l);
}
@Override
public void onError(Throwable e) {
Log.info("onError");
}
@Override
public void onComplete() {
Log.info("onComplete" );
}
});
块和交易过滤器:
可以根据具体的需求选择合适的过滤器。
/**
* 接收所有新块把它们添加到区块链(false参数指定我们只需要块)
*/
Subscription subscription = web3j.blockObservable(false).subscribe(block -> {
...
});
/**
* 接收所有新交易,把它们添加到块链
*/
Subscription subscription = web3j.transactionObservable().subscribe(tx -> {
...
});
/**
* 接收所有待提交交易并提交到网络(即在它们被分组在一起之前)
*/
Subscription subscription = web3j.pendingTransactionObservable().subscribe(tx -> {
...
});
/**
* 不再需要的时候取消订阅unsubscribe
*/
subscription.unsubscribe();
再现过滤器:
/**
* 从区块链再现一系列块
* startBlockNumber:开始的块高
* endBlockNumber:结束的块高
* fullTxObjects:如果为true,则提供嵌入在块中的交易,否则提供交易散列
*/
Subscription subscription = web3j.replayBlocksObservable(
startBlockNumber, endBlockNumber, fullTxObjects)
.subscribe(block -> {
...
});
/**
* 再现包含在一个块范围内的单个交易:
*/
Subscription subscription = web3j.replayTransactionsObservable(
startBlockNumber, endBlockNumber)
.subscribe(tx -> {
...
});
/**
* 重置所有的区块到最新的位置,那么当有新建区块的时候会通知你
*/
Subscription subscription = web3j.catchUpToLatestAndSubscribeToNewBlocksObservable(
startBlockNumber, fullTxObjects)
.subscribe(block -> {
...
});
/**
* 重置所有的交易到最新的位置,那么当有新建交易的时候会通知你
*/
Subscription subscription = web3j.catchUpToLatestAndSubscribeToNewTransactionsObservable(
startBlockNumber)
.subscribe(tx -> {
...
});
参考:https://www.jianshu.com/p/6954cd0defec
参考:https://www.wenjiangs.com/doc/hcjb47yt
参考:https://gank.io/post/560e15be2dca930e00da1083
更多推荐
所有评论(0)