ServerKafka

  
class ServerKafka extends Server implements CustomTransportStrategy {
  constructor(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })
  transportId: Transport.KAFKA
  protected logger: Logger
  protected client: Kafka
  protected consumer: Consumer
  protected producer: Producer
  protected parser: KafkaParser
  protected brokers: string[] | BrokersFunction
  protected clientId: string
  protected groupId: string
  protected options: KafkaOptions['options']
  listen(callback: (err?: unknown, ...optionalParams: unknown[]) => void): Promise<void>
  close(): Promise<void>
  start(callback: () => void): Promise<void>
  createClient<T = any>(): T
  bindEvents(consumer: Consumer)
  getMessageHandler()
  getPublisher(replyTopic: string, replyPartition: string, correlationId: string): (data: any) => Promise<RecordMetadata[]>
  handleMessage(payload: EachMessagePayload)
  sendMessage(message: OutgoingResponse, replyTopic: string, replyPartition: string, correlationId: string): Promise<RecordMetadata[]>
  assignIsDisposedHeader(outgoingResponse: OutgoingResponse, outgoingMessage: Message)
  assignErrorHeader(outgoingResponse: OutgoingResponse, outgoingMessage: Message)
  assignCorrelationIdHeader(correlationId: string, outgoingMessage: Message)
  assignReplyPartition(replyPartition: string, outgoingMessage: Message)
  handleEvent(pattern: string, packet: ReadPacket<any>, context: KafkaContext): Promise<any>
  protected initializeSerializer(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })
  protected initializeDeserializer(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })

  // inherited from nest/packages/microservices/Server
  protected messageHandlers: Map<string, MessageHandler>
  protected logger: LoggerService
  protected serializer: ConsumerSerializer
  protected deserializer: ConsumerDeserializer
  addHandler(pattern: any, callback: MessageHandler<any, any, any>, isEventHandler: boolean = false, extras: Record<string, any> = {})
  getHandlers(): Map<string, MessageHandler>
  getHandlerByPattern(pattern: string): MessageHandler | null
  send(stream$: Observable<any>, respond: (data: WritePacket<any>) => unknown): Subscription
  handleEvent(pattern: string, packet: ReadPacket<any>, context: BaseRpcContext<unknown[]>): Promise<any>
  transformToObservable(resultOrDeferred: any)
  getOptionsProp<T extends MicroserviceOptions['options'], K extends keyof T>(obj: T, prop: K, defaultValue: T[K] = undefined)
  protected handleError(error: string)
  protected loadPackage<T = any>(name: string, ctx: string, loader?: Function): T
  protected initializeSerializer(options: { url?: string; maxSendMessageLength?: number; maxReceiveMessageLength?: number; maxMetadataSize?: number; keepalive?: { keepaliveTimeMs?: number; keepaliveTimeoutMs?: number; keepalivePermitWithoutCalls?: number; http2MaxPingsWithoutData?: number; http2MinTimeBetweenPingsMs?: number; http2MinPingIntervalWithoutData...)
  protected initializeDeserializer(options: { url?: string; maxSendMessageLength?: number; maxReceiveMessageLength?: number; maxMetadataSize?: number; keepalive?: { keepaliveTimeMs?: number; keepaliveTimeoutMs?: number; keepalivePermitWithoutCalls?: number; http2MaxPingsWithoutData?: number; http2MinTimeBetweenPingsMs?: number; http2MinPingIntervalWithoutData...)
  protected getRouteFromPattern(pattern: string): string
  protected normalizePattern(pattern: MsPattern): string
}

Constructor


constructor(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })

Parameters

Option Type Description
options object

Properties

Property Description
transportId: Transport.KAFKA Read-only.
protected logger: Logger
protected client: Kafka
protected consumer: Consumer
protected producer: Producer
protected parser: KafkaParser
protected brokers: string[] | BrokersFunction
protected clientId: string
protected groupId: string
protected options: KafkaOptions['options'] Read-only. Declared in constructor.

Methods

listen()


listen(callback: (err?: unknown, ...optionalParams: unknown[]) => void): Promise<void>

Parameters

Option Type Description
callback (err?: unknown, ...optionalParams: unknown[]) => void

Returns

Promise<void>

close()


close(): Promise<void>

Parameters

There are no parameters.

Returns

Promise<void>

start()


start(callback: () => void): Promise<void>

Parameters

Option Type Description
callback () => void

Returns

Promise<void>

createClient()


createClient<T = any>(): T

Parameters

There are no parameters.

Returns

T

bindEvents()


bindEvents(consumer: Consumer)

Parameters

Option Type Description
consumer Consumer

getMessageHandler()


getMessageHandler()

Parameters

There are no parameters.

getPublisher()


getPublisher(replyTopic: string, replyPartition: string, correlationId: string): (data: any) => Promise<RecordMetadata[]>

Parameters

Option Type Description
replyTopic string
replyPartition string
correlationId string

Returns

(data: any) => Promise<RecordMetadata[]>

handleMessage()


handleMessage(payload: EachMessagePayload)

Parameters

Option Type Description
payload EachMessagePayload

sendMessage()


sendMessage(message: OutgoingResponse, replyTopic: string, replyPartition: string, correlationId: string): Promise<RecordMetadata[]>

Parameters

Option Type Description
message OutgoingResponse
replyTopic string
replyPartition string
correlationId string

Returns

Promise<RecordMetadata[]>

assignIsDisposedHeader()


assignIsDisposedHeader(outgoingResponse: OutgoingResponse, outgoingMessage: Message)

Parameters

Option Type Description
outgoingResponse OutgoingResponse
outgoingMessage Message

assignErrorHeader()


assignErrorHeader(outgoingResponse: OutgoingResponse, outgoingMessage: Message)

Parameters

Option Type Description
outgoingResponse OutgoingResponse
outgoingMessage Message

assignCorrelationIdHeader()


assignCorrelationIdHeader(correlationId: string, outgoingMessage: Message)

Parameters

Option Type Description
correlationId string
outgoingMessage Message

assignReplyPartition()


assignReplyPartition(replyPartition: string, outgoingMessage: Message)

Parameters

Option Type Description
replyPartition string
outgoingMessage Message

handleEvent()


handleEvent(pattern: string, packet: ReadPacket<any>, context: KafkaContext): Promise<any>

Parameters

Option Type Description
pattern string
packet ReadPacket
context KafkaContext

Returns

Promise<any>

initializeSerializer()


protected initializeSerializer(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })

Parameters

Option Type Description
options object

initializeDeserializer()


protected initializeDeserializer(options: { postfixId?: string; client?: KafkaConfig; consumer?: ConsumerConfig; run?: Omit<ConsumerRunConfig, "eachBatch" | "eachMessage">; ... 6 more ...; producerOnlyMode?: boolean; })

Parameters

Option Type Description
options object