class ServerKafka extends Server implements CustomTransportStrategy {
constructor(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })
transportId: Transport.KAFKA
protected logger: Logger
protected client: Kafka
protected consumer: Consumer
protected producer: Producer
protected parser: KafkaParser
protected brokers: string[] | BrokersFunction
protected clientId: string
protected groupId: string
protected options: KafkaOptions['options']
listen(callback: (err?: unknown, ...optionalParams: unknown[]) => void): Promise<void>
close(): Promise<void>
start(callback: () => void): Promise<void>
createClient<T = any>(): T
bindEvents(consumer: Consumer)
getMessageHandler()
getPublisher(replyTopic: string, replyPartition: string, correlationId: string): (data: any) => Promise<RecordMetadata[]>
handleMessage(payload: EachMessagePayload)
sendMessage(message: OutgoingResponse, replyTopic: string, replyPartition: string, correlationId: string): Promise<RecordMetadata[]>
assignIsDisposedHeader(outgoingResponse: OutgoingResponse, outgoingMessage: Message)
assignErrorHeader(outgoingResponse: OutgoingResponse, outgoingMessage: Message)
assignCorrelationIdHeader(correlationId: string, outgoingMessage: Message)
assignReplyPartition(replyPartition: string, outgoingMessage: Message)
handleEvent(pattern: string, packet: ReadPacket<any>, context: KafkaContext): Promise<any>
protected initializeSerializer(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })
protected initializeDeserializer(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })
protected messageHandlers: Map<string, MessageHandler>
protected logger: LoggerService
protected serializer: ConsumerSerializer
protected deserializer: ConsumerDeserializer
addHandler(pattern: any, callback: MessageHandler<any, any, any>, isEventHandler: boolean = false, extras: Record<string, any> = {})
getHandlers(): Map<string, MessageHandler>
getHandlerByPattern(pattern: string): MessageHandler | null
send(stream$: Observable<any>, respond: (data: WritePacket<any>) => unknown): Subscription
handleEvent(pattern: string, packet: ReadPacket<any>, context: BaseRpcContext<unknown[]>): Promise<any>
transformToObservable(resultOrDeferred: any)
getOptionsProp<T extends MicroserviceOptions['options'], K extends keyof T>(obj: T, prop: K, defaultValue: T[K] = undefined)
protected handleError(error: string)
protected loadPackage<T = any>(name: string, ctx: string, loader?: Function): T
protected initializeSerializer(options: { url?: string; maxSendMessageLength?: number; maxReceiveMessageLength?: number; maxMetadataSize?: number; keepalive?: { keepaliveTimeMs?: number; keepaliveTimeoutMs?: number; keepalivePermitWithoutCalls?: number; http2MaxPingsWithoutData?: number; http2MinTimeBetweenPingsMs?: number; http2MinPingIntervalWithoutData...)
protected initializeDeserializer(options: { url?: string; maxSendMessageLength?: number; maxReceiveMessageLength?: number; maxMetadataSize?: number; keepalive?: { keepaliveTimeMs?: number; keepaliveTimeoutMs?: number; keepalivePermitWithoutCalls?: number; http2MaxPingsWithoutData?: number; http2MinTimeBetweenPingsMs?: number; http2MinPingIntervalWithoutData...)
protected getRouteFromPattern(pattern: string): string
protected normalizePattern(pattern: MsPattern): string
}
Constructor
constructor(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })
Parameters
Option |
Type |
Description |
options
|
object |
|
|
Properties
Property |
Description |
transportId: Transport.KAFKA
|
Read-only.
|
protected logger: Logger
|
|
protected client: Kafka
|
|
protected consumer: Consumer
|
|
protected producer: Producer
|
|
protected parser: KafkaParser
|
|
protected brokers: string[] | BrokersFunction
|
|
protected clientId: string
|
|
protected groupId: string
|
|
protected options: KafkaOptions['options']
|
Read-only.
Declared in constructor.
|
Methods
listen()
|
listen(callback: (err?: unknown, ...optionalParams: unknown[]) => void): Promise<void>
Parameters
Option |
Type |
Description |
callback
|
(err?: unknown, ...optionalParams: unknown[]) => void |
|
Returns
Promise<void>
|
close()
|
close(): Promise<void>
Parameters
There are no parameters.
Returns
Promise<void>
|
start()
|
start(callback: () => void): Promise<void>
Parameters
Option |
Type |
Description |
callback
|
() => void |
|
Returns
Promise<void>
|
createClient()
|
createClient<T = any>(): T
Parameters
There are no parameters.
Returns
T
|
bindEvents()
|
bindEvents(consumer: Consumer)
Parameters
Option |
Type |
Description |
consumer
|
Consumer |
|
|
getMessageHandler()
|
getMessageHandler()
Parameters
There are no parameters.
|
getPublisher()
|
getPublisher(replyTopic: string, replyPartition: string, correlationId: string): (data: any) => Promise<RecordMetadata[]>
Parameters
Option |
Type |
Description |
replyTopic
|
string |
|
replyPartition
|
string |
|
correlationId
|
string |
|
Returns
(data: any) => Promise<RecordMetadata[]>
|
handleMessage()
|
handleMessage(payload: EachMessagePayload)
Parameters
Option |
Type |
Description |
payload
|
EachMessagePayload |
|
|
sendMessage()
|
sendMessage(message: OutgoingResponse, replyTopic: string, replyPartition: string, correlationId: string): Promise<RecordMetadata[]>
Parameters
Option |
Type |
Description |
message
|
OutgoingResponse |
|
replyTopic
|
string |
|
replyPartition
|
string |
|
correlationId
|
string |
|
Returns
Promise<RecordMetadata[]>
|
|
assignIsDisposedHeader(outgoingResponse: OutgoingResponse, outgoingMessage: Message)
Parameters
Option |
Type |
Description |
outgoingResponse
|
OutgoingResponse |
|
outgoingMessage
|
Message |
|
|
|
assignErrorHeader(outgoingResponse: OutgoingResponse, outgoingMessage: Message)
Parameters
Option |
Type |
Description |
outgoingResponse
|
OutgoingResponse |
|
outgoingMessage
|
Message |
|
|
|
assignCorrelationIdHeader(correlationId: string, outgoingMessage: Message)
Parameters
Option |
Type |
Description |
correlationId
|
string |
|
outgoingMessage
|
Message |
|
|
assignReplyPartition()
|
assignReplyPartition(replyPartition: string, outgoingMessage: Message)
Parameters
Option |
Type |
Description |
replyPartition
|
string |
|
outgoingMessage
|
Message |
|
|
handleEvent()
|
handleEvent(pattern: string, packet: ReadPacket<any>, context: KafkaContext): Promise<any>
Parameters
Returns
Promise<any>
|
initializeSerializer()
|
protected initializeSerializer(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })
Parameters
Option |
Type |
Description |
options
|
object |
|
|
initializeDeserializer()
|
protected initializeDeserializer(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })
Parameters
Option |
Type |
Description |
options
|
object |
|
|