나이가 들수록 기억력이 감퇴하여 이글을 쓰는 목적은 spring integration tcp를 정리하기 위함이다.
시간날때마다 프로젝트에서 경험했던 문제점들과 오류들 및 삽질들을 시간날때마다 하나씩 채워나가보겠다.
그래서 오타도 있을테고 잘 못 설명한 부분도 있을테니 궁금한점이 있으면 언제든 댓글 환영한다.
우선적으로 아래의 스프링 공식 문서 reference를 참고하기 바란다.
https://docs.spring.io/spring-integration/reference/html/ip.html
공식문서에서 tcp overview를 보면 아래와 같은 설명이 나온다.
For TCP, the configuration of the underlying connection is provided by using a connection factory. Two types of connection factory are provided: a client connection factory and a server connection factory. Client connection factories establish outgoing connections. Server connection factories listen for incoming connections.
서툰영어이지만 해석해보면 TCP에서, 연결을 위한 configuration으로 connection factory를 제공하고 있는데 두가지 방법이 있고, 클라이언트 컨넥션 팩토리와 서버 컨넥션 팩토리가 있다.
클라이언트 컨넥션 팩토리는 outgoing connection을 연결설정하고, 서버 컨텍션 팩토리는 incoming connection을 수신 대기한다.
소스코드를 보며 확인해보겠다.
필자는 인텔리제이를 이용해여 코드를 짜보도록 하겠고, build 환경은 spring boot 2.7.5와 gradle을 사용하도록 하겠다.
일단 필요한 라이브러리를 가져오기위해 build.gradle에 dependency를 추가한다.
필자의 프로젝트에는 여러가지 의존성이 추가되어 있지만, spring integration tcp를 사용하기 위해서는
을 dependency에 추가해준다. 뒤에 버젼 정보를 명시하여 해당버젼을 설정할 수 있는데 필자는 최신버젼을 명시했다.
intellij의 grdle 에서 Reload All Gradle Projects 버튼을 눌러 라이브러리를 가져온다.
인텔리제이의 왼쪽패널의 External Libraries를 보면 아래와 같이 org.springframework.integration:spring-integration-ip:6.0.2 라이브러리가 다운로드되어져 있는것을 볼 수 있다.
일단은 Annotation에 기반(Annotation-Based Configuration)한 코딩을하고 JAVA DSL 방식은 추후에 추가하도록 하겠다.
spring integration tcp client부터 작성해보도록 하자.
먼저 spring integrattion tcp client 설정을 위해 클래스를 하나 만들어보도록 하겠다. 필자는 IntegrationTcpClientConfig로 클래스를 만들었다.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.ip.tcp.TcpOutboundGateway;
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory;
import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@Configuration
public class IntegrationTcpClientConfig implements ApplicationEventPublisherAware {
private static final Logger log = LoggerFactory.getLogger(IntegrationTcpClientConfig.class);
@Value("${integration.tcp.server.host}")
private String host;
@Value("${integration.tcp.server.port}")
private int port;
@Value(value = "${integration.tcp.client.connection.timeout}")
private int connectionTimeout;
@Value("${integration.tcp.client.connection.poolSize}")
private int connectionPoolSize;
private ApplicationEventPublisher applicationEventPublisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
@Bean
public AbstractClientConnectionFactory integrationClientConnectionFactory() {
TcpNioClientConnectionFactory tcpNioClientConnectionFactory = new TcpNioClientConnectionFactory(host, 27021);
tcpNioClientConnectionFactory.setUsingDirectBuffers(true);
tcpNioClientConnectionFactory.setSingleUse(true);
tcpNioClientConnectionFactory.setSerializer(byteArrayCrLfSerializer());
tcpNioClientConnectionFactory.setDeserializer(byteArrayCrLfSerializer());
tcpNioClientConnectionFactory.setApplicationEventPublisher(applicationEventPublisher);
return new CachingClientConnectionFactory(tcpNioClientConnectionFactory, connectionPoolSize);
}
@Bean
public MessageChannel outboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler outboundGateway(AbstractClientConnectionFactory integrationClientConnectionFactory) {
TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
tcpOutboundGateway.setConnectionFactory(integrationClientConnectionFactory);
tcpOutboundGateway.setRemoteTimeout(connectionTimeout);
return tcpOutboundGateway;
}
public ByteArrayCrLfSerializer byteArrayCrLfSerializer() {
ByteArrayCrLfSerializer crLfSerializer = new ByteArrayCrLfSerializer();
crLfSerializer.setMaxMessageSize(409600000);
return crLfSerializer;
}
}
위의 코드는 tcp client connection factory 빈을 생성하고, MessageChannel 역활을 하는 outboundChannel Bean과 그 메시지채널의 게이트웨이 역활을 하는 ServiceActivator를 생성했다.
그리고 tcp client connection factory 빈을 생성할때 메시지의 직렬화/역직렬화를 위해서 ByteArrayCrlfSerializer를 사용하였고, 물론 각자 서버에서 outbound되는 byte에 따라 다른 직렬화/역직렬화 클래스를 참조할 수도 있다. 그건 스프링 공식문서를 참고하도록 하자.
마지막으로 MessagingGateway 역활을 하는 interface를 @Component로 만들었다.
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.stereotype.Component;
@Component
@MessagingGateway(defaultRequestChannel = "outboundChannel")
public interface IntegrationTcpClientGateway {
byte[] send(byte[] message);
}
spring integration 프레임워크에서 메시지 송수신을 하기위한 인터페이스의 하나로 @MessageGateway 어노테이션을 사용한다.
@MessageGateway 어노테이션과 함께 tcp client에서 서버로부터 메시지를 수신하기 위해 defaultRequestChannel로 Bean으로 생성했던 outboundChannel을 설정하였다.
빌드 후 실행을 해보자.
필자는 로컬호스트로 호스트를 설정하였다. netstate로 설정한 27021 포트가 정상적으로 생성되었는지 확인해보자.
그럼 tcp 클라이언트 설정은 끝이 난것이다.
이제 tcp integration server를 설정해 보도록 하자.
@Bean
public AbstractServerConnectionFactory serverConnectionFactory() {
TcpNioServerConnectionFactory serverConnectionFactory = new TcpNioServerConnectionFactory(27005);
serverConnectionFactory.setSerializer(byteArrayStxEtxSerializer());
serverConnectionFactory.setDeserializer(byteArrayStxEtxSerializer());
serverConnectionFactory.setSingleUse(true);
serverConnectionFactory.setUsingDirectBuffers(true);
return serverConnectionFactory;
}
Bean으로 AbstractServerConnectionFactory 추상메서드 타입으로 TcpNioServerConnectionFactory instance를 만들어주고 직렬화/역직렬화 하였다.
@Bean
public MessageChannel inboundChannel() {
return new DirectChannel();
}
메시지채널 타입으로 inbound 채널을 Bean으로 생성했다.
@Bean
public TcpInboundGateway inboundGateway(AbstractServerConnectionFactory serverConnectionFactory,
MessageChannel inboundChannel) {
TcpInboundGateway tcpInboundGateway = new TcpInboundGateway();
tcpInboundGateway.setConnectionFactory(serverConnectionFactory);
tcpInboundGateway.setRequestChannel(inboundChannel);
return tcpInboundGateway;
}
인바운드 게이트웨이를 Bean으로 등록하자.
public ByteArrayStxEtxSerializer byteArrayStxEtxSerializer() {
// ByteArrayCrLfSerializer crLfSerializer = new ByteArrayCrLfSerializer();
ByteArrayStxEtxSerializer stxEtxSerializer = new ByteArrayStxEtxSerializer();
// crLfSerializer.setMaxMessageSize(409600000);
// return crLfSerializer;
stxEtxSerializer.setMaxMessageSize(409600000);
return stxEtxSerializer;
}
ByteArrayStxExtSerializer 타입으로 직렬화/역직렬화 함수를 만들었다. 내부에서 ByteArrayStxExtSerializer 인스턴스를 만든다.
@MessageEndpoint
위의 어노테이션으로 endpoint 클래스를 만들어주자.
@ServiceActivator(inputChannel = "inboundChannel", async = "true")
public byte[] process(byte[] message) throws ParserConfigurationException, IOException, SAXException {
return tcpServerMessageService.processMessage(message);
}
@ServiceActivator 어노테이션으로 inputChannel을 설정해주었다.
그럼 실행시켜서 tcp oupbound 포트가 제대로 떠 있는지 확인해보자.
여기까지...
궁금한 사항이 있으면 댓글달아주시면 최대한 성의껏 답변 드리겠습니다.
그럼 즐코딩하세요.^^
First Spring Boot Application (0) | 2019.08.19 |
---|---|
spring boot (intelliJ) + mybatis + mysql 연동 (1) (0) | 2019.08.11 |
spring boot swagger 사용 샘플 코드 (0) | 2019.08.05 |
IntelliJ IDEA 에서 GitHub로 소스 올리는 방법 (0) | 2019.08.01 |
IntelliJ IDEA 에서 spring boot 프로젝트 생성 (0) | 2019.07.24 |
댓글 영역