Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5897,7 +5897,7 @@ func GetMsgByStationIdAndMsgSeq(stationId, messageSeq, partitionNumber int) (boo
return true, message[0], nil
}

func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName string, poisonedCgs []string, messageDetails models.MessagePayload, tenantName string, partitionNumber int) (int, bool, error) {
func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName string, poisonedCgs []string, messageDetails models.MessagePayload, tenantName string, partitionNumber int, validationError string) (int, bool, error) {
ctx, cancelfunc := context.WithTimeout(context.Background(), DbOperationTimeout*time.Second)
defer cancelfunc()
updated := false
Expand Down Expand Up @@ -5975,7 +5975,7 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin
if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}
rows, err := tx.Query(ctx, stmt.Name, stationId, messageSeq, producerName, poisonedCgs, messageDetails, updatedAt, "poison", "", tenantName, partitionNumber)
rows, err := tx.Query(ctx, stmt.Name, stationId, messageSeq, producerName, poisonedCgs, messageDetails, updatedAt, "poison", validationError, tenantName, partitionNumber)
if err != nil {
return 0, updated, err
}
Expand Down Expand Up @@ -6004,7 +6004,7 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin
}
}
} else { // then update
query = `UPDATE dls_messages SET poisoned_cgs = ARRAY_APPEND(poisoned_cgs, $1), updated_at = $4 WHERE station_id=$2 AND message_seq=$3 AND not($1 = ANY(poisoned_cgs)) AND tenant_name=$5 RETURNING id`
query = `UPDATE dls_messages SET poisoned_cgs = ARRAY_APPEND(poisoned_cgs, $1), updated_at = $4, validation_error = $6 WHERE station_id=$2 AND message_seq=$3 AND not($1 = ANY(poisoned_cgs)) AND tenant_name=$5 RETURNING id`
stmt, err := tx.Prepare(ctx, "update_poisoned_cgs", query)
if err != nil {
return 0, updated, err
Expand All @@ -6013,7 +6013,7 @@ func StorePoisonMsg(stationId, messageSeq int, cgName string, producerName strin
if tenantName != conf.GlobalAccount {
tenantName = strings.ToLower(tenantName)
}
rows, err = tx.Query(ctx, stmt.Name, poisonedCgs[0], stationId, messageSeq, updatedAt, tenantName)
rows, err = tx.Query(ctx, stmt.Name, poisonedCgs[0], stationId, messageSeq, updatedAt, tenantName, validationError)
if err != nil {
return 0, updated, err
}
Expand Down
8 changes: 8 additions & 0 deletions models/dead_letter_station.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ type SchemaVerseDlsMessageSdk struct {
PartitionNumber int `json:"partition_number"`
}

type NackedDlsMessageSdk struct {
StationName string `json:"station_name"`
Error string `json:"error"`
CgName string `json:"cg_name"`
Seq uint64 `json:"seq"`
Partition int `json:"partition"`
}

type FunctionsDlsMessage struct {
StationID int `json:"station_id"`
TenantName string `json:"tenant_name"`
Expand Down
93 changes: 93 additions & 0 deletions server/background_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ const NOTIFICATION_EVENTS_SUBJ = "$memphis_notifications"
const PM_RESEND_ACK_SUBJ = "$memphis_pm_acks"
const TIERED_STORAGE_CONSUMER = "$memphis_tiered_storage_consumer"
const DLS_UNACKED_CONSUMER = "$memphis_dls_unacked_consumer"
const NACKED_DLS_SUBJ = "$memphis_nacked_dls"
const NACKED_DLS_INNER_SUBJ = "$memphis_nacked_inner_dls"
const NACKED_DLS_CONSUMER = "$memphis_nacked_dls_consumer"
const SCHEMAVERSE_DLS_SUBJ = "$memphis_schemaverse_dls"
const SCHEMAVERSE_DLS_INNER_SUBJ = "$memphis_schemaverse_inner_dls"
const SCHEMAVERSE_DLS_CONSUMER = "$memphis_schemaverse_dls_consumer"
Expand Down Expand Up @@ -211,6 +214,19 @@ func (s *Server) ListenForSchemaverseDlsEvents() error {
return nil
}

func (s *Server) ListenForNackedDlsEvents() error {
err := s.queueSubscribe(s.MemphisGlobalAccountString(), NACKED_DLS_SUBJ, NACKED_DLS_SUBJ+"_group", func(_ *client, subject, reply string, msg []byte) {
go func(msg []byte) {
s.sendInternalAccountMsg(s.MemphisGlobalAccount(), NACKED_DLS_INNER_SUBJ, msg)
}(copyBytes(msg))
})
if err != nil {
return err
}

return nil
}

func (s *Server) ListenForPoisonMsgAcks() error {
err := s.queueSubscribe(s.MemphisGlobalAccountString(), PM_RESEND_ACK_SUBJ, PM_RESEND_ACK_SUBJ+"_group", func(_ *client, subject, reply string, msg []byte) {
go func(msg []byte) {
Expand Down Expand Up @@ -311,6 +327,11 @@ func (s *Server) StartBackgroundTasks() error {
return errors.New("Failed to subscribing for schemaverse dls" + err.Error())
}

err = s.ListenForNackedDlsEvents()
if err != nil {
return errors.New("Failed to subscribing for nacked dls" + err.Error())
}

err = s.ListenForPoisonMsgAcks()
if err != nil {
return errors.New("Failed subscribing for poison message acks: " + err.Error())
Expand All @@ -337,6 +358,7 @@ func (s *Server) StartBackgroundTasks() error {
}

go s.ConsumeSchemaverseDlsMessages()
go s.ConsumeNackedDlsMessages()
go s.ConsumeUnackedMsgs()
go s.ConsumeFunctionsDlsMessages()
go s.ConsumeTieredStorageMsgs()
Expand Down Expand Up @@ -607,6 +629,77 @@ func (s *Server) ConsumeSchemaverseDlsMessages() {
}
}

func (s *Server) ConsumeNackedDlsMessages() {
type nackedDlsMsg struct {
Msg []byte
ReplySubject string
}
amount := 1000
req := []byte(strconv.FormatUint(uint64(amount), 10))
for {
if DLS_NACKED_CONSUMER_CREATED && DLS_NACKED_STREAM_CREATED {
resp := make(chan nackedDlsMsg)
replySubj := NACKED_DLS_CONSUMER + "_reply_" + s.memphis.nuid.Next()

// subscribe to schemavers dls messages
sub, err := s.subscribeOnAcc(s.MemphisGlobalAccount(), replySubj, replySubj+"_sid", func(_ *client, subject, reply string, msg []byte) {
go func(subject, reply string, msg []byte) {
// Ignore 409 Exceeded MaxWaiting cases
if reply != _EMPTY_ {
message := nackedDlsMsg{
Msg: msg,
ReplySubject: reply,
}
resp <- message
}
}(subject, reply, copyBytes(msg))
})
if err != nil {
s.Errorf("Failed to subscribe to nacked dls messages: %v", err.Error())
continue
}

// send JS API request to get more messages
subject := fmt.Sprintf(JSApiRequestNextT, dlsNackedStream, NACKED_DLS_CONSUMER)
s.sendInternalAccountMsgWithReply(s.MemphisGlobalAccount(), subject, replySubj, nil, req, true)

s.Debugf("ConsumeNackedDlsMessages: sending fetch request")

timeout := time.NewTimer(5 * time.Second)
msgs := make([]nackedDlsMsg, 0)
stop := false
for {
if stop {
s.unsubscribeOnAcc(s.MemphisGlobalAccount(), sub)
break
}
select {
case nackedDlsMsg := <-resp:
msgs = append(msgs, nackedDlsMsg)
if len(msgs) == amount {
stop = true
s.Debugf("ConsumeNackedDlsMessages: finished appending %v messages", len(msgs))
}
case <-timeout.C:
stop = true
s.Debugf("ConsumeNackedDlsMessages: finished because of timer: %v messages", len(msgs))
}
}
for _, message := range msgs {
msg := message.Msg
err := s.handleNackedDlsMsg(msg)
if err == nil {
// send ack
s.sendInternalAccountMsgWithEcho(s.MemphisGlobalAccount(), message.ReplySubject, []byte(_EMPTY_))
}
}
} else {
s.Warnf("ConsumeNackedDlsMessages: waiting for consumer and stream to be created")
time.Sleep(2 * time.Second)
}
}
}

func (s *Server) RemoveOldDlsMsgs() {
ticker := time.NewTicker(2 * time.Minute)
for range ticker.C {
Expand Down
4 changes: 4 additions & 0 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,8 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
THROUGHPUT_LEGACY_STREAM_EXIST = true
case dlsSchemaverseStream:
DLS_SCHEMAVERSE_STREAM_CREATED = true
case dlsNackedStream:
DLS_NACKED_STREAM_CREATED = true
case integrationsAuditLogsStream:
INTEGRATIONS_AUDIT_LOGS_STREAM_CREATED = true
case notificationsStreamName:
Expand Down Expand Up @@ -1386,6 +1388,8 @@ func (a *Account) EnableJetStream(limits map[string]JetStreamAccountLimits) erro
DLS_UNACKED_CONSUMER_CREATED = true
case dlsSchemaverseStream:
DLS_SCHEMAVERSE_CONSUMER_CREATED = true
case dlsNackedStream:
DLS_NACKED_CONSUMER_CREATED = true
case notificationsStreamName:
NOTIFICATIONS_BUFFER_CONSUMER_CREATED = true
case systemTasksStreamName:
Expand Down
106 changes: 103 additions & 3 deletions server/memphis_handlers_dls_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (s *Server) handleNewUnackedMsg(msg []byte) error {
Headers: headersJson,
}

dlsMsgId, updated, err := db.StorePoisonMsg(station.ID, int(messageSeq), cgName, producedByHeader, poisonedCgs, messageDetails, station.TenantName, partitionNumber)
dlsMsgId, updated, err := db.StorePoisonMsg(station.ID, int(messageSeq), cgName, producedByHeader, poisonedCgs, messageDetails, station.TenantName, partitionNumber, "")
if err != nil {
serv.Errorf("[tenant: %v]handleNewUnackedMsg at StorePoisonMsg: Error while getting notified about a poison message: %v", station.TenantName, err.Error())
return err
Expand Down Expand Up @@ -168,13 +168,14 @@ func (s *Server) handleSchemaverseDlsMsg(msg []byte) error {
return err
}

exist, station, err := db.GetStationByName(message.StationName, tenantName)
stationName := StationNameFromStreamName(message.StationName)
exist, station, err := db.GetStationByName(stationName.Ext(), tenantName)
if err != nil {
serv.Errorf("[tenant: %v]handleSchemaverseDlsMsg: %v", tenantName, err.Error())
return err
}
if !exist {
serv.Warnf("[tenant: %v]handleSchemaverseDlsMsg: station %v couldn't been found", tenantName, message.StationName)
serv.Warnf("[tenant: %v]handleSchemaverseDlsMsg: station %v couldn't been found", tenantName, stationName.Ext())
return nil
}

Expand All @@ -198,6 +199,105 @@ func (s *Server) handleSchemaverseDlsMsg(msg []byte) error {
return nil
}

func (s *Server) handleNackedDlsMsg(msg []byte) error {
tenantName, stringMessage, err := s.getTenantNameAndMessage(msg)
if err != nil {
s.Errorf("handleNackedDlsMsg at getTenantNameAndMessage: %v", err.Error())
return err
}
var message models.NackedDlsMessageSdk
err = json.Unmarshal([]byte(stringMessage), &message)
if err != nil {
serv.Errorf("[tenant: %v]handleNackedDlsMsg: %v", tenantName, err.Error())
return err
}

if message.Partition == 0 {
serv.Errorf("[tenant: %v]handleNackedDlsMsg - missing partition number: %v", tenantName, err.Error())
return err
}

stationName := StationNameFromStreamName(message.StationName)
streamName := stationName.Intern() + "$" + strconv.Itoa(message.Partition)
exist, station, err := db.GetStationByName(stationName.Ext(), tenantName)
if err != nil {
serv.Errorf("[tenant: %v]handleNackedDlsMsg: %v", tenantName, err.Error())
return err
}
if !exist {
serv.Warnf("[tenant: %v]handleNackedDlsMsg: station %v couldn't been found", tenantName, stationName.Ext())
return nil
}
if !station.DlsConfigurationPoison {
return nil
}

poisonMessageContent, err := s.memphisGetMessage(tenantName, streamName, uint64(message.Seq))
if err != nil {
if IsNatsErr(err, JSNoMessageFoundErr) {
return nil
}
serv.Errorf("[tenant: %v]handleNackedDlsMsg at memphisGetMessage: station: %v, Error while getting notified about a poison message: %v", tenantName, stationName.Ext(), err.Error())
return err
}


timeSentTimeStamp := poisonMessageContent.Time
data := poisonMessageContent.Data
lenPayload := len(poisonMessageContent.Data) + len(poisonMessageContent.Header)
headers := poisonMessageContent.Header
var headersJson map[string]string
if headers != nil {
headersJson, err = DecodeHeader(headers)
if err != nil {
serv.Errorf("handleNackedDlsMsg: %v", err.Error())
return err
}
}

producedByHeader := _EMPTY_
poisonedCgs := []string{}
producedByHeader = headersJson["$memphis_producedBy"]
if producedByHeader == _EMPTY_ {
producedByHeader = "unknown"
}
poisonedCgs = append(poisonedCgs, message.CgName)

messageDetails := models.MessagePayload{
TimeSent: timeSentTimeStamp,
Size: lenPayload,
Data: hex.EncodeToString(data),
Headers: headersJson,
}

dlsMsgId, updated, err := db.StorePoisonMsg(station.ID, int(message.Seq), message.CgName, producedByHeader, poisonedCgs, messageDetails, tenantName, message.Partition, message.Error)
if err != nil {
serv.Errorf("[tenant: %v]handleNackedDlsMsg at StorePoisonMsg: Error while getting notified about a poison message: %v", station.TenantName, err.Error())
return err
}
if !updated {
err = s.sendToDlsStation(station, data, headersJson, "unacked", _EMPTY_)
if err != nil {
serv.Errorf("[tenant: %v]handleNackedDlsMsg at sendToDlsStation: station: %v, Error while getting notified about a poison message: %v", station.TenantName, station.DlsStation, err.Error())
return err
}
}

if dlsMsgId == 0 { // nothing to do
return nil
}

idForUrl := strconv.Itoa(dlsMsgId)
var msgUrl = s.opts.UiHost + "/stations/" + stationName.Ext() + "/" + idForUrl
err = s.SendNotification(station.TenantName, PoisonMessageTitle, "Poison message has been identified, for more details head to: "+msgUrl, PoisonMAlert)
if err != nil {
serv.Warnf("[tenant: %v]handleNackedDlsMsg at SendNotification: Error while sending a poison message notification: %v", station.TenantName, err.Error())
return nil
}

return nil
}

func (pmh PoisonMessagesHandler) GetDlsMsgsByStationLight(station models.Station, partitionNumber int) ([]models.LightDlsMessageResponse, []models.LightDlsMessageResponse, []models.LightDlsMessageResponse, int, error) {
poisonMessages := make([]models.LightDlsMessageResponse, 0)
schemaMessages := make([]models.LightDlsMessageResponse, 0)
Expand Down
1 change: 1 addition & 0 deletions server/memphis_handlers_rbac.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func GetAllMemphisAndNatsInternalSubjects() []string {
var subjects []string

// Memphis subjects
subjects = append(subjects, NACKED_DLS_SUBJ)
subjects = append(subjects, SCHEMAVERSE_DLS_SUBJ)
subjects = append(subjects, sdkClientsUpdatesSubject)
subjects = append(subjects, PM_RESEND_ACK_SUBJ)
Expand Down
Loading